Files
momentry_core/scripts/trace_face_aggregator.py
Warren e75c4d6f07 cleanup: remove dead code and duplicate docs
- Remove session-ses_2f27.md (161KB raw session log)
- Remove 49 ROOT_* duplicate files across REFERENCE/
- Remove 14 duplicate files between REFERENCE/ root and history/
- Remove asr_legacy.rs (dead code, replaced by asr.rs)
- Remove src/core/worker/ (duplicate JobWorker)
- Remove src/core/layers/ (empty directory)
- Remove 4 .bak files in src/
- Remove 7 dead private methods in worker/processor.rs
- Remove backup directory from git tracking
2026-05-04 01:31:21 +08:00

183 lines
5.7 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Trace Face Aggregator - 獨立 Python 腳本
由 Rust Worker (PythonExecutor) 排程執行
功能: 讀取 face_detections 表 → 按 face_id 分組聚合 → 寫入 pre_chunks (source_type: trace_face)
用法 (CLI):
python3 scripts/trace_face_aggregator.py --file-uuid <uuid> [--fps <fps>]
用法 (由 Rust 排程):
executor.run("trace_face_aggregator.py", &["--file-uuid", uuid], Some(uuid), "TRACE_FACE", timeout)
"""
import sys
import json
import os
import argparse
import time
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from redis_publisher import RedisPublisher
def get_pg_conn():
import psycopg2
db_url = os.getenv(
"DATABASE_URL",
"postgres://accusys@localhost:5432/momentry?options=-c%20search_path=dev",
)
return psycopg2.connect(db_url)
def aggregate_traces(file_uuid: str, fps: float = 30.0) -> int:
pub = RedisPublisher(file_uuid)
pub.info("TRACE_FACE", f"Starting trace aggregation for {file_uuid}")
conn = get_pg_conn()
cur = conn.cursor()
try:
# 1. Count distinct face_ids
cur.execute(
"SELECT COUNT(DISTINCT face_id) FROM face_detections "
"WHERE file_uuid = %s AND face_id IS NOT NULL",
(file_uuid,),
)
distinct_count = cur.fetchone()[0]
if distinct_count == 0:
pub.info("TRACE_FACE", "No face detections found, skipping")
return 0
# 2. Get file_id from videos table
cur.execute(
"SELECT id, COALESCE(fps, %s) FROM videos WHERE file_uuid = %s",
(fps, file_uuid),
)
row = cur.fetchone()
if not row:
pub.error(f"File {file_uuid} not found in videos table")
return 0
file_id, actual_fps = row[0], float(row[1])
if actual_fps <= 0:
actual_fps = fps
# 3. Aggregate traces: group by face_id, compute start/end frame + max confidence
cur.execute(
"""
SELECT
face_id,
MIN(frame_number) AS start_frame,
MAX(frame_number) AS end_frame,
MAX(confidence) AS max_conf
FROM face_detections
WHERE file_uuid = %s AND face_id IS NOT NULL
GROUP BY face_id
""",
(file_uuid,),
)
traces = cur.fetchall()
created = 0
for face_id, start_frame, end_frame, max_conf in traces:
# 4. Find representative frame (highest confidence)
cur.execute(
"""
SELECT frame_number, bbox
FROM face_detections
WHERE file_uuid = %s AND face_id = %s AND confidence = %s
LIMIT 1
""",
(file_uuid, face_id, max_conf),
)
rep_row = cur.fetchone()
rep_frame = rep_row[0] if rep_row else start_frame
rep_bbox = rep_row[1] if rep_row and rep_row[1] else {}
raw_json = json.dumps({
"face_id": face_id,
"start_frame": int(start_frame),
"end_frame": int(end_frame),
"representative_frame": {
"frame_number": int(rep_frame),
"bbox": rep_bbox,
},
})
start_time = float(start_frame) / actual_fps
end_time = float(end_frame) / actual_fps
# 5. Insert into pre_chunks (upsert)
cur.execute(
"""
INSERT INTO pre_chunks
(file_id, source_type, source_file, chunk_type,
start_time, end_time, start_frame, end_frame,
fps, raw_json, text_content, processed, chunk_id)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s, %s, %s)
ON CONFLICT (file_id, source_type, start_frame, end_frame)
DO UPDATE SET
end_frame = EXCLUDED.end_frame,
end_time = EXCLUDED.end_time,
raw_json = EXCLUDED.raw_json,
fps = EXCLUDED.fps
""",
(
file_id,
"trace_face",
None,
"trace",
start_time,
end_time,
int(start_frame),
int(end_frame),
actual_fps,
raw_json,
None,
False,
None,
),
)
created += 1
conn.commit()
pub.info(
"TRACE_FACE",
f"Created {created} trace_face pre-chunks for {file_uuid}",
)
return created
except Exception as e:
conn.rollback()
pub.error(f"Trace aggregation failed: {e}")
raise
finally:
cur.close()
conn.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Aggregate face detections into traces")
parser.add_argument("--file-uuid", required=True, help="File UUID to process")
parser.add_argument("--fps", type=float, default=30.0, help="Frames per second (optional)")
parser.add_argument("--uuid", help="Generic UUID (passed by Rust executor, ignored)")
args = parser.parse_args()
uuid = args.file_uuid
if args.uuid and not args.file_uuid:
uuid = args.uuid
start = time.time()
count = aggregate_traces(uuid, args.fps)
elapsed = time.time() - start
print(
json.dumps({
"success": True,
"file_uuid": uuid,
"traces_generated": count,
"duration_sec": round(elapsed, 2),
})
)