#!/opt/homebrew/bin/python3.11 """ Speaker Clustering - 說話人聚類 使用譜聚類算法將聲紋嵌入分組 技術來源: - 譜聚類:Shi & Malik (2000), IEEE TPAMI - 論文:https://ieeexplore.ieee.org/document/868688 - 應用於說話人分離:Wooters & Huijbregts (2008), ICASSP """ import numpy as np from sklearn.cluster import SpectralClustering, AgglomerativeClustering from sklearn.metrics.pairwise import cosine_similarity def estimate_n_speakers_eigengap(similarity_matrix, max_speakers=10): """ 使用特徵值間隙方法估計說話人數量 技術來源: - 特徵值間隙理論:Lu et al. (2010) - 原理:相似度矩陣的特徵值分佈中,最大間隙對應最佳聚類數 Args: similarity_matrix: 相似度矩陣 [n, n] max_speakers: 最大說話人數 Returns: n_speakers: 估計的說話人數量 """ # 計算特徵值 eigenvalues = np.linalg.eigvalsh(similarity_matrix) # 降序排列 eigenvalues = np.sort(eigenvalues)[::-1] # 只考慮前 max_speakers 個特徵值 eigenvalues = eigenvalues[:max_speakers] # 計算間隙 gaps = np.diff(eigenvalues) # 找到最大間隙的位置 if len(gaps) > 0: n_speakers = np.argmax(np.abs(gaps)) + 1 else: n_speakers = 1 # 限制範圍 n_speakers = max(2, min(n_speakers, max_speakers)) return n_speakers def estimate_n_speakers_silhouette(embeddings, max_speakers=10): """ 使用輪廓係數估計說話人數量 Args: embeddings: 嵌入矩陣 [n, d] max_speakers: 最大說話人數 Returns: n_speakers: 估計的說話人數量 """ from sklearn.metrics import silhouette_score best_score = -1 best_n = 2 for n in range(2, min(max_speakers + 1, len(embeddings))): clustering = AgglomerativeClustering(n_clusters=n) labels = clustering.fit_predict(embeddings) if len(np.unique(labels)) > 1: score = silhouette_score(embeddings, labels) if score > best_score: best_score = score best_n = n return best_n def spectral_clustering_speaker( similarity_matrix, n_speakers=None, auto_estimate=True, max_speakers=10 ): """ 使用譜聚類進行說話人分離 Args: similarity_matrix: 相似度矩陣 [n, n] n_speakers: 說話人數量(可選,如果為 None 則自動估計) auto_estimate: 是否自動估計說話人數量 max_speakers: 最大說話人數 Returns: speaker_labels: 說話人標籤 [n,] n_speakers: 使用的說話人數量 """ n_segments = len(similarity_matrix) # 清洗相似度矩陣 similarity_matrix = np.nan_to_num( similarity_matrix, nan=0.5, posinf=1.0, neginf=-1.0 ) # 確保對角線為 1 np.fill_diagonal(similarity_matrix, 1.0) # 確保值在 [-1, 1] 範圍 similarity_matrix = np.clip(similarity_matrix, -1.0, 1.0) # 自動估計說話人數量 if n_speakers is None and auto_estimate: n_speakers = estimate_n_speakers_eigengap( similarity_matrix, max_speakers=max_speakers ) print(f"[Clustering] Estimated n_speakers: {n_speakers}") if n_speakers is None: n_speakers = 2 # 預設值 # 確保 n_speakers 不超過樣本數 n_speakers = min(n_speakers, n_segments) print(f"[Clustering] Running spectral clustering with {n_speakers} clusters...") # 譜聚類 try: clustering = SpectralClustering( n_clusters=int(n_speakers), affinity="precomputed", assign_labels="kmeans", random_state=42, n_init=10, ) speaker_labels = clustering.fit_predict(similarity_matrix) print("[Clustering] Spectral clustering completed") print(f"[Clustering] n_speakers: {n_speakers}") print(f"[Clustering] n_segments: {n_segments}") return speaker_labels, n_speakers except Exception as e: print(f"[Clustering] Spectral clustering failed: {e}") print("[Clustering] Using fallback: 2 speakers") # 簡單分配:前一半是 SPEAKER_0,後一半是 SPEAKER_1 speaker_labels = np.array( [0] * (n_segments // 2) + [1] * (n_segments - n_segments // 2) ) return speaker_labels, 2 def agglomerative_clustering_speaker( embeddings, n_speakers=None, threshold=0.5, max_speakers=10 ): """ 使用層次聚類進行說話人分離 Args: embeddings: 嵌入矩陣 [n, d] n_speakers: 說話人數量(可選) threshold: 距離閾值(用於自動決定聚類數) max_speakers: 最大說話人數 Returns: speaker_labels: 說話人標籤 [n,] n_speakers: 使用的說話人數量 """ n_segments = len(embeddings) if n_speakers is None: # 使用距離閾值自動決定 from sklearn.metrics.pairwise import cosine_distances distances = cosine_distances(embeddings) # 計算平均最近鄰距離 avg_distances = [] for i in range(min(100, n_segments)): dists = distances[i] dists = np.sort(dists) if len(dists) > 1: avg_distances.append(dists[1]) # 最近鄰(排除自己) if avg_distances: avg_dist = np.mean(avg_distances) # 根據平均距離估計聚類數 n_speakers = max(2, int(avg_dist / threshold)) n_speakers = min(n_speakers, max_speakers) else: n_speakers = 2 n_speakers = min(n_speakers, n_segments) # 層次聚類 clustering = AgglomerativeClustering( n_clusters=n_speakers, metric="cosine", linkage="average" ) speaker_labels = clustering.fit_predict(embeddings) print("[Clustering] Agglomerative clustering completed") print(f"[Clustering] n_speakers: {n_speakers}") return speaker_labels, n_speakers def smooth_speaker_labels(speaker_labels, window_size=5): """ 平滑說話人標籤(去除噪聲) Args: speaker_labels: 原始說話人標籤 window_size: 平滑窗口大小 Returns: smoothed_labels: 平滑後的標籤 """ from scipy import stats smoothed = np.copy(speaker_labels) half_window = window_size // 2 for i in range(len(speaker_labels)): start = max(0, i - half_window) end = min(len(speaker_labels), i + half_window + 1) window_labels = speaker_labels[start:end] mode_result = stats.mode(window_labels, keepdims=True) smoothed[i] = mode_result.mode[0] return smoothed def compute_diarization_purity(speaker_labels, ground_truth_labels=None): """ 計算說話人分離純度(如果有 ground truth) Args: speaker_labels: 預測的說話人標籤 ground_truth_labels: 真實的說話人標籤(可選) Returns: purity: 純度分數(0-1) """ if ground_truth_labels is None: # 沒有 ground truth,使用聚類純度近似 # 使用餘弦相似度作為距離 purity = 0.5 # 預設值 else: # 計算純度 from sklearn.metrics import adjusted_rand_score purity = adjusted_rand_score(ground_truth_labels, speaker_labels) return purity if __name__ == "__main__": # 測試聚類算法 print("[Test] Testing speaker clustering algorithms") # 生成模擬數據 np.random.seed(42) n_speakers = 3 n_segments_per_speaker = 20 # 生成 3 個說話人的嵌入 embeddings = [] for i in range(n_speakers): # 每個說話人有不同的中心 center = np.random.randn(192) * 2 + i * 3 # 添加噪聲 for _ in range(n_segments_per_speaker): emb = center + np.random.randn(192) * 0.5 embeddings.append(emb) embeddings = np.array(embeddings) print(f"[Test] Generated {len(embeddings)} embeddings for {n_speakers} speakers") # 計算相似度矩陣 similarity = cosine_similarity(embeddings) print(f"[Test] Similarity matrix shape: {similarity.shape}") # 估計說話人數量 estimated_n = estimate_n_speakers_eigengap(similarity, max_speakers=10) print(f"[Test] Estimated n_speakers (eigengap): {estimated_n}") estimated_n_silhouette = estimate_n_speakers_silhouette(embeddings, max_speakers=10) print(f"[Test] Estimated n_speakers (silhouette): {estimated_n_silhouette}") # 譜聚類 labels, n_clusters = spectral_clustering_speaker( similarity, n_speakers=None, auto_estimate=True ) print("\n[Test] Clustering results:") print(f" True n_speakers: {n_speakers}") print(f" Estimated n_speakers: {n_clusters}") print(f" Unique labels: {np.unique(labels)}") # 計算每個聚類的大小 for label in np.unique(labels): count = np.sum(labels == label) print(f" Cluster {label}: {count} segments")