#!/opt/homebrew/bin/python3.11 """ Face Identity Matching with 1-to-many Reference Vectors Purpose: 1. Implement 1-to-many matching algorithms 2. Support multiple strategies (Best Match, Voting, Weighted, Combined) 3. Match detected face to Identity in database Usage: python3 scripts/match_face_identity.py --identity-name "Preview Test Person" --face-json output/preview.face_new.json """ import json import argparse import numpy as np import psycopg2 import os DATABASE_URL = os.getenv("DATABASE_URL", "postgres://accusys@localhost:5432/momentry?options=-c%20search_path=dev") def cosine_similarity(a, b): """Calculate cosine similarity between two vectors""" a = np.array(a, dtype=np.float64) b = np.array(b, dtype=np.float64) norm_a = np.linalg.norm(a) norm_b = np.linalg.norm(b) if norm_a == 0 or norm_b == 0: return 0.0 return np.dot(a, b) / (norm_a * norm_b) def strategy_best_match(detected_embedding, reference_embeddings, threshold=0.85): """ Strategy 1: Best Match Take the highest similarity among all reference vectors Pros: Fast, simple Cons: May miss if detected face is from different angle """ similarities = [ cosine_similarity(detected_embedding, ref["embedding"]) for ref in reference_embeddings ] best_sim = max(similarities) best_idx = np.argmax(similarities) return { "strategy": "best_match", "best_similarity": best_sim, "best_reference_idx": best_idx, "is_match": best_sim >= threshold, "threshold": threshold, } def strategy_voting(detected_embedding, reference_embeddings, threshold=0.85): """ Strategy 2: Voting Mechanism Count how many reference vectors exceed threshold Pros: More robust Cons: Requires more reference vectors """ similarities = [ cosine_similarity(detected_embedding, ref["embedding"]) for ref in reference_embeddings ] votes = sum(1 for sim in similarities if sim >= threshold) vote_ratio = votes / len(similarities) # At least 50% of reference vectors should match is_match = vote_ratio >= 0.5 return { "strategy": "voting", "votes": votes, "total_references": len(similarities), "vote_ratio": vote_ratio, "is_match": is_match, "threshold": threshold, "similarities": similarities, } def strategy_weighted(detected_embedding, reference_embeddings, threshold=0.85): """ Strategy 3: Weighted Average Weight similarity by quality score Pros: Accounts for reference vector quality Cons: Requires quality scores """ similarities = [ cosine_similarity(detected_embedding, ref["embedding"]) for ref in reference_embeddings ] weights = [ ref.get("quality_score", 1.0) for ref in reference_embeddings ] weighted_sim = sum(sim * w for sim, w in zip(similarities, weights)) / sum(weights) return { "strategy": "weighted", "weighted_similarity": weighted_sim, "is_match": weighted_sim >= threshold, "threshold": threshold, "weights": weights, } def strategy_combined(detected_embedding, reference_embeddings, threshold=0.85, weights=None): """ Strategy 4: Combined Scoring Combine Best Match + Voting + Weighted Formula (optimized): final_score = best_match * 0.7 + vote_ratio * 0.2 + weighted_sim * 0.1 Pros: Most robust, prioritizes best_match Cons: More computation Args: weights: dict with keys 'best_match', 'vote_ratio', 'weighted_sim' default: {'best_match': 0.7, 'vote_ratio': 0.2, 'weighted_sim': 0.1} """ if weights is None: weights = {'best_match': 0.7, 'vote_ratio': 0.2, 'weighted_sim': 0.1} best_result = strategy_best_match(detected_embedding, reference_embeddings, threshold) voting_result = strategy_voting(detected_embedding, reference_embeddings, threshold) weighted_result = strategy_weighted(detected_embedding, reference_embeddings, threshold) final_score = ( best_result["best_similarity"] * weights['best_match'] + voting_result["vote_ratio"] * weights['vote_ratio'] + weighted_result["weighted_similarity"] * weights['weighted_sim'] ) return { "strategy": "combined", "best_match": best_result["best_similarity"], "vote_ratio": voting_result["vote_ratio"], "weighted_sim": weighted_result["weighted_similarity"], "final_score": final_score, "is_match": final_score >= threshold, "threshold": threshold, "weights": weights, "details": { "best_match": best_result, "voting": voting_result, "weighted": weighted_result, } } def match_face_to_identity( detected_embedding: list, identity_uuid: str, strategy: str = "combined", threshold: float = 0.85, schema: str = "dev", weights: dict = None, ): """Match detected face embedding to Identity in database Args: weights: dict for combined strategy, e.g., {'best_match': 0.7, 'vote_ratio': 0.2, 'weighted_sim': 0.1} """ conn = psycopg2.connect(DATABASE_URL) cur = conn.cursor() try: # Get Identity reference_data cur.execute(f""" SELECT name, identity_type, reference_data, face_embedding FROM {schema}.identities WHERE uuid = %s; """, (identity_uuid,)) result = cur.fetchone() if not result: print(f"❌ Identity not found: {identity_uuid}") return None name, identity_type, reference_data_json, centroid_embedding = result # Parse reference_data reference_data = json.loads(reference_data_json) if isinstance(reference_data_json, str) else reference_data_json face_embeddings = reference_data.get("face_embeddings", []) if not face_embeddings: print(f"⚠️ No reference embeddings for Identity: {name}") return None # Normalize detected embedding detected_norm = np.linalg.norm(detected_embedding) if detected_norm > 0: detected_normalized = (np.array(detected_embedding) / detected_norm).tolist() else: detected_normalized = detected_embedding # Choose matching strategy if strategy == "best_match": match_result = strategy_best_match(detected_normalized, face_embeddings, threshold) elif strategy == "voting": match_result = strategy_voting(detected_normalized, face_embeddings, threshold) elif strategy == "weighted": match_result = strategy_weighted(detected_normalized, face_embeddings, threshold) else: match_result = strategy_combined(detected_normalized, face_embeddings, threshold, weights) match_result["identity_name"] = name match_result["identity_uuid"] = identity_uuid match_result["identity_type"] = identity_type match_result["reference_count"] = len(face_embeddings) return match_result except Exception as e: print(f"❌ Matching error: {e}") return None finally: cur.close() conn.close() def batch_match_faces(face_json_path, identity_uuid, strategy="combined", threshold=0.85, schema="dev", weights=None): """Batch match all faces in face.json to Identity Args: weights: dict for combined strategy """ with open(face_json_path) as f: data = json.load(f) frames = data.get("frames", {}) results = [] for frame_key, frame_data in frames.items(): faces = frame_data.get("faces", []) for i, face in enumerate(faces): embedding = face.get("embedding") if not embedding: continue match_result = match_face_to_identity( detected_embedding=embedding, identity_uuid=identity_uuid, strategy=strategy, threshold=threshold, schema=schema, weights=weights, ) if match_result: match_result["frame"] = frame_key match_result["face_index"] = i match_result["detected_confidence"] = face.get("confidence", 0.9) results.append(match_result) return results def analyze_match_results(results): """Analyze batch match results""" print("\n=== Match Results Analysis ===") print(f"Total faces matched: {len(results)}") # Strategy comparison if results: is_match_count = sum(1 for r in results if r["is_match"]) match_ratio = is_match_count / len(results) print(f"Match ratio: {match_ratio:.2%} ({is_match_count}/{len(results)})") # Score distribution final_scores = [r.get("final_score", r.get("best_similarity", r.get("weighted_similarity", 0))) for r in results] print(f"Scores: min={min(final_scores):.2f}, max={max(final_scores):.2f}, avg={np.mean(final_scores):.2f}") # Print detailed results (first 5) print("\n=== Top 5 Match Details ===") sorted_results = sorted(results, key=lambda x: x.get("final_score", x.get("best_similarity", 0)), reverse=True) for i, r in enumerate(sorted_results[:5]): print(f"\nMatch {i+1}: Frame {r['frame']}, Face {r['face_index']}") print(f" Strategy: {r['strategy']}") print(f" Identity: {r['identity_name']}") print(f" Final Score: {r.get('final_score', r.get('best_similarity', 0)):.4f}") print(f" Is Match: {r['is_match']}") if r['strategy'] == 'combined': print(" Details:") print(f" Best Match: {r['best_match']:.4f}") print(f" Vote Ratio: {r['vote_ratio']:.2%}") print(f" Weighted Sim: {r['weighted_sim']:.4f}") def main(): parser = argparse.ArgumentParser(description="Match Face to Identity") parser.add_argument("--identity-uuid", help="Identity UUID to match against") parser.add_argument("--identity-name", help="Identity name (will query UUID)") parser.add_argument("--face-json", required=True, help="Path to face.json") parser.add_argument("--strategy", default="combined", choices=["best_match", "voting", "weighted", "combined"]) parser.add_argument("--threshold", type=float, default=0.85, help="Match threshold") parser.add_argument("--schema", default="dev", help="Database schema") parser.add_argument("--batch", action="store_true", help="Batch match all faces") parser.add_argument("--weights", type=str, default="0.7,0.2,0.1", help="Weights for combined strategy (best_match,vote_ratio,weighted_sim)") args = parser.parse_args() # Parse weights weights = None if args.strategy == "combined": w_parts = args.weights.split(",") if len(w_parts) == 3: weights = { 'best_match': float(w_parts[0]), 'vote_ratio': float(w_parts[1]), 'weighted_sim': float(w_parts[2]), } print("=" * 60) print("Face Identity Matching (1-to-many)") print("=" * 60) # Get Identity UUID identity_uuid = args.identity_uuid if not identity_uuid and args.identity_name: conn = psycopg2.connect(DATABASE_URL) cur = conn.cursor() try: cur.execute(f""" SELECT uuid FROM {args.schema}.identities WHERE name = %s; """, (args.identity_name,)) result = cur.fetchone() if result: identity_uuid = result[0] print(f"✅ Found Identity: {args.identity_name} (UUID: {identity_uuid})") else: print(f"❌ Identity not found: {args.identity_name}") return finally: cur.close() conn.close() if not identity_uuid: print("❌ Please provide --identity-uuid or --identity-name") return print(f"\nStrategy: {args.strategy}") print(f"Threshold: {args.threshold}") if weights: print(f"Weights: best_match={weights['best_match']}, vote_ratio={weights['vote_ratio']}, weighted_sim={weights['weighted_sim']}") # Batch match if args.batch: print(f"\n🔧 Batch matching from: {args.face_json}") results = batch_match_faces( face_json_path=args.face_json, identity_uuid=identity_uuid, strategy=args.strategy, threshold=args.threshold, schema=args.schema, weights=weights, ) analyze_match_results(results) else: # Single match (first face in face.json) with open(args.face_json) as f: data = json.load(f) frames = data.get("frames", {}) first_frame = list(frames.values())[0] first_face = first_frame["faces"][0] embedding = first_face.get("embedding") if not embedding: print("❌ No embedding in first face") return print("\n🔧 Matching first face...") match_result = match_face_to_identity( detected_embedding=embedding, identity_uuid=identity_uuid, strategy=args.strategy, threshold=args.threshold, schema=args.schema, weights=weights, ) if match_result: print("\n✅ Match Result:") print(f" Identity: {match_result['identity_name']}") print(f" Strategy: {match_result['strategy']}") print(f" Is Match: {match_result['is_match']}") if match_result['strategy'] == 'combined': print(f" Final Score: {match_result['final_score']:.4f}") print(f" Best Match: {match_result['best_match']:.4f}") print(f" Vote Ratio: {match_result['vote_ratio']:.2%}") print(f" Weighted Sim: {match_result['weighted_sim']:.4f}") if __name__ == "__main__": main()