#!/opt/homebrew/bin/python3.11 """ Manual Seed Creation - Create identity seed from user-selected face trace Flow: 1. Get trace centroid embedding from Qdrant _faces 2. Create identity in PG (source='manual') 3. Push embedding to Qdrant _seeds 4. Confirm trace binding (update TKG, Qdrant, PG) 5. Auto-trigger propagation Usage: # List pending traces python manual_seed.py --file-uuid --list # Create seed from trace python manual_seed.py --file-uuid --trace-id --name "John Doe" # Create with custom identity_uuid python manual_seed.py --file-uuid --trace-id --name "John Doe" --identity-uuid xxx Output: JSON with created identity info and propagation results """ import os import sys import json import argparse import uuid as uuid_lib from typing import Dict, Optional sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "utils")) from tkg_helper import ( mark_face_track_confirmed, get_pending_face_tracks, get_face_track_nodes, ) from qdrant_faces import ( get_trace_centroid, get_trace_representatives, push_seed_embedding, update_identity_in_faces, ) # Config DB_URL = os.environ.get("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry") SCHEMA = os.environ.get("DATABASE_SCHEMA", "dev") def get_conn(): """Get PostgreSQL connection""" import psycopg2 return psycopg2.connect(DB_URL) def table_name(table: str) -> str: """Get schema-prefixed table name""" if SCHEMA == "public": return table return f"{SCHEMA}.{table}" def create_identity(name: str, identity_uuid: str = None) -> Dict: """Create identity in PG with source='manual' Args: name: Identity name identity_uuid: Optional UUID (auto-generated if None) Returns: {identity_id, identity_uuid, name} """ if not identity_uuid: identity_uuid = str(uuid_lib.uuid4()) conn = get_conn() cur = conn.cursor() id_table = table_name("identities") try: cur.execute( f""" INSERT INTO {id_table} (uuid, name, identity_type, source, status) VALUES (%s, %s, 'person', 'manual', 'active') RETURNING id """, (identity_uuid, name), ) identity_id = cur.fetchone()[0] conn.commit() print(f"[MANUAL] Created identity: {name} (id={identity_id}, uuid={identity_uuid})") return { "identity_id": identity_id, "identity_uuid": identity_uuid, "name": name, } except Exception as e: print(f"[MANUAL] Identity creation failed: {e}") conn.rollback() raise finally: cur.close() conn.close() def create_manual_seed( file_uuid: str, trace_id: int, name: str, identity_uuid: str = None, propagate: bool = True, ) -> Dict: """Create manual seed from trace Args: file_uuid: Video file UUID trace_id: Face trace ID name: Identity name identity_uuid: Optional UUID (auto-generated if None) propagate: Auto-trigger propagation Returns: Result dict with identity info and propagation results """ result = { "file_uuid": file_uuid, "trace_id": trace_id, "name": name, "status": "success", "steps": {}, } # Step 1: Get trace centroid embedding centroid = get_trace_centroid(file_uuid, trace_id) if not centroid or all(v == 0.0 for v in centroid): result["status"] = "failed" result["error"] = "No valid centroid for trace" return result result["steps"]["centroid_extracted"] = True print(f"[MANUAL] Centroid extracted: trace_id={trace_id}") # Step 2: Create identity in PG identity = create_identity(name, identity_uuid) identity_id = identity["identity_id"] identity_uuid = identity["identity_uuid"] result["identity"] = identity result["steps"]["identity_created"] = True # Step 3: Push to _seeds try: push_seed_embedding( identity_id=identity_id, identity_uuid=identity_uuid, name=name, embedding=centroid, source="manual", file_uuid=file_uuid, trace_id=trace_id, ) result["steps"]["seed_pushed"] = True except Exception as e: result["steps"]["seed_pushed"] = False result["steps"]["seed_error"] = str(e) # Step 4: Confirm trace binding # Update TKG tkg_updated = mark_face_track_confirmed(file_uuid, trace_id, identity_id, identity_uuid, name) result["steps"]["tkg_updated"] = tkg_updated # Update Qdrant _faces try: qdrant_updated = update_identity_in_faces(file_uuid, trace_id, identity_id, identity_uuid) result["steps"]["qdrant_updated"] = qdrant_updated except Exception as e: result["steps"]["qdrant_error"] = str(e) # Update PG face_detections conn = get_conn() cur = conn.cursor() fd_table = table_name("face_detections") try: cur.execute( f"UPDATE {fd_table} SET identity_id = %s WHERE file_uuid = %s AND trace_id = %s", (identity_id, file_uuid, trace_id), ) conn.commit() pg_updated = cur.rowcount result["steps"]["pg_updated"] = pg_updated print(f"[MANUAL] PG updated: {pg_updated} face_detections") except Exception as e: result["steps"]["pg_error"] = str(e) conn.rollback() finally: cur.close() conn.close() # Step 5: Auto propagation if propagate: try: propagation_result = run_propagation(file_uuid) result["propagation"] = propagation_result except Exception as e: result["propagation"] = {"error": str(e)} return result def run_propagation(file_uuid: str) -> Dict: """Run Round 2 propagation after manual seed creation Args: file_uuid: Video file UUID Returns: Propagation results """ import subprocess import tempfile # Get confirmed traces face_track_nodes = get_face_track_nodes(file_uuid) confirmed_traces = [] identity_map = {} for node in face_track_nodes: props = node.get("properties", {}) status = props.get("status") if status == "confirmed": trace_id_str = node.get("external_id", "").replace("face_track_", "") if trace_id_str: trace_id = int(trace_id_str) confirmed_traces.append(trace_id) identity_map[trace_id] = { "identity_id": props.get("identity_id"), "identity_uuid": props.get("identity_uuid"), "name": props.get("identity_name"), } if not confirmed_traces: return {"matched": 0, "message": "No confirmed traces for propagation"} # Run identity_matcher.py Round 2 confirmed_str = ",".join(str(t) for t in confirmed_traces) identity_map_file = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) json.dump(identity_map, identity_map_file) identity_map_file.close() cmd = [ sys.executable, os.path.join(os.path.dirname(os.path.abspath(__file__)), "identity_matcher.py"), "--file-uuid", file_uuid, "--round", "2", "--confirmed-traces", confirmed_str, "--identity-map", identity_map_file.name, "--mark-tkg", ] try: output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, text=True) # Find JSON output (last line that starts with {) for line in output.split("\n"): if line.strip().startswith("{"): propagation_result = json.loads(line) break else: propagation_result = {"output": output} os.unlink(identity_map_file.name) return propagation_result except subprocess.CalledProcessError as e: os.unlink(identity_map_file.name) return {"error": str(e), "output": e.output} except Exception as e: os.unlink(identity_map_file.name) return {"error": str(e)} def list_pending_traces(file_uuid: str) -> Dict: """List pending traces for user selection Args: file_uuid: Video file UUID Returns: {traces: [{trace_id, frame_count, avg_bbox}], count} """ # Get TKG pending nodes pending_nodes = get_pending_face_tracks(file_uuid) # Get trace representatives (for centroid availability check) trace_reps = get_trace_representatives(file_uuid) traces = [] for node in pending_nodes: ext_id = node.get("external_id", "") trace_id_str = ext_id.replace("face_track_", "") if not trace_id_str: continue trace_id = int(trace_id_str) props = node.get("properties", {}) # Check if centroid available has_centroid = trace_id in trace_reps and len(trace_reps[trace_id]) > 0 traces.append({ "trace_id": trace_id, "frame_count": props.get("frame_count", 0), "start_frame": props.get("start_frame"), "end_frame": props.get("end_frame"), "avg_bbox": props.get("avg_bbox"), "has_centroid": has_centroid, }) return { "file_uuid": file_uuid, "traces": traces, "count": len(traces), } def main(): parser = argparse.ArgumentParser(description="Manual Seed Creation") parser.add_argument("--file-uuid", required=True, help="Video file UUID") parser.add_argument("--trace-id", type=int, help="Trace ID to create seed from") parser.add_argument("--name", help="Identity name") parser.add_argument("--identity-uuid", help="Custom identity UUID (optional)") parser.add_argument("--list", action="store_true", help="List pending traces") parser.add_argument("--no-propagate", action="store_true", help="Skip auto propagation") parser.add_argument("--output", help="Output JSON file path") args = parser.parse_args() propagate = not args.no_propagate if args.list: result = list_pending_traces(args.file_uuid) elif args.trace_id and args.name: result = create_manual_seed( args.file_uuid, args.trace_id, args.name, args.identity_uuid, propagate, ) else: print("Error: Need either --list or --trace-id/--name") sys.exit(1) output_json = json.dumps(result, indent=2, ensure_ascii=False) if args.output: with open(args.output, "w") as f: f.write(output_json) print(f"[MANUAL] Output saved to {args.output}") else: print(output_json) if __name__ == "__main__": main()