#!/opt/homebrew/bin/python3.11 """ ASRX 處理器 v2 - 說話人分離 使用 whisperx 進行轉錄和說話人分離 需要 PyTorch 2.5.0 + torchvision 0.20.0 + torchaudio 2.5.0 """ # Fix for PyTorch 2.5 compatibility import os os.environ["TORCH_FORCE_WEIGHTS_ONLY_LOAD"] = "0" import sys import json import argparse import signal import subprocess sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from redis_publisher import RedisPublisher def signal_handler(signum, frame): print(f"ASRX: Received signal {signum}, exiting...") sys.exit(1) def has_audio_stream(video_path): """Check if video file has audio stream using ffprobe.""" try: cmd = [ "ffprobe", "-v", "error", "-select_streams", "a", "-show_entries", "stream=codec_type", "-of", "csv=p=0", video_path, ] result = subprocess.run(cmd, capture_output=True, text=True, check=True) return bool(result.stdout.strip()) except subprocess.CalledProcessError: return False except FileNotFoundError: print("WARNING: ffprobe not found, assuming audio exists") return True def process_asrx(video_path: str, output_path: str, uuid: str = "", skip_diarization: bool = False): """ Process video for speaker diarization using whisperx Args: video_path: Path to video file output_path: Path to output JSON uuid: UUID for Redis progress skip_diarization: Skip speaker diarization (only transcription) """ signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) publisher = RedisPublisher(uuid) if uuid else None if publisher: publisher.info("asrx", "ASRX_START") # Check for audio stream if not has_audio_stream(video_path): if publisher: publisher.info("asrx", "No audio stream detected, skipping transcription") output = {"language": "", "language_probability": 0.0, "segments": []} with open(output_path, "w") as f: json.dump(output, f, indent=2) if publisher: publisher.complete("asrx", "0 segments (no audio)") sys.stderr.write("ASRX: No audio stream, skipping transcription\n") sys.stderr.flush() sys.exit(0) if publisher: publisher.info("asrx", "ASRX_LOADING_MODEL") try: import whisperx import torch except ImportError as e: if publisher: publisher.error("asrx", f"Missing dependency: {e}") result = {"language": None, "segments": [], "error": str(e)} if publisher: publisher.complete("asrx", "0 segments") with open(output_path, "w") as f: json.dump(result, f, indent=2) sys.exit(1) try: # Load model if publisher: publisher.info("asrx", "Loading whisperx base model (this may take a while)...") model = whisperx.load_model("base", device="cpu", compute_type="int8") if publisher: publisher.info("asrx", "ASRX_TRANSCRIBING") # Transcribe with language detection result = model.transcribe(video_path) if publisher: publisher.info("asrx", f"ASRX_LANGUAGE:{result.get('language', 'unknown')}") # Align timestamps if publisher: publisher.info("asrx", "ASRX_ALIGNING_TIMESTAMPS") model_a, metadata = whisperx.load_align_model( language_code=result["language"], device="cpu" ) result = whisperx.align( result["segments"], model_a, metadata, video_path, device="cpu" ) # Diarization (speaker segmentation) if not skip_diarization: if publisher: publisher.info("asrx", "ASRX_DIARIZATION") try: diarize_model = whisperx.DiarizationPipeline(use_auth_token=None) diarize_segments = diarize_model(video_path) # Assign speaker labels result = whisperx.assign_word_speakers(diarize_segments, result) if publisher: publisher.info("asrx", "Diarization completed") except Exception as e: if publisher: publisher.info("asrx", f"Diarization skipped: {e}") sys.stderr.write(f"ASRX: Diarization failed: {e}\n") # Build output segments = [] for seg in result.get("segments", []): text = seg.get("text", "").strip() if text: segments.append( { "start": seg.get("start", 0.0), "end": seg.get("end", 0.0), "text": text, "speaker_id": seg.get("speaker", None), } ) output_result = { "language": result.get("language"), "language_probability": result.get("language_probability", 0), "segments": segments, "diarization_enabled": not skip_diarization } if publisher: publisher.complete("asrx", f"{len(segments)} segments") with open(output_path, "w") as f: json.dump(output_result, f, indent=2, ensure_ascii=False) sys.stderr.write( f"ASRX: Transcription complete, {len(segments)} segments written to {output_path}\n" ) sys.stderr.flush() sys.exit(0) except Exception as e: if publisher: publisher.error("asrx", f"Error: {e}") import traceback traceback.print_exc() result = {"language": None, "segments": [], "error": str(e)} if publisher: publisher.complete("asrx", "0 segments (error)") with open(output_path, "w") as f: json.dump(result, f, indent=2) sys.exit(1) if __name__ == "__main__": parser = argparse.ArgumentParser(description="ASRX Speaker Diarization v2") parser.add_argument("video_path", help="Path to video file") parser.add_argument("output_path", help="Output JSON path") parser.add_argument("--uuid", "-u", help="UUID for Redis progress", default="") parser.add_argument( "--skip-diarization", action="store_true", help="Skip speaker diarization (only transcription)" ) args = parser.parse_args() process_asrx( args.video_path, args.output_path, args.uuid, args.skip_diarization )