Files
momentry_core/scripts/asrx_processor_v2.py
Warren 8f05a7c188 feat: update Python processors and add utility scripts
- Update ASR, face, OCR, pose processors
- Add release pre-flight check script
- Add synonym generation, chunk processing scripts
- Add face recognition, stamp search utilities
2026-04-30 15:07:49 +08:00

213 lines
6.7 KiB
Python
Executable File

#!/opt/homebrew/bin/python3.11
"""
ASRX 處理器 v2 - 說話人分離
使用 whisperx 進行轉錄和說話人分離
需要 PyTorch 2.5.0 + torchvision 0.20.0 + torchaudio 2.5.0
"""
# Fix for PyTorch 2.5 compatibility
import os
os.environ["TORCH_FORCE_WEIGHTS_ONLY_LOAD"] = "0"
import sys
import json
import argparse
import signal
import subprocess
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from redis_publisher import RedisPublisher
def signal_handler(signum, frame):
print(f"ASRX: Received signal {signum}, exiting...")
sys.exit(1)
def has_audio_stream(video_path):
"""Check if video file has audio stream using ffprobe."""
try:
cmd = [
"ffprobe",
"-v",
"error",
"-select_streams",
"a",
"-show_entries",
"stream=codec_type",
"-of",
"csv=p=0",
video_path,
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return bool(result.stdout.strip())
except subprocess.CalledProcessError:
return False
except FileNotFoundError:
print("WARNING: ffprobe not found, assuming audio exists")
return True
def process_asrx(video_path: str, output_path: str, uuid: str = "", skip_diarization: bool = False):
"""
Process video for speaker diarization using whisperx
Args:
video_path: Path to video file
output_path: Path to output JSON
uuid: UUID for Redis progress
skip_diarization: Skip speaker diarization (only transcription)
"""
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
publisher = RedisPublisher(uuid) if uuid else None
if publisher:
publisher.info("asrx", "ASRX_START")
# Check for audio stream
if not has_audio_stream(video_path):
if publisher:
publisher.info("asrx", "No audio stream detected, skipping transcription")
output = {"language": "", "language_probability": 0.0, "segments": []}
with open(output_path, "w") as f:
json.dump(output, f, indent=2)
if publisher:
publisher.complete("asrx", "0 segments (no audio)")
sys.stderr.write("ASRX: No audio stream, skipping transcription\n")
sys.stderr.flush()
sys.exit(0)
if publisher:
publisher.info("asrx", "ASRX_LOADING_MODEL")
try:
import whisperx
import torch
except ImportError as e:
if publisher:
publisher.error("asrx", f"Missing dependency: {e}")
result = {"language": None, "segments": [], "error": str(e)}
if publisher:
publisher.complete("asrx", "0 segments")
with open(output_path, "w") as f:
json.dump(result, f, indent=2)
sys.exit(1)
try:
# Load model
if publisher:
publisher.info("asrx", "Loading whisperx base model (this may take a while)...")
model = whisperx.load_model("base", device="cpu", compute_type="int8")
if publisher:
publisher.info("asrx", "ASRX_TRANSCRIBING")
# Transcribe with language detection
result = model.transcribe(video_path)
if publisher:
publisher.info("asrx", f"ASRX_LANGUAGE:{result.get('language', 'unknown')}")
# Align timestamps
if publisher:
publisher.info("asrx", "ASRX_ALIGNING_TIMESTAMPS")
model_a, metadata = whisperx.load_align_model(
language_code=result["language"],
device="cpu"
)
result = whisperx.align(
result["segments"],
model_a,
metadata,
video_path,
device="cpu"
)
# Diarization (speaker segmentation)
if not skip_diarization:
if publisher:
publisher.info("asrx", "ASRX_DIARIZATION")
try:
diarize_model = whisperx.DiarizationPipeline(use_auth_token=None)
diarize_segments = diarize_model(video_path)
# Assign speaker labels
result = whisperx.assign_word_speakers(diarize_segments, result)
if publisher:
publisher.info("asrx", "Diarization completed")
except Exception as e:
if publisher:
publisher.info("asrx", f"Diarization skipped: {e}")
sys.stderr.write(f"ASRX: Diarization failed: {e}\n")
# Build output
segments = []
for seg in result.get("segments", []):
text = seg.get("text", "").strip()
if text:
segments.append(
{
"start": seg.get("start", 0.0),
"end": seg.get("end", 0.0),
"text": text,
"speaker_id": seg.get("speaker", None),
}
)
output_result = {
"language": result.get("language"),
"language_probability": result.get("language_probability", 0),
"segments": segments,
"diarization_enabled": not skip_diarization
}
if publisher:
publisher.complete("asrx", f"{len(segments)} segments")
with open(output_path, "w") as f:
json.dump(output_result, f, indent=2, ensure_ascii=False)
sys.stderr.write(
f"ASRX: Transcription complete, {len(segments)} segments written to {output_path}\n"
)
sys.stderr.flush()
sys.exit(0)
except Exception as e:
if publisher:
publisher.error("asrx", f"Error: {e}")
import traceback
traceback.print_exc()
result = {"language": None, "segments": [], "error": str(e)}
if publisher:
publisher.complete("asrx", "0 segments (error)")
with open(output_path, "w") as f:
json.dump(result, f, indent=2)
sys.exit(1)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="ASRX Speaker Diarization v2")
parser.add_argument("video_path", help="Path to video file")
parser.add_argument("output_path", help="Output JSON path")
parser.add_argument("--uuid", "-u", help="UUID for Redis progress", default="")
parser.add_argument(
"--skip-diarization",
action="store_true",
help="Skip speaker diarization (only transcription)"
)
args = parser.parse_args()
process_asrx(
args.video_path,
args.output_path,
args.uuid,
args.skip_diarization
)