#!/opt/homebrew/bin/python3.11 """ ASRX Processor - Speaker Diarization Uses whisperx for speaker diarization (local model) """ import sys import json import argparse import os sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from redis_publisher import RedisPublisher def process_asrx(video_path: str, output_path: str, uuid: str = ""): """Process video for speaker diarization using whisperx""" publisher = RedisPublisher(uuid) if uuid else None if publisher: publisher.info("asrx", "ASRX_START") try: import whisperx import torch except ImportError: if publisher: publisher.error("asrx", "whisperx not installed") result = {"language": None, "segments": []} if publisher: publisher.complete("asrx", "0 segments") with open(output_path, "w") as f: json.dump(result, f, indent=2) return result if publisher: publisher.info("asrx", "ASRX_LOADING_MODEL") try: # Fix for PyTorch 2.6+ compatibility # Allow omegaconf types in torch.load import omegaconf torch.serialization.add_safe_globals( [omegaconf.listconfig.ListConfig, omegaconf.dictconfig.DictConfig] ) # Load model - using faster-whisper for better performance # You can also use: "large-v3", "medium", "small", "base", "tiny" model = whisperx.load_model("base", device="cpu", compute_type="int8") if publisher: publisher.info("asrx", "ASRX_TRANSCRIBING") # Transcribe audio result = model.transcribe(video_path, language="en") # Align timestamps model_a, metadata = whisperx.load_align_model(language_code=result["language"]) result = whisperx.align( result["segments"], model_a, metadata, video_path, device="cpu" ) # Diarization (speaker segmentation) try: from whisperx.diarize import DiarizationPipeline # DiarizationPipeline parameters: model_name, token, device, cache_dir diarize_model = DiarizationPipeline( model_name="pyannote/speaker-diarization", token=None, # HuggingFace token (None for public models) device="cpu", ) diarize_segments = diarize_model(video_path) # Assign speaker labels result = whisperx.assign_word_speakers(diarize_segments, result) except Exception as e: if publisher: publisher.info("asrx", f"Diarization skipped: {e}") # Build output segments = [] for seg in result.get("segments", []): text = seg.get("text", "").strip() if text: segments.append( { "start": seg.get("start", 0.0), "end": seg.get("end", 0.0), "text": text, "speaker_id": seg.get("speaker", None), } ) output_result = {"language": result.get("language"), "segments": segments} if publisher: publisher.complete("asrx", f"{len(segments)} segments") with open(output_path, "w") as f: json.dump(output_result, f, indent=2) return output_result except Exception as e: if publisher: publisher.error("asrx", f"Error: {e}") result = {"language": None, "segments": []} if publisher: publisher.complete("asrx", "0 segments") with open(output_path, "w") as f: json.dump(result, f, indent=2) return result if __name__ == "__main__": parser = argparse.ArgumentParser(description="ASRX Speaker Diarization") parser.add_argument("video_path", help="Path to video file") parser.add_argument("output_path", help="Output JSON path") parser.add_argument("--uuid", "-u", help="UUID for Redis progress", default="") args = parser.parse_args() process_asrx(args.video_path, args.output_path, args.uuid)