#!/opt/homebrew/bin/python3.11 """ Face Recognition Processor Integrates InsightFace for face detection, recognition, and tracking Supports: face detection, face recognition, face tracking, face clustering """ import sys import json import argparse import os import numpy as np from typing import List, Dict, Any, Optional, Tuple sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from redis_publisher import RedisPublisher class FaceRecognitionProcessor: def __init__( self, enable_recognition: bool = True, enable_tracking: bool = True, enable_clustering: bool = True, ): self.enable_recognition = enable_recognition self.enable_tracking = enable_tracking self.enable_clustering = enable_clustering self.face_model = None self.face_database = {} self.face_tracker = None self.face_clusters = {} self.embedding_dim = 512 # InsightFace default embedding dimension def load_models(self, use_mps: bool = False): """Load InsightFace models with MPS support""" try: import insightface from insightface.app import FaceAnalysis # Determine execution providers based on configuration providers = ["CPUExecutionProvider"] if use_mps: try: # Try to import MPS provider import onnxruntime as ort available_providers = ort.get_available_providers() if "CoreMLExecutionProvider" in available_providers: print( "[INFO] Using CoreMLExecutionProvider for MPS acceleration" ) providers = ["CoreMLExecutionProvider", "CPUExecutionProvider"] elif "CUDAExecutionProvider" in available_providers: print("[INFO] Using CUDAExecutionProvider") providers = ["CUDAExecutionProvider", "CPUExecutionProvider"] else: print("[INFO] MPS/CUDA not available, using CPU") providers = ["CPUExecutionProvider"] except ImportError: print("[WARNING] ONNX Runtime not available, using CPU") providers = ["CPUExecutionProvider"] print(f"[INFO] Using execution providers: {providers}") # Initialize face analysis app self.face_model = FaceAnalysis( name="buffalo_l", # or 'buffalo_s' for smaller model providers=providers, ) # For MPS/CoreML, we need to adjust context ctx_id = -1 # Default for CPU if use_mps and "CoreMLExecutionProvider" in providers: ctx_id = 0 # CoreML uses device 0 self.face_model.prepare(ctx_id=ctx_id, det_size=(640, 640)) print("[INFO] InsightFace models loaded successfully") return True except ImportError as e: print(f"[ERROR] Failed to import InsightFace: {e}") print("[INFO] Install with: pip install insightface") return False except Exception as e: print(f"[ERROR] Failed to load models: {e}") return False except Exception as e: print(f"[ERROR] Failed to load models: {e}") return False def load_face_database(self, database_path: Optional[str] = None): """Load face database from file""" if database_path and os.path.exists(database_path): try: with open(database_path, "r") as f: self.face_database = json.load(f) print(f"[INFO] Loaded {len(self.face_database)} faces from database") except Exception as e: print(f"[WARNING] Failed to load face database: {e}") self.face_database = {} else: print("[INFO] No face database provided, starting with empty database") self.face_database = {} def detect_faces(self, image: np.ndarray) -> List[Dict[str, Any]]: """Detect faces in image using InsightFace""" if self.face_model is None: return [] try: faces = self.face_model.get(image) results = [] for face in faces: # Get bounding box bbox = face.bbox.astype(int) x, y, x2, y2 = bbox width = x2 - x height = y2 - y # Get embedding embedding = ( face.embedding.tolist() if hasattr(face, "embedding") else None ) # Get attributes attributes = {} if hasattr(face, "age") and face.age is not None: attributes["age"] = int(face.age) if hasattr(face, "gender") and face.gender is not None: attributes["gender"] = "female" if face.gender == 0 else "male" # Get pose if available pose = None if hasattr(face, "pose") and face.pose is not None: pose = { "yaw": float(face.pose[0]), "pitch": float(face.pose[1]), "roll": float(face.pose[2]), } # Create face detection result face_result = { "x": int(x), "y": int(y), "width": int(width), "height": int(height), "confidence": float(face.det_score) if hasattr(face, "det_score") else 0.8, "embedding": embedding, "attributes": { "age": attributes.get("age"), "gender": attributes.get("gender"), "emotion": None, # InsightFace doesn't provide emotion "glasses": None, "mask": None, "pose": pose, } if any([attributes.get("age"), attributes.get("gender"), pose]) else None, "identity": None, # Will be filled by recognition step } results.append(face_result) return results except Exception as e: print(f"[ERROR] Face detection failed: {e}") return [] def recognize_faces( self, faces: List[Dict[str, Any]], threshold: float = 0.6 ) -> List[Dict[str, Any]]: """Recognize faces by comparing with database""" if not self.enable_recognition or not faces: return faces recognized_faces = [] for face in faces: if face.get("embedding") is None: face["identity"] = None recognized_faces.append(face) continue embedding = np.array(face["embedding"]) best_match = None best_similarity = 0.0 # Compare with all faces in database for face_id, db_face in self.face_database.items(): if "embedding" not in db_face: continue db_embedding = np.array(db_face["embedding"]) similarity = self.cosine_similarity(embedding, db_embedding) if similarity > best_similarity and similarity >= threshold: best_similarity = similarity best_match = { "name": db_face.get("name", "Unknown"), "confidence": float(similarity), "database_id": face_id, "metadata": db_face.get("metadata", {}), } if best_match: face["identity"] = best_match else: face["identity"] = None recognized_faces.append(face) return recognized_faces def track_faces(self, frames: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Track faces across frames using simple IoU tracking""" if not self.enable_tracking or not frames: return frames tracked_frames = [] face_tracks = {} # face_id -> track info next_face_id = 1 for frame_idx, frame in enumerate(frames): tracked_faces = [] for face in frame.get("faces", []): # Calculate IoU with existing tracks best_track_id = None best_iou = 0.3 # IoU threshold for track_id, track in face_tracks.items(): if frame_idx - track["last_frame"] > 10: # Skip old tracks continue iou = self.calculate_iou(face, track["last_bbox"]) if iou > best_iou: best_iou = iou best_track_id = track_id if best_track_id is not None: # Update existing track face["face_id"] = f"face_{best_track_id}" face_tracks[best_track_id]["last_bbox"] = ( face["x"], face["y"], face["width"], face["height"], ) face_tracks[best_track_id]["last_frame"] = frame_idx else: # Create new track face["face_id"] = f"face_{next_face_id}" face_tracks[next_face_id] = { "last_bbox": ( face["x"], face["y"], face["width"], face["height"], ), "last_frame": frame_idx, } next_face_id += 1 tracked_faces.append(face) tracked_frame = frame.copy() tracked_frame["faces"] = tracked_faces tracked_frames.append(tracked_frame) return tracked_frames def cluster_faces( self, frames: List[Dict[str, Any]] ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: """Cluster faces using DBSCAN algorithm""" if not self.enable_clustering: return frames, {} try: from sklearn.cluster import DBSCAN from sklearn.preprocessing import StandardScaler # Collect all face embeddings embeddings = [] face_info = [] for frame in frames: for face in frame.get("faces", []): if face.get("embedding") and face.get("face_id"): embeddings.append(face["embedding"]) face_info.append( { "face_id": face["face_id"], "frame_idx": frame["frame"], "bbox": ( face["x"], face["y"], face["width"], face["height"], ), } ) if len(embeddings) < 2: return frames, {} # Normalize embeddings scaler = StandardScaler() embeddings_scaled = scaler.fit_transform(embeddings) # Apply DBSCAN clustering dbscan = DBSCAN(eps=0.5, min_samples=2, metric="euclidean") clusters = dbscan.fit_predict(embeddings_scaled) # Create cluster information cluster_info = {} for idx, cluster_id in enumerate(clusters): if cluster_id == -1: # Noise continue cluster_key = f"cluster_{cluster_id}" if cluster_key not in cluster_info: cluster_info[cluster_key] = { "face_ids": [], "embeddings": [], "size": 0, } cluster_info[cluster_key]["face_ids"].append(face_info[idx]["face_id"]) cluster_info[cluster_key]["embeddings"].append(embeddings[idx]) cluster_info[cluster_key]["size"] += 1 # Calculate centroids for cluster_key, info in cluster_info.items(): if info["embeddings"]: centroid = np.mean(info["embeddings"], axis=0).tolist() info["centroid"] = centroid # Find representative face (closest to centroid) distances = [ np.linalg.norm(np.array(emb) - np.array(centroid)) for emb in info["embeddings"] ] rep_idx = np.argmin(distances) info["representative_face_id"] = info["face_ids"][rep_idx] return frames, cluster_info except ImportError: print("[WARNING] scikit-learn not installed, skipping clustering") return frames, {} except Exception as e: print(f"[ERROR] Clustering failed: {e}") return frames, {} def process_video( self, video_path: str, output_path: str, uuid: str = "", use_mps: bool = False ) -> Dict[str, Any]: """Process video for face recognition with MPS support""" publisher = RedisPublisher(uuid) if uuid else None if publisher: publisher.info("face_recognition", "FACE_RECOGNITION_START") # Check if OpenCV is available try: import cv2 except ImportError: if publisher: publisher.error("face_recognition", "opencv-python not installed") return self.create_empty_result() # Load InsightFace models with MPS support if publisher: publisher.info("face_recognition", "LOADING_MODELS") if not self.load_models(use_mps=use_mps): if publisher: publisher.error("face_recognition", "Failed to load InsightFace models") return self.create_empty_result() if publisher: publisher.info("face_recognition", "MODELS_LOADED") # Get video info cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) cap.release() if publisher: publisher.info("face_recognition", f"fps={fps}, frames={total_frames}") publisher.progress("face_recognition", 0, total_frames, "Starting") # Process every N frames to speed up sample_interval = 30 # Process every 30 frames frames = [] frame_count = 0 processed = 0 cap = cv2.VideoCapture(video_path) while True: ret, frame = cap.read() if not ret: break frame_count += 1 # Sample frames if frame_count % sample_interval != 0: continue processed += 1 timestamp = (frame_count - 1) / fps if fps > 0 else 0 # Detect faces faces = self.detect_faces(frame) # Recognize faces if enabled if self.enable_recognition: faces = self.recognize_faces(faces) # Create frame result frame_result = { "frame": frame_count - 1, "timestamp": round(timestamp, 3), "faces": faces, } frames.append(frame_result) if publisher: publisher.progress( "face_recognition", processed, total_frames // sample_interval, f"Frame {frame_count}", ) cap.release() # Track faces if enabled if self.enable_tracking: frames = self.track_faces(frames) # Cluster faces if enabled cluster_info = {} if self.enable_clustering: frames, cluster_info = self.cluster_faces(frames) # Extract recognized faces information recognized_faces = self.extract_recognized_faces(frames) # Prepare final result result = { "frame_count": total_frames, "fps": fps, "frames": frames, "recognized_faces": recognized_faces, "face_clusters": self.format_clusters(cluster_info), } if publisher: publisher.complete( "face_recognition", f"{len(frames)} frames, {len(recognized_faces)} recognized faces", ) # Save result with open(output_path, "w") as f: json.dump(result, f, indent=2) return result def extract_recognized_faces( self, frames: List[Dict[str, Any]] ) -> List[Dict[str, Any]]: """Extract unique recognized faces from frames""" face_info = {} for frame in frames: for face in frame.get("faces", []): face_id = face.get("face_id") if not face_id: continue if face_id not in face_info: face_info[face_id] = { "face_id": face_id, "embedding": face.get("embedding"), "first_seen": frame["timestamp"], "last_seen": frame["timestamp"], "total_appearances": 1, "attributes": face.get("attributes"), "identities": [], "cluster_id": None, } else: face_info[face_id]["last_seen"] = frame["timestamp"] face_info[face_id]["total_appearances"] += 1 # Add identity if recognized if face.get("identity"): identity = face["identity"] # Check if this identity is already recorded existing = False for existing_id in face_info[face_id]["identities"]: if existing_id.get("database_id") == identity.get( "database_id" ): existing = True break if not existing: face_info[face_id]["identities"].append(identity) return list(face_info.values()) def format_clusters(self, cluster_info: Dict[str, Any]) -> List[Dict[str, Any]]: """Format cluster information for output""" clusters = [] for cluster_id, info in cluster_info.items(): cluster = { "cluster_id": cluster_id, "face_ids": info.get("face_ids", []), "centroid": info.get("centroid", []), "size": info.get("size", 0), "representative_face_id": info.get("representative_face_id"), "metadata": {}, } clusters.append(cluster) return clusters def create_empty_result(self) -> Dict[str, Any]: """Create empty result structure""" return { "frame_count": 0, "fps": 0.0, "frames": [], "recognized_faces": [], "face_clusters": [], } @staticmethod def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float: """Calculate cosine similarity between two vectors""" dot_product = np.dot(a, b) norm_a = np.linalg.norm(a) norm_b = np.linalg.norm(b) if norm_a == 0 or norm_b == 0: return 0.0 return dot_product / (norm_a * norm_b) @staticmethod def calculate_iou(face1: Dict[str, Any], bbox2: Tuple[int, int, int, int]) -> float: """Calculate Intersection over Union between two bounding boxes""" x1, y1, w1, h1 = face1["x"], face1["y"], face1["width"], face1["height"] x2, y2, w2, h2 = bbox2 # Calculate intersection coordinates x_left = max(x1, x2) y_top = max(y1, y2) x_right = min(x1 + w1, x2 + w2) y_bottom = min(y1 + h1, y2 + h2) if x_right < x_left or y_bottom < y_top: return 0.0 intersection_area = (x_right - x_left) * (y_bottom - y_top) area1 = w1 * h1 area2 = w2 * h2 union_area = area1 + area2 - intersection_area return intersection_area / union_area if union_area > 0 else 0.0 def main(): parser = argparse.ArgumentParser( description="Face Recognition Processor with MPS support" ) parser.add_argument("video_path", help="Path to video file") parser.add_argument("output_path", help="Output JSON path") parser.add_argument( "enable_recognition", help="Enable face recognition (0/1)", default="1" ) parser.add_argument( "enable_tracking", help="Enable face tracking (0/1)", default="1" ) parser.add_argument( "enable_clustering", help="Enable face clustering (0/1)", default="1" ) parser.add_argument("--uuid", "-u", help="UUID for Redis progress", default="") parser.add_argument( "--database", "-d", help="Path to face database JSON file", default="" ) parser.add_argument( "--use-mps", "-m", help="Use MPS acceleration (Apple Silicon)", action="store_true", default=False, ) args = parser.parse_args() # Create processor processor = FaceRecognitionProcessor( enable_recognition=args.enable_recognition == "1", enable_tracking=args.enable_tracking == "1", enable_clustering=args.enable_clustering == "1", ) # Load face database if provided if args.database: processor.load_face_database(args.database) # Process video with MPS support result = processor.process_video( video_path=args.video_path, output_path=args.output_path, uuid=args.uuid, use_mps=args.use_mps, ) print(f"[INFO] Processing complete: {len(result['frames'])} frames processed") print(f"[INFO] Recognized faces: {len(result['recognized_faces'])}") print(f"[INFO] Face clusters: {len(result['face_clusters'])}") if __name__ == "__main__": main()