- Remove session-ses_2f27.md (161KB raw session log) - Remove 49 ROOT_* duplicate files across REFERENCE/ - Remove 14 duplicate files between REFERENCE/ root and history/ - Remove asr_legacy.rs (dead code, replaced by asr.rs) - Remove src/core/worker/ (duplicate JobWorker) - Remove src/core/layers/ (empty directory) - Remove 4 .bak files in src/ - Remove 7 dead private methods in worker/processor.rs - Remove backup directory from git tracking
240 lines
7.1 KiB
Python
240 lines
7.1 KiB
Python
#!/opt/homebrew/bin/python3.11
|
||
"""
|
||
Voice Embedding Extractor
|
||
職責:從視頻音軌提取 Speaker ID 的聲紋向量 (192-dim) 並存入資料庫。
|
||
依賴:SpeechBrain, Librosa, Psycopg2
|
||
"""
|
||
|
||
import sys
|
||
import os
|
||
import json
|
||
import torch
|
||
import librosa
|
||
import numpy as np
|
||
import psycopg2
|
||
|
||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||
|
||
# 引入 SpeechBrain (需確保環境已安裝)
|
||
try:
|
||
from speechbrain.inference.speaker import EncoderClassifier
|
||
|
||
HAS_SPEECHBRAIN = True
|
||
except ImportError:
|
||
HAS_SPEECHBRAIN = False
|
||
print("[Warning] SpeechBrain not found. Install via: pip install speechbrain")
|
||
|
||
DB_URL = os.getenv("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry")
|
||
OUTPUT_DIR = os.getenv("MOMENTRY_OUTPUT_DIR", "./output")
|
||
|
||
|
||
def get_db_connection():
|
||
return psycopg2.connect(DB_URL)
|
||
|
||
|
||
def extract_speaker_embeddings(uuid: str, video_path: str):
|
||
"""
|
||
提取指定視頻中所有 Speaker 的聲紋向量
|
||
"""
|
||
if not HAS_SPEECHBRAIN:
|
||
return {}
|
||
|
||
# 1. 加載 ASRX 數據以獲取時間軸
|
||
asrx_path = os.path.join(OUTPUT_DIR, f"{uuid}.asrx.json")
|
||
if not os.path.exists(asrx_path):
|
||
print(f" [Skip] No ASRX data for {uuid}")
|
||
return {}
|
||
|
||
with open(asrx_path, "r") as f:
|
||
asrx_data = json.load(f)
|
||
|
||
segments = asrx_data.get("segments", [])
|
||
if not segments:
|
||
return {}
|
||
|
||
# 2. 加載聲紋模型 (ECAPA-TDNN)
|
||
# 注意:首次運行會下載模型 (~50MB)
|
||
print(" [Model] Loading SpeechBrain EncoderClassifier...")
|
||
try:
|
||
classifier = EncoderClassifier.from_hparams(
|
||
source="speechbrain/spkrec-ecapa-voxceleb",
|
||
savedir="pretrained_models/spkrec-ecapa-voxceleb",
|
||
run_opts={"device": "cpu"}, # Use CPU to avoid device_type bug
|
||
)
|
||
except Exception as e:
|
||
print(f" [Error] Failed to load model: {e}")
|
||
return {}
|
||
|
||
# 3. 加載音頻
|
||
print(f" [Audio] Loading audio for {uuid}...")
|
||
audio, sr = librosa.load(video_path, sr=16000, mono=True)
|
||
|
||
# 優化:濾除背景雜訊 (Bandpass Filter 300Hz-3400Hz)
|
||
# 保留人聲頻率,去除低頻嗡嗡聲與高頻雜音,避免干擾聲紋識別
|
||
try:
|
||
from scipy import signal
|
||
|
||
nyquist = 0.5 * sr
|
||
low = 300.0 / nyquist
|
||
high = 3400.0 / nyquist
|
||
b, a = signal.butter(4, [low, high], btype="band")
|
||
audio = signal.lfilter(b, a, audio)
|
||
print(" [Filter] ✅ 已套用濾波器:去除背景雜訊 (300Hz-3400Hz)")
|
||
except Exception as e:
|
||
print(f" [Warning] ⚠️ 濾波失敗 (可能缺少 scipy): {e}")
|
||
|
||
# 按 Speaker ID 分組
|
||
speaker_samples = {}
|
||
|
||
for seg in segments:
|
||
sid = seg.get("speaker_id")
|
||
if not sid:
|
||
continue
|
||
|
||
start = seg.get("start", 0.0)
|
||
end = seg.get("end", 0.0)
|
||
|
||
# 截取音頻片段
|
||
start_sample = int(start * sr)
|
||
end_sample = int(end * sr)
|
||
|
||
# 過濾過短的片段 (< 1s) 以保證向量質量
|
||
if (end_sample - start_sample) < sr:
|
||
continue
|
||
|
||
segment_audio = audio[start_sample:end_sample]
|
||
|
||
if sid not in speaker_samples:
|
||
speaker_samples[sid] = []
|
||
speaker_samples[sid].append(segment_audio)
|
||
|
||
# 4. 計算每個 Speaker 的 Embedding (取平均)
|
||
speaker_embeddings = {}
|
||
|
||
for sid, samples in speaker_samples.items():
|
||
print(f" [Embedding] Processing {sid} ({len(samples)} segments)...")
|
||
|
||
embeddings = []
|
||
for sample in samples:
|
||
# SpeechBrain 需要 Tensor: (1, samples)
|
||
waveform = torch.tensor(sample).unsqueeze(0).to(classifier.device)
|
||
|
||
# 提取特徵
|
||
embedding = (
|
||
classifier.encode_batch(waveform).squeeze(0).squeeze(0).cpu().numpy()
|
||
)
|
||
embeddings.append(embedding)
|
||
|
||
# 平均池化
|
||
if embeddings:
|
||
avg_embedding = np.mean(embeddings, axis=0)
|
||
# 轉換為 List[float] 供 JSON/DB 使用
|
||
speaker_embeddings[sid] = avg_embedding.tolist()
|
||
|
||
return speaker_embeddings
|
||
|
||
|
||
def save_embeddings_to_db(uuid: str, embeddings: dict):
|
||
"""
|
||
將提取的聲紋向量存入資料庫
|
||
"""
|
||
if not embeddings:
|
||
return
|
||
|
||
conn = get_db_connection()
|
||
cur = conn.cursor()
|
||
|
||
# 確保 identity_bindings 表中有對應的 Speaker ID (即使還沒綁定 Talent)
|
||
# 這裡我們主要更新或創建與該 Speaker ID 對應的記錄
|
||
|
||
# 策略:
|
||
# 1. 檢查是否有現行的 Talent 已經綁定了這個 Speaker ID。
|
||
# 2. 如果有,更新該 Talent 的 voice_embedding。
|
||
# 3. 如果沒有,創建一個名為 "Unknown_Speaker_X" 的新 Talent 並綁定,存入向量。
|
||
|
||
for sid, vector in embeddings.items():
|
||
# 查找是否已綁定
|
||
cur.execute(
|
||
"""
|
||
SELECT t.id FROM talents t
|
||
JOIN identity_bindings b ON t.id = b.talent_id
|
||
WHERE b.binding_type = 'speaker' AND b.binding_value = %s
|
||
""",
|
||
(sid,),
|
||
)
|
||
|
||
row = cur.fetchone()
|
||
|
||
if row:
|
||
talent_id = row[0]
|
||
# 更新向量
|
||
cur.execute(
|
||
"""
|
||
UPDATE talents SET voice_embedding = %s WHERE id = %s
|
||
""",
|
||
(vector, talent_id),
|
||
)
|
||
print(
|
||
f" [DB] Updated embedding for bound Speaker {sid} (Talent #{talent_id})"
|
||
)
|
||
else:
|
||
# 創建新 Talent
|
||
# 使用 ON CONFLICT 確保不會重複創建同名
|
||
cur.execute(
|
||
"""
|
||
INSERT INTO talents (real_name, voice_embedding)
|
||
VALUES (%s, %s)
|
||
ON CONFLICT (real_name) DO UPDATE SET voice_embedding = EXCLUDED.voice_embedding
|
||
RETURNING id
|
||
""",
|
||
(f"Speaker_{sid}", vector),
|
||
)
|
||
|
||
talent_id = cur.fetchone()[0]
|
||
|
||
# 綁定關係
|
||
cur.execute(
|
||
"""
|
||
INSERT INTO identity_bindings (talent_id, binding_type, binding_value, source, confidence)
|
||
VALUES (%s, 'speaker', %s, 'auto_extracted', 0.9)
|
||
ON CONFLICT (binding_type, binding_value) DO NOTHING
|
||
""",
|
||
(talent_id, sid),
|
||
)
|
||
|
||
print(
|
||
f" [DB] Created new Talent 'Speaker_{sid}' (#{talent_id}) with embedding"
|
||
)
|
||
|
||
conn.commit()
|
||
cur.close()
|
||
conn.close()
|
||
|
||
|
||
def main():
|
||
import argparse
|
||
|
||
parser = argparse.ArgumentParser(description="Extract Speaker Embeddings")
|
||
parser.add_argument("--uuid", required=True, help="Video UUID")
|
||
parser.add_argument("--video-path", required=True, help="Path to video file")
|
||
|
||
args = parser.parse_args()
|
||
|
||
if not os.path.exists(args.video_path):
|
||
print(f"Error: Video file not found at {args.video_path}")
|
||
sys.exit(1)
|
||
|
||
print(f"Starting Voice Embedding Extraction for {args.uuid}")
|
||
|
||
# 1. 提取
|
||
embeddings = extract_speaker_embeddings(args.uuid, args.video_path)
|
||
|
||
# 2. 入庫
|
||
save_embeddings_to_db(args.uuid, embeddings)
|
||
|
||
print("Done.")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|