#!/opt/homebrew/bin/python3.11 """ Speaker Binding with Lip Verification Reads face.json (8Hz outer_lips) + asrx.json + identity_bindings For each ASR segment with face data + lip motion, create speaker→identity binding. """ import json, subprocess, sys from pathlib import Path from collections import defaultdict UUID = "aeed71342a899fe4b4c57b7d41bcb692" OUTPUT_DIR = Path("/Users/accusys/momentry/output_dev") PSQL = ["/Users/accusys/pgsql/18.3/bin/psql", "-U", "accusys", "-d", "momentry", "-t", "-A"] def psql(sql: str) -> str: r = subprocess.run(PSQL + ["-c", sql], capture_output=True, text=True, timeout=30) return r.stdout.strip() def calc_lip_height(face_data): """Calculate lip height from outer_lips (14 [x,y] points)""" lips = face_data.get("lips", {}) outer = lips.get("outer_lips", []) if isinstance(lips, dict) else lips if not outer or len(outer) < 3: return None ys = [pt[1] for pt in outer] return max(ys) - min(ys) print("=== Speaker Binding with Lip Verification ===") # Step 1: Load face traces with identity_id traces = psql(f""" SELECT trace_id, identity_id FROM dev.face_detections WHERE file_uuid='{UUID}' AND trace_id IS NOT NULL AND identity_id IS NOT NULL GROUP BY trace_id, identity_id """) trace_identity = {} for line in traces.strip().split('\n'): if not line.strip() or '|' not in line: continue p = line.split('|') trace_identity[int(p[0])] = int(p[1]) print(f"Traces with identity: {len(trace_identity)}") # Step 2: Load trace frame ranges tf = psql(f""" SELECT trace_id, MIN(frame_number), MAX(frame_number), MIN(timestamp_secs), MAX(timestamp_secs) FROM dev.face_detections WHERE file_uuid='{UUID}' AND trace_id IS NOT NULL GROUP BY trace_id """) trace_ranges = {} for line in tf.strip().split('\n'): if not line.strip() or '|' not in line: continue p = line.split('|') tid = int(p[0]) trace_ranges[tid] = { 'min_frame': int(p[1]), 'max_frame': int(p[2]), 'min_ts': float(p[3]), 'max_ts': float(p[4]) } # Step 3: Load lip analysis per frame from face.json print("Loading face.json lips data...") face = json.load(open(OUTPUT_DIR / f"{UUID}.face.json")) frame_faces = {} for fr in face.get("frames", []): fn = fr["frame"] faces_data = [] for face_data in fr.get("faces", []): h = calc_lip_height(face_data) if h is not None: faces_data.append({"height": h}) if faces_data: frame_faces[fn] = faces_data print(f"Frames with lip data: {len(frame_faces)}") # Step 4: Load ASRX segments asrx = json.load(open(OUTPUT_DIR / f"{UUID}.asrx.json")) segments = asrx.get("segments", []) # Step 5: For each ASR segment with face overlap, compute lip motion from collections import defaultdict speaker_trace_scores = defaultdict(list) for seg in segments: st = seg.get("start_time", 0) et = seg.get("end_time", 0) speaker = seg.get("speaker_id", "") if not speaker: continue fps = 25.0 start_frame = int(st * fps) end_frame = int(et * fps) + 10 # Find overlapping traces overlapping_traces = [] for tid, tr in trace_ranges.items(): if tr['min_ts'] <= et and tr['max_ts'] >= st: overlapping_traces.append(tid) if not overlapping_traces: continue # Compute lip motion for each overlapping trace for tid in overlapping_traces: tr = trace_ranges[tid] # Baseline frames before ASR start baseline = [] # During frames during = [] for fn in frame_faces: fn_ts = fn / fps if fn_ts >= tr['min_ts'] and fn_ts <= tr['max_ts']: if fn_ts < st - 1.0: # Before (baseline) for fd in frame_faces[fn]: baseline.append(fd["height"]) elif fn_ts >= st and fn_ts <= et: # During for fd in frame_faces[fn]: during.append(fd["height"]) if not baseline or not during: continue baseline_avg = sum(baseline) / len(baseline) during_avg = sum(during) / len(during) motion = (during_avg - baseline_avg) / max(baseline_avg, 0.1) score = max(0, min(1.0, motion * 5)) # Normalize: 20% motion → 1.0 speaker_trace_scores[(speaker, tid)].append(score) # Step 6: Create speaker bindings bindings = 0 existing = psql(f"SELECT identity_value FROM dev.identity_bindings WHERE identity_type='speaker' AND identity_id IN (SELECT identity_id FROM dev.face_detections WHERE file_uuid='{UUID}' AND identity_id IS NOT NULL GROUP BY identity_id)") existing_speakers = set(existing.strip().split('\n')) if existing.strip() else set() new_bindings = 0 for (speaker, tid), scores in speaker_trace_scores.items(): if tid not in trace_identity: continue identity_id = trace_identity[tid] avg_score = sum(scores) / len(scores) if scores else 0 if speaker in existing_speakers: continue if avg_score < 0.3: # Threshold: need meaningful lip motion continue r = psql(f""" INSERT INTO dev.identity_bindings (identity_id, identity_type, identity_value, confidence, metadata) VALUES ({identity_id}, 'speaker', '{speaker}', {avg_score:.3f}, '{{"source":"lip_analysis","trace_id":{tid},"segments":{len(scores)},"avg_score":{avg_score:.3f}}}'::jsonb) ON CONFLICT (identity_id, identity_type, identity_value) DO UPDATE SET confidence=EXCLUDED.confidence """) new_bindings += 1 print(f"\n=== Done ===") print(f"ASR segments analyzed: {len(segments)}") print(f"Segments with face+lip data: {len(speaker_trace_scores)}") print(f"New speaker bindings: {new_bindings}") # Verify binds = psql(f"SELECT ib.identity_value, i.name FROM dev.identity_bindings ib JOIN dev.identities i ON i.id=ib.identity_id WHERE ib.identity_type='speaker' AND i.id IN (SELECT identity_id FROM dev.face_detections WHERE file_uuid='{UUID}') ORDER BY ib.identity_value") print(f"\nSpeaker bindings:") for line in binds.strip().split('\n'): if line.strip() and '|' in line: p = line.split('|') print(f" {p[0]:15s} → {p[1]}")