#!/opt/homebrew/bin/python3.11 """Embed faces from existing detections JSON using CoreML FaceNet.""" import json, os, sys, time import cv2 import numpy as np from pathlib import Path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) import coremltools as ct FACENET_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "models", "facenet512.mlpackage") def classify_pose(roll: float, yaw: float) -> str: abs_yaw, abs_roll = abs(yaw), abs(roll) if abs_yaw < 15 and abs_roll < 15: return "frontal" elif abs_yaw > 30: return "profile_right" if yaw > 0 else "profile_left" return "three_quarter" def extract_embedding(coreml_model, face_img): resized = cv2.resize(face_img, (160, 160)) normalized = (resized.astype(np.float32) / 127.5) - 1.0 normalized = np.transpose(normalized, (2, 0, 1)) input_array = np.expand_dims(normalized, axis=0) result = coreml_model.predict({"input": input_array}) emb_key = [k for k in result.keys() if k.startswith("var_")][0] return result[emb_key].flatten().tolist() def main(): import argparse parser = argparse.ArgumentParser(description="Embed faces only") parser.add_argument("detections_json") parser.add_argument("output_json") parser.add_argument("--video", required=True) args = parser.parse_args() print(f"[EMBED] Loading detections: {args.detections_json}") with open(args.detections_json) as f: detection_data = json.load(f) print(f"[EMBED] Loading CoreML FaceNet: {FACENET_PATH}") coreml_model = ct.models.MLModel(FACENET_PATH) print(f"[EMBED] Opening video: {args.video}") video = cv2.VideoCapture(args.video) fps = video.get(cv2.CAP_PROP_FPS) total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)) face_data = { "metadata": { "video_path": os.path.abspath(args.video), "fps": fps, "width": width, "height": height, "sample_interval": detection_data.get("sample_interval", 3), "detection_method": "apple_vision", "embedding_method": "coreml_facenet", "total_frames": total_frames, }, "frames": {} } frames = detection_data.get("frames", []) t0 = time.time() embed_count, total_face_count = 0, 0 batch_size = max(1, len(frames) // 20) for idx, frame_info in enumerate(frames): frame_num = frame_info["frame"] faces = [] for face in frame_info.get("faces", []): total_face_count += 1 bb = face.get("bbox", face) x, y, w, h = bb["x"], bb["y"], bb["width"], bb["height"] if w <= 10 or h <= 10: continue video.set(cv2.CAP_PROP_POS_FRAMES, frame_num) ret, frame = video.read() if not ret: continue x1, y1 = max(0, x), max(0, y) x2, y2 = min(width, x + w), min(height, y + h) if x2 <= x1 or y2 <= y1: continue face_img = frame[y1:y2, x1:x2] if face_img.size == 0: continue emb = extract_embedding(coreml_model, face_img) if emb is not None: embed_count += 1 pose_info = face.get("pose", {}) pose_angle = classify_pose( pose_info.get("roll", 0), pose_info.get("yaw", 0) ) faces.append({ "x": x, "y": y, "width": w, "height": h, "confidence": face.get("confidence", 0.5), "embedding": emb, "pose_angle": { "angle": pose_angle, "roll": pose_info.get("roll", 0), "yaw": pose_info.get("yaw", 0), "pitch": pose_info.get("pitch", 0), }, "landmarks": face.get("landmarks", []), }) face_data["frames"][str(frame_num)] = faces if (idx + 1) % batch_size == 0: pct = (idx + 1) / len(frames) * 100 elapsed = time.time() - t0 eta = (elapsed / (idx + 1)) * (len(frames) - idx - 1) if idx > 0 else 0 print(f"[EMBED] {pct:.0f}% | {idx+1}/{len(frames)} frames | " f"{embed_count} embeddings | {elapsed:.0f}s elapsed | " f"{eta:.0f}s ETA", flush=True) video.release() face_data["metadata"]["status"] = "completed" print(f"[EMBED] Writing output: {args.output_json}") with open(args.output_json, "w") as f: json.dump(face_data, f, indent=2) elapsed = time.time() - t0 print(f"[EMBED] Done: {len(frames)} frames, {embed_count}/{total_face_count} embeddings, {elapsed:.0f}s") if __name__ == "__main__": main()