Files
momentry_core/scripts/manual_seed.py
Accusys d20819b03b feat: add manual_seed.py for user-selected face trace seed creation
Implements:
- create_identity(): Create PG identity (source='manual')
- create_manual_seed(): Full flow from trace → seed → confirm
  - Get trace centroid embedding from Qdrant _faces
  - Create identity in PG
  - Push to Qdrant _seeds
  - Confirm trace binding (TKG + Qdrant + PG)
  - Auto-trigger Round 2 propagation
- list_pending_traces(): List traces for user selection
- run_propagation(): Auto propagation trigger

Usage:
  # List pending traces
  python manual_seed.py --file-uuid <uuid> --list

  # Create seed from trace
  python manual_seed.py --file-uuid <uuid> --trace-id 1 --name 'John Doe'

  # Custom UUID
  python manual_seed.py --file-uuid <uuid> --trace-id 1 --name 'John Doe' --identity-uuid xxx

  # No propagation
  python manual_seed.py --file-uuid <uuid> --trace-id 1 --name 'John Doe' --no-propagate

Flow: select trace → label → create identity → push seed → auto-bind → propagate
2026-06-25 01:49:53 +08:00

370 lines
11 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Manual Seed Creation - Create identity seed from user-selected face trace
Flow:
1. Get trace centroid embedding from Qdrant _faces
2. Create identity in PG (source='manual')
3. Push embedding to Qdrant _seeds
4. Confirm trace binding (update TKG, Qdrant, PG)
5. Auto-trigger propagation
Usage:
# List pending traces
python manual_seed.py --file-uuid <uuid> --list
# Create seed from trace
python manual_seed.py --file-uuid <uuid> --trace-id <id> --name "John Doe"
# Create with custom identity_uuid
python manual_seed.py --file-uuid <uuid> --trace-id <id> --name "John Doe" --identity-uuid xxx
Output:
JSON with created identity info and propagation results
"""
import os
import sys
import json
import argparse
import uuid as uuid_lib
from typing import Dict, Optional
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "utils"))
from tkg_helper import (
mark_face_track_confirmed,
get_pending_face_tracks,
get_face_track_nodes,
)
from qdrant_faces import (
get_trace_centroid,
get_trace_representatives,
push_seed_embedding,
update_identity_in_faces,
)
# Config
DB_URL = os.environ.get("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry")
SCHEMA = os.environ.get("DATABASE_SCHEMA", "dev")
def get_conn():
"""Get PostgreSQL connection"""
import psycopg2
return psycopg2.connect(DB_URL)
def table_name(table: str) -> str:
"""Get schema-prefixed table name"""
if SCHEMA == "public":
return table
return f"{SCHEMA}.{table}"
def create_identity(name: str, identity_uuid: str = None) -> Dict:
"""Create identity in PG with source='manual'
Args:
name: Identity name
identity_uuid: Optional UUID (auto-generated if None)
Returns:
{identity_id, identity_uuid, name}
"""
if not identity_uuid:
identity_uuid = str(uuid_lib.uuid4())
conn = get_conn()
cur = conn.cursor()
id_table = table_name("identities")
try:
cur.execute(
f"""
INSERT INTO {id_table} (uuid, name, identity_type, source, status)
VALUES (%s, %s, 'person', 'manual', 'active')
RETURNING id
""",
(identity_uuid, name),
)
identity_id = cur.fetchone()[0]
conn.commit()
print(f"[MANUAL] Created identity: {name} (id={identity_id}, uuid={identity_uuid})")
return {
"identity_id": identity_id,
"identity_uuid": identity_uuid,
"name": name,
}
except Exception as e:
print(f"[MANUAL] Identity creation failed: {e}")
conn.rollback()
raise
finally:
cur.close()
conn.close()
def create_manual_seed(
file_uuid: str,
trace_id: int,
name: str,
identity_uuid: str = None,
propagate: bool = True,
) -> Dict:
"""Create manual seed from trace
Args:
file_uuid: Video file UUID
trace_id: Face trace ID
name: Identity name
identity_uuid: Optional UUID (auto-generated if None)
propagate: Auto-trigger propagation
Returns:
Result dict with identity info and propagation results
"""
result = {
"file_uuid": file_uuid,
"trace_id": trace_id,
"name": name,
"status": "success",
"steps": {},
}
# Step 1: Get trace centroid embedding
centroid = get_trace_centroid(file_uuid, trace_id)
if not centroid or all(v == 0.0 for v in centroid):
result["status"] = "failed"
result["error"] = "No valid centroid for trace"
return result
result["steps"]["centroid_extracted"] = True
print(f"[MANUAL] Centroid extracted: trace_id={trace_id}")
# Step 2: Create identity in PG
identity = create_identity(name, identity_uuid)
identity_id = identity["identity_id"]
identity_uuid = identity["identity_uuid"]
result["identity"] = identity
result["steps"]["identity_created"] = True
# Step 3: Push to _seeds
try:
push_seed_embedding(
identity_id=identity_id,
identity_uuid=identity_uuid,
name=name,
embedding=centroid,
source="manual",
file_uuid=file_uuid,
trace_id=trace_id,
)
result["steps"]["seed_pushed"] = True
except Exception as e:
result["steps"]["seed_pushed"] = False
result["steps"]["seed_error"] = str(e)
# Step 4: Confirm trace binding
# Update TKG
tkg_updated = mark_face_track_confirmed(file_uuid, trace_id, identity_id, identity_uuid, name)
result["steps"]["tkg_updated"] = tkg_updated
# Update Qdrant _faces
try:
qdrant_updated = update_identity_in_faces(file_uuid, trace_id, identity_id, identity_uuid)
result["steps"]["qdrant_updated"] = qdrant_updated
except Exception as e:
result["steps"]["qdrant_error"] = str(e)
# Update PG face_detections
conn = get_conn()
cur = conn.cursor()
fd_table = table_name("face_detections")
try:
cur.execute(
f"UPDATE {fd_table} SET identity_id = %s WHERE file_uuid = %s AND trace_id = %s",
(identity_id, file_uuid, trace_id),
)
conn.commit()
pg_updated = cur.rowcount
result["steps"]["pg_updated"] = pg_updated
print(f"[MANUAL] PG updated: {pg_updated} face_detections")
except Exception as e:
result["steps"]["pg_error"] = str(e)
conn.rollback()
finally:
cur.close()
conn.close()
# Step 5: Auto propagation
if propagate:
try:
propagation_result = run_propagation(file_uuid)
result["propagation"] = propagation_result
except Exception as e:
result["propagation"] = {"error": str(e)}
return result
def run_propagation(file_uuid: str) -> Dict:
"""Run Round 2 propagation after manual seed creation
Args:
file_uuid: Video file UUID
Returns:
Propagation results
"""
import subprocess
import tempfile
# Get confirmed traces
face_track_nodes = get_face_track_nodes(file_uuid)
confirmed_traces = []
identity_map = {}
for node in face_track_nodes:
props = node.get("properties", {})
status = props.get("status")
if status == "confirmed":
trace_id_str = node.get("external_id", "").replace("face_track_", "")
if trace_id_str:
trace_id = int(trace_id_str)
confirmed_traces.append(trace_id)
identity_map[trace_id] = {
"identity_id": props.get("identity_id"),
"identity_uuid": props.get("identity_uuid"),
"name": props.get("identity_name"),
}
if not confirmed_traces:
return {"matched": 0, "message": "No confirmed traces for propagation"}
# Run identity_matcher.py Round 2
confirmed_str = ",".join(str(t) for t in confirmed_traces)
identity_map_file = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False)
json.dump(identity_map, identity_map_file)
identity_map_file.close()
cmd = [
sys.executable,
os.path.join(os.path.dirname(os.path.abspath(__file__)), "identity_matcher.py"),
"--file-uuid", file_uuid,
"--round", "2",
"--confirmed-traces", confirmed_str,
"--identity-map", identity_map_file.name,
"--mark-tkg",
]
try:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, text=True)
# Find JSON output (last line that starts with {)
for line in output.split("\n"):
if line.strip().startswith("{"):
propagation_result = json.loads(line)
break
else:
propagation_result = {"output": output}
os.unlink(identity_map_file.name)
return propagation_result
except subprocess.CalledProcessError as e:
os.unlink(identity_map_file.name)
return {"error": str(e), "output": e.output}
except Exception as e:
os.unlink(identity_map_file.name)
return {"error": str(e)}
def list_pending_traces(file_uuid: str) -> Dict:
"""List pending traces for user selection
Args:
file_uuid: Video file UUID
Returns:
{traces: [{trace_id, frame_count, avg_bbox}], count}
"""
# Get TKG pending nodes
pending_nodes = get_pending_face_tracks(file_uuid)
# Get trace representatives (for centroid availability check)
trace_reps = get_trace_representatives(file_uuid)
traces = []
for node in pending_nodes:
ext_id = node.get("external_id", "")
trace_id_str = ext_id.replace("face_track_", "")
if not trace_id_str:
continue
trace_id = int(trace_id_str)
props = node.get("properties", {})
# Check if centroid available
has_centroid = trace_id in trace_reps and len(trace_reps[trace_id]) > 0
traces.append({
"trace_id": trace_id,
"frame_count": props.get("frame_count", 0),
"start_frame": props.get("start_frame"),
"end_frame": props.get("end_frame"),
"avg_bbox": props.get("avg_bbox"),
"has_centroid": has_centroid,
})
return {
"file_uuid": file_uuid,
"traces": traces,
"count": len(traces),
}
def main():
parser = argparse.ArgumentParser(description="Manual Seed Creation")
parser.add_argument("--file-uuid", required=True, help="Video file UUID")
parser.add_argument("--trace-id", type=int, help="Trace ID to create seed from")
parser.add_argument("--name", help="Identity name")
parser.add_argument("--identity-uuid", help="Custom identity UUID (optional)")
parser.add_argument("--list", action="store_true", help="List pending traces")
parser.add_argument("--no-propagate", action="store_true", help="Skip auto propagation")
parser.add_argument("--output", help="Output JSON file path")
args = parser.parse_args()
propagate = not args.no_propagate
if args.list:
result = list_pending_traces(args.file_uuid)
elif args.trace_id and args.name:
result = create_manual_seed(
args.file_uuid,
args.trace_id,
args.name,
args.identity_uuid,
propagate,
)
else:
print("Error: Need either --list or --trace-id/--name")
sys.exit(1)
output_json = json.dumps(result, indent=2, ensure_ascii=False)
if args.output:
with open(args.output, "w") as f:
f.write(output_json)
print(f"[MANUAL] Output saved to {args.output}")
else:
print(output_json)
if __name__ == "__main__":
main()