Schema changes: dev.chunks->dev.chunk, remove old_chunk_id/chunk_index Correction: asr-1.json format, generate/apply scripts API: 37/37 endpoints fixed and tested Docs: HANDOVER_V2.0.md for M4
325 lines
11 KiB
Python
325 lines
11 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Dense Scan Traces - Re-scan frame-by-frame for traces with < 4 detections.
|
|
|
|
Flow:
|
|
1. Query face_detections for traces with < 4 rows for a file_uuid
|
|
2. For each short trace:
|
|
a. Extract video segment (ffmpeg)
|
|
b. Run face_processor.py with --sample-interval 1
|
|
c. Match new detections to trace by embedding similarity
|
|
d. Insert new rows into face_detections
|
|
|
|
Usage:
|
|
python dense_scan_traces.py --file-uuid <uuid> [--video-path <path>]
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import json
|
|
import argparse
|
|
import subprocess
|
|
import time
|
|
import tempfile
|
|
import numpy as np
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
from typing import List, Dict, Optional
|
|
|
|
DB_URL = os.environ.get("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry")
|
|
SCHEMA = os.environ.get("MOMENTRY_DB_SCHEMA", "dev")
|
|
OUTPUT_DIR = os.environ.get("MOMENTRY_OUTPUT_DIR", "/Users/accusys/momentry/output_dev")
|
|
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
FACE_PROCESSOR = os.path.join(SCRIPT_DIR, "face_processor.py")
|
|
PYTHON_BIN = "/opt/homebrew/bin/python3.11"
|
|
MIN_DETECTIONS = 4
|
|
|
|
|
|
def get_conn():
|
|
return psycopg2.connect(DB_URL)
|
|
|
|
|
|
def get_video_path(file_uuid: str) -> Optional[str]:
|
|
"""Get video file path from videos table"""
|
|
conn = get_conn()
|
|
cur = conn.cursor()
|
|
try:
|
|
cur.execute(
|
|
f"SELECT file_path FROM {SCHEMA}.videos WHERE file_uuid = %s",
|
|
(file_uuid,),
|
|
)
|
|
row = cur.fetchone()
|
|
return row[0] if row else None
|
|
finally:
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
def get_short_traces(file_uuid: str, min_det: int = MIN_DETECTIONS) -> List[Dict]:
|
|
"""Find traces with < min_det rows"""
|
|
conn = get_conn()
|
|
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
|
try:
|
|
cur.execute(
|
|
f"""
|
|
SELECT trace_id, COUNT(*) as cnt,
|
|
MIN(frame_number) as start_frame,
|
|
MAX(frame_number) as end_frame
|
|
FROM {SCHEMA}.face_detections
|
|
WHERE file_uuid = %s AND trace_id IS NOT NULL
|
|
GROUP BY trace_id
|
|
HAVING COUNT(*) < %s
|
|
ORDER BY trace_id
|
|
""",
|
|
(file_uuid, min_det),
|
|
)
|
|
return [dict(r) for r in cur.fetchall()]
|
|
finally:
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
def get_trace_embeddings(file_uuid: str, trace_id: int) -> List[Dict]:
|
|
"""Get existing embedding vectors for a trace"""
|
|
conn = get_conn()
|
|
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
|
try:
|
|
cur.execute(
|
|
f"""
|
|
SELECT frame_number, x, y, width, height, embedding
|
|
FROM {SCHEMA}.face_detections
|
|
WHERE file_uuid = %s AND trace_id = %s AND embedding IS NOT NULL
|
|
ORDER BY frame_number
|
|
""",
|
|
(file_uuid, trace_id),
|
|
)
|
|
return [dict(r) for r in cur.fetchall()]
|
|
finally:
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
def cosine_similarity(a: List[float], b: List[float]) -> float:
|
|
if not a or not b:
|
|
return 0.0
|
|
v1, v2 = np.array(a), np.array(b)
|
|
n1, n2 = np.linalg.norm(v1), np.linalg.norm(v2)
|
|
if n1 == 0 or n2 == 0:
|
|
return 0.0
|
|
return float(np.dot(v1, v2) / (n1 * n2))
|
|
|
|
|
|
def extract_video_segment(video_path: str, start_frame: int, end_frame: int, output_path: str, fps: float = 59.94):
|
|
"""Extract a frame range from video using ffmpeg (fast seek via -ss)"""
|
|
start_time = max(0.0, start_frame / fps - 1.0)
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-ss", f"{start_time:.2f}",
|
|
"-i", video_path,
|
|
"-vf", f"select=between(n\\,{start_frame}\\,{end_frame}),setpts=PTS-STARTPTS",
|
|
"-vsync", "0",
|
|
"-an", output_path,
|
|
]
|
|
subprocess.run(cmd, check=True, timeout=120, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
|
|
|
|
def match_new_detections(new_face_json: str, ref_embeddings: List[Dict],
|
|
similarity_threshold: float = 0.7) -> List[Dict]:
|
|
"""Match dense-scan detections to trace by embedding similarity"""
|
|
with open(new_face_json) as f:
|
|
data = json.load(f)
|
|
|
|
if not ref_embeddings:
|
|
return []
|
|
|
|
matches = []
|
|
frames = data.get("frames", []) if isinstance(data.get("frames"), list) else []
|
|
for frame_data in frames:
|
|
frame_num = frame_data.get("frame", 0)
|
|
for face in frame_data.get("faces", []):
|
|
emb = face.get("embedding")
|
|
if not emb:
|
|
continue
|
|
|
|
# Find best matching reference embedding
|
|
best_sim = 0.0
|
|
best_ref = None
|
|
for ref in ref_embeddings:
|
|
sim = cosine_similarity(emb, ref["embedding"])
|
|
if sim > best_sim:
|
|
best_sim = sim
|
|
best_ref = ref
|
|
|
|
if best_sim >= similarity_threshold:
|
|
matches.append({
|
|
"frame_number": frame_num,
|
|
"x": face["x"],
|
|
"y": face["y"],
|
|
"width": face["width"],
|
|
"height": face["height"],
|
|
"confidence": face.get("confidence", 0.5),
|
|
"embedding": emb,
|
|
"similarity": best_sim,
|
|
})
|
|
|
|
return matches
|
|
|
|
|
|
def insert_detections(file_uuid: str, trace_id: int, detections: List[Dict]):
|
|
"""Insert new detections into face_detections, skipping existing frames"""
|
|
if not detections:
|
|
return 0
|
|
|
|
conn = get_conn()
|
|
cur = conn.cursor()
|
|
try:
|
|
inserted = 0
|
|
for d in detections:
|
|
# Check if frame already exists for this trace
|
|
cur.execute(
|
|
f"SELECT 1 FROM {SCHEMA}.face_detections "
|
|
f"WHERE file_uuid=%s AND frame_number=%s AND trace_id=%s",
|
|
(file_uuid, d["frame_number"], trace_id),
|
|
)
|
|
if cur.fetchone():
|
|
continue
|
|
|
|
emb = d.get("embedding") if d.get("embedding") else None
|
|
cur.execute(
|
|
f"""
|
|
INSERT INTO {SCHEMA}.face_detections
|
|
(file_uuid, frame_number, face_id, trace_id,
|
|
x, y, width, height, confidence, embedding)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
""",
|
|
(
|
|
file_uuid, d["frame_number"], None, trace_id,
|
|
d["x"], d["y"], d["width"], d["height"],
|
|
d.get("confidence", 0.5), emb,
|
|
),
|
|
)
|
|
inserted += 1
|
|
conn.commit()
|
|
return inserted
|
|
except Exception as e:
|
|
conn.rollback()
|
|
print(f" [DENSE] DB error: {e}")
|
|
return 0
|
|
finally:
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
def dense_scan_trace(file_uuid: str, trace_id: int, video_path: str,
|
|
start_frame: int, end_frame: int):
|
|
"""Re-scan a trace's frame range frame-by-frame"""
|
|
pad = 15
|
|
seg_start = max(0, start_frame - pad)
|
|
seg_end = end_frame + pad
|
|
|
|
# Get reference embeddings FIRST (outside tempdir, before tempdir cleanup)
|
|
refs = get_trace_embeddings(file_uuid, trace_id)
|
|
if not refs:
|
|
return 0
|
|
|
|
new_detections = None
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# Extract segment
|
|
segment_path = os.path.join(tmpdir, f"seg_{trace_id}.mp4")
|
|
try:
|
|
extract_video_segment(video_path, seg_start, seg_end, segment_path)
|
|
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
|
|
err = e.stderr.decode() if hasattr(e, 'stderr') and e.stderr else str(e)
|
|
print(f" [DENSE] ffmpeg failed: {err[:200]}")
|
|
return 0
|
|
|
|
# Run face_processor with sample_interval=1
|
|
face_out = os.path.join(tmpdir, f"face_{trace_id}.json")
|
|
try:
|
|
subprocess.run(
|
|
[PYTHON_BIN, FACE_PROCESSOR, segment_path, face_out,
|
|
"--sample-interval", "1", "--uuid", file_uuid],
|
|
check=True, timeout=120,
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
|
|
)
|
|
except (subprocess.TimeoutExpired, subprocess.CalledProcessError) as e:
|
|
print(f" [DENSE] face_processor failed for trace {trace_id}: {e}")
|
|
return 0
|
|
|
|
if not os.path.exists(face_out):
|
|
return 0
|
|
|
|
# Match new detections while tempdir still exists
|
|
new_detections = match_new_detections(face_out, refs)
|
|
# Tempdir cleaned up here — face_out no longer accessible
|
|
|
|
if not new_detections:
|
|
return 0
|
|
|
|
# Adjust frame numbers
|
|
adjusted = []
|
|
for d in new_detections:
|
|
df = seg_start + d["frame_number"] - 1
|
|
orig_fn = d["frame_number"]
|
|
d["frame_number"] = df
|
|
if not any(r["frame_number"] == df for r in refs):
|
|
adjusted.append(d)
|
|
|
|
if not adjusted:
|
|
return 0
|
|
|
|
count = insert_detections(file_uuid, trace_id, adjusted)
|
|
print(f" [DENSE] Trace {trace_id}: added {count} new detections (range {seg_start}-{seg_end})")
|
|
return count
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Dense re-scan for short face traces")
|
|
parser.add_argument("--file-uuid", required=True, help="Video file UUID")
|
|
parser.add_argument("--video-path", help="Video file path (auto-detect if omitted)")
|
|
parser.add_argument("--min-detections", type=int, default=MIN_DETECTIONS,
|
|
help=f"Minimum detections per trace (default: {MIN_DETECTIONS})")
|
|
parser.add_argument("--dry-run", action="store_true", help="Only list short traces")
|
|
args = parser.parse_args()
|
|
|
|
min_det = getattr(args, 'min_detections', MIN_DETECTIONS)
|
|
|
|
# Get video path
|
|
video_path = args.video_path or get_video_path(args.file_uuid)
|
|
if not video_path or not os.path.exists(video_path):
|
|
print(f"[DENSE] Video not found: {video_path}", file=sys.stderr)
|
|
sys.exit(1)
|
|
print(f"[DENSE] Video: {video_path}")
|
|
|
|
# Find short traces
|
|
short_traces = get_short_traces(args.file_uuid, min_det)
|
|
print(f"[DENSE] Traces with < {min_det} detections: {len(short_traces)}")
|
|
|
|
if args.dry_run:
|
|
for t in short_traces:
|
|
print(f" Trace {t['trace_id']}: {t['cnt']} detections "
|
|
f"(frames {t['start_frame']}-{t['end_frame']})")
|
|
return
|
|
|
|
# Dense scan each short trace
|
|
total_added = 0
|
|
total_traces = 0
|
|
t0 = time.time()
|
|
|
|
for t in short_traces:
|
|
count = dense_scan_trace(
|
|
args.file_uuid, t["trace_id"], video_path,
|
|
t["start_frame"], t["end_frame"],
|
|
)
|
|
if count > 0:
|
|
total_added += count
|
|
total_traces += 1
|
|
|
|
elapsed = time.time() - t0
|
|
print(f"\n[DENSE] Done: {total_traces} traces supplemented, "
|
|
f"{total_added} new detections added, {elapsed:.1f}s")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|