#!/opt/homebrew/bin/python3.11 """ Process Swift face detection output + add CoreML FaceNet embeddings. Replaces face_processor.py Step 2 when Swift already ran. """ import sys, os, json, argparse, time import cv2 import numpy as np import coremltools as ct from pathlib import Path SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) FACENET_PATH = os.path.join(SCRIPT_DIR, "..", "models", "facenet512.mlpackage") def classify_pose(roll, yaw): abs_yaw = abs(yaw) abs_roll = abs(roll) if abs_yaw < 15 and abs_roll < 15: return "frontal" elif abs_yaw > 30: return "profile_right" if yaw > 0 else "profile_left" else: return "three_quarter" def main(): parser = argparse.ArgumentParser() parser.add_argument("--swift-json", required=True, help="Swift detection output") parser.add_argument("--video", required=True, help="Video file path") parser.add_argument("--output", required=True, help="Output face.json path") parser.add_argument("--fps", type=float, default=24.0) args = parser.parse_args() print(f"[EMBED] Loading Swift output: {args.swift_json}") with open(args.swift_json) as f: swift = json.load(f) swift_frames = swift.get("frames", []) print(f"[EMBED] Swift frames: {len(swift_frames)}") # Load CoreML FaceNet facenet = os.path.normpath(FACENET_PATH) coreml_model = None if os.path.exists(facenet): coreml_model = ct.models.MLModel(facenet) print(f"[EMBED] FaceNet loaded") else: print(f"[EMBED] WARNING: FaceNet not found at {facenet}") # Open video video = cv2.VideoCapture(args.video) if not video.isOpened(): raise RuntimeError(f"Cannot open {args.video}") v_fps = video.get(cv2.CAP_PROP_FPS) v_total = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) v_width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH)) v_height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)) print(f"[EMBED] Video: {v_width}x{v_height}, {v_fps:.1f}fps") # Sequential read optimization: build lookup set needed_frames = set() frame_data_map = {} for sf in swift_frames: fn = int(sf.get("frame", sf.get("frame_number", 0))) needed_frames.add(fn) frame_data_map[fn] = sf output_frames = [] embed_count = 0 t0 = time.time() current_frame = 0 while True: ret, frame = video.read() if not ret: break if current_frame not in needed_frames: current_frame += 1 continue sf = frame_data_map[current_frame] timestamp = sf.get("timestamp", current_frame / v_fps) faces_in = sf.get("faces", []) processed_faces = [] for face in faces_in: bb = face.get("bbox", {}) x, y, w, h = bb.get("x", 0), bb.get("y", 0), bb.get("width", 0), bb.get("height", 0) if w <= 10 or h <= 10: continue x1, y1 = max(0, x), max(0, y) x2, y2 = min(v_width, x + w), min(v_height, y + h) if x2 <= x1 or y2 <= y1: continue face_img = frame[y1:y2, x1:x2] if face_img.size == 0: continue emb = None if coreml_model is not None and face_img.shape[0] > 0 and face_img.shape[1] > 0: try: resized = cv2.resize(face_img, (160, 160)) rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB).astype(np.float32) normalized = rgb / 127.5 - 1.0 input_data = np.expand_dims(np.transpose(normalized, (2, 0, 1)), axis=0) result = coreml_model.predict({"input": input_data}) emb = list(result.values())[0].flatten().tolist() embed_count += 1 except Exception as e: pass # Pose pose_info = face.get("pose", {}) pose_angle = classify_pose(pose_info.get("roll", 0), pose_info.get("yaw", 0)) processed_faces.append({ "x": x, "y": y, "width": w, "height": h, "confidence": face.get("confidence", 0.5), "embedding": emb, "pose_angle": { "angle": pose_angle, "roll": pose_info.get("roll", 0), "yaw": pose_info.get("yaw", 0), "pitch": pose_info.get("pitch", 0), }, "lips": face.get("lips"), "landmarks": face.get("landmarks"), "attributes": None, }) if processed_faces: output_frames.append({ "frame": current_frame, "timestamp": timestamp, "faces": processed_faces, }) current_frame += 1 if len(output_frames) % 500 == 0: print(f"[EMBED] {len(output_frames)}/{len(needed_frames)} frames, {embed_count} embeddings, {time.time()-t0:.0f}s") video.release() output = { "frame_count": len(output_frames), "fps": v_fps, "frames": output_frames, } os.makedirs(os.path.dirname(args.output), exist_ok=True) with open(args.output, "w") as f: json.dump(output, f, indent=2, ensure_ascii=False) elapsed = time.time() - t0 print(f"[EMBED] Done: {len(output_frames)} frames, {embed_count} embeddings, {elapsed:.0f}s → {args.output}") if __name__ == "__main__": main()