Files
momentry_core/scripts/store_traced_faces.py
Accusys 9fbb4f9b48 feat: add Qdrant _faces collection embedding push
- Add qdrant_faces.py utility module for _faces collection operations
- Modify face_processor.py to push embeddings to Qdrant (CoreML extraction re-enabled)
- Modify store_traced_faces.py to update trace_id in Qdrant after face tracking
- Collection schema: 512D vectors, Cosine distance, fixed name '_faces'
- Payload: file_uuid, frame, trace_id, bbox, confidence, identity_id/uuid, stranger_id
- Batch size: 100 (default), configurable via QDRANT_BATCH_SIZE env var
- Error handling: face_processor.py exits with error if Qdrant push fails
2026-06-25 00:23:20 +08:00

280 lines
9.7 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Store Traced Faces - Pipeline integration for face trace + position data
Flow:
1. Reads face.json output from face_processor.py
2. Runs face_tracker.py to assign trace_id per face (IoU + embedding)
3. Inserts traced faces into face_detections table with trace_id and position (x,y,w,h)
Usage:
python store_traced_faces.py --file-uuid <uuid> [--face-json <path>]
TKG Export:
trace_id + position (x,y,w,h) per frame enables spatial-temporal graph construction.
Each trace is a temporal entity; position tracks movement across frames.
"""
import sys
import os
import json
import argparse
from collections import defaultdict
import numpy as np
import psycopg2
import psycopg2.extras
from datetime import datetime
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "utils"))
from qdrant_faces import update_trace_ids
# Config
DB_URL = os.environ.get("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry")
SCHEMA = os.environ.get("MOMENTRY_DB_SCHEMA", "dev")
OUTPUT_DIR = os.environ.get("MOMENTRY_OUTPUT_DIR", "/Users/accusys/momentry/output_dev")
def get_conn():
return psycopg2.connect(DB_URL)
def merge_traces_within_cuts(face_data: dict, cut_scenes: list) -> dict:
"""Merge traces within the same cut - DISABLED (no embeddings)."""
# TODO: Reimplement with Qdrant _faces collection
return face_data
def run_face_tracker(
face_json_path: str, traced_json_path: str, filter_eyes: bool = False
) -> str:
"""Run face_tracker.py on face.json, returns path to face_traced.json"""
from face_tracker import track_faces
with open(face_json_path) as f:
face_data = json.load(f)
# V2.0 uses list format (FaceResult), convert to dict for face_tracker
if isinstance(face_data.get("frames"), list):
frames_dict = {}
for frame in face_data["frames"]:
fnum = str(frame["frame"])
faces = []
for f in frame.get("faces", []):
bbox = f.get("bbox", f)
face = {
"x": bbox.get("x", f.get("x", 0)),
"y": bbox.get("y", f.get("y", 0)),
"width": bbox.get("width", f.get("width", 0)),
"height": bbox.get("height", f.get("height", 0)),
"confidence": f.get("confidence", 0.0),
}
if "landmarks" in f:
face["landmarks"] = f["landmarks"]
if "embedding" in f:
face["embedding"] = f["embedding"]
faces.append(face)
frames_dict[fnum] = {
"frame_number": frame["frame"],
"time_seconds": frame.get("timestamp", 0),
"faces": faces,
}
face_data["frames"] = frames_dict
# Preserve metadata (fps needed by face_tracker)
if "metadata" not in face_data:
face_data["metadata"] = {
"fps": face_data.get("fps", 30.0),
"total_frames": face_data.get("frame_count", 0),
}
# Eye filter: remove faces without at least one eye landmark
if filter_eyes:
removed = 0
for fnum_str, frm_data in face_data.get("frames", {}).items():
faces = frm_data.get("faces", [])
kept = []
for face in faces:
lm = face.get("landmarks", {})
if len(lm.get("left_eye", [])) > 0 or len(lm.get("right_eye", [])) > 0:
kept.append(face)
else:
removed += 1
frm_data["faces"] = kept
print(f"[TRACE] Eye filter: {removed} faces without eyes removed")
print(f"[TRACE] Processing {len(face_data.get('frames', {}))} frames")
# Embeddings no longer loaded from DB - use IoU-only tracking
file_uuid = (
face_json_path.split("/")[-1]
.replace(".face.json", "")
.replace("_traced.json", "")
)
# Load cut boundaries from cut.json (same directory as face.json)
cut_boundaries = None
cut_scenes = None
cuts_path = face_json_path.replace("_traced.json", ".cut.json").replace(
".face.json", ".cut.json"
)
if os.path.exists(cuts_path):
with open(cuts_path) as f:
cuts = json.load(f)
cut_scenes = cuts.get("scenes", [])
cut_boundaries = {s["start_frame"] for s in cut_scenes if s["start_frame"] > 0}
print(f"[TRACE] Loaded {len(cut_boundaries)} cut boundaries")
face_data = track_faces(
face_data, use_embedding=False, cut_boundaries=cut_boundaries
)
# Merge traces within same cut (same person re-appearing after occlusion/pose change)
if cut_scenes and len(cut_scenes) > 0:
face_data = merge_traces_within_cuts(face_data, cut_scenes)
metadata = face_data.get("metadata", {})
metadata["tracking_method"] = "iou_only"
metadata["tracked_at"] = datetime.now().isoformat()
face_data["metadata"] = metadata
with open(traced_json_path, "w") as f:
json.dump(face_data, f, indent=2, ensure_ascii=False)
trace_count = len(face_data.get("traces", {}))
print(f"[TRACE] Completed: {trace_count} traces -> {traced_json_path}")
return traced_json_path
def store_traced_faces(file_uuid: str, traced_json_path: str, schema: str = SCHEMA):
"""Insert traced face detections into face_detections table with trace_id"""
conn = get_conn()
cur = conn.cursor()
with open(traced_json_path) as f:
data = json.load(f)
frames = data.get("frames", {})
total_stored = 0
for frame_num_str, frame_data in sorted(frames.items(), key=lambda x: int(x[0])):
frame_num = int(frame_num_str)
faces = frame_data.get("faces", [])
for face in faces:
trace_id = face.get("trace_id")
if trace_id is None:
continue
x = face.get("x", 0)
y = face.get("y", 0)
w = face.get("width", 0)
h = face.get("height", 0)
confidence = face.get("confidence", 0.0)
face_id = face.get("face_id")
if face_id is None:
face_id = f"face_{trace_id}"
attributes = face.get("attributes")
bbox = json.dumps({"x": x, "y": y, "width": w, "height": h})
try:
cur.execute(
f"""
UPDATE {schema}.face_detections
SET trace_id = %s, face_id = %s
WHERE file_uuid = %s AND frame_number = %s
AND x = %s AND y = %s AND width = %s AND height = %s
""",
(
trace_id,
face_id,
file_uuid,
frame_num,
x,
y,
w,
h,
),
)
if cur.rowcount > 0:
total_stored += 1
except Exception as e:
print(f"[TRACE] Error storing face at frame {frame_num}: {e}")
conn.rollback()
continue
conn.commit()
# Build trace_mapping for Qdrant update
trace_mapping = {} # {frame: {bbox_key: trace_id}}
for frame_num_str, frame_data in sorted(frames.items(), key=lambda x: int(x[0])):
frame_num = int(frame_num_str)
trace_mapping[frame_num] = {}
for face in frame_data.get("faces", []):
trace_id = face.get("trace_id")
if trace_id is None:
continue
bbox_key = f"{face['x']}_{face['y']}_{face['width']}_{face['height']}"
trace_mapping[frame_num][bbox_key] = trace_id
# Update Qdrant _faces collection with trace_id
try:
qdrant_updated = update_trace_ids(file_uuid, trace_mapping)
except Exception as e:
print(f"[TRACE] Warning: Qdrant trace_id update failed: {e}")
qdrant_updated = 0
# Log trace summary
cur.execute(
f"SELECT COUNT(DISTINCT trace_id) FROM {schema}.face_detections WHERE file_uuid = %s AND trace_id IS NOT NULL",
(file_uuid,),
)
db_trace_count = cur.fetchone()[0]
cur.close()
conn.close()
print(
f"[TRACE] Stored {total_stored} face detections, {db_trace_count} unique traces in DB"
)
if qdrant_updated > 0:
print(f"[TRACE] Updated {qdrant_updated} Qdrant points with trace_id")
return total_stored, db_trace_count
def main():
parser = argparse.ArgumentParser(description="Store traced faces in DB")
parser.add_argument("--file-uuid", required=True, help="Video file UUID")
parser.add_argument("--face-json", help="Path to face.json (default: auto-detect)")
parser.add_argument("--schema", default=SCHEMA, help="DB schema name")
parser.add_argument("--uuid", help="UUID for Redis tracking (accepted by executor)")
parser.add_argument(
"--filter-eyes",
action="store_true",
help="Remove faces without eye landmarks before tracking",
)
args = parser.parse_args()
face_json = args.face_json or os.path.join(
OUTPUT_DIR, f"{args.file_uuid}.face.json"
)
traced_json = os.path.join(OUTPUT_DIR, f"{args.file_uuid}.face_traced.json")
if not os.path.exists(face_json):
print(f"[TRACE] face.json not found: {face_json}", file=sys.stderr)
sys.exit(1)
# Step 1: Run face tracker
run_face_tracker(face_json, traced_json, filter_eyes=args.filter_eyes)
# Step 2: Store in DB with trace_id
total, traces = store_traced_faces(args.file_uuid, traced_json, args.schema)
print(f"[TRACE] Done: {total} detections, {traces} traces")
if __name__ == "__main__":
main()