#!/opt/homebrew/bin/python3.11 """ Face Processor - Face Detection & Demographics with Resume Support Uses InsightFace for detection, age, gender, and embedding extraction. IMPORTANT: InsightFace is REQUIRED. No Haar fallback. - InsightFace provides 512-dim ArcFace embedding for identity matching - Haar Cascade cannot generate embedding, only detection - If InsightFace fails, processor will ERROR and exit Resume Feature: - Auto-detect existing results and resume from last frame - Auto-save at configurable intervals (default: 30 seconds) - Ctrl+C gracefully saves and exits """ import sys import json import argparse import os import time sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from resume_framework import ResumeFramework, format_time, print_progress from utils.pose_analyzer import calculate_pose_angle_v2 def process_face( video_path: str, output_path: str, uuid: str = "", auto_save_interval: int = 30, auto_save_frames: int = 300, force_restart: bool = False, sample_interval: int = 30, ): """Process video for face detection and demographics analysis with resume support""" framework = ResumeFramework( output_path=output_path, processor_name="face", uuid=uuid, auto_save_interval=auto_save_interval, auto_save_frames=auto_save_frames, force_restart=force_restart, ) framework.publish_info("FACE_START") try: import cv2 import numpy as np import insightface except ImportError as e: error_msg = f"Missing dependency: {e.name}" framework.publish_error(error_msg) result = { "metadata": {"status": "error", "error": error_msg}, "frames": {}, } with open(output_path, "w") as f: json.dump(result, f, indent=2) return result app = None coreml_embedder = None # 載入 CoreML FaceNet(必要,無 fallback) try: import coremltools as ct coreml_path = os.path.join( os.path.dirname(os.path.abspath(__file__)), "../models/facenet512.mlpackage" ) if not os.path.exists(coreml_path): raise FileNotFoundError(f"CoreML model not found at {coreml_path}") coreml_embedder = ct.models.MLModel(coreml_path) framework.publish_info("COREML_FACENET_LOADED") except Exception as e: error_msg = f"CoreML FaceNet512 load failed: {e}" print(f"[FACE] {error_msg}") framework.publish_error(error_msg) result = {"metadata": {"status": "error", "error": error_msg}, "frames": {}} with open(output_path, "w") as f: json.dump(result, f, indent=2) return result try: framework.publish_info("LOADING_INSIGHTFACE") app = insightface.app.FaceAnalysis( name="buffalo_l", providers=["CPUExecutionProvider"] ) app.prepare(ctx_id=0, det_size=(320, 320)) framework.publish_info("INSIGHTFACE_LOADED") except Exception as e: print(f"[FACE] InsightFace failed to load (REQUIRED): {e}") error_msg = f"InsightFace failed to load (REQUIRED): {e}" framework.publish_error(error_msg) result = { "metadata": {"status": "error", "error": error_msg}, "frames": {}, } with open(output_path, "w") as f: json.dump(result, f, indent=2) return result framework.publish_info("PROCESSING_VIDEO") cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"Error: Cannot open video: {video_path}") return {"metadata": {"status": "error"}, "frames": {}} fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) total_duration = total_frames / fps if fps > 0 else 0 cap.release() framework.publish_info(f"fps={fps}, frames={total_frames}") existing_data, last_checkpoint = framework.load_existing_data() resume_mode = existing_data is not None and last_checkpoint > 0 and not force_restart if resume_mode: print(f"\nFound existing data: {output_path}") print(f"Last processed frame: {last_checkpoint}") print(f"Will resume from frame {last_checkpoint + 1}") if resume_mode and existing_data: face_data = existing_data frame_count = last_checkpoint processed_frames = set(int(k) for k in existing_data.get("frames", {}).keys()) cap = cv2.VideoCapture(video_path) cap.set(cv2.CAP_PROP_POS_FRAMES, frame_count) else: face_data = { "metadata": framework.init_metadata( video_path=video_path, fps=fps, width=width, height=height, total_frames=total_frames, total_duration=total_duration, extra={ "sample_interval": sample_interval, "detection_method": "insightface", }, ), "frames": {}, } frame_count = 0 processed_frames = set() cap = cv2.VideoCapture(video_path) framework.set_data(face_data) start_time = time.time() framework.last_save_time = start_time print(f"\nProcessing video: {total_frames} frames @ {fps:.2f} fps") print(f"Auto-save every {auto_save_interval}s or {auto_save_frames} frames") print(f"Resume from frame {frame_count + 1 if resume_mode else 1}") print("Detection method: InsightFace (REQUIRED)") print() while True: ret, frame = cap.read() if not ret: break frame_count += 1 current_time = (frame_count - 1) / fps if fps > 0 else 0 if frame_count in processed_frames: continue if frame_count % sample_interval != 0: continue face_list = [] try: faces = app.get(frame) for face in faces: bbox = face.bbox.astype(int) bx, by, bw, bh = ( bbox[0], bbox[1], bbox[2] - bbox[0], bbox[3] - bbox[1], ) age = int(face.age) if hasattr(face, "age") else None gender_val = face.gender if hasattr(face, "gender") else None gender = ( "female" if gender_val == 0 else ("male" if gender_val == 1 else None) ) embedding = None if coreml_embedder is not None: # 使用 CoreML FaceNet(MIT license, ANE 加速) try: # InsightFace 的 bbox 是 [x1, y1, x2, y2] 在原始解析度 # 但 frame 可能已被 cv2 讀取為原始解析度 h_orig, w_orig = frame.shape[:2] x1 = max(0, min(int(bbox[0]), w_orig - 1)) y1 = max(0, min(int(bbox[1]), h_orig - 1)) x2 = max(x1 + 10, min(int(bbox[2]), w_orig)) y2 = max(y1 + 10, min(int(bbox[3]), h_orig)) if x2 - x1 >= 20 and y2 - y1 >= 20: crop = frame[y1:y2, x1:x2] crop_rgb = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB) crop_resized = cv2.resize(crop_rgb, (160, 160)) crop_float = crop_resized.astype(np.float32) / 255.0 crop_std = (crop_float - 0.5) / 0.5 crop_input = np.transpose(crop_std, (2, 0, 1))[np.newaxis, ...] coreml_out = coreml_embedder.predict({"input": crop_input}) emb_key = [k for k in coreml_out.keys() if k.startswith("var_")][0] embedding = coreml_out[emb_key].flatten().tolist() except Exception as e: print(f"[FACE] CoreML embedding error for face at ({x1},{y1}): {e}") landmarks = None if hasattr(face, "kps"): landmarks = face.kps.tolist() elif hasattr(face, "landmark_3d_68"): landmarks = face.landmark_3d_68.tolist() pose_angle = None if landmarks and len(landmarks) >= 5: try: pose_result = calculate_pose_angle_v2(landmarks) pose_angle = { "angle": pose_result.get("angle", "unknown"), "confidence": pose_result.get("confidence", 0.0), "pitch": pose_result.get("pitch", "neutral"), "features": pose_result.get("features", {}), } except Exception: pass face_list.append( { "x": int(bx), "y": int(by), "width": int(bw), "height": int(bh), "confidence": float(face.det_score) if hasattr(face, "det_score") else 0.9, "embedding": embedding, "landmarks": landmarks, "pose_angle": pose_angle, "attributes": {"age": age, "gender": gender}, } ) except Exception as e: print(f"[ERROR] Frame processing error: {e}") if face_list: face_data["frames"][str(frame_count)] = { "frame_number": frame_count, "time_seconds": round(current_time, 3), "time_formatted": format_time(current_time), "faces": face_list, } processed_frames.add(frame_count) if frame_count % 500 == 0: elapsed = time.time() - start_time print_progress(frame_count, total_frames, elapsed, f"{len(face_list)} faces") framework.publish_progress(frame_count, total_frames, f"frame {frame_count}") if framework.should_auto_save(frame_count): framework.save_progress(frame_count, silent=True) cap.release() total_processed = len(processed_frames) embedder_name = "coreml_facenet" if coreml_embedder is not None else "insightface" framework.finalize( total_processed=total_processed, extra_metadata={ "sample_interval": sample_interval, "detection_method": "insightface", "embedding_method": embedder_name, }, ) print(f"\nFace detection completed: {total_processed} frames processed") print(f"Frames with faces: {len(face_data['frames'])}") return face_data def _convert_to_face_result(face_data: dict) -> dict: """Convert ResumeFramework output to FaceResult format expected by Rust.""" metadata = face_data.get("metadata", {}) raw_frames = face_data.get("frames", {}) fps = metadata.get("fps", 30.0) frames = [] for frame_key in sorted(raw_frames.keys(), key=lambda k: int(k)): f = raw_frames[frame_key] faces = [] for raw_face in f.get("faces", []): pose = raw_face.get("pose_angle") attributes = raw_face.get("attributes", {}) face = { "face_id": None, "x": raw_face["x"], "y": raw_face["y"], "width": raw_face["width"], "height": raw_face["height"], "confidence": raw_face.get("confidence", 0.0), "embedding": raw_face.get("embedding"), "landmarks": raw_face.get("landmarks"), "attributes": { "age": attributes.get("age") if attributes else None, "gender": attributes.get("gender") if attributes else None, }, } faces.append(face) frames.append({ "frame": f["frame_number"], "timestamp": f["time_seconds"], "faces": faces, }) return { "frame_count": len(frames), "fps": fps, "frames": frames, } if __name__ == "__main__": parser = argparse.ArgumentParser(description="Face Detection & Demographics with Resume Support") parser.add_argument("video_path", help="Path to video file") parser.add_argument("output_path", help="Output JSON path") parser.add_argument("--uuid", "-u", help="UUID for Redis progress", default="") parser.add_argument( "--auto-save-interval", "-a", help="Auto-save interval in seconds", type=int, default=30, ) parser.add_argument( "--auto-save-frames", "-f", help="Auto-save interval in frames", type=int, default=300, ) parser.add_argument( "--force-restart", "-r", help="Force restart (ignore existing data)", action="store_true", ) parser.add_argument( "--sample-interval", "-s", help="Frame sample interval", type=int, default=5, ) args = parser.parse_args() result = process_face( args.video_path, args.output_path, args.uuid, args.auto_save_interval, args.auto_save_frames, args.force_restart, args.sample_interval, ) face_result = _convert_to_face_result(result) with open(args.output_path, "w") as f: json.dump(face_result, f, indent=2)