#!/opt/homebrew/bin/python3.11 """Face Trace Heatmap + Timeline Visualization for Momentry. Usage: python3 render_face_heatmap.py [output.html] [--identity ID] """ import sys, psycopg2, argparse from collections import defaultdict parser = argparse.ArgumentParser() parser.add_argument("uuid") parser.add_argument("output", nargs="?", default=None) parser.add_argument("--identity", "-i", type=int, default=None, help="Filter by identity_id") args = parser.parse_args() UUID = args.uuid OUT = args.output or f"/tmp/face_report_{UUID[:8]}.html" IDENTITY = args.identity conn = psycopg2.connect("dbname=momentry user=accusys") cur = conn.cursor() cur.execute("SELECT duration, file_name, COALESCE(fps, 25.0) FROM dev.videos WHERE file_uuid=%s", (UUID,)) row = cur.fetchone() if not row: print("UUID not found") sys.exit(1) duration, video_name, fps = float(row[0] or 6785), row[1] or UUID, float(row[2] or 25.0) # Get sample interval from face.json metadata (or default 3 = 8Hz) sample_interval = 3 hz = fps / sample_interval # Build identity filter identity_filter = "" identity_params = [UUID] identity_label = "" identity_info = None # full identity record when filtered top_identities = [] # top identities summary (all view) if IDENTITY is not None: identity_filter = " AND identity_id = %s" identity_params.append(IDENTITY) cur.execute("SELECT id, name, identity_type, source, status FROM dev.identities WHERE id=%s", (IDENTITY,)) id_row = cur.fetchone() if id_row: identity_info = {"id": id_row[0], "name": id_row[1], "type": id_row[2], "source": id_row[3], "status": id_row[4]} identity_label = f" (identity: {id_row[1]})" else: identity_label = f" (identity #{IDENTITY})" identity_params = [UUID, IDENTITY] # Query trace spans cur.execute(f""" SELECT trace_id, MIN(frame_number), MAX(frame_number), COALESCE(MIN(timestamp_secs), MIN(frame_number) / {fps}) as first_t, COALESCE(MAX(timestamp_secs), MAX(frame_number) / {fps}) as last_t, COUNT(*) FROM dev.face_detections WHERE file_uuid=%s AND trace_id IS NOT NULL{identity_filter} GROUP BY trace_id ORDER BY first_t """, identity_params) trace_spans = cur.fetchall() # Query density per time bucket (5s) cur.execute(f""" SELECT FLOOR(COALESCE(timestamp_secs, frame_number / {fps}) / 5)::int as bkt, COUNT(*) as cnt FROM dev.face_detections WHERE file_uuid=%s AND trace_id IS NOT NULL{identity_filter} GROUP BY bkt ORDER BY bkt """, identity_params) density = {b: c for b, c in cur.fetchall()} # Count total detections cur.execute(f"SELECT COUNT(*) FROM dev.face_detections WHERE file_uuid=%s{identity_filter}", identity_params) total_detections = cur.fetchone()[0] # Get top identities (for all view) and trace↔identity mapping if IDENTITY is None: cur.execute(""" SELECT fd.identity_id, i.name, COUNT(*) as faces, COUNT(DISTINCT fd.trace_id) as traces FROM dev.face_detections fd LEFT JOIN dev.identities i ON i.id = fd.identity_id WHERE fd.file_uuid=%s AND fd.identity_id IS NOT NULL GROUP BY fd.identity_id, i.name ORDER BY faces DESC LIMIT 10 """, (UUID,)) top_identities = cur.fetchall() # Always get trace→identity mapping for tooltip enrichment cur.execute(""" SELECT DISTINCT fd.trace_id, i.name FROM dev.face_detections fd LEFT JOIN dev.identities i ON i.id = fd.identity_id WHERE fd.file_uuid=%s AND fd.identity_id IS NOT NULL """, (UUID,)) trace_to_identity = {r[0]: r[1] for r in cur.fetchall()} cur.close(); conn.close() BUCKET = 5 num_buckets = int(duration / BUCKET) + 1 max_density = max(density.values()) if density else 1 def build_html(): h = [] h.append('Face Trace Report') h.append('') sub = identity_label if identity_label else "" h.append('

