385 lines
15 KiB
Python
385 lines
15 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Store Traced Faces - Pipeline integration for face trace + position data
|
|
|
|
Flow:
|
|
1. Reads face.json output from face_processor.py
|
|
2. Runs face_tracker.py to assign trace_id per face (IoU + embedding)
|
|
3. Inserts traced faces into face_detections table with trace_id and position (x,y,w,h)
|
|
|
|
Usage:
|
|
python store_traced_faces.py --file-uuid <uuid> [--face-json <path>]
|
|
|
|
TKG Export:
|
|
trace_id + position (x,y,w,h) per frame enables spatial-temporal graph construction.
|
|
Each trace is a temporal entity; position tracks movement across frames.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import json
|
|
import argparse
|
|
from collections import defaultdict
|
|
import numpy as np
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
from datetime import datetime
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "utils"))
|
|
|
|
# Config
|
|
DB_URL = os.environ.get("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry")
|
|
SCHEMA = os.environ.get("MOMENTRY_DB_SCHEMA", "dev")
|
|
OUTPUT_DIR = os.environ.get("MOMENTRY_OUTPUT_DIR", "/Users/accusys/momentry/output_dev")
|
|
|
|
|
|
def get_conn():
|
|
return psycopg2.connect(DB_URL)
|
|
|
|
|
|
def merge_traces_within_cuts(face_data: dict, cut_scenes: list) -> dict:
|
|
"""Merge traces within the same cut if they have similar embeddings (same person re-appeared)."""
|
|
frames = face_data.get("frames", {})
|
|
if not frames:
|
|
return face_data
|
|
|
|
# Map each frame to its scene/cut number
|
|
frame_to_scene = {}
|
|
for s in cut_scenes:
|
|
for f in range(s["start_frame"], s["end_frame"] + 1):
|
|
frame_to_scene[f] = s["scene_number"]
|
|
|
|
# Collect per-trace data: scene numbers, embeddings, face positions
|
|
trace_frames = defaultdict(list)
|
|
trace_embeddings = defaultdict(list)
|
|
trace_poses = {}
|
|
|
|
for fnum_str, frm_data in frames.items():
|
|
fnum = int(fnum_str)
|
|
for face in frm_data.get("faces", []):
|
|
tid = face.get("trace_id")
|
|
if tid is None:
|
|
continue
|
|
trace_frames[tid].append(fnum)
|
|
emb = face.get("embedding")
|
|
if emb is not None:
|
|
trace_embeddings[tid].append(emb)
|
|
if tid not in trace_poses:
|
|
trace_poses[tid] = (face.get("x", 0), face.get("y", 0),
|
|
face.get("width", 0), face.get("height", 0))
|
|
|
|
if len(trace_embeddings) < 2:
|
|
return face_data
|
|
|
|
# Compute centroid per trace
|
|
trace_centroids = {}
|
|
for tid, embs in trace_embeddings.items():
|
|
centroid = np.mean(embs, axis=0)
|
|
norm = np.linalg.norm(centroid)
|
|
trace_centroids[tid] = centroid / norm if norm > 0 else centroid
|
|
|
|
# Determine which scene each trace belongs to (majority of frames)
|
|
trace_scene = {}
|
|
for tid, fns in trace_frames.items():
|
|
scene_votes = defaultdict(int)
|
|
for fn in fns:
|
|
scene = frame_to_scene.get(fn, -1)
|
|
scene_votes[scene] += 1
|
|
trace_scene[tid] = max(scene_votes, key=scene_votes.get) if scene_votes else -1
|
|
|
|
# Within each scene, merge traces with similar centroids
|
|
scene_traces = defaultdict(list)
|
|
for tid, scene in trace_scene.items():
|
|
if scene >= 0 and tid in trace_centroids:
|
|
scene_traces[scene].append(tid)
|
|
|
|
merged = 0
|
|
next_new_id = max(trace_frames.keys()) + 1 if trace_frames else 0
|
|
SIMILARITY_THRESHOLD = 0.75
|
|
|
|
for scene, tids in scene_traces.items():
|
|
if len(tids) < 2:
|
|
continue
|
|
used = set()
|
|
for i in range(len(tids)):
|
|
if tids[i] in used:
|
|
continue
|
|
keep_tid = tids[i]
|
|
for j in range(i + 1, len(tids)):
|
|
if tids[j] in used:
|
|
continue
|
|
sim = float(np.dot(trace_centroids[tids[i]], trace_centroids[tids[j]]))
|
|
if sim >= SIMILARITY_THRESHOLD:
|
|
# Merge tids[j] into keep_tid
|
|
for fnum_str, frm_data in frames.items():
|
|
for face in frm_data.get("faces", []):
|
|
if face.get("trace_id") == tids[j]:
|
|
face["trace_id"] = keep_tid
|
|
used.add(tids[j])
|
|
merged += 1
|
|
|
|
# If any merges happened, rebuild trace metadata
|
|
if merged > 0:
|
|
# Rebuild traces dict
|
|
new_traces = {}
|
|
new_trace_frames = defaultdict(list)
|
|
for fnum_str, frm_data in frames.items():
|
|
fnum = int(fnum_str)
|
|
for face in frm_data.get("faces", []):
|
|
tid = face.get("trace_id")
|
|
if tid is not None:
|
|
new_trace_frames[tid].append({
|
|
"frame": fnum,
|
|
"face_index": 0,
|
|
"bbox": {"x": face.get("x", 0), "y": face.get("y", 0),
|
|
"width": face.get("width", 0), "height": face.get("height", 0)},
|
|
"confidence": face.get("confidence", 0.0),
|
|
})
|
|
|
|
for tid, path in new_trace_frames.items():
|
|
if len(path) >= 1:
|
|
frames_sorted = sorted(set(p["frame"] for p in path))
|
|
new_traces[str(tid)] = {
|
|
"trace_id": tid,
|
|
"start_frame": frames_sorted[0],
|
|
"end_frame": frames_sorted[-1],
|
|
"duration_frames": frames_sorted[-1] - frames_sorted[0] + 1,
|
|
"duration_seconds": (frames_sorted[-1] - frames_sorted[0]) / face_data.get("metadata", {}).get("fps", 25.0),
|
|
"total_appearances": len(path),
|
|
"path": path,
|
|
}
|
|
|
|
face_data["traces"] = new_traces
|
|
face_data["metadata"]["trace_stats"] = {
|
|
"total_traces": len(new_traces),
|
|
"active_traces": len(new_traces),
|
|
"long_traces": len([t for t in new_traces.values() if t["duration_frames"] >= 2]),
|
|
}
|
|
print(f"[TRACE] Post-merge: {merged} traces merged, {len(new_traces)} total traces")
|
|
|
|
return face_data
|
|
|
|
|
|
def run_face_tracker(face_json_path: str, traced_json_path: str, filter_eyes: bool = False) -> str:
|
|
"""Run face_tracker.py on face.json, returns path to face_traced.json"""
|
|
from face_tracker import track_faces
|
|
|
|
with open(face_json_path) as f:
|
|
face_data = json.load(f)
|
|
|
|
# V2.0 uses list format (FaceResult), convert to dict for face_tracker
|
|
if isinstance(face_data.get("frames"), list):
|
|
frames_dict = {}
|
|
for frame in face_data["frames"]:
|
|
fnum = str(frame["frame"])
|
|
faces = []
|
|
for f in frame.get("faces", []):
|
|
bbox = f.get("bbox", f)
|
|
face = {
|
|
"x": bbox.get("x", f.get("x", 0)),
|
|
"y": bbox.get("y", f.get("y", 0)),
|
|
"width": bbox.get("width", f.get("width", 0)),
|
|
"height": bbox.get("height", f.get("height", 0)),
|
|
"confidence": f.get("confidence", 0.0),
|
|
}
|
|
if "landmarks" in f:
|
|
face["landmarks"] = f["landmarks"]
|
|
if "embedding" in f:
|
|
face["embedding"] = f["embedding"]
|
|
faces.append(face)
|
|
frames_dict[fnum] = {
|
|
"frame_number": frame["frame"],
|
|
"time_seconds": frame.get("timestamp", 0),
|
|
"faces": faces,
|
|
}
|
|
face_data["frames"] = frames_dict
|
|
# Preserve metadata (fps needed by face_tracker)
|
|
if "metadata" not in face_data:
|
|
face_data["metadata"] = {
|
|
"fps": face_data.get("fps", 30.0),
|
|
"total_frames": face_data.get("frame_count", 0),
|
|
}
|
|
|
|
# Eye filter: remove faces without at least one eye landmark
|
|
if filter_eyes:
|
|
removed = 0
|
|
for fnum_str, frm_data in face_data.get("frames", {}).items():
|
|
faces = frm_data.get("faces", [])
|
|
kept = []
|
|
for face in faces:
|
|
lm = face.get("landmarks", {})
|
|
if len(lm.get("left_eye", [])) > 0 or len(lm.get("right_eye", [])) > 0:
|
|
kept.append(face)
|
|
else:
|
|
removed += 1
|
|
frm_data["faces"] = kept
|
|
print(f"[TRACE] Eye filter: {removed} faces without eyes removed")
|
|
|
|
print(f"[TRACE] Processing {len(face_data.get('frames', {}))} frames")
|
|
|
|
# Load embeddings from DB for the face tracker
|
|
file_uuid = face_json_path.split("/")[-1].replace(".face.json", "").replace("_traced.json", "")
|
|
try:
|
|
conn = get_conn()
|
|
cur = conn.cursor()
|
|
cur.execute(f"""
|
|
SELECT frame_number, x, y, width, height, embedding
|
|
FROM {SCHEMA}.face_detections
|
|
WHERE file_uuid = %s AND embedding IS NOT NULL
|
|
""", (file_uuid,))
|
|
emb_rows = cur.fetchall()
|
|
conn.close()
|
|
# Build lookup: frame_number → list of (bbox, embedding)
|
|
emb_map = {}
|
|
for fn, x, y, w, h, emb in emb_rows:
|
|
emb_map.setdefault(fn, []).append(((x, y, w, h), emb))
|
|
print(f"[TRACE] Loaded {len(emb_rows)} embeddings from DB")
|
|
|
|
# Attach embeddings to face data
|
|
attached = 0
|
|
for fnum_str, frm_data in face_data.get("frames", {}).items():
|
|
fnum = int(fnum_str)
|
|
for face in frm_data.get("faces", []):
|
|
x, y, w, h = face.get("x", 0), face.get("y", 0), face.get("width", 0), face.get("height", 0)
|
|
candidates = emb_map.get(fnum, [])
|
|
# Find matching embedding by bbox proximity
|
|
for (ex, ey, ew, eh), emb in candidates:
|
|
if abs(x - ex) < 10 and abs(y - ey) < 10 and abs(w - ew) < 10 and abs(h - eh) < 10:
|
|
face["embedding"] = emb
|
|
attached += 1
|
|
break
|
|
print(f"[TRACE] Attached {attached} embeddings to faces")
|
|
except Exception as e:
|
|
print(f"[TRACE] WARNING: Could not load embeddings: {e}")
|
|
|
|
# Load cut boundaries from cut.json (same directory as face.json)
|
|
cut_boundaries = None
|
|
cut_scenes = None
|
|
cuts_path = face_json_path.replace("_traced.json", ".cut.json").replace(".face.json", ".cut.json")
|
|
if os.path.exists(cuts_path):
|
|
with open(cuts_path) as f:
|
|
cuts = json.load(f)
|
|
cut_scenes = cuts.get("scenes", [])
|
|
cut_boundaries = {s["start_frame"] for s in cut_scenes if s["start_frame"] > 0}
|
|
print(f"[TRACE] Loaded {len(cut_boundaries)} cut boundaries")
|
|
|
|
face_data = track_faces(face_data, use_embedding=True, cut_boundaries=cut_boundaries)
|
|
|
|
# Merge traces within same cut (same person re-appearing after occlusion/pose change)
|
|
if cut_scenes and len(cut_scenes) > 0:
|
|
face_data = merge_traces_within_cuts(face_data, cut_scenes)
|
|
|
|
metadata = face_data.get("metadata", {})
|
|
metadata["tracking_method"] = "iou_embedding"
|
|
metadata["tracked_at"] = datetime.now().isoformat()
|
|
face_data["metadata"] = metadata
|
|
|
|
with open(traced_json_path, "w") as f:
|
|
json.dump(face_data, f, indent=2, ensure_ascii=False)
|
|
|
|
trace_count = len(face_data.get("traces", {}))
|
|
print(f"[TRACE] Completed: {trace_count} traces -> {traced_json_path}")
|
|
return traced_json_path
|
|
|
|
|
|
def store_traced_faces(file_uuid: str, traced_json_path: str, schema: str = SCHEMA):
|
|
"""Insert traced face detections into face_detections table with trace_id"""
|
|
conn = get_conn()
|
|
cur = conn.cursor()
|
|
|
|
with open(traced_json_path) as f:
|
|
data = json.load(f)
|
|
|
|
frames = data.get("frames", {})
|
|
total_stored = 0
|
|
|
|
for frame_num_str, frame_data in sorted(frames.items(), key=lambda x: int(x[0])):
|
|
frame_num = int(frame_num_str)
|
|
faces = frame_data.get("faces", [])
|
|
|
|
for face in faces:
|
|
trace_id = face.get("trace_id")
|
|
if trace_id is None:
|
|
continue
|
|
|
|
x = face.get("x", 0)
|
|
y = face.get("y", 0)
|
|
w = face.get("width", 0)
|
|
h = face.get("height", 0)
|
|
confidence = face.get("confidence", 0.0)
|
|
face_id = face.get("face_id")
|
|
attributes = face.get("attributes")
|
|
embedding = face.get("embedding")
|
|
|
|
bbox = json.dumps({"x": x, "y": y, "width": w, "height": h})
|
|
embed_vec = embedding if embedding and len(embedding) > 0 else None
|
|
|
|
try:
|
|
cur.execute(
|
|
f"""
|
|
UPDATE {schema}.face_detections
|
|
SET trace_id = %s
|
|
WHERE file_uuid = %s AND frame_number = %s
|
|
AND x = %s AND y = %s AND width = %s AND height = %s
|
|
""",
|
|
(
|
|
trace_id,
|
|
file_uuid, frame_num, x, y, w, h,
|
|
),
|
|
)
|
|
if cur.rowcount > 0:
|
|
total_stored += 1
|
|
except Exception as e:
|
|
print(f"[TRACE] Error storing face at frame {frame_num}: {e}")
|
|
conn.rollback()
|
|
continue
|
|
|
|
conn.commit()
|
|
|
|
# Log trace summary
|
|
cur.execute(
|
|
f"SELECT COUNT(DISTINCT trace_id) FROM {schema}.face_detections WHERE file_uuid = %s AND trace_id IS NOT NULL",
|
|
(file_uuid,),
|
|
)
|
|
db_trace_count = cur.fetchone()[0]
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
print(f"[TRACE] Stored {total_stored} face detections, {db_trace_count} unique traces in DB")
|
|
return total_stored, db_trace_count
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Store traced faces in DB")
|
|
parser.add_argument("--file-uuid", required=True, help="Video file UUID")
|
|
|
|
parser.add_argument("--face-json", help="Path to face.json (default: auto-detect)")
|
|
|
|
parser.add_argument("--schema", default=SCHEMA, help="DB schema name")
|
|
|
|
parser.add_argument("--uuid", help="UUID for Redis tracking (accepted by executor)")
|
|
parser.add_argument("--filter-eyes", action="store_true", help="Remove faces without eye landmarks before tracking")
|
|
args = parser.parse_args()
|
|
|
|
face_json = args.face_json or os.path.join(
|
|
OUTPUT_DIR, f"{args.file_uuid}.face.json"
|
|
)
|
|
traced_json = os.path.join(OUTPUT_DIR, f"{args.file_uuid}.face_traced.json")
|
|
|
|
if not os.path.exists(face_json):
|
|
print(f"[TRACE] face.json not found: {face_json}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Step 1: Run face tracker
|
|
run_face_tracker(face_json, traced_json, filter_eyes=args.filter_eyes)
|
|
|
|
# Step 2: Store in DB with trace_id
|
|
total, traces = store_traced_faces(args.file_uuid, traced_json, args.schema)
|
|
print(f"[TRACE] Done: {total} detections, {traces} traces")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|