feat: add confirm_identity.py for identity binding confirmation
Implements: - confirm_single_trace(): Confirm identity binding for one trace - Update TKG face_track node: status='confirmed' - Update Qdrant _faces: identity_uuid for all points - Update PG face_detections: identity_id - Add trace centroid to _seeds (source='propagation') - Auto-trigger Round 2 matching - batch_confirm_from_json(): Batch confirm from suggestions file - Confirm multiple suggestions from identity_matcher output - Final propagation after all confirmations - run_round_2_propagation(): Auto propagation trigger - Get confirmed traces from TKG nodes - Build identity_map for propagation - Run identity_matcher.py Round 2 Usage: python confirm_identity.py --file-uuid <uuid> --trace-id 1 --identity-id 1 --identity-uuid xxx --name 'Tom Hanks' python confirm_identity.py --file-uuid <uuid> --json suggestions.json python confirm_identity.py --file-uuid <uuid> --json suggestions.json --no-propagate
This commit is contained in:
311
scripts/confirm_identity.py
Normal file
311
scripts/confirm_identity.py
Normal file
@@ -0,0 +1,311 @@
|
||||
#!/opt/homebrew/bin/python3.11
|
||||
"""
|
||||
Confirm Identity - Confirm suggested identity binding for a trace
|
||||
|
||||
Flow:
|
||||
1. Mark TKG face_track node as 'confirmed'
|
||||
2. Update Qdrant _faces: all points with trace_id → identity_uuid
|
||||
3. Update PG face_detections: identity_id
|
||||
4. Add trace centroid to _seeds (source='propagation')
|
||||
5. Auto-trigger Round 2 matching (propagation)
|
||||
|
||||
Usage:
|
||||
python confirm_identity.py --file-uuid <uuid> --trace-id <id> --identity-id <id> --identity-uuid <uuid> --name "Tom Hanks"
|
||||
python confirm_identity.py --file-uuid <uuid> --json suggestions.json # Batch confirm from file
|
||||
|
||||
Output:
|
||||
JSON with confirm status and Round 2 propagation results
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import argparse
|
||||
import subprocess
|
||||
from typing import Dict, Optional
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "utils"))
|
||||
|
||||
from tkg_helper import (
|
||||
mark_face_track_confirmed,
|
||||
get_suggested_face_tracks,
|
||||
get_face_track_nodes,
|
||||
)
|
||||
from qdrant_faces import (
|
||||
update_identity_in_faces,
|
||||
get_trace_centroid,
|
||||
push_seed_embedding,
|
||||
get_trace_representatives,
|
||||
)
|
||||
|
||||
|
||||
def confirm_single_trace(
|
||||
file_uuid: str,
|
||||
trace_id: int,
|
||||
identity_id: int,
|
||||
identity_uuid: str,
|
||||
name: str,
|
||||
propagate: bool = True,
|
||||
) -> Dict:
|
||||
"""Confirm identity binding for a single trace
|
||||
|
||||
Args:
|
||||
file_uuid: Video file UUID
|
||||
trace_id: Face trace ID
|
||||
identity_id: PG identity.id
|
||||
identity_uuid: Identity UUID
|
||||
name: Identity name
|
||||
propagate: Auto-trigger Round 2 matching
|
||||
|
||||
Returns:
|
||||
Result dict with status and propagation info
|
||||
"""
|
||||
result = {
|
||||
"file_uuid": file_uuid,
|
||||
"trace_id": trace_id,
|
||||
"identity_id": identity_id,
|
||||
"identity_uuid": identity_uuid,
|
||||
"name": name,
|
||||
"status": "success",
|
||||
"steps": {},
|
||||
}
|
||||
|
||||
# Step 1: Update TKG node
|
||||
tkg_updated = mark_face_track_confirmed(file_uuid, trace_id, identity_id, identity_uuid, name)
|
||||
result["steps"]["tkg_updated"] = tkg_updated
|
||||
|
||||
# Step 2: Update Qdrant _faces
|
||||
try:
|
||||
qdrant_updated = update_identity_in_faces(file_uuid, trace_id, identity_id, identity_uuid)
|
||||
result["steps"]["qdrant_updated"] = qdrant_updated
|
||||
except Exception as e:
|
||||
print(f"[CONFIRM] Qdrant update failed: {e}")
|
||||
result["steps"]["qdrant_updated"] = 0
|
||||
result["steps"]["qdrant_error"] = str(e)
|
||||
|
||||
# Step 3: Update PG face_detections
|
||||
import psycopg2
|
||||
DB_URL = os.environ.get("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry")
|
||||
SCHEMA = os.environ.get("DATABASE_SCHEMA", "dev")
|
||||
|
||||
if SCHEMA == "public":
|
||||
fd_table = "face_detections"
|
||||
else:
|
||||
fd_table = f"{SCHEMA}.face_detections"
|
||||
|
||||
try:
|
||||
conn = psycopg2.connect(DB_URL)
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
f"UPDATE {fd_table} SET identity_id = %s WHERE file_uuid = %s AND trace_id = %s",
|
||||
(identity_id, file_uuid, trace_id),
|
||||
)
|
||||
conn.commit()
|
||||
pg_updated = cur.rowcount
|
||||
cur.close()
|
||||
conn.close()
|
||||
result["steps"]["pg_updated"] = pg_updated
|
||||
print(f"[CONFIRM] PG updated: {pg_updated} face_detections")
|
||||
except Exception as e:
|
||||
print(f"[CONFIRM] PG update failed: {e}")
|
||||
result["steps"]["pg_updated"] = 0
|
||||
result["steps"]["pg_error"] = str(e)
|
||||
|
||||
# Step 4: Add to _seeds as propagation seed
|
||||
try:
|
||||
centroid = get_trace_centroid(file_uuid, trace_id)
|
||||
if centroid and not all(v == 0.0 for v in centroid):
|
||||
push_seed_embedding(
|
||||
identity_id=identity_id,
|
||||
identity_uuid=identity_uuid,
|
||||
name=name,
|
||||
embedding=centroid,
|
||||
source="propagation",
|
||||
file_uuid=file_uuid,
|
||||
trace_id=trace_id,
|
||||
)
|
||||
result["steps"]["seed_added"] = True
|
||||
else:
|
||||
result["steps"]["seed_added"] = False
|
||||
result["steps"]["seed_error"] = "No valid centroid"
|
||||
except Exception as e:
|
||||
print(f"[CONFIRM] Seed addition failed: {e}")
|
||||
result["steps"]["seed_added"] = False
|
||||
result["steps"]["seed_error"] = str(e)
|
||||
|
||||
# Step 5: Auto-trigger Round 2 matching
|
||||
if propagate:
|
||||
try:
|
||||
propagation_result = run_round_2_propagation(file_uuid)
|
||||
result["propagation"] = propagation_result
|
||||
except Exception as e:
|
||||
print(f"[CONFIRM] Propagation failed: {e}")
|
||||
result["propagation"] = {"error": str(e)}
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def run_round_2_propagation(file_uuid: str) -> Dict:
|
||||
"""Run Round 2 matching after confirmation
|
||||
|
||||
Args:
|
||||
file_uuid: Video file UUID
|
||||
|
||||
Returns:
|
||||
Round 2 matching results
|
||||
"""
|
||||
# Get confirmed traces (identity_map)
|
||||
face_track_nodes = get_face_track_nodes(file_uuid)
|
||||
|
||||
confirmed_traces = []
|
||||
identity_map = {}
|
||||
|
||||
for node in face_track_nodes:
|
||||
props = node.get("properties", {})
|
||||
status = props.get("status")
|
||||
|
||||
if status == "confirmed":
|
||||
trace_id_str = node.get("external_id", "").replace("face_track_", "")
|
||||
if trace_id_str:
|
||||
trace_id = int(trace_id_str)
|
||||
confirmed_traces.append(trace_id)
|
||||
identity_map[trace_id] = {
|
||||
"identity_id": props.get("identity_id"),
|
||||
"identity_uuid": props.get("identity_uuid"),
|
||||
"name": props.get("identity_name"),
|
||||
}
|
||||
|
||||
if not confirmed_traces:
|
||||
return {"matched": 0, "message": "No confirmed traces for propagation"}
|
||||
|
||||
# Run identity_matcher.py Round 2
|
||||
confirmed_str = ",".join(str(t) for t in confirmed_traces)
|
||||
|
||||
import tempfile
|
||||
identity_map_file = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False)
|
||||
json.dump(identity_map, identity_map_file)
|
||||
identity_map_file.close()
|
||||
|
||||
cmd = [
|
||||
sys.executable,
|
||||
os.path.join(os.path.dirname(os.path.abspath(__file__)), "identity_matcher.py"),
|
||||
"--file-uuid", file_uuid,
|
||||
"--round", "2",
|
||||
"--confirmed-traces", confirmed_str,
|
||||
"--identity-map", identity_map_file.name,
|
||||
]
|
||||
|
||||
try:
|
||||
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, text=True)
|
||||
propagation_result = json.loads(output.split("\n")[-1])
|
||||
|
||||
# Clean up temp file
|
||||
os.unlink(identity_map_file.name)
|
||||
|
||||
return propagation_result
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"[PROPAGATE] Command failed: {e.output}")
|
||||
os.unlink(identity_map_file.name)
|
||||
return {"error": str(e), "matched": 0}
|
||||
except Exception as e:
|
||||
os.unlink(identity_map_file.name)
|
||||
return {"error": str(e), "matched": 0}
|
||||
|
||||
|
||||
def batch_confirm_from_json(file_uuid: str, suggestions_file: str, propagate: bool = True) -> Dict:
|
||||
"""Batch confirm suggestions from JSON file
|
||||
|
||||
Args:
|
||||
file_uuid: Video file UUID
|
||||
suggestions_file: JSON file with suggestions
|
||||
propagate: Auto-trigger Round 2 after all confirmations
|
||||
|
||||
Returns:
|
||||
Batch confirm results
|
||||
"""
|
||||
with open(suggestions_file) as f:
|
||||
data = json.load(f)
|
||||
|
||||
suggestions = data.get("suggestions", {})
|
||||
|
||||
results = []
|
||||
confirmed_traces = []
|
||||
|
||||
for trace_id_str, suggestion in suggestions.items():
|
||||
trace_id = int(trace_id_str)
|
||||
result = confirm_single_trace(
|
||||
file_uuid,
|
||||
trace_id,
|
||||
suggestion.get("identity_id"),
|
||||
suggestion.get("identity_uuid"),
|
||||
suggestion.get("name"),
|
||||
propagate=False, # Don't propagate each one
|
||||
)
|
||||
results.append(result)
|
||||
|
||||
if result.get("status") == "success":
|
||||
confirmed_traces.append(trace_id)
|
||||
|
||||
# Final propagation after all confirmations
|
||||
if propagate and confirmed_traces:
|
||||
propagation_result = run_round_2_propagation(file_uuid)
|
||||
batch_result = {
|
||||
"file_uuid": file_uuid,
|
||||
"confirmed_count": len(confirmed_traces),
|
||||
"total_suggestions": len(suggestions),
|
||||
"results": results,
|
||||
"propagation": propagation_result,
|
||||
}
|
||||
else:
|
||||
batch_result = {
|
||||
"file_uuid": file_uuid,
|
||||
"confirmed_count": len(confirmed_traces),
|
||||
"total_suggestions": len(suggestions),
|
||||
"results": results,
|
||||
}
|
||||
|
||||
return batch_result
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Confirm Identity Binding")
|
||||
parser.add_argument("--file-uuid", required=True, help="Video file UUID")
|
||||
parser.add_argument("--trace-id", type=int, help="Trace ID to confirm")
|
||||
parser.add_argument("--identity-id", type=int, help="Identity ID")
|
||||
parser.add_argument("--identity-uuid", help="Identity UUID")
|
||||
parser.add_argument("--name", help="Identity name")
|
||||
parser.add_argument("--json", help="JSON file with suggestions (batch confirm)")
|
||||
parser.add_argument("--no-propagate", action="store_true", help="Skip auto propagation")
|
||||
parser.add_argument("--output", help="Output JSON file path")
|
||||
args = parser.parse_args()
|
||||
|
||||
propagate = not args.no_propagate
|
||||
|
||||
if args.json:
|
||||
result = batch_confirm_from_json(args.file_uuid, args.json, propagate)
|
||||
elif args.trace_id and args.identity_id and args.identity_uuid and args.name:
|
||||
result = confirm_single_trace(
|
||||
args.file_uuid,
|
||||
args.trace_id,
|
||||
args.identity_id,
|
||||
args.identity_uuid,
|
||||
args.name,
|
||||
propagate,
|
||||
)
|
||||
else:
|
||||
print("Error: Need either --json or --trace-id/--identity-id/--identity-uuid/--name")
|
||||
sys.exit(1)
|
||||
|
||||
output_json = json.dumps(result, indent=2, ensure_ascii=False)
|
||||
|
||||
if args.output:
|
||||
with open(args.output, "w") as f:
|
||||
f.write(output_json)
|
||||
print(f"[CONFIRM] Output saved to {args.output}")
|
||||
else:
|
||||
print(output_json)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user