#!/opt/homebrew/bin/python3.11 """ Face Registration Script Register new faces to the face database """ import sys import json import argparse import os import time from typing import Dict, Any, Optional sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) class FaceRegistration: def __init__(self): self.face_model = None self.face_database = {} self.database_path = None def load_models(self, use_mps: bool = False): """Load InsightFace models with MPS support""" try: import insightface from insightface.app import FaceAnalysis # Determine execution providers based on configuration providers = ["CPUExecutionProvider"] if use_mps: try: # Try to import MPS provider import onnxruntime as ort available_providers = ort.get_available_providers() if "CoreMLExecutionProvider" in available_providers: print( "[INFO] Using CoreMLExecutionProvider for MPS acceleration" ) providers = ["CoreMLExecutionProvider", "CPUExecutionProvider"] elif "CUDAExecutionProvider" in available_providers: print("[INFO] Using CUDAExecutionProvider") providers = ["CUDAExecutionProvider", "CPUExecutionProvider"] else: print("[INFO] MPS/CUDA not available, using CPU") providers = ["CPUExecutionProvider"] except ImportError: print("[WARNING] ONNX Runtime not available, using CPU") providers = ["CPUExecutionProvider"] print(f"[INFO] Using execution providers: {providers}") # Initialize face analysis app self.face_model = FaceAnalysis( name="buffalo_l", # or 'buffalo_s' for smaller model providers=providers, ) # For MPS/CoreML, we need to adjust context ctx_id = -1 # Default for CPU if use_mps and "CoreMLExecutionProvider" in providers: ctx_id = 0 # CoreML uses device 0 self.face_model.prepare(ctx_id=ctx_id, det_size=(640, 640)) print("[INFO] InsightFace models loaded successfully") return True except ImportError as e: print(f"[ERROR] Failed to import InsightFace: {e}") print("[INFO] Install with: pip install insightface") return False except Exception as e: print(f"[ERROR] Failed to load models: {e}") return False def load_database(self, database_path: str): """Load existing face database""" self.database_path = database_path if os.path.exists(database_path): try: with open(database_path, "r") as f: self.face_database = json.load(f) print(f"[INFO] Loaded {len(self.face_database)} faces from database") except Exception as e: print(f"[WARNING] Failed to load database: {e}") self.face_database = {} else: print("[INFO] Creating new face database") self.face_database = {} def save_database(self): """Save face database to file""" if not self.database_path: print("[ERROR] No database path specified") return False try: # Create directory if it doesn't exist os.makedirs(os.path.dirname(self.database_path), exist_ok=True) with open(self.database_path, "w") as f: json.dump(self.face_database, f, indent=2) print(f"[INFO] Saved {len(self.face_database)} faces to database") return True except Exception as e: print(f"[ERROR] Failed to save database: {e}") return False def register_face( self, image_path: str, name: str, metadata: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Register a new face from image""" # Check if image exists if not os.path.exists(image_path): return { "success": False, "message": f"Image not found: {image_path}", "face_id": None, "embedding": None, "attributes": None, } # Load models if not already loaded if self.face_model is None: if not self.load_models(): return { "success": False, "message": "Failed to load face models", "face_id": None, "embedding": None, "attributes": None, } # Load image try: import cv2 image = cv2.imread(image_path) if image is None: return { "success": False, "message": f"Failed to load image: {image_path}", "face_id": None, "embedding": None, "attributes": None, } except ImportError: return { "success": False, "message": "OpenCV not installed", "face_id": None, "embedding": None, "attributes": None, } # Detect faces try: faces = self.face_model.get(image) if len(faces) == 0: return { "success": False, "message": "No faces detected in image", "face_id": None, "embedding": None, "attributes": None, } if len(faces) > 1: print("[WARNING] Multiple faces detected, using the first one") # Use the first face face = faces[0] # Get embedding embedding = face.embedding.tolist() if hasattr(face, "embedding") else None if embedding is None: return { "success": False, "message": "Failed to extract face embedding", "face_id": None, "embedding": None, "attributes": None, } # Get attributes attributes = {} if hasattr(face, "age") and face.age is not None: attributes["age"] = int(face.age) if hasattr(face, "gender") and face.gender is not None: attributes["gender"] = "female" if face.gender == 0 else "male" # Get pose if available pose = None if hasattr(face, "pose") and face.pose is not None: pose = { "yaw": float(face.pose[0]), "pitch": float(face.pose[1]), "roll": float(face.pose[2]), } # Generate face ID import uuid face_id = str(uuid.uuid4()) # Create face record face_record = { "face_id": face_id, "name": name, "embedding": embedding, "attributes": { "age": attributes.get("age"), "gender": attributes.get("gender"), "emotion": None, "glasses": None, "mask": None, "pose": pose, } if any([attributes.get("age"), attributes.get("gender"), pose]) else None, "metadata": metadata or {}, "registration_time": time.time(), "image_path": image_path, } # Add to database self.face_database[face_id] = face_record # Save database if not self.save_database(): return { "success": False, "message": "Failed to save face to database", "face_id": face_id, "embedding": embedding, "attributes": face_record["attributes"], } return { "success": True, "message": f"Face registered successfully as '{name}'", "face_id": face_id, "embedding": embedding, "attributes": face_record["attributes"], } except Exception as e: return { "success": False, "message": f"Face registration failed: {str(e)}", "face_id": None, "embedding": None, "attributes": None, } def list_faces(self) -> Dict[str, Any]: """List all registered faces""" faces = [] for face_id, face_data in self.face_database.items(): faces.append( { "face_id": face_id, "name": face_data.get("name", "Unknown"), "registration_time": face_data.get("registration_time"), "metadata": face_data.get("metadata", {}), } ) return { "success": True, "message": f"Found {len(faces)} registered faces", "faces": faces, } def delete_face(self, face_id: str) -> Dict[str, Any]: """Delete a face from database""" if face_id not in self.face_database: return {"success": False, "message": f"Face ID not found: {face_id}"} # Remove from database deleted_face = self.face_database.pop(face_id) # Save database if not self.save_database(): # Try to restore self.face_database[face_id] = deleted_face return { "success": False, "message": "Failed to save database after deletion", } return { "success": True, "message": f"Face '{deleted_face.get('name', 'Unknown')}' deleted successfully", } def main(): parser = argparse.ArgumentParser(description="Face Registration Tool") parser.add_argument("image_path", help="Path to face image") parser.add_argument("output_path", help="Output JSON path") parser.add_argument("name", help="Name for the registered face") parser.add_argument( "--metadata", "-m", help="Path to metadata JSON file", default="" ) parser.add_argument( "--database", "-d", help="Path to face database", default="face_database.json" ) parser.add_argument( "--list", "-l", help="List registered faces", action="store_true" ) parser.add_argument("--delete", help="Delete face by ID", default="") args = parser.parse_args() # Initialize registration registration = FaceRegistration() # Load database registration.load_database(args.database) # Handle list command if args.list: result = registration.list_faces() with open(args.output_path, "w") as f: json.dump(result, f, indent=2) print(result["message"]) return # Handle delete command if args.delete: result = registration.delete_face(args.delete) with open(args.output_path, "w") as f: json.dump(result, f, indent=2) print(result["message"]) return # Load metadata if provided metadata = {} if args.metadata and os.path.exists(args.metadata): try: with open(args.metadata, "r") as f: metadata = json.load(f) except Exception as e: print(f"[WARNING] Failed to load metadata: {e}") # Register face result = registration.register_face( image_path=args.image_path, name=args.name, metadata=metadata ) # Save result with open(args.output_path, "w") as f: json.dump(result, f, indent=2) print(result["message"]) if __name__ == "__main__": main()