#!/opt/homebrew/bin/python3.11 """ Confirm Identity - Confirm suggested identity binding for a trace Flow: 1. Mark TKG face_track node as 'confirmed' 2. Update Qdrant _faces: all points with trace_id → identity_uuid 3. Update PG face_detections: identity_id 4. Add trace centroid to _seeds (source='propagation') 5. Auto-trigger Round 2 matching (propagation) Usage: python confirm_identity.py --file-uuid --trace-id --identity-id --identity-uuid --name "Tom Hanks" python confirm_identity.py --file-uuid --json suggestions.json # Batch confirm from file Output: JSON with confirm status and Round 2 propagation results """ import os import sys import json import argparse import subprocess from typing import Dict, Optional sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "utils")) from tkg_helper import ( mark_face_track_confirmed, get_suggested_face_tracks, get_face_track_nodes, ) from qdrant_faces import ( update_identity_in_faces, get_trace_centroid, push_seed_embedding, get_trace_representatives, ) def confirm_single_trace( file_uuid: str, trace_id: int, identity_id: int, identity_uuid: str, name: str, propagate: bool = True, ) -> Dict: """Confirm identity binding for a single trace Args: file_uuid: Video file UUID trace_id: Face trace ID identity_id: PG identity.id identity_uuid: Identity UUID name: Identity name propagate: Auto-trigger Round 2 matching Returns: Result dict with status and propagation info """ result = { "file_uuid": file_uuid, "trace_id": trace_id, "identity_id": identity_id, "identity_uuid": identity_uuid, "name": name, "status": "success", "steps": {}, } # Step 1: Update TKG node tkg_updated = mark_face_track_confirmed(file_uuid, trace_id, identity_id, identity_uuid, name) result["steps"]["tkg_updated"] = tkg_updated # Step 2: Update Qdrant _faces try: qdrant_updated = update_identity_in_faces(file_uuid, trace_id, identity_id, identity_uuid) result["steps"]["qdrant_updated"] = qdrant_updated except Exception as e: print(f"[CONFIRM] Qdrant update failed: {e}") result["steps"]["qdrant_updated"] = 0 result["steps"]["qdrant_error"] = str(e) # Step 3: Update PG face_detections import psycopg2 DB_URL = os.environ.get("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry") SCHEMA = os.environ.get("DATABASE_SCHEMA", "dev") if SCHEMA == "public": fd_table = "face_detections" else: fd_table = f"{SCHEMA}.face_detections" try: conn = psycopg2.connect(DB_URL) cur = conn.cursor() cur.execute( f"UPDATE {fd_table} SET identity_id = %s WHERE file_uuid = %s AND trace_id = %s", (identity_id, file_uuid, trace_id), ) conn.commit() pg_updated = cur.rowcount cur.close() conn.close() result["steps"]["pg_updated"] = pg_updated print(f"[CONFIRM] PG updated: {pg_updated} face_detections") except Exception as e: print(f"[CONFIRM] PG update failed: {e}") result["steps"]["pg_updated"] = 0 result["steps"]["pg_error"] = str(e) # Step 4: Add to _seeds as propagation seed try: centroid = get_trace_centroid(file_uuid, trace_id) if centroid and not all(v == 0.0 for v in centroid): push_seed_embedding( identity_id=identity_id, identity_uuid=identity_uuid, name=name, embedding=centroid, source="propagation", file_uuid=file_uuid, trace_id=trace_id, ) result["steps"]["seed_added"] = True else: result["steps"]["seed_added"] = False result["steps"]["seed_error"] = "No valid centroid" except Exception as e: print(f"[CONFIRM] Seed addition failed: {e}") result["steps"]["seed_added"] = False result["steps"]["seed_error"] = str(e) # Step 5: Auto-trigger Round 2 matching if propagate: try: propagation_result = run_round_2_propagation(file_uuid) result["propagation"] = propagation_result except Exception as e: print(f"[CONFIRM] Propagation failed: {e}") result["propagation"] = {"error": str(e)} return result def run_round_2_propagation(file_uuid: str) -> Dict: """Run Round 2 matching after confirmation Args: file_uuid: Video file UUID Returns: Round 2 matching results """ # Get confirmed traces (identity_map) face_track_nodes = get_face_track_nodes(file_uuid) confirmed_traces = [] identity_map = {} for node in face_track_nodes: props = node.get("properties", {}) status = props.get("status") if status == "confirmed": trace_id_str = node.get("external_id", "").replace("face_track_", "") if trace_id_str: trace_id = int(trace_id_str) confirmed_traces.append(trace_id) identity_map[trace_id] = { "identity_id": props.get("identity_id"), "identity_uuid": props.get("identity_uuid"), "name": props.get("identity_name"), } if not confirmed_traces: return {"matched": 0, "message": "No confirmed traces for propagation"} # Run identity_matcher.py Round 2 confirmed_str = ",".join(str(t) for t in confirmed_traces) import tempfile identity_map_file = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) json.dump(identity_map, identity_map_file) identity_map_file.close() cmd = [ sys.executable, os.path.join(os.path.dirname(os.path.abspath(__file__)), "identity_matcher.py"), "--file-uuid", file_uuid, "--round", "2", "--confirmed-traces", confirmed_str, "--identity-map", identity_map_file.name, ] try: output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, text=True) propagation_result = json.loads(output.split("\n")[-1]) # Clean up temp file os.unlink(identity_map_file.name) return propagation_result except subprocess.CalledProcessError as e: print(f"[PROPAGATE] Command failed: {e.output}") os.unlink(identity_map_file.name) return {"error": str(e), "matched": 0} except Exception as e: os.unlink(identity_map_file.name) return {"error": str(e), "matched": 0} def batch_confirm_from_json(file_uuid: str, suggestions_file: str, propagate: bool = True) -> Dict: """Batch confirm suggestions from JSON file Args: file_uuid: Video file UUID suggestions_file: JSON file with suggestions propagate: Auto-trigger Round 2 after all confirmations Returns: Batch confirm results """ with open(suggestions_file) as f: data = json.load(f) suggestions = data.get("suggestions", {}) results = [] confirmed_traces = [] for trace_id_str, suggestion in suggestions.items(): trace_id = int(trace_id_str) result = confirm_single_trace( file_uuid, trace_id, suggestion.get("identity_id"), suggestion.get("identity_uuid"), suggestion.get("name"), propagate=False, # Don't propagate each one ) results.append(result) if result.get("status") == "success": confirmed_traces.append(trace_id) # Final propagation after all confirmations if propagate and confirmed_traces: propagation_result = run_round_2_propagation(file_uuid) batch_result = { "file_uuid": file_uuid, "confirmed_count": len(confirmed_traces), "total_suggestions": len(suggestions), "results": results, "propagation": propagation_result, } else: batch_result = { "file_uuid": file_uuid, "confirmed_count": len(confirmed_traces), "total_suggestions": len(suggestions), "results": results, } return batch_result def main(): parser = argparse.ArgumentParser(description="Confirm Identity Binding") parser.add_argument("--file-uuid", required=True, help="Video file UUID") parser.add_argument("--trace-id", type=int, help="Trace ID to confirm") parser.add_argument("--identity-id", type=int, help="Identity ID") parser.add_argument("--identity-uuid", help="Identity UUID") parser.add_argument("--name", help="Identity name") parser.add_argument("--json", help="JSON file with suggestions (batch confirm)") parser.add_argument("--no-propagate", action="store_true", help="Skip auto propagation") parser.add_argument("--output", help="Output JSON file path") args = parser.parse_args() propagate = not args.no_propagate if args.json: result = batch_confirm_from_json(args.file_uuid, args.json, propagate) elif args.trace_id is not None and args.identity_id is not None and args.identity_uuid and args.name: result = confirm_single_trace( args.file_uuid, args.trace_id, args.identity_id, args.identity_uuid, args.name, propagate, ) else: print("Error: Need either --json or --trace-id/--identity-id/--identity-uuid/--name") sys.exit(1) output_json = json.dumps(result, indent=2, ensure_ascii=False) if args.output: with open(args.output, "w") as f: f.write(output_json) print(f"[CONFIRM] Output saved to {args.output}") else: print(output_json) if __name__ == "__main__": main()