#!/opt/homebrew/bin/python3.11 """ Face Processor V2 - Apple Vision detection + CoreML FaceNet embedding Flow: 1. swift_face (Vision/ANE) → bbox + pose per frame 2. cv2 opens video, crops faces from bbox 3. CoreML FaceNet → 512D embedding per face 4. Output face.json in standard format Replaces face_processor.py (no more InsightFace CPU detection). Detection cost: near-zero CPU (Vision ANE) Embedding cost: near-zero CPU (CoreML ANE) """ import re import sys import os import json import argparse import subprocess import time from typing import Optional, Dict import cv2 import numpy as np from pathlib import Path # CoreML import coremltools as ct sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from redis_publisher import RedisPublisher SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) SWIFT_BIN = os.path.join(SCRIPT_DIR, "swift_processors", ".build", "debug", "swift_face") FACENET_PATH = os.path.join(SCRIPT_DIR, "..", "models", "facenet512.mlpackage") # Pose angle classification from roll/yaw def classify_pose(roll: float, yaw: float) -> str: """Convert roll/yaw to pose angle label""" abs_yaw = abs(yaw) abs_roll = abs(roll) if abs_yaw < 15 and abs_roll < 15: return "frontal" elif abs_yaw > 30: return "profile_right" if yaw > 0 else "profile_left" else: return "three_quarter" class FaceProcessorVision: def __init__(self, video_path: str, output_path: str, uuid: str = "", sample_interval: int = 3, publisher: RedisPublisher = None): self.video_path = video_path self.output_path = output_path self.uuid = uuid self.sample_interval = sample_interval self.publisher = publisher # Load CoreML FaceNet self.coreml_model = None facenet = os.path.normpath(FACENET_PATH) if os.path.exists(facenet): try: self.coreml_model = ct.models.MLModel(facenet) print(f"[FACE_V2] CoreML FaceNet loaded: {facenet}") except Exception as e: print(f"[FACE_V2] CoreML load failed: {e}") self.video = None self.fps = 30.0 self.total_frames = 0 self.width = 0 self.height = 0 def open_video(self): self.video = cv2.VideoCapture(self.video_path) if not self.video.isOpened(): raise RuntimeError(f"Cannot open: {self.video_path}") self.fps = self.video.get(cv2.CAP_PROP_FPS) self.total_frames = int(self.video.get(cv2.CAP_PROP_FRAME_COUNT)) self.width = int(self.video.get(cv2.CAP_PROP_FRAME_WIDTH)) self.height = int(self.video.get(cv2.CAP_PROP_FRAME_HEIGHT)) print(f"[FACE_V2] Video: {self.width}x{self.height}, {self.fps:.1f}fps, {self.total_frames}f") def extract_face_embedding(self, face_img: np.ndarray) -> Optional[list]: """Run CoreML FaceNet on cropped face""" if self.coreml_model is None: return None try: # Resize to 160x160 resized = cv2.resize(face_img, (160, 160)) # Convert HWC to CHW and normalize to [-1, 1] normalized = (resized.astype(np.float32) / 127.5) - 1.0 normalized = np.transpose(normalized, (2, 0, 1)) # HWC -> CHW # Add batch dim: (1, 3, 160, 160) input_array = np.expand_dims(normalized, axis=0) result = self.coreml_model.predict({"input": input_array}) # Find output key (var_xxx) emb_key = [k for k in result.keys() if k.startswith("var_")][0] emb = result[emb_key].flatten().tolist() return emb except Exception as e: print(f"[FACE_V2] Embedding error: {e}") return None def process_with_swift(self) -> Dict: """Step 1: Run swift_face to get bbox + pose""" print(f"[FACE_V2] Step 1: Vision detection...") # Build swift_face if needed if not os.path.exists(SWIFT_BIN): build_dir = os.path.join(SCRIPT_DIR, "swift_processors") print(f"[FACE_V2] Building swift_face in {build_dir}...") subprocess.run( ["swift", "build", "-c", "debug", "--product", "swift_face"], cwd=build_dir, check=True ) swift_out = self.output_path.replace(".json", "_detect.json") cmd = [ SWIFT_BIN, self.video_path, swift_out, "--sample-interval", str(self.sample_interval), ] if self.uuid: cmd.extend(["--uuid", self.uuid]) print(f"[FACE_V2] Running: {' '.join(cmd)}") t0 = time.time() log_path = swift_out + ".log" log_f = open(log_path, "w") proc = subprocess.Popen(cmd, stdout=log_f, stderr=subprocess.STDOUT, text=True) last_pct = -1 while proc.poll() is None: time.sleep(10) # Read latest log lines try: with open(log_path) as lf: for line in lf: line = line.strip() m = re.search(r'(\d+)% complete', line) if m: pct = int(m.group(1)) if pct > last_pct: last_pct = pct if self.publisher: self.publisher.progress("face", pct, 100, f"swift detect {pct}%") except Exception: pass log_f.close() if proc.returncode != 0: stderr_out = proc.stderr.read() if stderr_out: print(stderr_out.strip(), file=sys.stderr) raise RuntimeError(f"swift_face exited with code {proc.returncode}") elapsed = time.time() - t0 print(f"[FACE_V2] Detection done in {elapsed:.1f}s") with open(swift_out) as f: return json.load(f) def embed_and_save(self, detection_data: Dict): """Step 2: Crop faces + CoreML embedding + save face.json""" print(f"[FACE_V2] Step 2: CoreML embedding...") frames = detection_data.get("frames", []) self.open_video() face_data = { "metadata": { "video_path": os.path.abspath(self.video_path), "fps": self.fps, "width": self.width, "height": self.height, "sample_interval": self.sample_interval, "detection_method": "apple_vision", "embedding_method": "coreml_facenet", "status": "in_progress", "total_frames": self.total_frames, }, "frames": {} } t0 = time.time() embed_count = 0 total_face_count = 0 last_pct = -1 for frame_info in frames: frame_num = frame_info["frame"] faces = [] for face in frame_info.get("faces", []): bb = face["bbox"] x, y, w, h = bb["x"], bb["y"], bb["width"], bb["height"] if w <= 10 or h <= 10: continue # skip tiny faces # Seek to frame and read self.video.set(cv2.CAP_PROP_POS_FRAMES, frame_num) ret, frame = self.video.read() if not ret: continue # Crop face x1, y1 = max(0, x), max(0, y) x2, y2 = min(self.width, x + w), min(self.height, y + h) if x2 <= x1 or y2 <= y1: continue face_img = frame[y1:y2, x1:x2] if face_img.size == 0: continue # CoreML embedding emb = self.extract_face_embedding(face_img) if emb is not None: embed_count += 1 # Pose classification pose_info = face.get("pose", {}) pose_angle = classify_pose( pose_info.get("roll", 0), pose_info.get("yaw", 0) ) faces.append({ "x": x, "y": y, "width": w, "height": h, "confidence": face.get("confidence", 0.5), "embedding": emb, "pose_angle": { "angle": pose_angle, "roll": pose_info.get("roll", 0), "yaw": pose_info.get("yaw", 0), "pitch": pose_info.get("pitch", 0), }, "lips": face.get("lips"), "landmarks": face.get("landmarks"), "attributes": None, }) if faces: face_data["frames"][str(frame_num)] = { "frame_number": frame_num, "time_seconds": frame_info.get("timestamp", frame_num / self.fps), "time_formatted": f"{frame_num / self.fps:.1f}s", "faces": faces, } if len(face_data["frames"]) % 100 == 0: elapsed = time.time() - t0 print(f"[FACE_V2] {len(face_data['frames'])} frames, {embed_count} embeddings, {elapsed:.0f}s") if self.publisher: pct = int(len(face_data["frames"]) * 100 / max(len(frames), 1)) if pct > last_pct: last_pct = pct self.publisher.progress("face", len(face_data["frames"]), len(frames), f"{embed_count} faces", embed_count, "faces") self.video.release() # Finalize face_data["metadata"]["status"] = "completed" face_data["metadata"]["total_embeddings"] = embed_count face_data["metadata"]["embedder"] = "coreml_facenet" # Convert dict frames to list for Rust FaceResult format frames_list = [] for fnum_str, fdata in sorted(face_data["frames"].items(), key=lambda x: int(x[0])): frames_list.append({ "frame": int(fnum_str), "timestamp": fdata["time_seconds"], "faces": fdata["faces"], }) output = { "frame_count": len(frames_list), "fps": self.fps, "frames": frames_list, } with open(self.output_path, "w") as f: json.dump(output, f, indent=2, ensure_ascii=False) elapsed = time.time() - t0 print(f"[FACE_V2] Done: {len(frames_list)} frames, {embed_count} embeddings, {elapsed:.0f}s") def main(): parser = argparse.ArgumentParser(description="Apple Vision Face Processor V2") parser.add_argument("video_path", help="Video file path") parser.add_argument("output_path", help="Output JSON path") parser.add_argument("--uuid", "-u", default="") parser.add_argument("--sample-interval", type=int, default=3) parser.add_argument("--force", action="store_true") args = parser.parse_args() publisher = RedisPublisher(args.uuid) if args.uuid else None if publisher: publisher.info("face", "FACE_START") if args.force and os.path.exists(args.output_path): os.remove(args.output_path) processor = FaceProcessorVision( args.video_path, args.output_path, args.uuid, args.sample_interval, publisher ) # Step 1: Vision detection (bbox + pose via ANE) try: detection = processor.process_with_swift() except Exception as e: if publisher: publisher.error("face", f"Detection failed: {e}") raise # Step 2: CoreML embedding + save try: processor.embed_and_save(detection) except Exception as e: if publisher: publisher.error("face", f"Embedding failed: {e}") raise if publisher: publisher.complete("face", f"{len(detection.get('frames',[]))} frames") # Clean up temp detection file swift_out = args.output_path.replace(".json", "_detect.json") if os.path.exists(swift_out): os.remove(swift_out) if __name__ == "__main__": main()