Files
momentry_core/scripts/face_mediapipe_test.py

201 lines
7.1 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
POC: MediaPipe Face Detection vs Apple Vision Framework vs InsightFace
Tests face detection on video frames and reports:
- Detection count
- Bounding box quality
- Landmarks (468 face mesh)
- Processing speed
"""
import sys
import json
import os
import time
import subprocess
import argparse
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
def extract_frames(video_path, sample_interval=30, max_frames=50):
"""Extract frames using ffmpeg"""
import tempfile
tmpdir = tempfile.mkdtemp(prefix="face_test_")
pattern = os.path.join(tmpdir, "frame_%05d.jpg")
cmd = ["ffmpeg", "-y", "-v", "quiet", "-i", video_path,
"-vf", f"select=not(mod(n\\,{sample_interval}))",
"-vsync", "vfr", "-q:v", "5", pattern]
subprocess.run(cmd, check=True)
files = sorted([f for f in os.listdir(tmpdir) if f.endswith(".jpg")])[:max_frames]
return tmpdir, [os.path.join(tmpdir, f) for f in files]
def test_mediapipe(frame_paths, fps):
"""MediaPipe Face Detection + Face Mesh"""
try:
from mediapipe.tasks import vision
from mediapipe.tasks.python.core.base_options import BaseOptions
from mediapipe.tasks.python.vision.face_detector import FaceDetector, FaceDetectorOptions
from mediapipe.tasks.python.vision.face_landmarker import FaceLandmarker, FaceLandmarkerOptions
except ImportError:
print("[MediaPipe] Not available, skipping")
return None
model_dir = os.path.join(os.path.dirname(__file__), "models")
os.makedirs(model_dir, exist_ok=True)
# Check model files - MediaPipe downloads automatically via the API
base_opts_detect = BaseOptions(model_asset_path="")
detect_opts = FaceDetectorOptions(base_options=BaseOptions())
t0 = time.time()
total_faces = 0
frames_with_faces = 0
landmarks_total = 0
# MediaPipe Face Detector
try:
detector = vision.FaceDetector.create_from_options(
FaceDetectorOptions(
base_options=BaseOptions(model_asset_buffer=None),
running_mode=vision.RunningMode.IMAGE
)
)
except:
# Download model first
import urllib.request
model_url = "https://storage.googleapis.com/mediapipe-models/face_detector/blaze_face_short_range/float16/latest/face_detector.task"
model_path = os.path.join(model_dir, "face_detector.task")
if not os.path.exists(model_path):
print(f"[MediaPipe] Downloading model: {model_url}")
urllib.request.urlretrieve(model_url, model_path)
detector = vision.FaceDetector.create_from_options(
FaceDetectorOptions(
base_options=BaseOptions(model_asset_path=model_path),
running_mode=vision.RunningMode.IMAGE
)
)
import cv2
for path in frame_paths:
img = cv2.imread(path)
if img is None:
continue
h, w = img.shape[:2]
mp_img = mp.Image(image_format=mp.ImageFormat.SRGB, data=img)
result = detector.detect(mp_img)
if result.detections:
frames_with_faces += 1
for det in result.detections:
total_faces += 1
bbox = det.bounding_box
# bbox is [x, y, width, height] in pixels
elapsed = time.time() - t0
print(f"[MediaPipe] Detection: {len(frame_paths)} frames, {frames_with_faces} with faces, {total_faces} faces, {elapsed:.2f}s")
# Face Landmarker (468 points)
landmark_path = os.path.join(model_dir, "face_landmarker.task")
if not os.path.exists(landmark_path):
model_url = "https://storage.googleapis.com/mediapipe-models/face_landmarker/face_landmarker/float16/latest/face_landmarker.task"
print(f"[MediaPipe] Downloading landmark model...")
import urllib.request
urllib.request.urlretrieve(model_url, landmark_path)
landmarker = vision.FaceLandmarker.create_from_options(
FaceLandmarkerOptions(
base_options=BaseOptions(model_asset_path=landmark_path),
running_mode=vision.RunningMode.IMAGE,
output_face_blendshapes=False,
output_facial_transformation_matrixes=False,
)
)
t1 = time.time()
for path in frame_paths[:10]: # Only test 10 frames for landmarks
img = cv2.imread(path)
if img is None:
continue
mp_img = mp.Image(image_format=mp.ImageFormat.SRGB, data=img)
result = landmarker.detect(mp_img)
if result.face_landmarks:
for face in result.face_landmarks:
landmarks_total += len(face)
elapsed2 = time.time() - t1
print(f"[MediaPipe] Face Mesh (10 frames): {landmarks_total} total landmarks (~{landmarks_total//max(len(result.face_landmarks),1)} per face)")
return {
"frames_processed": len(frame_paths),
"frames_with_faces": frames_with_faces,
"total_faces": total_faces,
"time_sec": elapsed,
"landmarks_per_face": 468,
}
def test_vision_framework(frame_paths, fps):
"""Apple Vision Framework face detection via swift binary"""
# Use the existing swift binary
swift_bin = os.path.join(os.path.dirname(__file__),
"swift_processors/.build/debug/swift_ocr")
# swift_ocr doesn't do face detection, use the face_compare_test
swift_face = os.path.join(os.path.dirname(__file__),
"swift_processors/.build/debug/face_compare_test")
if not os.path.exists(swift_face):
print("[Vision] Binary not found, skipping")
return None
print(f"[Vision] Running face compare test...")
t0 = time.time()
result = subprocess.run(
[swift_face, frame_paths[0].rsplit("/", 2)[0].replace("/frames", ""), # This won't work for single files
"--sample-interval", "1", "--max-frames", str(len(frame_paths))],
capture_output=True, text=True, timeout=120
)
elapsed = time.time() - t0
print(result.stdout[-500:])
return {"time_sec": elapsed}
def main():
parser = argparse.ArgumentParser()
parser.add_argument("video_path")
parser.add_argument("--sample-interval", type=int, default=30)
parser.add_argument("--max-frames", type=int, default=50)
args = parser.parse_args()
print(f"Testing: {args.video_path}")
# Extract frames
tmpdir, frames = extract_frames(args.video_path, args.sample_interval, args.max_frames)
print(f"Extracted {len(frames)} frames")
# MediaPipe
print("\n=== MediaPipe ===")
mp_result = test_mediapipe(frames, 24)
# Vision Framework
print("\n=== Apple Vision Framework ===")
vf_result = test_vision_framework(frames, 24)
# Summary
print("\n=== Comparison ===")
if mp_result:
print(f"MediaPipe: {mp_result['total_faces']} faces in {mp_result['frames_with_faces']} frames, {mp_result['time_sec']:.2f}s")
print(f" Landmarks: {mp_result['landmarks_per_face']} per face")
print(f"Vision Framework: (see above)")
# Cleanup
import shutil
shutil.rmtree(tmpdir, ignore_errors=True)
if __name__ == "__main__":
main()