#!/opt/homebrew/bin/python3.11 """ Voice Embedding Extractor 職責:從視頻音軌提取 Speaker ID 的聲紋向量 (192-dim) 並存入資料庫。 依賴:SpeechBrain, Librosa, Psycopg2 """ import sys import os import json import torch import librosa import numpy as np import psycopg2 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) # 引入 SpeechBrain (需確保環境已安裝) try: from speechbrain.inference.speaker import EncoderClassifier HAS_SPEECHBRAIN = True except ImportError: HAS_SPEECHBRAIN = False print("[Warning] SpeechBrain not found. Install via: pip install speechbrain") DB_URL = os.getenv("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry") OUTPUT_DIR = os.getenv("MOMENTRY_OUTPUT_DIR", "./output") def get_db_connection(): return psycopg2.connect(DB_URL) def extract_speaker_embeddings(uuid: str, video_path: str): """ 提取指定視頻中所有 Speaker 的聲紋向量 """ if not HAS_SPEECHBRAIN: return {} # 1. 加載 ASRX 數據以獲取時間軸 asrx_path = os.path.join(OUTPUT_DIR, f"{uuid}.asrx.json") if not os.path.exists(asrx_path): print(f" [Skip] No ASRX data for {uuid}") return {} with open(asrx_path, "r") as f: asrx_data = json.load(f) segments = asrx_data.get("segments", []) if not segments: return {} # 2. 加載聲紋模型 (ECAPA-TDNN) # 注意:首次運行會下載模型 (~50MB) print(" [Model] Loading SpeechBrain EncoderClassifier...") try: classifier = EncoderClassifier.from_hparams( source="speechbrain/spkrec-ecapa-voxceleb", savedir="pretrained_models/spkrec-ecapa-voxceleb", run_opts={"device": "cpu"}, # Use CPU to avoid device_type bug ) except Exception as e: print(f" [Error] Failed to load model: {e}") return {} # 3. 加載音頻 print(f" [Audio] Loading audio for {uuid}...") audio, sr = librosa.load(video_path, sr=16000, mono=True) # 優化:濾除背景雜訊 (Bandpass Filter 300Hz-3400Hz) # 保留人聲頻率,去除低頻嗡嗡聲與高頻雜音,避免干擾聲紋識別 try: from scipy import signal nyquist = 0.5 * sr low = 300.0 / nyquist high = 3400.0 / nyquist b, a = signal.butter(4, [low, high], btype="band") audio = signal.lfilter(b, a, audio) print(" [Filter] ✅ 已套用濾波器:去除背景雜訊 (300Hz-3400Hz)") except Exception as e: print(f" [Warning] ⚠️ 濾波失敗 (可能缺少 scipy): {e}") # 按 Speaker ID 分組 speaker_samples = {} for seg in segments: sid = seg.get("speaker_id") if not sid: continue start = seg.get("start", 0.0) end = seg.get("end", 0.0) # 截取音頻片段 start_sample = int(start * sr) end_sample = int(end * sr) # 過濾過短的片段 (< 1s) 以保證向量質量 if (end_sample - start_sample) < sr: continue segment_audio = audio[start_sample:end_sample] if sid not in speaker_samples: speaker_samples[sid] = [] speaker_samples[sid].append(segment_audio) # 4. 計算每個 Speaker 的 Embedding (取平均) speaker_embeddings = {} for sid, samples in speaker_samples.items(): print(f" [Embedding] Processing {sid} ({len(samples)} segments)...") embeddings = [] for sample in samples: # SpeechBrain 需要 Tensor: (1, samples) waveform = torch.tensor(sample).unsqueeze(0).to(classifier.device) # 提取特徵 embedding = ( classifier.encode_batch(waveform).squeeze(0).squeeze(0).cpu().numpy() ) embeddings.append(embedding) # 平均池化 if embeddings: avg_embedding = np.mean(embeddings, axis=0) # 轉換為 List[float] 供 JSON/DB 使用 speaker_embeddings[sid] = avg_embedding.tolist() return speaker_embeddings def save_embeddings_to_db(uuid: str, embeddings: dict): """ 將提取的聲紋向量存入資料庫 """ if not embeddings: return conn = get_db_connection() cur = conn.cursor() # 確保 identity_bindings 表中有對應的 Speaker ID (即使還沒綁定 Talent) # 這裡我們主要更新或創建與該 Speaker ID 對應的記錄 # 策略: # 1. 檢查是否有現行的 Talent 已經綁定了這個 Speaker ID。 # 2. 如果有,更新該 Talent 的 voice_embedding。 # 3. 如果沒有,創建一個名為 "Unknown_Speaker_X" 的新 Talent 並綁定,存入向量。 for sid, vector in embeddings.items(): # 查找是否已綁定 cur.execute( """ SELECT t.id FROM talents t JOIN identity_bindings b ON t.id = b.talent_id WHERE b.binding_type = 'speaker' AND b.binding_value = %s """, (sid,), ) row = cur.fetchone() if row: talent_id = row[0] # 更新向量 cur.execute( """ UPDATE talents SET voice_embedding = %s WHERE id = %s """, (vector, talent_id), ) print( f" [DB] Updated embedding for bound Speaker {sid} (Talent #{talent_id})" ) else: # 創建新 Talent # 使用 ON CONFLICT 確保不會重複創建同名 cur.execute( """ INSERT INTO talents (real_name, voice_embedding) VALUES (%s, %s) ON CONFLICT (real_name) DO UPDATE SET voice_embedding = EXCLUDED.voice_embedding RETURNING id """, (f"Speaker_{sid}", vector), ) talent_id = cur.fetchone()[0] # 綁定關係 cur.execute( """ INSERT INTO identity_bindings (talent_id, binding_type, binding_value, source, confidence) VALUES (%s, 'speaker', %s, 'auto_extracted', 0.9) ON CONFLICT (binding_type, binding_value) DO NOTHING """, (talent_id, sid), ) print( f" [DB] Created new Talent 'Speaker_{sid}' (#{talent_id}) with embedding" ) conn.commit() cur.close() conn.close() def main(): import argparse parser = argparse.ArgumentParser(description="Extract Speaker Embeddings") parser.add_argument("--uuid", required=True, help="Video UUID") parser.add_argument("--video-path", required=True, help="Path to video file") args = parser.parse_args() if not os.path.exists(args.video_path): print(f"Error: Video file not found at {args.video_path}") sys.exit(1) print(f"Starting Voice Embedding Extraction for {args.uuid}") # 1. 提取 embeddings = extract_speaker_embeddings(args.uuid, args.video_path) # 2. 入庫 save_embeddings_to_db(args.uuid, embeddings) print("Done.") if __name__ == "__main__": main()