#!/opt/homebrew/bin/python3.11 """ Face Clustering Processor 職責:將短暫的 Face ID 聚合為持續的 Person ID,並自動綁定 Speaker。 """ import cv2 import json import numpy as np import os import sys import psycopg2 from sklearn.cluster import AgglomerativeClustering sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) try: from deepface import DeepFace HAS_DEEPFACE = True except ImportError: print("❌ DeepFace not found. Run: pip install deepface") sys.exit(1) # 設定 UUID = os.getenv("UUID", "quick_preview") OUTPUT_DIR = os.getenv("MOMENTRY_OUTPUT_DIR", "./output") VIDEO_PATH = os.path.join(OUTPUT_DIR, UUID, f"{UUID}.mp4") FACE_JSON_PATH = os.path.join(OUTPUT_DIR, UUID, f"{UUID}.face.json") OUTPUT_JSON_PATH = os.path.join(OUTPUT_DIR, UUID, f"{UUID}.face_clustered.json") ASRX_JSON_PATH = os.path.join(OUTPUT_DIR, UUID, f"{UUID}.asrx.json") DB_URL = os.getenv("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry") def optimized_clustering(embeddings): """ Optimized Clustering for large datasets (e.g. 25k faces). Strategy: Sample -> Agglomerative -> Centroid Assignment """ import numpy as np from sklearn.cluster import AgglomerativeClustering from sklearn.metrics.pairwise import cosine_distances n_faces = len(embeddings) print(f" 🚀 Starting optimized clustering for {n_faces} faces...") # 1. Sampling sample_size = min(5000, n_faces) if n_faces > sample_size: indices = np.random.choice(n_faces, sample_size, replace=False) sample_embeddings = embeddings[indices] else: sample_embeddings = embeddings indices = np.arange(n_faces) print(f" 📊 Sampling {len(sample_embeddings)} faces for clustering structure...") # 2. Agglomerative Clustering on Sample clustering = AgglomerativeClustering( n_clusters=None, distance_threshold=0.4, metric="cosine", linkage="average" ) sample_labels = clustering.fit_predict(sample_embeddings) unique_labels = set(sample_labels) n_clusters = len(unique_labels) print(f" 🔍 Found {n_clusters} unique clusters in sample.") # 3. Compute Centroids for each cluster centroids = [] for label in unique_labels: cluster_mask = sample_labels == label cluster_faces = sample_embeddings[cluster_mask] # Mean embedding centroid = np.mean(cluster_faces, axis=0) centroids.append(centroid) centroids = np.array(centroids) # Shape: (n_clusters, 512) # 4. Assign all faces to nearest centroid # Batch processing to save memory print(f" 🏃 Assigning {n_faces} faces to {n_clusters} clusters...") all_labels = np.zeros(n_faces, dtype=int) batch_size = 5000 for start in range(0, n_faces, batch_size): end = min(start + batch_size, n_faces) batch = embeddings[start:end] dists = cosine_distances(batch, centroids) all_labels[start:end] = np.argmin(dists, axis=1) return all_labels def main(): if not os.path.exists(FACE_JSON_PATH): print("❌ Face JSON not found.") return with open(FACE_JSON_PATH) as f: face_data = json.load(f) frames_list = face_data.get("frames", []) if not frames_list: print("❌ No frames in JSON.") return cap = cv2.VideoCapture(VIDEO_PATH) embeddings = [] face_refs = [] print(f"🔍 Extracting face embeddings from {UUID}...") for frame_idx, frame_obj in enumerate(frames_list): ts = frame_obj.get("timestamp") faces = frame_obj.get("faces", []) if not faces: continue if ts is not None: cap.set(cv2.CAP_PROP_POS_MSEC, ts * 1000) ret, frame = cap.read() if not ret: continue for face_idx, face in enumerate(faces): x, y, w, h = face["x"], face["y"], face["width"], face["height"] margin = 5 crop = frame[ max(0, y - margin) : y + h + margin, max(0, x - margin) : x + w + margin ] if crop is None or crop.size == 0: continue try: res = DeepFace.represent( img_path=crop, model_name="ArcFace", enforce_detection=False ) if res and "embedding" in res[0]: embeddings.append(res[0]["embedding"]) face_refs.append({"frame_idx": frame_idx, "face_idx": face_idx}) except Exception: pass cap.release() if not embeddings: print("❌ No embeddings extracted.") return embeddings = np.array(embeddings) print(f"✅ Extracted {len(embeddings)} face embeddings.") # 2. 聚類 print(f"🧠 Clustering {len(embeddings)} faces...") clustering = AgglomerativeClustering( n_clusters=None, distance_threshold=0.4, metric="cosine", linkage="average" ) labels = clustering.fit_predict(embeddings) unique_labels = set(labels) label_to_person = {l: f"Person_{i}" for i, l in enumerate(unique_labels)} print( f"👥 Detected {len(unique_labels)} unique persons: {[label_to_person[l] for l in unique_labels]}" ) # 3. 更新 JSON for ref, label in zip(face_refs, labels): f_idx = ref["frame_idx"] face_idx = ref["face_idx"] person_id = label_to_person[label] if f_idx < len(frames_list): faces = frames_list[f_idx].get("faces", []) if face_idx < len(faces): frames_list[f_idx]["faces"][face_idx]["person_id"] = person_id # 保存 with open(OUTPUT_JSON_PATH, "w", encoding="utf-8") as f: json.dump(face_data, f, indent=2, ensure_ascii=False) print(f"✅ Saved clustered data to {OUTPUT_JSON_PATH}") # 4. 自動綁定 Speaker auto_bind_speakers() def auto_bind_speakers(): if not os.path.exists(OUTPUT_JSON_PATH) or not os.path.exists(ASRX_JSON_PATH): print("⚠️ Missing data for speaker binding.") return with open(OUTPUT_JSON_PATH) as f: face_clustered = json.load(f) with open(ASRX_JSON_PATH) as f: asrx_data = json.load(f) print("🔗 Auto-binding Speakers to Persons...") # 建立 Face 時間列表 face_spans = [] for frame_obj in face_clustered.get("frames", []): ts = frame_obj.get("timestamp") for face in frame_obj.get("faces", []): person_id = face.get("person_id") if person_id and ts is not None: face_spans.append({"ts": ts, "person_id": person_id}) speaker_person_counts = {} # 對於每個說話片段,找出畫面中出現的人 for seg in asrx_data.get("segments", []): start = seg.get("start") end = seg.get("end") speaker = seg.get("speaker_id") if not speaker: continue # 找時間重疊 candidates = [f for f in face_spans if start <= f["ts"] <= end] if candidates: # 投票 person_counts = {} for c in candidates: pid = c["person_id"] person_counts[pid] = person_counts.get(pid, 0) + 1 if speaker not in speaker_person_counts: speaker_person_counts[speaker] = {} best_person = max(person_counts, key=person_counts.get) speaker_person_counts[speaker][best_person] = ( speaker_person_counts[speaker].get(best_person, 0) + 1 ) # 寫入資料庫 try: conn = psycopg2.connect(DB_URL) cur = conn.cursor() for speaker, persons in speaker_person_counts.items(): if not persons: continue best_person = max(persons, key=persons.get) print( f" 🎤 {speaker} is likely {best_person} ({persons[best_person]} votes)" ) # 1. 找或建 Talent cur.execute("SELECT id FROM talents WHERE real_name = %s", (best_person,)) row = cur.fetchone() if row: talent_id = row[0] else: cur.execute( "INSERT INTO talents (real_name) VALUES (%s) RETURNING id", (best_person,), ) talent_id = cur.fetchone()[0] print(f" ✨ Created Talent #{talent_id} ({best_person})") # 2. 綁定 Speaker cur.execute( """ INSERT INTO identity_bindings (talent_id, binding_type, binding_value, source, confidence) VALUES (%s, 'speaker', %s, 'auto_cluster', 0.8) ON CONFLICT (binding_type, binding_value) DO UPDATE SET talent_id = EXCLUDED.talent_id """, (talent_id, speaker), ) print(f" ✅ Bound {speaker} -> {best_person}") conn.commit() cur.close() conn.close() except Exception as e: print(f" ❌ DB Error: {e}") if __name__ == "__main__": main()