Files
momentry_core/scripts/asr_processor.py
2026-05-19 13:23:09 +08:00

417 lines
15 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/opt/homebrew/bin/python3.11
"""
ASR Processor - faster-whisper small model (Production)
Version: 2.1
Model: small (int8 quantization, CPU)
Reason: small 模型在準確率和速度間取得最佳平衡
經實驗驗證,最少要使用 small 才可以較好的處理多語種及台灣腔國語
Configuration:
- Model: faster-whisper/small
- Device: CPU (MPS not supported by faster_whisper)
- Compute: int8
- Beam size: 5
- VAD filter: enabled (min_silence=500ms, speech_pad=200ms)
- Audio fallback: ffmpeg extraction for PyAV-incompatible streams (v2.1)
"""
import sys
import json
import os
import time
import argparse
import signal
import subprocess
import tempfile
from faster_whisper import WhisperModel
PROCESSOR_VERSION = "2.1"
MODEL_SIZE = "small"
DEVICE = "cpu"
COMPUTE_TYPE = "int8"
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from redis_publisher import RedisPublisher
def signal_handler(signum, frame):
print(f"ASR: Received signal {signum}, exiting...")
sys.exit(1)
def has_audio_stream(video_path):
"""Check if video file has audio stream using ffprobe."""
try:
cmd = [
"ffprobe",
"-v",
"error",
"-select_streams",
"a",
"-show_entries",
"stream=codec_type",
"-of",
"csv=p=0",
video_path,
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return bool(result.stdout.strip())
except subprocess.CalledProcessError:
return False
except FileNotFoundError:
print("WARNING: ffprobe not found, assuming audio exists")
return True
def extract_audio_with_ffmpeg(video_path):
"""Extract audio from video to WAV using ffmpeg.
Returns path to temporary WAV file. Caller is responsible for cleanup.
"""
wav_path = tempfile.mktemp(suffix=".wav", prefix="asr_audio_")
cmd = [
"ffmpeg",
"-y",
"-i", video_path,
"-vn",
"-acodec", "pcm_s16le",
"-ar", "16000",
"-ac", "1",
wav_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
sys.stderr.write(f"ASR: ffmpeg extraction failed: {result.stderr}\n")
sys.stderr.flush()
return None
return wav_path
def transcribe_with_fallback(model, video_path, publisher=None):
"""Transcribe video with fallback to ffmpeg-extracted WAV.
First tries direct transcription (PyAV). If PyAV fails to decode,
falls back to ffmpeg audio extraction then transcription.
"""
# Try direct transcription first
try:
if publisher:
publisher.info("asr", "Direct transcription attempt...")
return model.transcribe(
video_path,
beam_size=5,
vad_filter=True,
vad_parameters=dict(min_silence_duration_ms=500, speech_pad_ms=200),
)
except Exception as e:
error_str = str(e)
# Check if it's a PyAV/av decoding error
is_pyav_error = any(
keyword in error_str.lower()
for keyword in ["av.error", "avcodec", "decode", "packet"]
)
if not is_pyav_error:
raise # Re-raise non-PyAV errors
if publisher:
publisher.info("asr", "PyAV decode failed, falling back to ffmpeg extraction...")
sys.stderr.write("ASR: PyAV decode error detected, falling back to ffmpeg extraction\n")
sys.stderr.flush()
wav_path = extract_audio_with_ffmpeg(video_path)
if wav_path is None:
raise RuntimeError("Failed to extract audio with ffmpeg")
try:
if publisher:
publisher.info("asr", "Transcribing extracted WAV audio...")
segments, info = model.transcribe(
wav_path,
beam_size=5,
vad_filter=True,
vad_parameters=dict(min_silence_duration_ms=500, speech_pad_ms=200),
)
return segments, info
finally:
# Clean up temporary WAV file
try:
os.remove(wav_path)
except OSError:
pass
def get_fps_from_cut(cut_path):
"""從 CUT 資料獲取 FPS"""
if os.path.exists(cut_path):
try:
with open(cut_path) as f:
cut_data = json.load(f)
fps = cut_data.get("fps")
if fps and fps > 0:
return fps
except Exception as e:
print(f"[ASR] Failed to load CUT FPS: {e}", file=sys.stderr)
return None
def get_fps_from_ffprobe(video_path):
"""從影片獲取 FPS (ffprobe)"""
try:
cmd = ["ffprobe", "-v", "error",
"-select_streams", "v:0",
"-show_entries", "stream=r_frame_rate",
"-of", "csv=p=0", video_path]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
fps_str = result.stdout.strip()
if "/" in fps_str:
num, den = fps_str.split("/")
return float(num) / float(den)
return float(fps_str)
except Exception:
return None
def run_asr(video_path, output_path, uuid: str = "", fps: float = None):
# Set up signal handlers
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# FPS detection chain: CLI → CUT → ffprobe → FAIL
if fps is not None:
print(f"[ASR] Using CLI-provided FPS: {fps}", file=sys.stderr)
else:
cut_path_check = output_path.replace(".asr.json", ".cut.json")
fps = get_fps_from_cut(cut_path_check)
if fps:
print(f"[ASR] FPS from CUT: {fps}", file=sys.stderr)
if fps is None:
fps = get_fps_from_ffprobe(video_path)
if fps:
print(f"[ASR] FPS from ffprobe: {fps}", file=sys.stderr)
if fps is None:
print("[ASR] ERROR: Cannot determine FPS (no CUT data, ffprobe failed). Aborting.", file=sys.stderr)
sys.exit(1)
publisher = RedisPublisher(uuid) if uuid else None
if publisher:
publisher.info("asr", "ASR_START")
# Check for audio stream
if not has_audio_stream(video_path):
if publisher:
publisher.info("asr", "No audio stream detected, skipping transcription")
output = {"language": "", "language_probability": 0.0, "segments": []}
with open(output_path, "w") as f:
json.dump(output, f, indent=2)
if publisher:
publisher.complete("asr", "0 segments (no audio)")
sys.stderr.write("ASR: No audio stream, skipping transcription\n")
sys.stderr.flush()
sys.exit(0)
# 嘗試以 CUT 場景分段處理(降低長片記憶體使用)
cut_scenes = []
cut_path = output_path.replace(".asr.json", ".cut.json")
if os.path.exists(cut_path):
try:
with open(cut_path) as f:
cut_data = json.load(f)
scenes = cut_data.get("scenes", [])
if scenes:
cut_scenes = [(s["start_time"], s["end_time"]) for s in scenes]
print(f"[ASR] Loaded {len(cut_scenes)} cut scenes for segmented transcription", file=sys.stderr)
except Exception as e:
print(f"[ASR] Failed to load cut scenes: {e}", file=sys.stderr)
if publisher:
publisher.info("asr", "Loading Whisper model...")
sys.stderr.write(f"[ASR] Loading Whisper model {MODEL_SIZE}...\n")
sys.stderr.flush()
model = WhisperModel(MODEL_SIZE, device="cpu", compute_type="int8")
sys.stderr.write(f"[ASR] Model loaded\n")
sys.stderr.flush()
if publisher:
publisher.info("asr", f"Transcribing: {video_path}")
results = []
total_segments = 0
if cut_scenes:
# 分段處理:對每個場景萃取音訊並轉錄
sys.stderr.write(f"[ASR] Starting segmented transcription for {len(cut_scenes)} scenes\n")
sys.stderr.flush()
import subprocess
import tempfile
temp_dir = tempfile.mkdtemp(prefix="asr_cut_")
sys.stderr.write(f"[ASR] Temp dir: {temp_dir}\n")
sys.stderr.flush()
transcript_language = None
# 建立 scene lookup: 給定時間點,找是哪個 scene
import bisect
scene_starts = [s[0] for s in cut_scenes]
def find_scene_idx(t):
i = bisect.bisect_right(scene_starts, t) - 1
return max(0, i)
# 逐段處理,每段結果即時寫入 .asr.tmp
tmp_path = output_path + ".tmp"
err_path = output_path + ".err"
all_segments = []
# Resume: 若 executor 將 .tmp rename 成 .err先救回
if not os.path.exists(tmp_path) and os.path.exists(err_path) and os.path.getsize(err_path) > 10:
try:
os.rename(err_path, tmp_path)
sys.stderr.write(f"[ASR] Recovered .err → .tmp for resume ({os.path.getsize(tmp_path)} bytes)\n")
sys.stderr.flush()
except Exception as e:
sys.stderr.write(f"[ASR] Failed to recover .err: {e}\n")
sys.stderr.flush()
# Resume: 若已有 .asr.tmp載入已完成的 segments 並跳過已處理的 scenes
resume_from_scene = 0
if os.path.exists(tmp_path) and os.path.getsize(tmp_path) > 10:
try:
with open(tmp_path) as f:
existing = json.load(f)
all_segments = existing.get("segments", [])
if all_segments:
# 找出最後一個 segment 的 end_time決定 resume 起點
last_end = max(s.get("end", 0) for s in all_segments)
# 找出最後完成的 scene_idx場景 end_time > last_end
for i, (st, et) in enumerate(cut_scenes):
if et > last_end:
resume_from_scene = i
break
else:
resume_from_scene = len(cut_scenes) # 全部完成
# 繼承 language
if existing.get("language"):
transcript_language = existing["language"]
sys.stderr.write(f"[ASR] Resume from scene {resume_from_scene}/{len(cut_scenes)} "
f"(last segment end={last_end:.1f}s, {len(all_segments)} existing segments)\n")
sys.stderr.flush()
except Exception as e:
sys.stderr.write(f"[ASR] Failed to load tmp for resume: {e}, starting fresh\n")
sys.stderr.flush()
all_segments = []
for idx, (start_t, end_t) in enumerate(cut_scenes):
if idx < resume_from_scene:
continue # 跳過已處理的 scenes
seg_wav = os.path.join(temp_dir, f"seg_{idx:04d}.wav")
sys.stderr.write(f"[ASR] Scene {idx}: {start_t:.1f}-{end_t:.1f}s\n")
sys.stderr.flush()
# 用 ffmpeg 萃取出該段音訊
t0 = time.time()
cmd = ["ffmpeg", "-y", "-v", "quiet", "-i", video_path,
"-ss", str(start_t), "-to", str(end_t),
"-ar", "16000", "-ac", "1", seg_wav]
subprocess.run(cmd, check=False, capture_output=True)
sys.stderr.write(f"[ASR] Scene {idx}: ffmpeg took {time.time()-t0:.1f}s\n")
sys.stderr.flush()
if not os.path.exists(seg_wav) or os.path.getsize(seg_wav) < 100:
sys.stderr.write(f"[ASR] Scene {idx}: empty audio, skipping\n")
sys.stderr.flush()
continue
try:
t1 = time.time()
seg_result, seg_info = model.transcribe(
seg_wav, beam_size=5,
vad_filter=True,
vad_parameters=dict(min_silence_duration_ms=500, speech_pad_ms=200),
)
sys.stderr.write(f"[ASR] Scene {idx}: transcribe took {time.time()-t1:.1f}s, language={seg_info.language}\n")
sys.stderr.flush()
scene_segments = []
seg_language = seg_info.language if seg_info else transcript_language
for segment in seg_result:
seg_start = start_t + segment.start
seg_end = start_t + segment.end
scene_idx = find_scene_idx((seg_start + seg_end) / 2)
scene_segments.append({
"start_time": seg_start,
"end_time": seg_end,
"start_frame": int(round(seg_start * fps)),
"end_frame": int(round(seg_end * fps)),
"text": segment.text.strip(),
"scene_number": scene_idx + 1,
"language": seg_language,
})
total_segments += 1
# 當前 scene 結果寫入 .asr.tmp
all_segments.extend(scene_segments)
with open(tmp_path, "w") as f:
json.dump({"language": transcript_language or "", "segments": all_segments}, f)
if total_segments % 100 == 0:
if publisher:
publisher.progress("asr", total_segments, 0, f"Segment {total_segments}")
except Exception as e:
print(f"[ASR] Segment {idx} failed: {e}", file=sys.stderr)
# 清理暫存 WAV
try: os.remove(seg_wav)
except: pass
try: os.rmdir(temp_dir)
except: pass
info_language = transcript_language or "unknown"
print(f"[ASR] Segmented transcription complete: {total_segments} segments", file=sys.stderr)
else:
# 無 CUT 資料,直接轉錄(原有流程)
segments, info = transcribe_with_fallback(model, video_path, publisher)
info_language = info.language
tmp_path = output_path + ".tmp"
all_segments = []
for segment in segments:
all_segments.append({
"start_time": segment.start,
"end_time": segment.end,
"start_frame": int(round(segment.start * fps)),
"end_frame": int(round(segment.end * fps)),
"text": segment.text.strip(),
})
total_segments += 1
if total_segments % 100 == 0:
if publisher:
publisher.progress("asr", total_segments, 0, f"Segment {total_segments}")
with open(tmp_path, "w") as f:
json.dump({"language": info_language, "segments": all_segments}, f)
if publisher:
publisher.info("asr", f"ASR_LANGUAGE:{info_language}")
# rename .tmp → .json
os.rename(tmp_path, output_path)
if publisher:
publisher.complete("asr", f"{len(results)} segments")
sys.stderr.write(
f"ASR: Transcription complete, {len(results)} segments written to {output_path}\n"
)
sys.stderr.flush()
sys.exit(0)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="ASR Transcription")
parser.add_argument("video_path", help="Path to video file")
parser.add_argument("output_path", help="Output JSON path")
parser.add_argument("--uuid", "-u", help="UUID for Redis progress", default="")
parser.add_argument("--fps", type=float, help="Override FPS (default: auto-detect)")
args = parser.parse_args()
run_asr(args.video_path, args.output_path, args.uuid, fps=args.fps)