- Remove session-ses_2f27.md (161KB raw session log) - Remove 49 ROOT_* duplicate files across REFERENCE/ - Remove 14 duplicate files between REFERENCE/ root and history/ - Remove asr_legacy.rs (dead code, replaced by asr.rs) - Remove src/core/worker/ (duplicate JobWorker) - Remove src/core/layers/ (empty directory) - Remove 4 .bak files in src/ - Remove 7 dead private methods in worker/processor.rs - Remove backup directory from git tracking
183 lines
5.7 KiB
Python
183 lines
5.7 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Trace Face Aggregator - 獨立 Python 腳本
|
|
由 Rust Worker (PythonExecutor) 排程執行
|
|
|
|
功能: 讀取 face_detections 表 → 按 face_id 分組聚合 → 寫入 pre_chunks (source_type: trace_face)
|
|
|
|
用法 (CLI):
|
|
python3 scripts/trace_face_aggregator.py --file-uuid <uuid> [--fps <fps>]
|
|
|
|
用法 (由 Rust 排程):
|
|
executor.run("trace_face_aggregator.py", &["--file-uuid", uuid], Some(uuid), "TRACE_FACE", timeout)
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import os
|
|
import argparse
|
|
import time
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
from redis_publisher import RedisPublisher
|
|
|
|
|
|
def get_pg_conn():
|
|
import psycopg2
|
|
|
|
db_url = os.getenv(
|
|
"DATABASE_URL",
|
|
"postgres://accusys@localhost:5432/momentry?options=-c%20search_path=dev",
|
|
)
|
|
return psycopg2.connect(db_url)
|
|
|
|
|
|
def aggregate_traces(file_uuid: str, fps: float = 30.0) -> int:
|
|
pub = RedisPublisher(file_uuid)
|
|
pub.info("TRACE_FACE", f"Starting trace aggregation for {file_uuid}")
|
|
|
|
conn = get_pg_conn()
|
|
cur = conn.cursor()
|
|
|
|
try:
|
|
# 1. Count distinct face_ids
|
|
cur.execute(
|
|
"SELECT COUNT(DISTINCT face_id) FROM face_detections "
|
|
"WHERE file_uuid = %s AND face_id IS NOT NULL",
|
|
(file_uuid,),
|
|
)
|
|
distinct_count = cur.fetchone()[0]
|
|
if distinct_count == 0:
|
|
pub.info("TRACE_FACE", "No face detections found, skipping")
|
|
return 0
|
|
|
|
# 2. Get file_id from videos table
|
|
cur.execute(
|
|
"SELECT id, COALESCE(fps, %s) FROM videos WHERE file_uuid = %s",
|
|
(fps, file_uuid),
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
pub.error(f"File {file_uuid} not found in videos table")
|
|
return 0
|
|
file_id, actual_fps = row[0], float(row[1])
|
|
if actual_fps <= 0:
|
|
actual_fps = fps
|
|
|
|
# 3. Aggregate traces: group by face_id, compute start/end frame + max confidence
|
|
cur.execute(
|
|
"""
|
|
SELECT
|
|
face_id,
|
|
MIN(frame_number) AS start_frame,
|
|
MAX(frame_number) AS end_frame,
|
|
MAX(confidence) AS max_conf
|
|
FROM face_detections
|
|
WHERE file_uuid = %s AND face_id IS NOT NULL
|
|
GROUP BY face_id
|
|
""",
|
|
(file_uuid,),
|
|
)
|
|
traces = cur.fetchall()
|
|
|
|
created = 0
|
|
for face_id, start_frame, end_frame, max_conf in traces:
|
|
# 4. Find representative frame (highest confidence)
|
|
cur.execute(
|
|
"""
|
|
SELECT frame_number, bbox
|
|
FROM face_detections
|
|
WHERE file_uuid = %s AND face_id = %s AND confidence = %s
|
|
LIMIT 1
|
|
""",
|
|
(file_uuid, face_id, max_conf),
|
|
)
|
|
rep_row = cur.fetchone()
|
|
rep_frame = rep_row[0] if rep_row else start_frame
|
|
rep_bbox = rep_row[1] if rep_row and rep_row[1] else {}
|
|
|
|
raw_json = json.dumps({
|
|
"face_id": face_id,
|
|
"start_frame": int(start_frame),
|
|
"end_frame": int(end_frame),
|
|
"representative_frame": {
|
|
"frame_number": int(rep_frame),
|
|
"bbox": rep_bbox,
|
|
},
|
|
})
|
|
|
|
start_time = float(start_frame) / actual_fps
|
|
end_time = float(end_frame) / actual_fps
|
|
|
|
# 5. Insert into pre_chunks (upsert)
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO pre_chunks
|
|
(file_id, source_type, source_file, chunk_type,
|
|
start_time, end_time, start_frame, end_frame,
|
|
fps, raw_json, text_content, processed, chunk_id)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s, %s, %s)
|
|
ON CONFLICT (file_id, source_type, start_frame, end_frame)
|
|
DO UPDATE SET
|
|
end_frame = EXCLUDED.end_frame,
|
|
end_time = EXCLUDED.end_time,
|
|
raw_json = EXCLUDED.raw_json,
|
|
fps = EXCLUDED.fps
|
|
""",
|
|
(
|
|
file_id,
|
|
"trace_face",
|
|
None,
|
|
"trace",
|
|
start_time,
|
|
end_time,
|
|
int(start_frame),
|
|
int(end_frame),
|
|
actual_fps,
|
|
raw_json,
|
|
None,
|
|
False,
|
|
None,
|
|
),
|
|
)
|
|
created += 1
|
|
|
|
conn.commit()
|
|
pub.info(
|
|
"TRACE_FACE",
|
|
f"Created {created} trace_face pre-chunks for {file_uuid}",
|
|
)
|
|
return created
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
pub.error(f"Trace aggregation failed: {e}")
|
|
raise
|
|
finally:
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Aggregate face detections into traces")
|
|
parser.add_argument("--file-uuid", required=True, help="File UUID to process")
|
|
parser.add_argument("--fps", type=float, default=30.0, help="Frames per second (optional)")
|
|
parser.add_argument("--uuid", help="Generic UUID (passed by Rust executor, ignored)")
|
|
args = parser.parse_args()
|
|
|
|
uuid = args.file_uuid
|
|
if args.uuid and not args.file_uuid:
|
|
uuid = args.uuid
|
|
|
|
start = time.time()
|
|
count = aggregate_traces(uuid, args.fps)
|
|
elapsed = time.time() - start
|
|
print(
|
|
json.dumps({
|
|
"success": True,
|
|
"file_uuid": uuid,
|
|
"traces_generated": count,
|
|
"duration_sec": round(elapsed, 2),
|
|
})
|
|
)
|