#!/opt/homebrew/bin/python3.11 """ Store Traced Faces - Pipeline integration for face trace + position data Flow: 1. Reads face.json output from face_processor.py 2. Runs face_tracker.py to assign trace_id per face (IoU + embedding) 3. Inserts traced faces into face_detections table with trace_id and position (x,y,w,h) Usage: python store_traced_faces.py --file-uuid [--face-json ] TKG Export: trace_id + position (x,y,w,h) per frame enables spatial-temporal graph construction. Each trace is a temporal entity; position tracks movement across frames. """ import sys import os import json import argparse import numpy as np import psycopg2 import psycopg2.extras from datetime import datetime sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "utils")) # Config DB_URL = os.environ.get("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry") SCHEMA = os.environ.get("MOMENTRY_DB_SCHEMA", "dev") OUTPUT_DIR = os.environ.get("MOMENTRY_OUTPUT_DIR", "/Users/accusys/momentry/output_dev") def get_conn(): return psycopg2.connect(DB_URL) def merge_traces_within_cuts(face_data: dict, cut_scenes: list) -> dict: """Merge traces within the same cut if they have similar embeddings (same person re-appeared).""" frames = face_data.get("frames", {}) if not frames: return face_data # Map each frame to its scene/cut number frame_to_scene = {} for s in cut_scenes: for f in range(s["start_frame"], s["end_frame"] + 1): frame_to_scene[f] = s["scene_number"] # Collect per-trace data: scene numbers, embeddings, face positions trace_frames = defaultdict(list) trace_embeddings = defaultdict(list) trace_poses = {} for fnum_str, frm_data in frames.items(): fnum = int(fnum_str) for face in frm_data.get("faces", []): tid = face.get("trace_id") if tid is None: continue trace_frames[tid].append(fnum) emb = face.get("embedding") if emb is not None: trace_embeddings[tid].append(emb) if tid not in trace_poses: trace_poses[tid] = (face.get("x", 0), face.get("y", 0), face.get("width", 0), face.get("height", 0)) if len(trace_embeddings) < 2: return face_data # Compute centroid per trace trace_centroids = {} for tid, embs in trace_embeddings.items(): centroid = np.mean(embs, axis=0) norm = np.linalg.norm(centroid) trace_centroids[tid] = centroid / norm if norm > 0 else centroid # Determine which scene each trace belongs to (majority of frames) trace_scene = {} for tid, fns in trace_frames.items(): scene_votes = defaultdict(int) for fn in fns: scene = frame_to_scene.get(fn, -1) scene_votes[scene] += 1 trace_scene[tid] = max(scene_votes, key=scene_votes.get) if scene_votes else -1 # Within each scene, merge traces with similar centroids scene_traces = defaultdict(list) for tid, scene in trace_scene.items(): if scene >= 0 and tid in trace_centroids: scene_traces[scene].append(tid) merged = 0 next_new_id = max(trace_frames.keys()) + 1 if trace_frames else 0 SIMILARITY_THRESHOLD = 0.75 for scene, tids in scene_traces.items(): if len(tids) < 2: continue used = set() for i in range(len(tids)): if tids[i] in used: continue keep_tid = tids[i] for j in range(i + 1, len(tids)): if tids[j] in used: continue sim = float(np.dot(trace_centroids[tids[i]], trace_centroids[tids[j]])) if sim >= SIMILARITY_THRESHOLD: # Merge tids[j] into keep_tid for fnum_str, frm_data in frames.items(): for face in frm_data.get("faces", []): if face.get("trace_id") == tids[j]: face["trace_id"] = keep_tid used.add(tids[j]) merged += 1 # If any merges happened, rebuild trace metadata if merged > 0: # Rebuild traces dict new_traces = {} new_trace_frames = defaultdict(list) for fnum_str, frm_data in frames.items(): fnum = int(fnum_str) for face in frm_data.get("faces", []): tid = face.get("trace_id") if tid is not None: new_trace_frames[tid].append({ "frame": fnum, "face_index": 0, "bbox": {"x": face.get("x", 0), "y": face.get("y", 0), "width": face.get("width", 0), "height": face.get("height", 0)}, "confidence": face.get("confidence", 0.0), }) for tid, path in new_trace_frames.items(): if len(path) >= 1: frames_sorted = sorted(set(p["frame"] for p in path)) new_traces[str(tid)] = { "trace_id": tid, "start_frame": frames_sorted[0], "end_frame": frames_sorted[-1], "duration_frames": frames_sorted[-1] - frames_sorted[0] + 1, "duration_seconds": (frames_sorted[-1] - frames_sorted[0]) / face_data.get("metadata", {}).get("fps", 25.0), "total_appearances": len(path), "path": path, } face_data["traces"] = new_traces face_data["metadata"]["trace_stats"] = { "total_traces": len(new_traces), "active_traces": len(new_traces), "long_traces": len([t for t in new_traces.values() if t["duration_frames"] >= 2]), } print(f"[TRACE] Post-merge: {merged} traces merged, {len(new_traces)} total traces") return face_data def run_face_tracker(face_json_path: str, traced_json_path: str) -> str: """Run face_tracker.py on face.json, returns path to face_traced.json""" from face_tracker import track_faces with open(face_json_path) as f: face_data = json.load(f) # V2.0 uses list format (FaceResult), convert to dict for face_tracker if isinstance(face_data.get("frames"), list): frames_dict = {} for frame in face_data["frames"]: fnum = str(frame["frame"]) faces = [] for f in frame.get("faces", []): bbox = f.get("bbox", f) face = { "x": bbox.get("x", f.get("x", 0)), "y": bbox.get("y", f.get("y", 0)), "width": bbox.get("width", f.get("width", 0)), "height": bbox.get("height", f.get("height", 0)), "confidence": f.get("confidence", 0.0), } if "landmarks" in f: face["landmarks"] = f["landmarks"] if "embedding" in f: face["embedding"] = f["embedding"] faces.append(face) frames_dict[fnum] = { "frame_number": frame["frame"], "time_seconds": frame.get("timestamp", 0), "faces": faces, } face_data["frames"] = frames_dict # Preserve metadata (fps needed by face_tracker) if "metadata" not in face_data: face_data["metadata"] = { "fps": face_data.get("fps", 30.0), "total_frames": face_data.get("frame_count", 0), } print(f"[TRACE] Processing {len(face_data.get('frames', {}))} frames") # Load embeddings from DB for the face tracker file_uuid = face_json_path.split("/")[-1].replace(".face.json", "").replace("_traced.json", "") try: conn = get_conn() cur = conn.cursor() cur.execute(f""" SELECT frame_number, x, y, width, height, embedding FROM {SCHEMA}.face_detections WHERE file_uuid = %s AND embedding IS NOT NULL """, (file_uuid,)) emb_rows = cur.fetchall() conn.close() # Build lookup: frame_number → list of (bbox, embedding) emb_map = {} for fn, x, y, w, h, emb in emb_rows: emb_map.setdefault(fn, []).append(((x, y, w, h), emb)) print(f"[TRACE] Loaded {len(emb_rows)} embeddings from DB") # Attach embeddings to face data attached = 0 for fnum_str, frm_data in face_data.get("frames", {}).items(): fnum = int(fnum_str) for face in frm_data.get("faces", []): x, y, w, h = face.get("x", 0), face.get("y", 0), face.get("width", 0), face.get("height", 0) candidates = emb_map.get(fnum, []) # Find matching embedding by bbox proximity for (ex, ey, ew, eh), emb in candidates: if abs(x - ex) < 10 and abs(y - ey) < 10 and abs(w - ew) < 10 and abs(h - eh) < 10: face["embedding"] = emb attached += 1 break print(f"[TRACE] Attached {attached} embeddings to faces") except Exception as e: print(f"[TRACE] WARNING: Could not load embeddings: {e}") # Load cut boundaries from cut.json (same directory as face.json) cut_boundaries = None cut_scenes = None cuts_path = face_json_path.replace("_traced.json", ".cut.json").replace(".face.json", ".cut.json") if os.path.exists(cuts_path): with open(cuts_path) as f: cuts = json.load(f) cut_scenes = cuts.get("scenes", []) cut_boundaries = {s["start_frame"] for s in cut_scenes if s["start_frame"] > 0} print(f"[TRACE] Loaded {len(cut_boundaries)} cut boundaries") face_data = track_faces(face_data, use_embedding=True, cut_boundaries=cut_boundaries) # Merge traces within same cut (same person re-appearing after occlusion/pose change) if cut_scenes and len(cut_scenes) > 0: face_data = merge_traces_within_cuts(face_data, cut_scenes) metadata = face_data.get("metadata", {}) metadata["tracking_method"] = "iou_embedding" metadata["tracked_at"] = datetime.now().isoformat() face_data["metadata"] = metadata with open(traced_json_path, "w") as f: json.dump(face_data, f, indent=2, ensure_ascii=False) trace_count = len(face_data.get("traces", {})) print(f"[TRACE] Completed: {trace_count} traces -> {traced_json_path}") return traced_json_path def store_traced_faces(file_uuid: str, traced_json_path: str, schema: str = SCHEMA): """Insert traced face detections into face_detections table with trace_id""" conn = get_conn() cur = conn.cursor() with open(traced_json_path) as f: data = json.load(f) frames = data.get("frames", {}) total_stored = 0 for frame_num_str, frame_data in sorted(frames.items(), key=lambda x: int(x[0])): frame_num = int(frame_num_str) faces = frame_data.get("faces", []) for face in faces: trace_id = face.get("trace_id") if trace_id is None: continue x = face.get("x", 0) y = face.get("y", 0) w = face.get("width", 0) h = face.get("height", 0) confidence = face.get("confidence", 0.0) face_id = face.get("face_id") attributes = face.get("attributes") embedding = face.get("embedding") bbox = json.dumps({"x": x, "y": y, "width": w, "height": h}) embed_vec = embedding if embedding and len(embedding) > 0 else None try: cur.execute( f""" UPDATE {schema}.face_detections SET trace_id = %s WHERE file_uuid = %s AND frame_number = %s AND x = %s AND y = %s AND width = %s AND height = %s """, ( trace_id, file_uuid, frame_num, x, y, w, h, ), ) if cur.rowcount > 0: total_stored += 1 except Exception as e: print(f"[TRACE] Error storing face at frame {frame_num}: {e}") conn.rollback() continue conn.commit() # Log trace summary cur.execute( f"SELECT COUNT(DISTINCT trace_id) FROM {schema}.face_detections WHERE file_uuid = %s AND trace_id IS NOT NULL", (file_uuid,), ) db_trace_count = cur.fetchone()[0] cur.close() conn.close() print(f"[TRACE] Stored {total_stored} face detections, {db_trace_count} unique traces in DB") return total_stored, db_trace_count def main(): parser = argparse.ArgumentParser(description="Store traced faces in DB") parser.add_argument("--file-uuid", required=True, help="Video file UUID") parser.add_argument("--face-json", help="Path to face.json (default: auto-detect)") parser.add_argument("--schema", default=SCHEMA, help="DB schema name") parser.add_argument("--uuid", help="UUID for Redis tracking (accepted by executor)") args = parser.parse_args() face_json = args.face_json or os.path.join( OUTPUT_DIR, f"{args.file_uuid}.face.json" ) traced_json = os.path.join(OUTPUT_DIR, f"{args.file_uuid}.face_traced.json") if not os.path.exists(face_json): print(f"[TRACE] face.json not found: {face_json}", file=sys.stderr) sys.exit(1) # Step 1: Run face tracker run_face_tracker(face_json, traced_json) # Step 2: Store in DB with trace_id total, traces = store_traced_faces(args.file_uuid, traced_json, args.schema) print(f"[TRACE] Done: {total} detections, {traces} traces") if __name__ == "__main__": main()