fix: restore identity_id after face_dedup, rebuild package v20260512
- Re-ran identity_bind.py to restore identity_id on face_detections - Dedup cleanup had removed rows with identity_id, kept NULL rows - 70691 face_detections now have identity_id, 428 identities - Full package rebuild: 169MB sqlite, 1358MB tar.gz - identities.json: 428 identities + 5483 bindings + 5483 trace maps - TMDB matching complete: Audrey Hepburn 843 traces, Cary Grant 482
This commit is contained in:
511
scripts/asr_processor.py.backup
Executable file
511
scripts/asr_processor.py.backup
Executable file
@@ -0,0 +1,511 @@
|
||||
#!/opt/homebrew/bin/python3.11
|
||||
"""
|
||||
ASR Processor with chunked transcription for large files and resource monitoring.
|
||||
Maintains backward compatibility with existing API.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import json
|
||||
import os
|
||||
import argparse
|
||||
import signal
|
||||
import subprocess
|
||||
import tempfile
|
||||
import time
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Any, Optional, Tuple
|
||||
|
||||
# Try to import psutil for resource monitoring
|
||||
PSUTIL_AVAILABLE = False
|
||||
psutil = None
|
||||
try:
|
||||
import psutil
|
||||
|
||||
PSUTIL_AVAILABLE = True
|
||||
except ImportError:
|
||||
sys.stderr.write("WARNING: psutil not available, resource monitoring disabled\n")
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
from redis_publisher import RedisPublisher
|
||||
|
||||
|
||||
def signal_handler(signum, frame):
|
||||
sys.stderr.write(f"ASR: Received signal {signum}, exiting...\n")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def has_audio_stream(video_path: str) -> bool:
|
||||
"""Check if video file has audio stream using ffprobe."""
|
||||
try:
|
||||
cmd = [
|
||||
"ffprobe",
|
||||
"-v",
|
||||
"error",
|
||||
"-select_streams",
|
||||
"a",
|
||||
"-show_entries",
|
||||
"stream=codec_type",
|
||||
"-of",
|
||||
"csv=p=0",
|
||||
video_path,
|
||||
]
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
|
||||
return bool(result.stdout.strip())
|
||||
except subprocess.CalledProcessError:
|
||||
return False
|
||||
except FileNotFoundError:
|
||||
sys.stderr.write("WARNING: ffprobe not found, assuming audio exists\n")
|
||||
return True
|
||||
|
||||
|
||||
def get_media_duration(media_path: str) -> float:
|
||||
"""Get media duration in seconds using ffprobe."""
|
||||
cmd = [
|
||||
"ffprobe",
|
||||
"-v",
|
||||
"error",
|
||||
"-show_entries",
|
||||
"format=duration",
|
||||
"-of",
|
||||
"csv=p=0",
|
||||
media_path,
|
||||
]
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
try:
|
||||
return float(result.stdout.strip())
|
||||
except (ValueError, AttributeError):
|
||||
return 0.0
|
||||
|
||||
|
||||
def extract_audio(video_path: str, audio_path: str) -> bool:
|
||||
"""Extract audio from video to WAV format."""
|
||||
cmd = [
|
||||
"ffmpeg",
|
||||
"-i",
|
||||
video_path,
|
||||
"-acodec",
|
||||
"pcm_s16le",
|
||||
"-ar",
|
||||
"16000",
|
||||
"-ac",
|
||||
"1",
|
||||
"-y",
|
||||
audio_path,
|
||||
]
|
||||
result = subprocess.run(cmd, capture_output=True)
|
||||
return result.returncode == 0 and os.path.exists(audio_path)
|
||||
|
||||
|
||||
def extract_chunk(
|
||||
audio_path: str, start: float, duration: float, output_path: str
|
||||
) -> bool:
|
||||
"""Extract a chunk of audio using ffmpeg."""
|
||||
cmd = [
|
||||
"ffmpeg",
|
||||
"-i",
|
||||
audio_path,
|
||||
"-ss",
|
||||
str(start),
|
||||
"-t",
|
||||
str(duration),
|
||||
"-acodec",
|
||||
"pcm_s16le",
|
||||
"-ar",
|
||||
"16000",
|
||||
"-ac",
|
||||
"1",
|
||||
"-y",
|
||||
output_path,
|
||||
]
|
||||
result = subprocess.run(cmd, capture_output=True)
|
||||
return os.path.exists(output_path) and os.path.getsize(output_path) > 0
|
||||
|
||||
|
||||
def monitor_resources(pid: int, interval: float = 0.1) -> Dict[str, Any]:
|
||||
"""Monitor CPU and memory usage for a process."""
|
||||
if not PSUTIL_AVAILABLE or psutil is None:
|
||||
return {"cpu_percent": 0.0, "memory_mb": 0.0, "available": False}
|
||||
|
||||
try:
|
||||
process = psutil.Process(pid)
|
||||
cpu_percent = process.cpu_percent(interval=interval)
|
||||
memory_info = process.memory_info()
|
||||
memory_mb = memory_info.rss / (1024 * 1024)
|
||||
return {
|
||||
"cpu_percent": cpu_percent,
|
||||
"memory_mb": memory_mb,
|
||||
"available": True,
|
||||
"pid": pid,
|
||||
}
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
||||
return {"cpu_percent": 0.0, "memory_mb": 0.0, "available": False}
|
||||
|
||||
|
||||
def transcribe_direct(
|
||||
model, audio_path: str, publisher: Optional[RedisPublisher] = None
|
||||
) -> Tuple[List[Dict[str, Any]], Any]:
|
||||
"""Transcribe audio directly (non-chunked)."""
|
||||
if publisher:
|
||||
publisher.info("asr", "Transcribing audio directly...")
|
||||
|
||||
start_time = time.time()
|
||||
segments, info = model.transcribe(audio_path, beam_size=5)
|
||||
|
||||
results = []
|
||||
total_segments = 0
|
||||
for segment in segments:
|
||||
results.append(
|
||||
{"start": segment.start, "end": segment.end, "text": segment.text.strip()}
|
||||
)
|
||||
total_segments += 1
|
||||
if total_segments % 100 == 0 and publisher:
|
||||
publisher.progress("asr", total_segments, 0, f"Segment {total_segments}")
|
||||
|
||||
elapsed = time.time() - start_time
|
||||
if publisher:
|
||||
publisher.info(
|
||||
"asr", f"Direct transcription: {len(results)} segments in {elapsed:.1f}s"
|
||||
)
|
||||
|
||||
return results, info
|
||||
|
||||
|
||||
def transcribe_chunk(
|
||||
model,
|
||||
chunk_path: str,
|
||||
chunk_start: float,
|
||||
chunk_idx: int,
|
||||
total_chunks: int,
|
||||
publisher: Optional[RedisPublisher] = None,
|
||||
) -> Tuple[List[Dict[str, Any]], Any]:
|
||||
"""Transcribe a single audio chunk."""
|
||||
if publisher:
|
||||
publisher.info("asr", f"Transcribing chunk {chunk_idx + 1}/{total_chunks}")
|
||||
|
||||
start_time = time.time()
|
||||
segments, info = model.transcribe(chunk_path, beam_size=5)
|
||||
|
||||
results = []
|
||||
for segment in segments:
|
||||
results.append(
|
||||
{
|
||||
"start": segment.start + chunk_start,
|
||||
"end": segment.end + chunk_start,
|
||||
"text": segment.text.strip(),
|
||||
}
|
||||
)
|
||||
|
||||
elapsed = time.time() - start_time
|
||||
if publisher:
|
||||
publisher.info(
|
||||
"asr",
|
||||
f"Chunk {chunk_idx + 1}/{total_chunks}: {len(results)} segments in {elapsed:.1f}s",
|
||||
)
|
||||
|
||||
return results, info
|
||||
|
||||
|
||||
def run_asr(
|
||||
video_path: str,
|
||||
output_path: str,
|
||||
uuid: str = "",
|
||||
chunk_duration: int = 600, # 10 minutes default
|
||||
max_direct_duration: int = 1800, # 30 minutes: use direct transcription for shorter files
|
||||
model_size: str = "tiny",
|
||||
compute_type: str = "int8",
|
||||
monitor_interval: int = 60,
|
||||
) -> None:
|
||||
# Set up signal handlers
|
||||
signal.signal(signal.SIGTERM, signal_handler)
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
|
||||
publisher = RedisPublisher(uuid) if uuid else None
|
||||
if publisher:
|
||||
publisher.info("asr", "ASR_START")
|
||||
|
||||
# Check for audio stream
|
||||
if not has_audio_stream(video_path):
|
||||
if publisher:
|
||||
publisher.info("asr", "No audio stream detected, skipping transcription")
|
||||
output = {"language": "", "language_probability": 0.0, "segments": []}
|
||||
with open(output_path, "w") as f:
|
||||
json.dump(output, f, indent=2)
|
||||
if publisher:
|
||||
publisher.complete("asr", "0 segments (no audio)")
|
||||
sys.stderr.write("ASR: No audio stream, skipping transcription\n")
|
||||
sys.stderr.flush()
|
||||
sys.exit(0)
|
||||
|
||||
# Create temporary directory
|
||||
temp_dir = tempfile.mkdtemp(prefix="asr_")
|
||||
audio_path = os.path.join(temp_dir, "audio.wav")
|
||||
|
||||
if publisher:
|
||||
publisher.info("asr", "Extracting audio from video...")
|
||||
|
||||
# Extract audio
|
||||
if not extract_audio(video_path, audio_path):
|
||||
if publisher:
|
||||
publisher.error("asr", "Failed to extract audio")
|
||||
sys.stderr.write("ASR: Failed to extract audio\n")
|
||||
sys.stderr.flush()
|
||||
# Clean up
|
||||
shutil.rmtree(temp_dir, ignore_errors=True)
|
||||
sys.exit(1)
|
||||
|
||||
# Get audio duration
|
||||
try:
|
||||
total_duration = get_media_duration(audio_path)
|
||||
except Exception as e:
|
||||
if publisher:
|
||||
publisher.error("asr", f"Failed to get audio duration: {e}")
|
||||
sys.stderr.write(f"ASR: Failed to get audio duration: {e}\n")
|
||||
sys.stderr.flush()
|
||||
shutil.rmtree(temp_dir, ignore_errors=True)
|
||||
sys.exit(1)
|
||||
|
||||
if publisher:
|
||||
publisher.info(
|
||||
"asr",
|
||||
f"Audio duration: {total_duration:.1f}s ({total_duration / 3600:.1f} hrs)",
|
||||
)
|
||||
|
||||
# Load Whisper model
|
||||
if publisher:
|
||||
publisher.info(
|
||||
"asr", f"Loading Whisper model ({model_size}, {compute_type})..."
|
||||
)
|
||||
|
||||
try:
|
||||
from faster_whisper import WhisperModel
|
||||
|
||||
model = WhisperModel(model_size, device="cpu", compute_type=compute_type)
|
||||
except Exception as e:
|
||||
if publisher:
|
||||
publisher.error("asr", f"Failed to load Whisper model: {e}")
|
||||
sys.stderr.write(f"ASR: Failed to load Whisper model: {e}\n")
|
||||
sys.stderr.flush()
|
||||
shutil.rmtree(temp_dir, ignore_errors=True)
|
||||
sys.exit(1)
|
||||
|
||||
if publisher:
|
||||
publisher.info("asr", "Whisper model loaded successfully")
|
||||
|
||||
# Decide whether to use chunked or direct transcription
|
||||
use_chunked = total_duration > max_direct_duration
|
||||
|
||||
all_segments = []
|
||||
language = None
|
||||
language_prob = None
|
||||
chunks = [] # Initialize chunks variable
|
||||
|
||||
if not use_chunked:
|
||||
# Direct transcription for shorter audio
|
||||
if publisher:
|
||||
publisher.info(
|
||||
"asr", f"Using direct transcription (duration ≤ {max_direct_duration}s)"
|
||||
)
|
||||
|
||||
try:
|
||||
segments, info = transcribe_direct(model, audio_path, publisher)
|
||||
all_segments.extend(segments)
|
||||
language = info.language
|
||||
language_prob = info.language_probability
|
||||
except Exception as e:
|
||||
if publisher:
|
||||
publisher.error("asr", f"Direct transcription failed: {e}")
|
||||
sys.stderr.write(f"ASR: Direct transcription failed: {e}\n")
|
||||
sys.stderr.flush()
|
||||
# Fall back to chunked approach
|
||||
use_chunked = True
|
||||
if publisher:
|
||||
publisher.info("asr", "Falling back to chunked transcription")
|
||||
|
||||
if use_chunked:
|
||||
# Chunked transcription for long audio
|
||||
if publisher:
|
||||
publisher.info(
|
||||
"asr", f"Using chunked transcription ({chunk_duration}s chunks)"
|
||||
)
|
||||
|
||||
# Calculate chunks
|
||||
chunks = []
|
||||
start = 0.0
|
||||
chunk_idx = 0
|
||||
while start < total_duration:
|
||||
chunk_end = min(start + chunk_duration, total_duration)
|
||||
chunks.append(
|
||||
{
|
||||
"start": start,
|
||||
"end": chunk_end,
|
||||
"duration": chunk_end - start,
|
||||
"idx": chunk_idx,
|
||||
}
|
||||
)
|
||||
start = chunk_end
|
||||
chunk_idx += 1
|
||||
|
||||
if publisher:
|
||||
publisher.info("asr", f"Split into {len(chunks)} chunks")
|
||||
|
||||
chunk_temp_dir = os.path.join(temp_dir, "chunks")
|
||||
os.makedirs(chunk_temp_dir, exist_ok=True)
|
||||
|
||||
last_resource_report = time.time()
|
||||
|
||||
for i, chunk in enumerate(chunks):
|
||||
chunk_path = os.path.join(chunk_temp_dir, f"chunk_{i:04d}.wav")
|
||||
|
||||
if publisher:
|
||||
publisher.progress(
|
||||
"asr", i, len(chunks), f"Processing chunk {i + 1}/{len(chunks)}"
|
||||
)
|
||||
|
||||
# Extract chunk
|
||||
if not extract_chunk(
|
||||
audio_path, chunk["start"], chunk["duration"], chunk_path
|
||||
):
|
||||
if publisher:
|
||||
publisher.warning("asr", f"Failed to extract chunk {i}, skipping")
|
||||
continue
|
||||
|
||||
# Resource monitoring (sample every monitor_interval seconds)
|
||||
current_time = time.time()
|
||||
if (
|
||||
PSUTIL_AVAILABLE
|
||||
and publisher
|
||||
and (current_time - last_resource_report) >= monitor_interval
|
||||
):
|
||||
resources = monitor_resources(os.getpid())
|
||||
if resources["available"]:
|
||||
publisher.info(
|
||||
"asr",
|
||||
f"Resource usage: CPU {resources['cpu_percent']:.1f}%, "
|
||||
f"Memory {resources['memory_mb']:.1f}MB",
|
||||
)
|
||||
last_resource_report = current_time
|
||||
|
||||
# Transcribe chunk
|
||||
try:
|
||||
segments, info = transcribe_chunk(
|
||||
model, chunk_path, chunk["start"], i, len(chunks), publisher
|
||||
)
|
||||
all_segments.extend(segments)
|
||||
|
||||
if language is None:
|
||||
language = info.language
|
||||
language_prob = info.language_probability
|
||||
if publisher:
|
||||
publisher.info(
|
||||
"asr",
|
||||
f"Detected language: {language} (prob {language_prob:.2f})",
|
||||
)
|
||||
except Exception as e:
|
||||
if publisher:
|
||||
publisher.error("asr", f"Error transcribing chunk {i}: {e}")
|
||||
sys.stderr.write(f"ASR: Error transcribing chunk {i}: {e}\n")
|
||||
sys.stderr.flush()
|
||||
# Continue with next chunk
|
||||
|
||||
# Clean up chunk file
|
||||
try:
|
||||
os.unlink(chunk_path)
|
||||
except:
|
||||
pass
|
||||
|
||||
# Clean up temporary directory
|
||||
try:
|
||||
shutil.rmtree(temp_dir, ignore_errors=True)
|
||||
except:
|
||||
pass
|
||||
|
||||
# Sort segments by start time
|
||||
all_segments.sort(key=lambda x: x["start"])
|
||||
|
||||
# Prepare output (maintain same format as original)
|
||||
output = {
|
||||
"language": language or "",
|
||||
"language_probability": language_prob or 0.0,
|
||||
"segments": all_segments,
|
||||
}
|
||||
|
||||
# Add metadata for chunked processing (optional)
|
||||
if use_chunked:
|
||||
output["processing_mode"] = "chunked"
|
||||
output["chunk_count"] = len(chunks) if "chunks" in locals() else 0
|
||||
output["chunk_duration"] = chunk_duration
|
||||
else:
|
||||
output["processing_mode"] = "direct"
|
||||
|
||||
# Write output
|
||||
with open(output_path, "w") as f:
|
||||
json.dump(output, f, indent=2)
|
||||
|
||||
if publisher:
|
||||
publisher.complete(
|
||||
"asr",
|
||||
f"{len(all_segments)} segments ({'chunked' if use_chunked else 'direct'} mode)",
|
||||
)
|
||||
|
||||
sys.stderr.write(
|
||||
f"ASR: Transcription complete, {len(all_segments)} segments written to {output_path}\n"
|
||||
)
|
||||
sys.stderr.flush()
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(
|
||||
description="ASR Transcription with chunked processing"
|
||||
)
|
||||
parser.add_argument("video_path", help="Path to video file")
|
||||
parser.add_argument("output_path", help="Output JSON path")
|
||||
parser.add_argument("--uuid", "-u", help="UUID for Redis progress", default="")
|
||||
parser.add_argument("--version", action="version", version="2.0.0")
|
||||
|
||||
# Hidden arguments for configuration (can be set via environment variables)
|
||||
parser.add_argument(
|
||||
"--chunk-duration", type=int, default=600, help=argparse.SUPPRESS
|
||||
) # 10 minutes default
|
||||
parser.add_argument(
|
||||
"--max-direct-duration", type=int, default=1800, help=argparse.SUPPRESS
|
||||
) # 30 minutes
|
||||
parser.add_argument("--model-size", default="tiny", help=argparse.SUPPRESS)
|
||||
parser.add_argument("--compute-type", default="int8", help=argparse.SUPPRESS)
|
||||
parser.add_argument(
|
||||
"--monitor-interval", type=int, default=60, help=argparse.SUPPRESS
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Allow environment variable overrides
|
||||
chunk_duration_str = os.environ.get("MOMENTRY_ASR_CHUNK_DURATION")
|
||||
if chunk_duration_str is not None:
|
||||
chunk_duration = int(chunk_duration_str)
|
||||
else:
|
||||
chunk_duration = args.chunk_duration
|
||||
|
||||
max_direct_duration_str = os.environ.get("MOMENTRY_ASR_MAX_DIRECT_DURATION")
|
||||
if max_direct_duration_str is not None:
|
||||
max_direct_duration = int(max_direct_duration_str)
|
||||
else:
|
||||
max_direct_duration = args.max_direct_duration
|
||||
|
||||
model_size = os.environ.get("MOMENTRY_ASR_MODEL_SIZE")
|
||||
if model_size is None:
|
||||
model_size = args.model_size
|
||||
|
||||
compute_type = os.environ.get("MOMENTRY_ASR_COMPUTE_TYPE")
|
||||
if compute_type is None:
|
||||
compute_type = args.compute_type
|
||||
|
||||
run_asr(
|
||||
args.video_path,
|
||||
args.output_path,
|
||||
args.uuid,
|
||||
chunk_duration,
|
||||
max_direct_duration,
|
||||
model_size,
|
||||
compute_type,
|
||||
)
|
||||
Reference in New Issue
Block a user