#!/opt/homebrew/bin/python3.11 """ ASR Processor with chunked transcription and resource monitoring. Supports large audio files by splitting into manageable chunks. """ import sys import json import os import argparse import signal import subprocess import tempfile import time from typing import List, Dict, Any, Optional, Tuple # Try to import psutil for resource monitoring, but don't fail if not available try: import psutil PSUTIL_AVAILABLE = True except ImportError: PSUTIL_AVAILABLE = False print("WARNING: psutil not available, resource monitoring disabled") sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from redis_publisher import RedisPublisher def signal_handler(signum, frame): print(f"ASR: Received signal {signum}, exiting...") sys.exit(1) def has_audio_stream(video_path: str) -> bool: """Check if video file has audio stream using ffprobe.""" try: cmd = [ "ffprobe", "-v", "error", "-select_streams", "a", "-show_entries", "stream=codec_type", "-of", "csv=p=0", video_path, ] result = subprocess.run(cmd, capture_output=True, text=True, check=True) return bool(result.stdout.strip()) except subprocess.CalledProcessError: return False except FileNotFoundError: print("WARNING: ffprobe not found, assuming audio exists") return True def get_audio_duration(audio_path: str) -> float: """Get audio duration in seconds using ffprobe.""" cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "csv=p=0", audio_path, ] result = subprocess.run(cmd, capture_output=True, text=True) return float(result.stdout.strip()) def extract_audio(video_path: str, audio_path: str) -> bool: """Extract audio from video to WAV format.""" cmd = [ "ffmpeg", "-i", video_path, "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", audio_path, ] result = subprocess.run(cmd, capture_output=True) return result.returncode == 0 and os.path.exists(audio_path) def extract_chunk( audio_path: str, start: float, duration: float, output_path: str ) -> bool: """Extract a chunk of audio using ffmpeg.""" cmd = [ "ffmpeg", "-i", audio_path, "-ss", str(start), "-t", str(duration), "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", output_path, ] result = subprocess.run(cmd, capture_output=True) return os.path.exists(output_path) and os.path.getsize(output_path) > 0 def monitor_resources(pid: int, interval: int = 60) -> Dict[str, Any]: """Monitor CPU and memory usage for a process.""" if not PSUTIL_AVAILABLE: return {"cpu_percent": 0.0, "memory_mb": 0.0, "available": False} try: process = psutil.Process(pid) cpu_percent = process.cpu_percent(interval=0.1) memory_info = process.memory_info() memory_mb = memory_info.rss / (1024 * 1024) return { "cpu_percent": cpu_percent, "memory_mb": memory_mb, "available": True, "pid": pid, } except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): return {"cpu_percent": 0.0, "memory_mb": 0.0, "available": False} def transcribe_chunk( model, chunk_path: str, chunk_start: float, chunk_idx: int, total_chunks: int, publisher: Optional[RedisPublisher] = None, ) -> Tuple[List[Dict[str, Any]], Any]: """Transcribe a single audio chunk.""" if publisher: publisher.info("asr", f"Transcribing chunk {chunk_idx + 1}/{total_chunks}") start_time = time.time() segments, info = model.transcribe(chunk_path, beam_size=5) results = [] for segment in segments: results.append( { "start": segment.start + chunk_start, "end": segment.end + chunk_start, "text": segment.text.strip(), } ) elapsed = time.time() - start_time if publisher: publisher.info( "asr", f"Chunk {chunk_idx + 1}/{total_chunks}: {len(results)} segments in {elapsed:.1f}s", ) return results, info def run_asr_chunked( video_path: str, output_path: str, uuid: str = "", chunk_duration: int = 600, # 10 minutes default model_size: str = "tiny", compute_type: str = "int8", ) -> None: # Set up signal handlers signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) publisher = RedisPublisher(uuid) if uuid else None if publisher: publisher.info("asr", "ASR_START_CHUNKED") # Check for audio stream if not has_audio_stream(video_path): if publisher: publisher.info("asr", "No audio stream detected, skipping transcription") output = {"language": "", "language_probability": 0.0, "segments": []} with open(output_path, "w") as f: json.dump(output, f, indent=2) if publisher: publisher.complete("asr", "0 segments (no audio)") sys.stderr.write("ASR: No audio stream, skipping transcription\n") sys.stderr.flush() sys.exit(0) # Create temporary directory for audio extraction temp_dir = tempfile.mkdtemp(prefix="asr_") audio_path = os.path.join(temp_dir, "audio.wav") if publisher: publisher.info("asr", "Extracting audio from video...") # Extract audio if not extract_audio(video_path, audio_path): if publisher: publisher.error("asr", "Failed to extract audio") sys.stderr.write("ASR: Failed to extract audio\n") sys.stderr.flush() sys.exit(1) # Get audio duration try: total_duration = get_audio_duration(audio_path) except Exception as e: if publisher: publisher.error("asr", f"Failed to get audio duration: {e}") sys.stderr.write(f"ASR: Failed to get audio duration: {e}\n") sys.stderr.flush() sys.exit(1) if publisher: publisher.info( "asr", f"Audio duration: {total_duration:.1f}s ({total_duration / 3600:.1f} hrs)", ) publisher.info("asr", f"Chunk duration: {chunk_duration}s") # Calculate chunks chunks = [] start = 0.0 chunk_idx = 0 while start < total_duration: chunk_end = min(start + chunk_duration, total_duration) chunks.append( { "start": start, "end": chunk_end, "duration": chunk_end - start, "idx": chunk_idx, } ) start = chunk_end chunk_idx += 1 if publisher: publisher.info("asr", f"Split into {len(chunks)} chunks") # Load Whisper model if publisher: publisher.info( "asr", f"Loading Whisper model ({model_size}, {compute_type})..." ) try: from faster_whisper import WhisperModel model = WhisperModel(model_size, device="cpu", compute_type=compute_type) except Exception as e: if publisher: publisher.error("asr", f"Failed to load Whisper model: {e}") sys.stderr.write(f"ASR: Failed to load Whisper model: {e}\n") sys.stderr.flush() sys.exit(1) if publisher: publisher.info("asr", "Whisper model loaded successfully") # Process each chunk all_segments = [] language = None language_prob = None chunk_temp_dir = os.path.join(temp_dir, "chunks") os.makedirs(chunk_temp_dir, exist_ok=True) for i, chunk in enumerate(chunks): chunk_path = os.path.join(chunk_temp_dir, f"chunk_{i:04d}.wav") if publisher: publisher.progress( "asr", i, len(chunks), f"Processing chunk {i + 1}/{len(chunks)}" ) # Extract chunk if not extract_chunk(audio_path, chunk["start"], chunk["duration"], chunk_path): if publisher: publisher.warning("asr", f"Failed to extract chunk {i}, skipping") continue # Monitor resources if PSUTIL_AVAILABLE and publisher: resources = monitor_resources(os.getpid()) if resources["available"]: publisher.info( "asr", f"Resource usage: CPU {resources['cpu_percent']:.1f}%, " f"Memory {resources['memory_mb']:.1f}MB", ) # Transcribe chunk with timeout try: segments, info = transcribe_chunk( model, chunk_path, chunk["start"], i, len(chunks), publisher ) all_segments.extend(segments) if language is None: language = info.language language_prob = info.language_probability if publisher: publisher.info( "asr", f"Detected language: {language} (prob {language_prob:.2f})", ) except Exception as e: if publisher: publisher.error("asr", f"Error transcribing chunk {i}: {e}") sys.stderr.write(f"ASR: Error transcribing chunk {i}: {e}\n") sys.stderr.flush() # Continue with next chunk # Clean up chunk file try: os.unlink(chunk_path) except: pass # Clean up temporary directory try: import shutil shutil.rmtree(temp_dir, ignore_errors=True) except: pass # Sort segments by start time all_segments.sort(key=lambda x: x["start"]) # Prepare output output = { "language": language or "", "language_probability": language_prob or 0.0, "segments": all_segments, "chunk_count": len(chunks), "chunk_duration": chunk_duration, "total_segments": len(all_segments), "processing_mode": "chunked", } # Write output with open(output_path, "w") as f: json.dump(output, f, indent=2) if publisher: publisher.complete( "asr", f"{len(all_segments)} segments from {len(chunks)} chunks" ) sys.stderr.write( f"ASR: Transcription complete, {len(all_segments)} segments written to {output_path}\n" ) sys.stderr.flush() sys.exit(0) if __name__ == "__main__": parser = argparse.ArgumentParser(description="ASR Transcription (Chunked)") parser.add_argument("video_path", help="Path to video file") parser.add_argument("output_path", help="Output JSON path") parser.add_argument("--uuid", "-u", help="UUID for Redis progress", default="") parser.add_argument( "--chunk-duration", type=int, default=600, help="Chunk duration in seconds (default: 600 = 10 minutes)", ) parser.add_argument("--model-size", default="tiny", help="Whisper model size") parser.add_argument("--compute-type", default="int8", help="Compute type") args = parser.parse_args() run_asr_chunked( args.video_path, args.output_path, args.uuid, args.chunk_duration, args.model_size, args.compute_type, )