Files
momentry_core/scripts/rescan_single_frame_traces.py
Accusys 39ba5ddf76 feat: Phase 1 handover - schema migration, correction mechanism, API fixes
Schema changes: dev.chunks->dev.chunk, remove old_chunk_id/chunk_index
Correction: asr-1.json format, generate/apply scripts
API: 37/37 endpoints fixed and tested
Docs: HANDOVER_V2.0.md for M4
2026-05-11 07:03:22 +08:00

181 lines
6.4 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Rescan cut scenes at 1-frame interval to find more face detections
for single-frame traces.
Usage:
python3 scripts/rescan_single_frame_traces.py --file-uuid <uuid> [--workers 2]
"""
import os, sys, json, subprocess, tempfile, argparse, time, psycopg2
from pathlib import Path
from collections import defaultdict
DB_URL = os.environ.get("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry")
OUTPUT_DIR = os.environ.get("MOMENTRY_OUTPUT_DIR", "/Users/accusys/momentry/output_dev")
SCRIPTS_DIR = os.environ.get("MOMENTRY_SCRIPTS_DIR", "/Users/accusys/momentry_core_0.1/scripts")
VENV_PYTHON = "/Users/accusys/momentry_core_0.1/venv/bin/python"
def get_cut_scenes_with_single_traces(file_uuid):
conn = psycopg2.connect(DB_URL)
cur = conn.cursor()
cur.execute("SET search_path TO dev")
cur.execute("""
SELECT c.chunk_id, c.start_frame, c.end_frame, c.start_time, c.end_time,
COUNT(DISTINCT s.trace_id) as single_traces
FROM dev.chunks c
JOIN dev.face_detections fd ON fd.file_uuid=c.file_uuid
AND fd.frame_number >= c.start_frame AND fd.frame_number <= c.end_frame
JOIN (
SELECT trace_id FROM dev.face_detections
WHERE file_uuid=%s AND trace_id IS NOT NULL
GROUP BY trace_id HAVING COUNT(*) = 1
) s ON s.trace_id = fd.trace_id
WHERE c.file_uuid=%s AND c.chunk_type='cut'
GROUP BY c.id, c.chunk_id, c.start_frame, c.end_frame, c.start_time, c.end_time
ORDER BY single_traces DESC
""", (file_uuid, file_uuid))
scenes = cur.fetchall()
cur.close(); conn.close()
return scenes
def process_scene(file_uuid, video_path, chunk_id, start_frame, end_frame, start_time, end_time):
temp_dir = Path(OUTPUT_DIR) / f"rescan_{file_uuid[:8]}"
temp_dir.mkdir(exist_ok=True)
# Extract segment
seg_path = temp_dir / f"{chunk_id}.mp4"
duration = end_time - start_time + 2 # pad 2 seconds
result = subprocess.run([
"ffmpeg", "-y", "-i", video_path,
"-ss", str(max(0, start_time - 1)),
"-t", str(duration),
"-c:v", "libx264", "-preset", "ultrafast", "-crf", "28",
"-an", # no audio
str(seg_path)
], capture_output=True, text=True)
if not seg_path.exists():
return None, f"ffmpeg failed: {result.stderr[:200]}"
# Run face processor
out_path = temp_dir / f"{chunk_id}.face.json"
frame_offset = start_frame - 1 # ffmpeg extracts from start_time-1
result = subprocess.run([
VENV_PYTHON, str(Path(SCRIPTS_DIR) / "face_processor.py"),
str(seg_path), str(out_path),
"--sample-interval", "1",
"--uuid", file_uuid,
], capture_output=True, text=True, timeout=180)
if not out_path.exists():
seg_path.unlink(missing_ok=True)
return None, f"face processor failed"
# Read results and re-map frame numbers
with open(out_path) as f:
data = json.load(f)
new_detections = []
for entry in data.get("frames", []):
orig_frame = int(entry.get("frame", 0)) + frame_offset
if orig_frame < start_frame or orig_frame > end_frame:
continue
faces = entry.get("faces", [])
if faces:
new_detections.append({"frame": orig_frame, "faces": faces})
# Cleanup temp files
seg_path.unlink(missing_ok=True)
out_path.unlink(missing_ok=True)
return new_detections, None
def merge_into_face_json(file_uuid, scene_detections):
face_path = Path(OUTPUT_DIR) / f"{file_uuid}.face.json"
with open(face_path) as f:
face_data = json.load(f)
# Index existing frames
existing = {}
for i, entry in enumerate(face_data.get("frames", [])):
existing[entry["frame"]] = i
new_faces = 0
for entry in scene_detections:
fn = entry["frame"]
if fn in existing:
# Add new faces not already present
existing_face_ids = {f.get("face_id") for f in face_data["frames"][existing[fn]]["faces"]}
for face in entry["faces"]:
if face.get("face_id") not in existing_face_ids:
face_data["frames"][existing[fn]]["faces"].append(face)
new_faces += 1
else:
face_data["frames"].append({"frame": fn, "faces": entry["faces"]})
new_faces += len(entry["faces"])
# Re-sort by frame
face_data["frames"].sort(key=lambda x: x["frame"])
with open(face_path, "w") as f:
json.dump(face_data, f)
return new_faces
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--file-uuid", required=True)
parser.add_argument("--video-path", default=None)
args = parser.parse_args()
UUID = args.file_uuid
if args.video_path:
video_path = args.video_path
else:
# Try to find video path from DB
conn = psycopg2.connect(DB_URL)
cur = conn.cursor()
cur.execute("SET search_path TO dev")
cur.execute("SELECT file_path FROM dev.videos WHERE file_uuid=%s", (UUID,))
row = cur.fetchone()
cur.close(); conn.close()
if not row:
print(f"Video not found for UUID {UUID}")
return
video_path = row[0]
print(f"Scanning for single-frame traces in {UUID}")
scenes = get_cut_scenes_with_single_traces(UUID)
print(f"Found {len(scenes)} cut scenes with single-frame traces")
total_new = 0
start_time = time.time()
for i, (chunk_id, sf, ef, st, et, n_traces) in enumerate(scenes):
t0 = time.time()
detections, error = process_scene(UUID, video_path, chunk_id, sf, ef, st, et)
if error:
print(f"[{i+1}/{len(scenes)}] {chunk_id}: ERROR - {error}")
continue
if not detections:
print(f"[{i+1}/{len(scenes)}] {chunk_id}: no new detections")
continue
added = merge_into_face_json(UUID, detections)
total_new += added
elapsed = time.time() - t0
eta = (len(scenes) - i - 1) * elapsed
print(f"[{i+1}/{len(scenes)}] {chunk_id}: +{added} faces ({len(detections)} frames, {elapsed:.0f}s, ETA {eta/60:.0f}min)")
print(f"\nDone! Added {total_new} new face detections across {len(scenes)} scenes")
print(f"Total time: {(time.time()-start_time)/60:.1f} min")
if __name__ == "__main__":
main()