#!/opt/homebrew/bin/python3.11 """ Trace Face Aggregator - 獨立 Python 腳本 由 Rust Worker (PythonExecutor) 排程執行 功能: 讀取 face_detections 表 → 按 face_id 分組聚合 → 寫入 pre_chunks (source_type: trace_face) 用法 (CLI): python3 scripts/trace_face_aggregator.py --file-uuid [--fps ] 用法 (由 Rust 排程): executor.run("trace_face_aggregator.py", &["--file-uuid", uuid], Some(uuid), "TRACE_FACE", timeout) """ import sys import json import os import argparse import time sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from redis_publisher import RedisPublisher def get_pg_conn(): import psycopg2 db_url = os.getenv( "DATABASE_URL", "postgres://accusys@localhost:5432/momentry?options=-c%20search_path=dev", ) return psycopg2.connect(db_url) def aggregate_traces(file_uuid: str, fps: float = 30.0) -> int: pub = RedisPublisher(file_uuid) pub.info("TRACE_FACE", f"Starting trace aggregation for {file_uuid}") conn = get_pg_conn() cur = conn.cursor() try: # 1. Count distinct face_ids cur.execute( "SELECT COUNT(DISTINCT face_id) FROM face_detections " "WHERE file_uuid = %s AND face_id IS NOT NULL", (file_uuid,), ) distinct_count = cur.fetchone()[0] if distinct_count == 0: pub.info("TRACE_FACE", "No face detections found, skipping") return 0 # 2. Get file_id from videos table cur.execute( "SELECT id, COALESCE(fps, %s) FROM videos WHERE file_uuid = %s", (fps, file_uuid), ) row = cur.fetchone() if not row: pub.error(f"File {file_uuid} not found in videos table") return 0 file_id, actual_fps = row[0], float(row[1]) if actual_fps <= 0: actual_fps = fps # 3. Aggregate traces: group by face_id, compute start/end frame + max confidence cur.execute( """ SELECT face_id, MIN(frame_number) AS start_frame, MAX(frame_number) AS end_frame, MAX(confidence) AS max_conf FROM face_detections WHERE file_uuid = %s AND face_id IS NOT NULL GROUP BY face_id """, (file_uuid,), ) traces = cur.fetchall() created = 0 for face_id, start_frame, end_frame, max_conf in traces: # 4. Find representative frame (highest confidence) cur.execute( """ SELECT frame_number, bbox FROM face_detections WHERE file_uuid = %s AND face_id = %s AND confidence = %s LIMIT 1 """, (file_uuid, face_id, max_conf), ) rep_row = cur.fetchone() rep_frame = rep_row[0] if rep_row else start_frame rep_bbox = rep_row[1] if rep_row and rep_row[1] else {} raw_json = json.dumps({ "face_id": face_id, "start_frame": int(start_frame), "end_frame": int(end_frame), "representative_frame": { "frame_number": int(rep_frame), "bbox": rep_bbox, }, }) start_time = float(start_frame) / actual_fps end_time = float(end_frame) / actual_fps # 5. Insert into pre_chunks (upsert) cur.execute( """ INSERT INTO pre_chunks (file_id, source_type, source_file, chunk_type, start_time, end_time, start_frame, end_frame, fps, raw_json, text_content, processed, chunk_id) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s, %s, %s) ON CONFLICT (file_id, source_type, start_frame, end_frame) DO UPDATE SET end_frame = EXCLUDED.end_frame, end_time = EXCLUDED.end_time, raw_json = EXCLUDED.raw_json, fps = EXCLUDED.fps """, ( file_id, "trace_face", None, "trace", start_time, end_time, int(start_frame), int(end_frame), actual_fps, raw_json, None, False, None, ), ) created += 1 conn.commit() pub.info( "TRACE_FACE", f"Created {created} trace_face pre-chunks for {file_uuid}", ) return created except Exception as e: conn.rollback() pub.error(f"Trace aggregation failed: {e}") raise finally: cur.close() conn.close() if __name__ == "__main__": parser = argparse.ArgumentParser(description="Aggregate face detections into traces") parser.add_argument("--file-uuid", required=True, help="File UUID to process") parser.add_argument("--fps", type=float, default=30.0, help="Frames per second (optional)") parser.add_argument("--uuid", help="Generic UUID (passed by Rust executor, ignored)") args = parser.parse_args() uuid = args.file_uuid if args.uuid and not args.file_uuid: uuid = args.uuid start = time.time() count = aggregate_traces(uuid, args.fps) elapsed = time.time() - start print( json.dumps({ "success": True, "file_uuid": uuid, "traces_generated": count, "duration_sec": round(elapsed, 2), }) )