#!/opt/homebrew/bin/python3.11 """ Rescan cut scenes at 1-frame interval to find more face detections for single-frame traces. Usage: python3 scripts/rescan_single_frame_traces.py --file-uuid [--workers 2] """ import os, sys, json, subprocess, tempfile, argparse, time, psycopg2 from pathlib import Path from collections import defaultdict DB_URL = os.environ.get("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry") OUTPUT_DIR = os.environ.get("MOMENTRY_OUTPUT_DIR", "/Users/accusys/momentry/output_dev") SCRIPTS_DIR = os.environ.get("MOMENTRY_SCRIPTS_DIR", "/Users/accusys/momentry_core_0.1/scripts") VENV_PYTHON = "/Users/accusys/momentry_core_0.1/venv/bin/python" def get_cut_scenes_with_single_traces(file_uuid): conn = psycopg2.connect(DB_URL) cur = conn.cursor() cur.execute("SET search_path TO dev") cur.execute(""" SELECT c.chunk_id, c.start_frame, c.end_frame, c.start_time, c.end_time, COUNT(DISTINCT s.trace_id) as single_traces FROM dev.chunks c JOIN dev.face_detections fd ON fd.file_uuid=c.file_uuid AND fd.frame_number >= c.start_frame AND fd.frame_number <= c.end_frame JOIN ( SELECT trace_id FROM dev.face_detections WHERE file_uuid=%s AND trace_id IS NOT NULL GROUP BY trace_id HAVING COUNT(*) = 1 ) s ON s.trace_id = fd.trace_id WHERE c.file_uuid=%s AND c.chunk_type='cut' GROUP BY c.id, c.chunk_id, c.start_frame, c.end_frame, c.start_time, c.end_time ORDER BY single_traces DESC """, (file_uuid, file_uuid)) scenes = cur.fetchall() cur.close(); conn.close() return scenes def process_scene(file_uuid, video_path, chunk_id, start_frame, end_frame, start_time, end_time): temp_dir = Path(OUTPUT_DIR) / f"rescan_{file_uuid[:8]}" temp_dir.mkdir(exist_ok=True) # Extract segment seg_path = temp_dir / f"{chunk_id}.mp4" duration = end_time - start_time + 2 # pad 2 seconds result = subprocess.run([ "ffmpeg", "-y", "-i", video_path, "-ss", str(max(0, start_time - 1)), "-t", str(duration), "-c:v", "libx264", "-preset", "ultrafast", "-crf", "28", "-an", # no audio str(seg_path) ], capture_output=True, text=True) if not seg_path.exists(): return None, f"ffmpeg failed: {result.stderr[:200]}" # Run face processor out_path = temp_dir / f"{chunk_id}.face.json" frame_offset = start_frame - 1 # ffmpeg extracts from start_time-1 result = subprocess.run([ VENV_PYTHON, str(Path(SCRIPTS_DIR) / "face_processor.py"), str(seg_path), str(out_path), "--sample-interval", "1", "--uuid", file_uuid, ], capture_output=True, text=True, timeout=180) if not out_path.exists(): seg_path.unlink(missing_ok=True) return None, f"face processor failed" # Read results and re-map frame numbers with open(out_path) as f: data = json.load(f) new_detections = [] for entry in data.get("frames", []): orig_frame = int(entry.get("frame", 0)) + frame_offset if orig_frame < start_frame or orig_frame > end_frame: continue faces = entry.get("faces", []) if faces: new_detections.append({"frame": orig_frame, "faces": faces}) # Cleanup temp files seg_path.unlink(missing_ok=True) out_path.unlink(missing_ok=True) return new_detections, None def merge_into_face_json(file_uuid, scene_detections): face_path = Path(OUTPUT_DIR) / f"{file_uuid}.face.json" with open(face_path) as f: face_data = json.load(f) # Index existing frames existing = {} for i, entry in enumerate(face_data.get("frames", [])): existing[entry["frame"]] = i new_faces = 0 for entry in scene_detections: fn = entry["frame"] if fn in existing: # Add new faces not already present existing_face_ids = {f.get("face_id") for f in face_data["frames"][existing[fn]]["faces"]} for face in entry["faces"]: if face.get("face_id") not in existing_face_ids: face_data["frames"][existing[fn]]["faces"].append(face) new_faces += 1 else: face_data["frames"].append({"frame": fn, "faces": entry["faces"]}) new_faces += len(entry["faces"]) # Re-sort by frame face_data["frames"].sort(key=lambda x: x["frame"]) with open(face_path, "w") as f: json.dump(face_data, f) return new_faces def main(): parser = argparse.ArgumentParser() parser.add_argument("--file-uuid", required=True) parser.add_argument("--video-path", default=None) args = parser.parse_args() UUID = args.file_uuid if args.video_path: video_path = args.video_path else: # Try to find video path from DB conn = psycopg2.connect(DB_URL) cur = conn.cursor() cur.execute("SET search_path TO dev") cur.execute("SELECT file_path FROM dev.videos WHERE file_uuid=%s", (UUID,)) row = cur.fetchone() cur.close(); conn.close() if not row: print(f"Video not found for UUID {UUID}") return video_path = row[0] print(f"Scanning for single-frame traces in {UUID}") scenes = get_cut_scenes_with_single_traces(UUID) print(f"Found {len(scenes)} cut scenes with single-frame traces") total_new = 0 start_time = time.time() for i, (chunk_id, sf, ef, st, et, n_traces) in enumerate(scenes): t0 = time.time() detections, error = process_scene(UUID, video_path, chunk_id, sf, ef, st, et) if error: print(f"[{i+1}/{len(scenes)}] {chunk_id}: ERROR - {error}") continue if not detections: print(f"[{i+1}/{len(scenes)}] {chunk_id}: no new detections") continue added = merge_into_face_json(UUID, detections) total_new += added elapsed = time.time() - t0 eta = (len(scenes) - i - 1) * elapsed print(f"[{i+1}/{len(scenes)}] {chunk_id}: +{added} faces ({len(detections)} frames, {elapsed:.0f}s, ETA {eta/60:.0f}min)") print(f"\nDone! Added {total_new} new face detections across {len(scenes)} scenes") print(f"Total time: {(time.time()-start_time)/60:.1f} min") if __name__ == "__main__": main()