#!/opt/homebrew/bin/python3.11 """ Self-implemented ASRX - Fixed Version 使用魯棒的聚類算法 """ import sys import json import time import numpy as np from pathlib import Path # 導入自定義模組 from vad import load_vad_model, extract_speech_segments from speaker_encoder import ( load_speaker_encoder, extract_speaker_embeddings_batch, normalize_embeddings ) from speaker_cluster_fixed import robust_speaker_clustering class SelfASRXFixed: """自實作說話人分離系統(修復版)""" def __init__(self): print("[SelfASRX-Fixed] Initializing models...") # 載入 VAD 模型 print("[SelfASRX-Fixed] Loading VAD model (Silero)...") self.vad_model, self.vad_utils = load_vad_model() # 載入聲紋模型 print("[SelfASRX-Fixed] Loading speaker encoder (ECAPA-TDNN)...") self.speaker_encoder = load_speaker_encoder() print("[SelfASRX-Fixed] Models loaded successfully") def process(self, audio_path, output_path=None, min_speech_duration_ms=500, n_speakers=None, max_speakers=10): """處理音頻文件""" start_time = time.time() print(f"\n[SelfASRX-Fixed] Processing: {audio_path}") print("=" * 60) # 步驟 1: VAD print("\n[Step 1] Voice Activity Detection...") step1_start = time.time() speech_segments, wav, sample_rate = extract_speech_segments( audio_path, self.vad_model, self.vad_utils, min_speech_duration_ms=min_speech_duration_ms ) step1_time = time.time() - step1_start print(f" Speech segments: {len(speech_segments)}") print(f" Total duration: {len(wav)/sample_rate:.2f}s") print(f" VAD time: {step1_time:.2f}s") if len(speech_segments) == 0: print("[SelfASRX-Fixed] No speech detected!") return {"error": "No speech detected", "segments": []} # 步驟 2: 聲紋特徵提取 print("\n[Step 2] Speaker embedding extraction...") step2_start = time.time() # 提取語音片段音頻 audio_segments = [] for start_sec, end_sec in speech_segments: start_sample = int(start_sec * sample_rate) end_sample = int(end_sec * sample_rate) audio_segments.append(wav[start_sample:end_sample]) # 批量提取嵌入 embeddings = extract_speaker_embeddings_batch( self.speaker_encoder, audio_segments, sample_rate ) # 正規化 embeddings = normalize_embeddings(embeddings) step2_time = time.time() - step2_start print(f" Embedding shape: {embeddings.shape}") print(f" Embedding time: {step2_time:.2f}s") # 步驟 3: 魯棒聚類 print("\n[Step 3] Robust speaker clustering...") step3_start = time.time() speaker_labels, estimated_n_speakers = robust_speaker_clustering( embeddings, n_speakers=n_speakers, max_speakers=max_speakers ) step3_time = time.time() - step3_start print(f" Clustering time: {step3_time:.2f}s") # 步驟 4: 建立輸出 print("\n[Step 4] Building output...") result = { "audio_path": str(audio_path), "total_duration": len(wav) / sample_rate, "n_speech_segments": len(speech_segments), "n_speakers": int(estimated_n_speakers), "segments": [] } for i, ((start, end), label) in enumerate(zip(speech_segments, speaker_labels)): result["segments"].append({ "index": i, "start": round(start, 3), "end": round(end, 3), "duration": round(end - start, 3), "speaker": f"SPEAKER_{int(label)}" }) # 統計每個說話人的總時長 speaker_stats = {} for seg in result["segments"]: speaker = seg["speaker"] if speaker not in speaker_stats: speaker_stats[speaker] = {"count": 0, "duration": 0} speaker_stats[speaker]["count"] += 1 speaker_stats[speaker]["duration"] += seg["duration"] result["speaker_stats"] = speaker_stats total_time = time.time() - start_time result["processing_time"] = round(total_time, 2) result["realtime_factor"] = round(result["total_duration"] / total_time, 2) print("\n[SelfASRX-Fixed] Processing completed!") print(f" Total time: {total_time:.2f}s") print(f" Realtime factor: {result['realtime_factor']:.2f}x") print(f" Detected speakers: {estimated_n_speakers}") # 保存結果 if output_path: output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, 'w', encoding='utf-8') as f: json.dump(result, f, indent=2, ensure_ascii=False) print(f" Results saved to: {output_path}") print("=" * 60) return result def process_with_segments(self, audio_path, asr_segments, output_path=None): """ 使用 ASR segment 邊界進行 speaker diarization,取代 VAD 步驟。 Args: audio_path: 音頻文件路徑(WAV) asr_segments: ASR segment 列表,每個包含 start/end(秒) output_path: 輸出 JSON 路徑(可選) """ start_time = time.time() print(f"\n[SelfASRX-Fixed] Processing with {len(asr_segments)} ASR segments: {audio_path}") print("=" * 60) # 載入完整音頻 import soundfile as sf wav, sample_rate = sf.read(audio_path) if len(wav.shape) > 1: wav = np.mean(wav, axis=1) # 轉 mono print(f" Audio loaded: {len(wav)/sample_rate:.2f}s, {sample_rate}Hz") # 使用 ASR segments 取代 VAD (audio处理用time) speech_segments = [(s["start_time"], s["end_time"]) for s in asr_segments] print(f" Speech segments from ASR: {len(speech_segments)}") if len(speech_segments) == 0: print("[SelfASRX-Fixed] No ASR segments provided!") return {"error": "No ASR segments", "segments": []} # 提取語音片段 audio_segments = [] for start_sec, end_sec in speech_segments: start_sample = int(start_sec * sample_rate) end_sample = int(end_sec * sample_rate) if start_sample >= len(wav): continue audio_segments.append(wav[start_sample:min(end_sample, len(wav))]) print(f" Audio segments extracted: {len(audio_segments)}") # 批量提取聲紋嵌入 print("\n[Step 2] Speaker embedding extraction...") step2_start = time.time() embeddings = extract_speaker_embeddings_batch( self.speaker_encoder, audio_segments, sample_rate ) embeddings = normalize_embeddings(embeddings) step2_time = time.time() - step2_start print(f" Embedding shape: {embeddings.shape}") print(f" Embedding time: {step2_time:.2f}s") # 聚類 print("\n[Step 3] Robust speaker clustering...") step3_start = time.time() speaker_labels, estimated_n_speakers = robust_speaker_clustering( embeddings, n_speakers=None, max_speakers=10 ) step3_time = time.time() - step3_start print(f" Clustering time: {step3_time:.2f}s") # 建立輸出 result = { "audio_path": str(audio_path), "total_duration": len(wav) / sample_rate, "n_speech_segments": len(speech_segments), "n_speakers": int(estimated_n_speakers), "segments": [] } for i, ((start, end), label) in enumerate(zip(speech_segments, speaker_labels)): result["segments"].append({ "index": i, "start": round(start, 3), "end": round(end, 3), "duration": round(end - start, 3), "speaker": f"SPEAKER_{int(label)}" }) # 加入 embeddings(每個 segment 對應的 192-D speaker embedding) result["embeddings"] = [] for emb in embeddings: result["embeddings"].append(emb.tolist()) # 統計 speaker_stats = {} for seg in result["segments"]: speaker = seg["speaker"] if speaker not in speaker_stats: speaker_stats[speaker] = {"count": 0, "duration": 0} speaker_stats[speaker]["count"] += 1 speaker_stats[speaker]["duration"] += seg["duration"] result["speaker_stats"] = speaker_stats total_time = time.time() - start_time result["processing_time"] = round(total_time, 2) result["realtime_factor"] = round(result["total_duration"] / total_time, 2) print("\n[SelfASRX-Fixed] Processing completed!") print(f" Total time: {total_time:.2f}s") print(f" Realtime factor: {result['realtime_factor']:.2f}x") print(f" Detected speakers: {estimated_n_speakers}") if output_path: import json with open(output_path, 'w', encoding='utf-8') as f: json.dump(result, f, indent=2, ensure_ascii=False) print(f" Results saved to: {output_path}") print("=" * 60) return result def main(): import argparse parser = argparse.ArgumentParser(description="Self-implemented ASRX (Fixed)") parser.add_argument("audio_path", help="Path to audio file") parser.add_argument("-o", "--output", help="Output JSON path") parser.add_argument("--min-speech-duration", type=int, default=500) parser.add_argument("--n-speakers", type=int, default=None) parser.add_argument("--max-speakers", type=int, default=10) args = parser.parse_args() if not Path(args.audio_path).exists(): print(f"Error: Audio file not found: {args.audio_path}") sys.exit(1) asrx = SelfASRXFixed() result = asrx.process( args.audio_path, args.output, min_speech_duration_ms=args.min_speech_duration, n_speakers=args.n_speakers, max_speakers=args.max_speakers ) if "error" not in result: print("\n[Summary]") print(f" Audio duration: {result['total_duration']:.2f}s") print(f" Speech segments: {result['n_speech_segments']}") print(f" Detected speakers: {result['n_speakers']}") print(f" Processing time: {result['processing_time']:.2f}s") print(f" Realtime factor: {result['realtime_factor']:.2f}x") print("\n[Speaker Statistics]") for speaker, stats in result['speaker_stats'].items(): pct = stats['duration'] / result['total_duration'] * 100 print(f" {speaker}: {stats['count']} segments, " + f"{stats['duration']:.2f}s ({pct:.1f}%)") if __name__ == "__main__": main()