Files
momentry_core/scripts/face_registration.py
Warren e75c4d6f07 cleanup: remove dead code and duplicate docs
- Remove session-ses_2f27.md (161KB raw session log)
- Remove 49 ROOT_* duplicate files across REFERENCE/
- Remove 14 duplicate files between REFERENCE/ root and history/
- Remove asr_legacy.rs (dead code, replaced by asr.rs)
- Remove src/core/worker/ (duplicate JobWorker)
- Remove src/core/layers/ (empty directory)
- Remove 4 .bak files in src/
- Remove 7 dead private methods in worker/processor.rs
- Remove backup directory from git tracking
2026-05-04 01:31:21 +08:00

372 lines
12 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Face Registration Script
Register new faces to the face database
"""
import sys
import json
import argparse
import os
import time
from typing import Dict, Any, Optional
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
class FaceRegistration:
def __init__(self):
self.face_model = None
self.face_database = {}
self.database_path = None
def load_models(self, use_mps: bool = False):
"""Load InsightFace models with MPS support"""
try:
import insightface
from insightface.app import FaceAnalysis
# Determine execution providers based on configuration
providers = ["CPUExecutionProvider"]
if use_mps:
try:
# Try to import MPS provider
import onnxruntime as ort
available_providers = ort.get_available_providers()
if "CoreMLExecutionProvider" in available_providers:
print(
"[INFO] Using CoreMLExecutionProvider for MPS acceleration"
)
providers = ["CoreMLExecutionProvider", "CPUExecutionProvider"]
elif "CUDAExecutionProvider" in available_providers:
print("[INFO] Using CUDAExecutionProvider")
providers = ["CUDAExecutionProvider", "CPUExecutionProvider"]
else:
print("[INFO] MPS/CUDA not available, using CPU")
providers = ["CPUExecutionProvider"]
except ImportError:
print("[WARNING] ONNX Runtime not available, using CPU")
providers = ["CPUExecutionProvider"]
print(f"[INFO] Using execution providers: {providers}")
# Initialize face analysis app
self.face_model = FaceAnalysis(
name="buffalo_l", # or 'buffalo_s' for smaller model
providers=providers,
)
# For MPS/CoreML, we need to adjust context
ctx_id = -1 # Default for CPU
if use_mps and "CoreMLExecutionProvider" in providers:
ctx_id = 0 # CoreML uses device 0
self.face_model.prepare(ctx_id=ctx_id, det_size=(640, 640))
print("[INFO] InsightFace models loaded successfully")
return True
except ImportError as e:
print(f"[ERROR] Failed to import InsightFace: {e}")
print("[INFO] Install with: pip install insightface")
return False
except Exception as e:
print(f"[ERROR] Failed to load models: {e}")
return False
def load_database(self, database_path: str):
"""Load existing face database"""
self.database_path = database_path
if os.path.exists(database_path):
try:
with open(database_path, "r") as f:
self.face_database = json.load(f)
print(f"[INFO] Loaded {len(self.face_database)} faces from database")
except Exception as e:
print(f"[WARNING] Failed to load database: {e}")
self.face_database = {}
else:
print("[INFO] Creating new face database")
self.face_database = {}
def save_database(self):
"""Save face database to file"""
if not self.database_path:
print("[ERROR] No database path specified")
return False
try:
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(self.database_path), exist_ok=True)
with open(self.database_path, "w") as f:
json.dump(self.face_database, f, indent=2)
print(f"[INFO] Saved {len(self.face_database)} faces to database")
return True
except Exception as e:
print(f"[ERROR] Failed to save database: {e}")
return False
def register_face(
self, image_path: str, name: str, metadata: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Register a new face from image"""
# Check if image exists
if not os.path.exists(image_path):
return {
"success": False,
"message": f"Image not found: {image_path}",
"face_id": None,
"embedding": None,
"attributes": None,
}
# Load models if not already loaded
if self.face_model is None:
if not self.load_models():
return {
"success": False,
"message": "Failed to load face models",
"face_id": None,
"embedding": None,
"attributes": None,
}
# Load image
try:
import cv2
image = cv2.imread(image_path)
if image is None:
return {
"success": False,
"message": f"Failed to load image: {image_path}",
"face_id": None,
"embedding": None,
"attributes": None,
}
except ImportError:
return {
"success": False,
"message": "OpenCV not installed",
"face_id": None,
"embedding": None,
"attributes": None,
}
# Detect faces
try:
faces = self.face_model.get(image)
if len(faces) == 0:
return {
"success": False,
"message": "No faces detected in image",
"face_id": None,
"embedding": None,
"attributes": None,
}
if len(faces) > 1:
print("[WARNING] Multiple faces detected, using the first one")
# Use the first face
face = faces[0]
# Get embedding
embedding = face.embedding.tolist() if hasattr(face, "embedding") else None
if embedding is None:
return {
"success": False,
"message": "Failed to extract face embedding",
"face_id": None,
"embedding": None,
"attributes": None,
}
# Get attributes
attributes = {}
if hasattr(face, "age") and face.age is not None:
attributes["age"] = int(face.age)
if hasattr(face, "gender") and face.gender is not None:
attributes["gender"] = "female" if face.gender == 0 else "male"
# Get pose if available
pose = None
if hasattr(face, "pose") and face.pose is not None:
pose = {
"yaw": float(face.pose[0]),
"pitch": float(face.pose[1]),
"roll": float(face.pose[2]),
}
# Generate face ID
import uuid
face_id = str(uuid.uuid4())
# Create face record
face_record = {
"face_id": face_id,
"name": name,
"embedding": embedding,
"attributes": {
"age": attributes.get("age"),
"gender": attributes.get("gender"),
"emotion": None,
"glasses": None,
"mask": None,
"pose": pose,
}
if any([attributes.get("age"), attributes.get("gender"), pose])
else None,
"metadata": metadata or {},
"registration_time": time.time(),
"image_path": image_path,
}
# Add to database
self.face_database[face_id] = face_record
# Save database
if not self.save_database():
return {
"success": False,
"message": "Failed to save face to database",
"face_id": face_id,
"embedding": embedding,
"attributes": face_record["attributes"],
}
return {
"success": True,
"message": f"Face registered successfully as '{name}'",
"face_id": face_id,
"embedding": embedding,
"attributes": face_record["attributes"],
}
except Exception as e:
return {
"success": False,
"message": f"Face registration failed: {str(e)}",
"face_id": None,
"embedding": None,
"attributes": None,
}
def list_faces(self) -> Dict[str, Any]:
"""List all registered faces"""
faces = []
for face_id, face_data in self.face_database.items():
faces.append(
{
"face_id": face_id,
"name": face_data.get("name", "Unknown"),
"registration_time": face_data.get("registration_time"),
"metadata": face_data.get("metadata", {}),
}
)
return {
"success": True,
"message": f"Found {len(faces)} registered faces",
"faces": faces,
}
def delete_face(self, face_id: str) -> Dict[str, Any]:
"""Delete a face from database"""
if face_id not in self.face_database:
return {"success": False, "message": f"Face ID not found: {face_id}"}
# Remove from database
deleted_face = self.face_database.pop(face_id)
# Save database
if not self.save_database():
# Try to restore
self.face_database[face_id] = deleted_face
return {
"success": False,
"message": "Failed to save database after deletion",
}
return {
"success": True,
"message": f"Face '{deleted_face.get('name', 'Unknown')}' deleted successfully",
}
def main():
parser = argparse.ArgumentParser(description="Face Registration Tool")
parser.add_argument("image_path", help="Path to face image")
parser.add_argument("output_path", help="Output JSON path")
parser.add_argument("name", help="Name for the registered face")
parser.add_argument(
"--metadata", "-m", help="Path to metadata JSON file", default=""
)
parser.add_argument(
"--database", "-d", help="Path to face database", default="face_database.json"
)
parser.add_argument(
"--list", "-l", help="List registered faces", action="store_true"
)
parser.add_argument("--delete", help="Delete face by ID", default="")
args = parser.parse_args()
# Initialize registration
registration = FaceRegistration()
# Load database
registration.load_database(args.database)
# Handle list command
if args.list:
result = registration.list_faces()
with open(args.output_path, "w") as f:
json.dump(result, f, indent=2)
print(result["message"])
return
# Handle delete command
if args.delete:
result = registration.delete_face(args.delete)
with open(args.output_path, "w") as f:
json.dump(result, f, indent=2)
print(result["message"])
return
# Load metadata if provided
metadata = {}
if args.metadata and os.path.exists(args.metadata):
try:
with open(args.metadata, "r") as f:
metadata = json.load(f)
except Exception as e:
print(f"[WARNING] Failed to load metadata: {e}")
# Register face
result = registration.register_face(
image_path=args.image_path, name=args.name, metadata=metadata
)
# Save result
with open(args.output_path, "w") as f:
json.dump(result, f, indent=2)
print(result["message"])
if __name__ == "__main__":
main()