#!/opt/homebrew/bin/python3.11 """ ASR Processor with chunked transcription for large files and resource monitoring. Maintains backward compatibility with existing API. """ import sys import json import os import argparse import signal import subprocess import tempfile import time import shutil from pathlib import Path from typing import List, Dict, Any, Optional, Tuple # Try to import psutil for resource monitoring PSUTIL_AVAILABLE = False psutil = None try: import psutil PSUTIL_AVAILABLE = True except ImportError: sys.stderr.write("WARNING: psutil not available, resource monitoring disabled\n") sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from redis_publisher import RedisPublisher def signal_handler(signum, frame): sys.stderr.write(f"ASR: Received signal {signum}, exiting...\n") sys.exit(1) def has_audio_stream(video_path: str) -> bool: """Check if video file has audio stream using ffprobe.""" try: cmd = [ "ffprobe", "-v", "error", "-select_streams", "a", "-show_entries", "stream=codec_type", "-of", "csv=p=0", video_path, ] result = subprocess.run(cmd, capture_output=True, text=True, check=True) return bool(result.stdout.strip()) except subprocess.CalledProcessError: return False except FileNotFoundError: sys.stderr.write("WARNING: ffprobe not found, assuming audio exists\n") return True def get_media_duration(media_path: str) -> float: """Get media duration in seconds using ffprobe.""" cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "csv=p=0", media_path, ] result = subprocess.run(cmd, capture_output=True, text=True) try: return float(result.stdout.strip()) except (ValueError, AttributeError): return 0.0 def extract_audio(video_path: str, audio_path: str) -> bool: """Extract audio from video to WAV format.""" cmd = [ "ffmpeg", "-i", video_path, "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", audio_path, ] result = subprocess.run(cmd, capture_output=True) return result.returncode == 0 and os.path.exists(audio_path) def extract_chunk( audio_path: str, start: float, duration: float, output_path: str ) -> bool: """Extract a chunk of audio using ffmpeg.""" cmd = [ "ffmpeg", "-i", audio_path, "-ss", str(start), "-t", str(duration), "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", output_path, ] result = subprocess.run(cmd, capture_output=True) return os.path.exists(output_path) and os.path.getsize(output_path) > 0 def monitor_resources(pid: int, interval: float = 0.1) -> Dict[str, Any]: """Monitor CPU and memory usage for a process.""" if not PSUTIL_AVAILABLE or psutil is None: return {"cpu_percent": 0.0, "memory_mb": 0.0, "available": False} try: process = psutil.Process(pid) cpu_percent = process.cpu_percent(interval=interval) memory_info = process.memory_info() memory_mb = memory_info.rss / (1024 * 1024) return { "cpu_percent": cpu_percent, "memory_mb": memory_mb, "available": True, "pid": pid, } except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): return {"cpu_percent": 0.0, "memory_mb": 0.0, "available": False} def transcribe_direct( model, audio_path: str, publisher: Optional[RedisPublisher] = None ) -> Tuple[List[Dict[str, Any]], Any]: """Transcribe audio directly (non-chunked).""" if publisher: publisher.info("asr", "Transcribing audio directly...") start_time = time.time() segments, info = model.transcribe(audio_path, beam_size=5) results = [] total_segments = 0 for segment in segments: results.append( {"start": segment.start, "end": segment.end, "text": segment.text.strip()} ) total_segments += 1 if total_segments % 100 == 0 and publisher: publisher.progress("asr", total_segments, 0, f"Segment {total_segments}") elapsed = time.time() - start_time if publisher: publisher.info( "asr", f"Direct transcription: {len(results)} segments in {elapsed:.1f}s" ) return results, info def transcribe_chunk( model, chunk_path: str, chunk_start: float, chunk_idx: int, total_chunks: int, publisher: Optional[RedisPublisher] = None, ) -> Tuple[List[Dict[str, Any]], Any]: """Transcribe a single audio chunk.""" if publisher: publisher.info("asr", f"Transcribing chunk {chunk_idx + 1}/{total_chunks}") start_time = time.time() segments, info = model.transcribe(chunk_path, beam_size=5) results = [] for segment in segments: results.append( { "start": segment.start + chunk_start, "end": segment.end + chunk_start, "text": segment.text.strip(), } ) elapsed = time.time() - start_time if publisher: publisher.info( "asr", f"Chunk {chunk_idx + 1}/{total_chunks}: {len(results)} segments in {elapsed:.1f}s", ) return results, info def run_asr( video_path: str, output_path: str, uuid: str = "", chunk_duration: int = 600, # 10 minutes default max_direct_duration: int = 1800, # 30 minutes: use direct transcription for shorter files model_size: str = "tiny", compute_type: str = "int8", monitor_interval: int = 60, ) -> None: # Set up signal handlers signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) publisher = RedisPublisher(uuid) if uuid else None if publisher: publisher.info("asr", "ASR_START") # Check for audio stream if not has_audio_stream(video_path): if publisher: publisher.info("asr", "No audio stream detected, skipping transcription") output = {"language": "", "language_probability": 0.0, "segments": []} with open(output_path, "w") as f: json.dump(output, f, indent=2) if publisher: publisher.complete("asr", "0 segments (no audio)") sys.stderr.write("ASR: No audio stream, skipping transcription\n") sys.stderr.flush() sys.exit(0) # Create temporary directory temp_dir = tempfile.mkdtemp(prefix="asr_") audio_path = os.path.join(temp_dir, "audio.wav") if publisher: publisher.info("asr", "Extracting audio from video...") # Extract audio if not extract_audio(video_path, audio_path): if publisher: publisher.error("asr", "Failed to extract audio") sys.stderr.write("ASR: Failed to extract audio\n") sys.stderr.flush() # Clean up shutil.rmtree(temp_dir, ignore_errors=True) sys.exit(1) # Get audio duration try: total_duration = get_media_duration(audio_path) except Exception as e: if publisher: publisher.error("asr", f"Failed to get audio duration: {e}") sys.stderr.write(f"ASR: Failed to get audio duration: {e}\n") sys.stderr.flush() shutil.rmtree(temp_dir, ignore_errors=True) sys.exit(1) if publisher: publisher.info( "asr", f"Audio duration: {total_duration:.1f}s ({total_duration / 3600:.1f} hrs)", ) # Load Whisper model if publisher: publisher.info( "asr", f"Loading Whisper model ({model_size}, {compute_type})..." ) try: from faster_whisper import WhisperModel model = WhisperModel(model_size, device="cpu", compute_type=compute_type) except Exception as e: if publisher: publisher.error("asr", f"Failed to load Whisper model: {e}") sys.stderr.write(f"ASR: Failed to load Whisper model: {e}\n") sys.stderr.flush() shutil.rmtree(temp_dir, ignore_errors=True) sys.exit(1) if publisher: publisher.info("asr", "Whisper model loaded successfully") # Decide whether to use chunked or direct transcription use_chunked = total_duration > max_direct_duration all_segments = [] language = None language_prob = None chunks = [] # Initialize chunks variable if not use_chunked: # Direct transcription for shorter audio if publisher: publisher.info( "asr", f"Using direct transcription (duration ≤ {max_direct_duration}s)" ) try: segments, info = transcribe_direct(model, audio_path, publisher) all_segments.extend(segments) language = info.language language_prob = info.language_probability except Exception as e: if publisher: publisher.error("asr", f"Direct transcription failed: {e}") sys.stderr.write(f"ASR: Direct transcription failed: {e}\n") sys.stderr.flush() # Fall back to chunked approach use_chunked = True if publisher: publisher.info("asr", "Falling back to chunked transcription") if use_chunked: # Chunked transcription for long audio if publisher: publisher.info( "asr", f"Using chunked transcription ({chunk_duration}s chunks)" ) # Calculate chunks chunks = [] start = 0.0 chunk_idx = 0 while start < total_duration: chunk_end = min(start + chunk_duration, total_duration) chunks.append( { "start": start, "end": chunk_end, "duration": chunk_end - start, "idx": chunk_idx, } ) start = chunk_end chunk_idx += 1 if publisher: publisher.info("asr", f"Split into {len(chunks)} chunks") chunk_temp_dir = os.path.join(temp_dir, "chunks") os.makedirs(chunk_temp_dir, exist_ok=True) last_resource_report = time.time() for i, chunk in enumerate(chunks): chunk_path = os.path.join(chunk_temp_dir, f"chunk_{i:04d}.wav") if publisher: publisher.progress( "asr", i, len(chunks), f"Processing chunk {i + 1}/{len(chunks)}" ) # Extract chunk if not extract_chunk( audio_path, chunk["start"], chunk["duration"], chunk_path ): if publisher: publisher.warning("asr", f"Failed to extract chunk {i}, skipping") continue # Resource monitoring (sample every monitor_interval seconds) current_time = time.time() if ( PSUTIL_AVAILABLE and publisher and (current_time - last_resource_report) >= monitor_interval ): resources = monitor_resources(os.getpid()) if resources["available"]: publisher.info( "asr", f"Resource usage: CPU {resources['cpu_percent']:.1f}%, " f"Memory {resources['memory_mb']:.1f}MB", ) last_resource_report = current_time # Transcribe chunk try: segments, info = transcribe_chunk( model, chunk_path, chunk["start"], i, len(chunks), publisher ) all_segments.extend(segments) if language is None: language = info.language language_prob = info.language_probability if publisher: publisher.info( "asr", f"Detected language: {language} (prob {language_prob:.2f})", ) except Exception as e: if publisher: publisher.error("asr", f"Error transcribing chunk {i}: {e}") sys.stderr.write(f"ASR: Error transcribing chunk {i}: {e}\n") sys.stderr.flush() # Continue with next chunk # Clean up chunk file try: os.unlink(chunk_path) except: pass # Clean up temporary directory try: shutil.rmtree(temp_dir, ignore_errors=True) except: pass # Sort segments by start time all_segments.sort(key=lambda x: x["start"]) # Prepare output (maintain same format as original) output = { "language": language or "", "language_probability": language_prob or 0.0, "segments": all_segments, } # Add metadata for chunked processing (optional) if use_chunked: output["processing_mode"] = "chunked" output["chunk_count"] = len(chunks) if "chunks" in locals() else 0 output["chunk_duration"] = chunk_duration else: output["processing_mode"] = "direct" # Write output with open(output_path, "w") as f: json.dump(output, f, indent=2) if publisher: publisher.complete( "asr", f"{len(all_segments)} segments ({'chunked' if use_chunked else 'direct'} mode)", ) sys.stderr.write( f"ASR: Transcription complete, {len(all_segments)} segments written to {output_path}\n" ) sys.stderr.flush() sys.exit(0) if __name__ == "__main__": parser = argparse.ArgumentParser( description="ASR Transcription with chunked processing" ) parser.add_argument("video_path", help="Path to video file") parser.add_argument("output_path", help="Output JSON path") parser.add_argument("--uuid", "-u", help="UUID for Redis progress", default="") parser.add_argument("--version", action="version", version="2.0.0") # Hidden arguments for configuration (can be set via environment variables) parser.add_argument( "--chunk-duration", type=int, default=600, help=argparse.SUPPRESS ) # 10 minutes default parser.add_argument( "--max-direct-duration", type=int, default=1800, help=argparse.SUPPRESS ) # 30 minutes parser.add_argument("--model-size", default="tiny", help=argparse.SUPPRESS) parser.add_argument("--compute-type", default="int8", help=argparse.SUPPRESS) parser.add_argument( "--monitor-interval", type=int, default=60, help=argparse.SUPPRESS ) args = parser.parse_args() # Allow environment variable overrides chunk_duration_str = os.environ.get("MOMENTRY_ASR_CHUNK_DURATION") if chunk_duration_str is not None: chunk_duration = int(chunk_duration_str) else: chunk_duration = args.chunk_duration max_direct_duration_str = os.environ.get("MOMENTRY_ASR_MAX_DIRECT_DURATION") if max_direct_duration_str is not None: max_direct_duration = int(max_direct_duration_str) else: max_direct_duration = args.max_direct_duration model_size = os.environ.get("MOMENTRY_ASR_MODEL_SIZE") if model_size is None: model_size = args.model_size compute_type = os.environ.get("MOMENTRY_ASR_COMPUTE_TYPE") if compute_type is None: compute_type = args.compute_type run_asr( args.video_path, args.output_path, args.uuid, chunk_duration, max_direct_duration, model_size, compute_type, )