Face Trace Report — ' + video_name[:60] + sub + '

') # Identity card (when filtering by identity) if identity_info: h.append('
') h.append('

Identity Details

') h.append(f'') h.append(f'') h.append(f'') h.append(f'') h.append(f'') h.append(f'') h.append('
ID{identity_info["id"]}
Name{identity_info["name"]}
Type{identity_info["type"]}
Source{identity_info["source"]}
Status{identity_info["status"]}
') # Top identities table (all view) if top_identities: h.append('

Top Identities

') h.append('
') h.append('') h.append('') for iid, name, fc, tc in top_identities: short_name = (name or f"#{iid}")[:60] h.append(f'') h.append('
IdentityNameFacesTraces
{iid}{short_name}{fc:,}{tc}
') # Stats row h.append('
') h.append(f'
{len(trace_spans):,}
traces
') h.append(f'
{total_detections:,}
detections
') h.append(f'
{duration:.0f}s
duration
') h.append(f'
{max_density}
max per {BUCKET}s
') h.append(f'
{fps:.0f}fps
video fps
') h.append(f'
{hz:.0f}Hz
sample rate (every {sample_interval}frames)
') h.append(f'
{num_buckets}
{BUCKET}s buckets
') h.append('
') # 1. Density histogram h.append('

Face Density Over Time

') h.append('
Number of face detections per 5-second interval
') w_px = num_buckets * 2 + 20 h.append(f'
') for b in range(num_buckets): v = density.get(b, 0) h_px = max(2, int(60 * v / max(1, max_density * 0.6))) if v > 0 else 0 if v == 0: color = "#0d1117" else: i = min(v / (max(1, max_density * 0.5)), 1.0) r = int(233 * i + 13 * (1 - i)) g = int(69 * i + 13 * (1 - i)) bv = int(96 * i + 23 * (1 - i)) color = f"rgb({r},{g},{bv})" title = f"{b * BUCKET:.0f}s: {v} faces" h.append(f'') h.append('
') h.append(f'
0s{duration:.0f}s
') # 2. Trace timeline (Gantt) h.append('

Trace Timeline

') h.append('
First → last appearance for each trace. Hover for details.
') show_traces = min(len(trace_spans), 2000) bar_h = 2 chart_height = show_traces * (bar_h + 1) + 10 h.append(f'
') for i, (tid, fn0, fn1, t0, t1, cnt) in enumerate(trace_spans[:show_traces]): left = int(t0 / duration * (w_px - 20)) + 10 width = max(3, int((t1 - t0) / duration * (w_px - 20))) top = i * (bar_h + 1) + 5 opacity = 1.0 if cnt > 5 else 0.3 identity_note = "" if tid in trace_to_identity: identity_note = f", identity: {trace_to_identity[tid]}" title = f"T{tid}: {t0:.0f}s–{t1:.0f}s, {cnt} faces, f{fn0}–f{fn1}{identity_note}" h.append(f'') h.append('
') h.append(f'
0s (showing {show_traces}/{len(trace_spans)} traces){duration:.0f}s
') # 3. Per-trace heatmap h.append('

Per-Trace Heatmap (top 500, every 10th trace)

') h.append(f'
') step = max(1, num_buckets // 120) for i, (tid, fn0, fn1, t0, t1, cnt) in enumerate(trace_spans[:500]): if i % 10 != 0: continue start_bkt = int(t0 / BUCKET) end_bkt = int(t1 / BUCKET) + 1 row = f'
T{tid}' for b in range(0, num_buckets, step): active = start_bkt <= b <= end_bkt color = "#e94560" if active else "#161b22" row += f'' row += '
' h.append(row) h.append('
') h.append('') return '\n'.join(h) html = build_html() with open(OUT, 'w') as f: f.write(html) print(f"Saved: {OUT}") print(f"Traces: {len(trace_spans)}, Detections: {total_detections}, Density max: {max_density}, Duration: {duration:.0f}s, Sample: {hz:.0f}Hz") print(f"Size: {len(html) / 1024:.0f}KB")