300 lines
11 KiB
Python
300 lines
11 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Cross-validate face detections: InsightFace vs Vision Framework vs MediaPipe
|
|
Identifies false positives by comparing all three detectors.
|
|
"""
|
|
import sys, os, json, time, subprocess, tempfile, shutil
|
|
from pathlib import Path
|
|
|
|
INSIGHTFACE_DIR = "/Users/accusys/momentry/output_dev"
|
|
EXHIBITION_VIDEO = "/Users/accusys/momentry/var/sftpgo/data/demo/Thunderbolt ExaSAN at CCBN 中国国际广播电视信息网络展览会清.mp4"
|
|
EXHIBITION_UUID = "477d8fa7bc0e1a70d89cc0022b7ebfd2"
|
|
|
|
|
|
def extract_frames(video_path, sample_interval=30, max_frames=30):
|
|
tmpdir = tempfile.mkdtemp(prefix="face_val_")
|
|
pattern = os.path.join(tmpdir, "frame_%05d.jpg")
|
|
subprocess.run(["ffmpeg", "-y", "-v", "quiet", "-i", video_path,
|
|
"-vf", f"select=not(mod(n\\,{sample_interval}))",
|
|
"-vsync", "vfr", "-q:v", "5", pattern], check=True)
|
|
files = sorted([f for f in os.listdir(tmpdir) if f.endswith(".jpg")])[:max_frames]
|
|
return tmpdir, [os.path.join(tmpdir, f) for f in files], {int(f.split("_")[1].split(".")[0]): os.path.join(tmpdir, f) for f in files[:max_frames]}
|
|
|
|
|
|
def iou(b1, b2):
|
|
"""IoU of two bboxes [x, y, w, h]"""
|
|
x1 = max(b1[0], b2[0])
|
|
y1 = max(b1[1], b2[1])
|
|
x2 = min(b1[0] + b1[2], b2[0] + b2[2])
|
|
y2 = min(b1[1] + b1[3], b2[1] + b2[3])
|
|
inter = max(0, x2 - x1) * max(0, y2 - y1)
|
|
a1, a2 = b1[2] * b1[3], b2[2] * b2[3]
|
|
union = a1 + a2 - inter
|
|
return inter / union if union > 0 else 0
|
|
|
|
|
|
def load_insightface_data(uuid):
|
|
"""Load existing InsightFace output"""
|
|
path = os.path.join(INSIGHTFACE_DIR, f"{uuid}.face.json")
|
|
if not os.path.exists(path):
|
|
print(f"[InsightFace] No data at {path}")
|
|
return {}
|
|
with open(path) as f:
|
|
data = json.load(f)
|
|
# Index by frame number
|
|
frames = {}
|
|
for fr in data.get("frames", []):
|
|
fn = fr.get("frame", 0)
|
|
faces = []
|
|
for face in fr.get("faces", []):
|
|
faces.append({
|
|
"bbox": [face.get("x", 0), face.get("y", 0),
|
|
face.get("width", 0), face.get("height", 0)],
|
|
"conf": face.get("confidence", 0),
|
|
"embedding": face.get("embedding"),
|
|
"attrs": face.get("attributes"),
|
|
})
|
|
if faces:
|
|
frames[fn] = faces
|
|
print(f"[InsightFace] Loaded {len(data.get('frames',[]))} frames, {sum(len(v) for v in frames.values())} faces")
|
|
return frames
|
|
|
|
|
|
def detect_vision(frame_paths):
|
|
"""Vision Framework detection - call swift binary"""
|
|
swift_bin = os.path.join(os.path.dirname(__file__),
|
|
"swift_processors/.build/debug/face_compare_test")
|
|
if not os.path.exists(swift_bin):
|
|
print("[Vision] Binary not found at", swift_bin)
|
|
return {}
|
|
|
|
print("[Vision] Running detection...")
|
|
t0 = time.time()
|
|
result = subprocess.run([swift_bin, EXHIBITION_VIDEO,
|
|
"--sample-interval", "30", "--max-frames", str(len(frame_paths)),
|
|
"--json-output", "/tmp/vision_faces.json"],
|
|
capture_output=True, text=True, timeout=120)
|
|
print(result.stdout[-300:] if result.stdout else "")
|
|
|
|
# Parse output to get per-frame results
|
|
frames = {}
|
|
current_frame = None
|
|
for line in result.stdout.split("\n"):
|
|
if "Frame " in line and "):" in line:
|
|
parts = line.strip().split(" ")
|
|
frame_num = None
|
|
for p in parts:
|
|
try:
|
|
frame_num = int(p)
|
|
break
|
|
except:
|
|
continue
|
|
if frame_num is not None:
|
|
current_frame = frame_num
|
|
if current_frame not in frames:
|
|
frames[current_frame] = []
|
|
elif "bbox=" in line and current_frame is not None:
|
|
# Parse bbox
|
|
try:
|
|
bbox_part = line.split("bbox=(")[1].split(")")[0]
|
|
x, y = bbox_part.split(",")
|
|
size_part = line.split("size=")[1].split(" ")[0]
|
|
w, h = size_part.split("x")
|
|
conf_part = line.split("conf=")[1].split(" ")[0]
|
|
frames[current_frame].append({
|
|
"bbox": [float(x), float(y), float(w), float(h)],
|
|
"conf": float(conf_part),
|
|
})
|
|
except:
|
|
pass
|
|
|
|
print(f"[Vision] Detected faces in {len(frames)} frames")
|
|
return frames
|
|
|
|
|
|
def detect_mediapipe(frame_paths, frame_map):
|
|
"""MediaPipe BlazeFace detection"""
|
|
try:
|
|
# Try to import from system python
|
|
sys.path.insert(0, "/Users/accusys/Library/Python/3.9/lib/python/site-packages")
|
|
from mediapipe.tasks.python.vision.face_detector import FaceDetector, FaceDetectorOptions
|
|
from mediapipe.tasks.python.core.base_options import BaseOptions
|
|
import mediapipe as mp
|
|
except ImportError:
|
|
print("[MediaPipe] Package not available via system Python")
|
|
return {}
|
|
|
|
import cv2
|
|
model_path = "/tmp/mp_models/face_detector.task"
|
|
if not os.path.exists(model_path):
|
|
print("[MediaPipe] Model not found, skipping")
|
|
return {}
|
|
|
|
try:
|
|
detector = FaceDetector.create_from_options(
|
|
FaceDetectorOptions(base_options=BaseOptions(model_asset_path=model_path)))
|
|
except:
|
|
print("[MediaPipe] Failed to create detector")
|
|
return {}
|
|
|
|
frames = {}
|
|
for fname in frame_paths:
|
|
fn = int(os.path.basename(fname).split("_")[1].split(".")[0])
|
|
img = cv2.imread(fname)
|
|
if img is None: continue
|
|
h, w = img.shape[:2]
|
|
rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
|
|
mp_img = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb)
|
|
result = detector.detect(mp_img)
|
|
if result.detections:
|
|
faces = []
|
|
for det in result.detections:
|
|
bb = det.bounding_box
|
|
faces.append({
|
|
"bbox": [bb.origin_x, bb.origin_y, bb.width, bb.height],
|
|
"conf": det.score,
|
|
})
|
|
if faces:
|
|
frames[fn] = faces
|
|
|
|
print(f"[MediaPipe] Detected faces in {len(frames)} frames")
|
|
return frames
|
|
|
|
|
|
def match_faces(ifaces, vfaces, mpfaces, iou_thresh=0.3):
|
|
"""Match faces across detectors and categorize"""
|
|
matched_if = set()
|
|
matched_vf = set()
|
|
matched_mp = set()
|
|
all_frame_nums = sorted(set(list(ifaces.keys()) + list(vfaces.keys()) + list(mpfaces.keys())))
|
|
|
|
stats = {"consensus": 0, "if_only": 0, "vf_only": 0, "mp_only": 0, "if_vf": 0, "if_mp": 0, "vf_mp": 0}
|
|
|
|
for fn in all_frame_nums:
|
|
if_faces = ifaces.get(fn, [])
|
|
vf_faces = vfaces.get(fn, [])
|
|
mp_faces = mpfaces.get(fn, [])
|
|
|
|
# Match IF vs VF
|
|
for ii, iface in enumerate(if_faces):
|
|
for vi, vface in enumerate(vf_faces):
|
|
if iou(iface["bbox"], vface["bbox"]) > iou_thresh:
|
|
matched_if.add((fn, ii))
|
|
matched_vf.add((fn, vi))
|
|
break
|
|
|
|
# Match IF vs MP
|
|
for ii, iface in enumerate(if_faces):
|
|
for mi, mpface in enumerate(mp_faces):
|
|
if iou(iface["bbox"], mpface["bbox"]) > iou_thresh:
|
|
matched_if.add((fn, ii))
|
|
matched_mp.add((fn, mi))
|
|
break
|
|
|
|
# Match VF vs MP
|
|
for vi, vface in enumerate(vf_faces):
|
|
for mi, mpface in enumerate(mp_faces):
|
|
if iou(vface["bbox"], mpface["bbox"]) > iou_thresh:
|
|
matched_vf.add((fn, vi))
|
|
matched_mp.add((fn, mi))
|
|
break
|
|
|
|
# Categorize
|
|
for fn in all_frame_nums:
|
|
if_faces = ifaces.get(fn, [])
|
|
vf_faces = vfaces.get(fn, [])
|
|
mp_faces = mpfaces.get(fn, [])
|
|
|
|
for ii in range(len(if_faces)):
|
|
matched_v = (fn, ii) in matched_if and any((fn, vi) in matched_vf for vi in range(len(vf_faces)))
|
|
matched_m = (fn, ii) in matched_if and any((fn, mi) in matched_mp for mi in range(len(mp_faces)))
|
|
if matched_v and matched_m:
|
|
stats["consensus"] += 1
|
|
elif matched_v:
|
|
stats["if_vf"] += 1
|
|
elif matched_m:
|
|
stats["if_mp"] += 1
|
|
else:
|
|
stats["if_only"] += 1
|
|
|
|
for vi in range(len(vf_faces)):
|
|
if (fn, vi) not in matched_vf:
|
|
stats["vf_only"] += 1
|
|
|
|
for mi in range(len(mp_faces)):
|
|
if (fn, mi) not in matched_mp:
|
|
stats["mp_only"] += 1
|
|
|
|
return stats, matched_if, matched_vf, matched_mp
|
|
|
|
|
|
def main():
|
|
print("=" * 60)
|
|
print("Face Detection Cross-Validation")
|
|
print("=" * 60)
|
|
|
|
# 1. Extract frames
|
|
tmpdir, frame_paths, frame_map = extract_frames(EXHIBITION_VIDEO, 30, 30)
|
|
print(f"Extracted {len(frame_paths)} frames")
|
|
|
|
# 2. Load InsightFace data
|
|
ifaces = load_insightface_data(EXHIBITION_UUID)
|
|
# Filter to only frames we extracted
|
|
ifaces = {k: v for k, v in ifaces.items() if k in frame_map}
|
|
|
|
# 3. Vision Framework
|
|
vfaces = detect_vision(frame_paths)
|
|
|
|
# 4. MediaPipe
|
|
mpfaces = detect_mediapipe(frame_paths, frame_map)
|
|
|
|
# 5. Cross-validate
|
|
print("\n" + "=" * 60)
|
|
print("Cross-Validation Results")
|
|
print("=" * 60)
|
|
stats, matched_if, matched_vf, matched_mp = match_faces(ifaces, vfaces, mpfaces)
|
|
|
|
total_if = sum(len(v) for v in ifaces.values())
|
|
total_vf = sum(len(v) for v in vfaces.values())
|
|
total_mp = sum(len(v) for v in mpfaces.values())
|
|
|
|
print(f"\nDetected faces (sample frames):")
|
|
print(f" InsightFace: {total_if}")
|
|
print(f" Vision: {total_vf}")
|
|
print(f" MediaPipe: {total_mp}")
|
|
|
|
print(f"\nMatch categories:")
|
|
print(f" All 3 consensus: {stats['consensus']} ✅ likely real")
|
|
print(f" IF + Vision: {stats['if_vf']} ✅ likely real")
|
|
print(f" IF + MediaPipe: {stats['if_mp']} ✅ likely real")
|
|
print(f" InsightFace ONLY: {stats['if_only']} ⚠️ potential false positives")
|
|
print(f" Vision ONLY: {stats['vf_only']} ⚠️")
|
|
print(f" MediaPipe ONLY: {stats['mp_only']} ⚠️")
|
|
|
|
if_total = stats["consensus"] + stats["if_vf"] + stats["if_mp"] + stats["if_only"]
|
|
fp_rate = stats["if_only"] / if_total * 100 if if_total > 0 else 0
|
|
print(f"\nEstimated InsightFace false positive rate: {fp_rate:.1f}%")
|
|
print(f" ({stats['if_only']} IF-only out of {if_total} total IF faces)")
|
|
|
|
if stats["if_only"] > 0:
|
|
print(f"\nSample IF-only faces (potential false positives):")
|
|
shown = 0
|
|
for fn in sorted(ifaces.keys()):
|
|
ifaces_list = ifaces[fn]
|
|
for ii in range(len(ifaces_list)):
|
|
if (fn, ii) not in matched_if:
|
|
face = ifaces_list[ii]
|
|
print(f" Frame {fn}: bbox={face['bbox']}, conf={face['conf']:.3f}, attrs={face.get('attrs',{})}")
|
|
shown += 1
|
|
if shown >= 10:
|
|
break
|
|
if shown >= 10:
|
|
break
|
|
|
|
shutil.rmtree(tmpdir, ignore_errors=True)
|
|
print("\nDone.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|