#!/opt/homebrew/bin/python3.11 """ Cross-validate face detections: InsightFace vs Vision Framework vs MediaPipe Identifies false positives by comparing all three detectors. """ import sys, os, json, time, subprocess, tempfile, shutil from pathlib import Path INSIGHTFACE_DIR = "/Users/accusys/momentry/output_dev" EXHIBITION_VIDEO = "/Users/accusys/momentry/var/sftpgo/data/demo/Thunderbolt ExaSAN at CCBN 中国国际广播电视信息网络展览会清.mp4" EXHIBITION_UUID = "477d8fa7bc0e1a70d89cc0022b7ebfd2" def extract_frames(video_path, sample_interval=30, max_frames=30): tmpdir = tempfile.mkdtemp(prefix="face_val_") pattern = os.path.join(tmpdir, "frame_%05d.jpg") subprocess.run(["ffmpeg", "-y", "-v", "quiet", "-i", video_path, "-vf", f"select=not(mod(n\\,{sample_interval}))", "-vsync", "vfr", "-q:v", "5", pattern], check=True) files = sorted([f for f in os.listdir(tmpdir) if f.endswith(".jpg")])[:max_frames] return tmpdir, [os.path.join(tmpdir, f) for f in files], {int(f.split("_")[1].split(".")[0]): os.path.join(tmpdir, f) for f in files[:max_frames]} def iou(b1, b2): """IoU of two bboxes [x, y, w, h]""" x1 = max(b1[0], b2[0]) y1 = max(b1[1], b2[1]) x2 = min(b1[0] + b1[2], b2[0] + b2[2]) y2 = min(b1[1] + b1[3], b2[1] + b2[3]) inter = max(0, x2 - x1) * max(0, y2 - y1) a1, a2 = b1[2] * b1[3], b2[2] * b2[3] union = a1 + a2 - inter return inter / union if union > 0 else 0 def load_insightface_data(uuid): """Load existing InsightFace output""" path = os.path.join(INSIGHTFACE_DIR, f"{uuid}.face.json") if not os.path.exists(path): print(f"[InsightFace] No data at {path}") return {} with open(path) as f: data = json.load(f) # Index by frame number frames = {} for fr in data.get("frames", []): fn = fr.get("frame", 0) faces = [] for face in fr.get("faces", []): faces.append({ "bbox": [face.get("x", 0), face.get("y", 0), face.get("width", 0), face.get("height", 0)], "conf": face.get("confidence", 0), "embedding": face.get("embedding"), "attrs": face.get("attributes"), }) if faces: frames[fn] = faces print(f"[InsightFace] Loaded {len(data.get('frames',[]))} frames, {sum(len(v) for v in frames.values())} faces") return frames def detect_vision(frame_paths): """Vision Framework detection - call swift binary""" swift_bin = os.path.join(os.path.dirname(__file__), "swift_processors/.build/debug/face_compare_test") if not os.path.exists(swift_bin): print("[Vision] Binary not found at", swift_bin) return {} print("[Vision] Running detection...") t0 = time.time() result = subprocess.run([swift_bin, EXHIBITION_VIDEO, "--sample-interval", "30", "--max-frames", str(len(frame_paths)), "--json-output", "/tmp/vision_faces.json"], capture_output=True, text=True, timeout=120) print(result.stdout[-300:] if result.stdout else "") # Parse output to get per-frame results frames = {} current_frame = None for line in result.stdout.split("\n"): if "Frame " in line and "):" in line: parts = line.strip().split(" ") frame_num = None for p in parts: try: frame_num = int(p) break except: continue if frame_num is not None: current_frame = frame_num if current_frame not in frames: frames[current_frame] = [] elif "bbox=" in line and current_frame is not None: # Parse bbox try: bbox_part = line.split("bbox=(")[1].split(")")[0] x, y = bbox_part.split(",") size_part = line.split("size=")[1].split(" ")[0] w, h = size_part.split("x") conf_part = line.split("conf=")[1].split(" ")[0] frames[current_frame].append({ "bbox": [float(x), float(y), float(w), float(h)], "conf": float(conf_part), }) except: pass print(f"[Vision] Detected faces in {len(frames)} frames") return frames def detect_mediapipe(frame_paths, frame_map): """MediaPipe BlazeFace detection""" try: # Try to import from system python sys.path.insert(0, "/Users/accusys/Library/Python/3.9/lib/python/site-packages") from mediapipe.tasks.python.vision.face_detector import FaceDetector, FaceDetectorOptions from mediapipe.tasks.python.core.base_options import BaseOptions import mediapipe as mp except ImportError: print("[MediaPipe] Package not available via system Python") return {} import cv2 model_path = "/tmp/mp_models/face_detector.task" if not os.path.exists(model_path): print("[MediaPipe] Model not found, skipping") return {} try: detector = FaceDetector.create_from_options( FaceDetectorOptions(base_options=BaseOptions(model_asset_path=model_path))) except: print("[MediaPipe] Failed to create detector") return {} frames = {} for fname in frame_paths: fn = int(os.path.basename(fname).split("_")[1].split(".")[0]) img = cv2.imread(fname) if img is None: continue h, w = img.shape[:2] rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) mp_img = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb) result = detector.detect(mp_img) if result.detections: faces = [] for det in result.detections: bb = det.bounding_box faces.append({ "bbox": [bb.origin_x, bb.origin_y, bb.width, bb.height], "conf": det.score, }) if faces: frames[fn] = faces print(f"[MediaPipe] Detected faces in {len(frames)} frames") return frames def match_faces(ifaces, vfaces, mpfaces, iou_thresh=0.3): """Match faces across detectors and categorize""" matched_if = set() matched_vf = set() matched_mp = set() all_frame_nums = sorted(set(list(ifaces.keys()) + list(vfaces.keys()) + list(mpfaces.keys()))) stats = {"consensus": 0, "if_only": 0, "vf_only": 0, "mp_only": 0, "if_vf": 0, "if_mp": 0, "vf_mp": 0} for fn in all_frame_nums: if_faces = ifaces.get(fn, []) vf_faces = vfaces.get(fn, []) mp_faces = mpfaces.get(fn, []) # Match IF vs VF for ii, iface in enumerate(if_faces): for vi, vface in enumerate(vf_faces): if iou(iface["bbox"], vface["bbox"]) > iou_thresh: matched_if.add((fn, ii)) matched_vf.add((fn, vi)) break # Match IF vs MP for ii, iface in enumerate(if_faces): for mi, mpface in enumerate(mp_faces): if iou(iface["bbox"], mpface["bbox"]) > iou_thresh: matched_if.add((fn, ii)) matched_mp.add((fn, mi)) break # Match VF vs MP for vi, vface in enumerate(vf_faces): for mi, mpface in enumerate(mp_faces): if iou(vface["bbox"], mpface["bbox"]) > iou_thresh: matched_vf.add((fn, vi)) matched_mp.add((fn, mi)) break # Categorize for fn in all_frame_nums: if_faces = ifaces.get(fn, []) vf_faces = vfaces.get(fn, []) mp_faces = mpfaces.get(fn, []) for ii in range(len(if_faces)): matched_v = (fn, ii) in matched_if and any((fn, vi) in matched_vf for vi in range(len(vf_faces))) matched_m = (fn, ii) in matched_if and any((fn, mi) in matched_mp for mi in range(len(mp_faces))) if matched_v and matched_m: stats["consensus"] += 1 elif matched_v: stats["if_vf"] += 1 elif matched_m: stats["if_mp"] += 1 else: stats["if_only"] += 1 for vi in range(len(vf_faces)): if (fn, vi) not in matched_vf: stats["vf_only"] += 1 for mi in range(len(mp_faces)): if (fn, mi) not in matched_mp: stats["mp_only"] += 1 return stats, matched_if, matched_vf, matched_mp def main(): print("=" * 60) print("Face Detection Cross-Validation") print("=" * 60) # 1. Extract frames tmpdir, frame_paths, frame_map = extract_frames(EXHIBITION_VIDEO, 30, 30) print(f"Extracted {len(frame_paths)} frames") # 2. Load InsightFace data ifaces = load_insightface_data(EXHIBITION_UUID) # Filter to only frames we extracted ifaces = {k: v for k, v in ifaces.items() if k in frame_map} # 3. Vision Framework vfaces = detect_vision(frame_paths) # 4. MediaPipe mpfaces = detect_mediapipe(frame_paths, frame_map) # 5. Cross-validate print("\n" + "=" * 60) print("Cross-Validation Results") print("=" * 60) stats, matched_if, matched_vf, matched_mp = match_faces(ifaces, vfaces, mpfaces) total_if = sum(len(v) for v in ifaces.values()) total_vf = sum(len(v) for v in vfaces.values()) total_mp = sum(len(v) for v in mpfaces.values()) print(f"\nDetected faces (sample frames):") print(f" InsightFace: {total_if}") print(f" Vision: {total_vf}") print(f" MediaPipe: {total_mp}") print(f"\nMatch categories:") print(f" All 3 consensus: {stats['consensus']} ✅ likely real") print(f" IF + Vision: {stats['if_vf']} ✅ likely real") print(f" IF + MediaPipe: {stats['if_mp']} ✅ likely real") print(f" InsightFace ONLY: {stats['if_only']} ⚠️ potential false positives") print(f" Vision ONLY: {stats['vf_only']} ⚠️") print(f" MediaPipe ONLY: {stats['mp_only']} ⚠️") if_total = stats["consensus"] + stats["if_vf"] + stats["if_mp"] + stats["if_only"] fp_rate = stats["if_only"] / if_total * 100 if if_total > 0 else 0 print(f"\nEstimated InsightFace false positive rate: {fp_rate:.1f}%") print(f" ({stats['if_only']} IF-only out of {if_total} total IF faces)") if stats["if_only"] > 0: print(f"\nSample IF-only faces (potential false positives):") shown = 0 for fn in sorted(ifaces.keys()): ifaces_list = ifaces[fn] for ii in range(len(ifaces_list)): if (fn, ii) not in matched_if: face = ifaces_list[ii] print(f" Frame {fn}: bbox={face['bbox']}, conf={face['conf']:.3f}, attrs={face.get('attrs',{})}") shown += 1 if shown >= 10: break if shown >= 10: break shutil.rmtree(tmpdir, ignore_errors=True) print("\nDone.") if __name__ == "__main__": main()