417 lines
15 KiB
Python
Executable File
417 lines
15 KiB
Python
Executable File
#!/opt/homebrew/bin/python3.11
|
||
"""
|
||
ASR Processor - faster-whisper small model (Production)
|
||
|
||
Version: 2.1
|
||
Model: small (int8 quantization, CPU)
|
||
Reason: small 模型在準確率和速度間取得最佳平衡
|
||
經實驗驗證,最少要使用 small 才可以較好的處理多語種及台灣腔國語
|
||
|
||
Configuration:
|
||
- Model: faster-whisper/small
|
||
- Device: CPU (MPS not supported by faster_whisper)
|
||
- Compute: int8
|
||
- Beam size: 5
|
||
- VAD filter: enabled (min_silence=500ms, speech_pad=200ms)
|
||
- Audio fallback: ffmpeg extraction for PyAV-incompatible streams (v2.1)
|
||
"""
|
||
import sys
|
||
import json
|
||
import os
|
||
import time
|
||
import argparse
|
||
import signal
|
||
import subprocess
|
||
import tempfile
|
||
from faster_whisper import WhisperModel
|
||
|
||
PROCESSOR_VERSION = "2.1"
|
||
MODEL_SIZE = "small"
|
||
DEVICE = "cpu"
|
||
COMPUTE_TYPE = "int8"
|
||
|
||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||
from redis_publisher import RedisPublisher
|
||
|
||
|
||
def signal_handler(signum, frame):
|
||
print(f"ASR: Received signal {signum}, exiting...")
|
||
sys.exit(1)
|
||
|
||
|
||
def has_audio_stream(video_path):
|
||
"""Check if video file has audio stream using ffprobe."""
|
||
try:
|
||
cmd = [
|
||
"ffprobe",
|
||
"-v",
|
||
"error",
|
||
"-select_streams",
|
||
"a",
|
||
"-show_entries",
|
||
"stream=codec_type",
|
||
"-of",
|
||
"csv=p=0",
|
||
video_path,
|
||
]
|
||
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
|
||
return bool(result.stdout.strip())
|
||
except subprocess.CalledProcessError:
|
||
return False
|
||
except FileNotFoundError:
|
||
print("WARNING: ffprobe not found, assuming audio exists")
|
||
return True
|
||
|
||
|
||
def extract_audio_with_ffmpeg(video_path):
|
||
"""Extract audio from video to WAV using ffmpeg.
|
||
|
||
Returns path to temporary WAV file. Caller is responsible for cleanup.
|
||
"""
|
||
wav_path = tempfile.mktemp(suffix=".wav", prefix="asr_audio_")
|
||
cmd = [
|
||
"ffmpeg",
|
||
"-y",
|
||
"-i", video_path,
|
||
"-vn",
|
||
"-acodec", "pcm_s16le",
|
||
"-ar", "16000",
|
||
"-ac", "1",
|
||
wav_path,
|
||
]
|
||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||
if result.returncode != 0:
|
||
sys.stderr.write(f"ASR: ffmpeg extraction failed: {result.stderr}\n")
|
||
sys.stderr.flush()
|
||
return None
|
||
return wav_path
|
||
|
||
|
||
def transcribe_with_fallback(model, video_path, publisher=None):
|
||
"""Transcribe video with fallback to ffmpeg-extracted WAV.
|
||
|
||
First tries direct transcription (PyAV). If PyAV fails to decode,
|
||
falls back to ffmpeg audio extraction then transcription.
|
||
"""
|
||
# Try direct transcription first
|
||
try:
|
||
if publisher:
|
||
publisher.info("asr", "Direct transcription attempt...")
|
||
return model.transcribe(
|
||
video_path,
|
||
beam_size=5,
|
||
vad_filter=True,
|
||
vad_parameters=dict(min_silence_duration_ms=500, speech_pad_ms=200),
|
||
)
|
||
except Exception as e:
|
||
error_str = str(e)
|
||
# Check if it's a PyAV/av decoding error
|
||
is_pyav_error = any(
|
||
keyword in error_str.lower()
|
||
for keyword in ["av.error", "avcodec", "decode", "packet"]
|
||
)
|
||
|
||
if not is_pyav_error:
|
||
raise # Re-raise non-PyAV errors
|
||
|
||
if publisher:
|
||
publisher.info("asr", "PyAV decode failed, falling back to ffmpeg extraction...")
|
||
sys.stderr.write("ASR: PyAV decode error detected, falling back to ffmpeg extraction\n")
|
||
sys.stderr.flush()
|
||
|
||
wav_path = extract_audio_with_ffmpeg(video_path)
|
||
if wav_path is None:
|
||
raise RuntimeError("Failed to extract audio with ffmpeg")
|
||
|
||
try:
|
||
if publisher:
|
||
publisher.info("asr", "Transcribing extracted WAV audio...")
|
||
segments, info = model.transcribe(
|
||
wav_path,
|
||
beam_size=5,
|
||
vad_filter=True,
|
||
vad_parameters=dict(min_silence_duration_ms=500, speech_pad_ms=200),
|
||
)
|
||
return segments, info
|
||
finally:
|
||
# Clean up temporary WAV file
|
||
try:
|
||
os.remove(wav_path)
|
||
except OSError:
|
||
pass
|
||
|
||
|
||
def get_fps_from_cut(cut_path):
|
||
"""從 CUT 資料獲取 FPS"""
|
||
if os.path.exists(cut_path):
|
||
try:
|
||
with open(cut_path) as f:
|
||
cut_data = json.load(f)
|
||
fps = cut_data.get("fps")
|
||
if fps and fps > 0:
|
||
return fps
|
||
except Exception as e:
|
||
print(f"[ASR] Failed to load CUT FPS: {e}", file=sys.stderr)
|
||
return None
|
||
|
||
|
||
def get_fps_from_ffprobe(video_path):
|
||
"""從影片獲取 FPS (ffprobe)"""
|
||
try:
|
||
cmd = ["ffprobe", "-v", "error",
|
||
"-select_streams", "v:0",
|
||
"-show_entries", "stream=r_frame_rate",
|
||
"-of", "csv=p=0", video_path]
|
||
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
|
||
fps_str = result.stdout.strip()
|
||
if "/" in fps_str:
|
||
num, den = fps_str.split("/")
|
||
return float(num) / float(den)
|
||
return float(fps_str)
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def run_asr(video_path, output_path, uuid: str = "", fps: float = None):
|
||
# Set up signal handlers
|
||
signal.signal(signal.SIGTERM, signal_handler)
|
||
signal.signal(signal.SIGINT, signal_handler)
|
||
|
||
# FPS detection chain: CLI → CUT → ffprobe → FAIL
|
||
if fps is not None:
|
||
print(f"[ASR] Using CLI-provided FPS: {fps}", file=sys.stderr)
|
||
else:
|
||
cut_path_check = output_path.replace(".asr.json", ".cut.json")
|
||
fps = get_fps_from_cut(cut_path_check)
|
||
if fps:
|
||
print(f"[ASR] FPS from CUT: {fps}", file=sys.stderr)
|
||
if fps is None:
|
||
fps = get_fps_from_ffprobe(video_path)
|
||
if fps:
|
||
print(f"[ASR] FPS from ffprobe: {fps}", file=sys.stderr)
|
||
if fps is None:
|
||
print("[ASR] ERROR: Cannot determine FPS (no CUT data, ffprobe failed). Aborting.", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
publisher = RedisPublisher(uuid) if uuid else None
|
||
if publisher:
|
||
publisher.info("asr", "ASR_START")
|
||
|
||
# Check for audio stream
|
||
if not has_audio_stream(video_path):
|
||
if publisher:
|
||
publisher.info("asr", "No audio stream detected, skipping transcription")
|
||
output = {"language": "", "language_probability": 0.0, "segments": []}
|
||
with open(output_path, "w") as f:
|
||
json.dump(output, f, indent=2)
|
||
if publisher:
|
||
publisher.complete("asr", "0 segments (no audio)")
|
||
sys.stderr.write("ASR: No audio stream, skipping transcription\n")
|
||
sys.stderr.flush()
|
||
sys.exit(0)
|
||
|
||
# 嘗試以 CUT 場景分段處理(降低長片記憶體使用)
|
||
cut_scenes = []
|
||
cut_path = output_path.replace(".asr.json", ".cut.json")
|
||
if os.path.exists(cut_path):
|
||
try:
|
||
with open(cut_path) as f:
|
||
cut_data = json.load(f)
|
||
scenes = cut_data.get("scenes", [])
|
||
if scenes:
|
||
cut_scenes = [(s["start_time"], s["end_time"]) for s in scenes]
|
||
print(f"[ASR] Loaded {len(cut_scenes)} cut scenes for segmented transcription", file=sys.stderr)
|
||
except Exception as e:
|
||
print(f"[ASR] Failed to load cut scenes: {e}", file=sys.stderr)
|
||
|
||
if publisher:
|
||
publisher.info("asr", "Loading Whisper model...")
|
||
|
||
sys.stderr.write(f"[ASR] Loading Whisper model {MODEL_SIZE}...\n")
|
||
sys.stderr.flush()
|
||
model = WhisperModel(MODEL_SIZE, device="cpu", compute_type="int8")
|
||
sys.stderr.write(f"[ASR] Model loaded\n")
|
||
sys.stderr.flush()
|
||
|
||
if publisher:
|
||
publisher.info("asr", f"Transcribing: {video_path}")
|
||
|
||
results = []
|
||
total_segments = 0
|
||
|
||
if cut_scenes:
|
||
# 分段處理:對每個場景萃取音訊並轉錄
|
||
sys.stderr.write(f"[ASR] Starting segmented transcription for {len(cut_scenes)} scenes\n")
|
||
sys.stderr.flush()
|
||
import subprocess
|
||
import tempfile
|
||
temp_dir = tempfile.mkdtemp(prefix="asr_cut_")
|
||
sys.stderr.write(f"[ASR] Temp dir: {temp_dir}\n")
|
||
sys.stderr.flush()
|
||
transcript_language = None
|
||
|
||
# 建立 scene lookup: 給定時間點,找是哪個 scene
|
||
import bisect
|
||
scene_starts = [s[0] for s in cut_scenes]
|
||
def find_scene_idx(t):
|
||
i = bisect.bisect_right(scene_starts, t) - 1
|
||
return max(0, i)
|
||
|
||
# 逐段處理,每段結果即時寫入 .asr.tmp
|
||
tmp_path = output_path + ".tmp"
|
||
err_path = output_path + ".err"
|
||
all_segments = []
|
||
|
||
# Resume: 若 executor 將 .tmp rename 成 .err,先救回
|
||
if not os.path.exists(tmp_path) and os.path.exists(err_path) and os.path.getsize(err_path) > 10:
|
||
try:
|
||
os.rename(err_path, tmp_path)
|
||
sys.stderr.write(f"[ASR] Recovered .err → .tmp for resume ({os.path.getsize(tmp_path)} bytes)\n")
|
||
sys.stderr.flush()
|
||
except Exception as e:
|
||
sys.stderr.write(f"[ASR] Failed to recover .err: {e}\n")
|
||
sys.stderr.flush()
|
||
|
||
# Resume: 若已有 .asr.tmp,載入已完成的 segments 並跳過已處理的 scenes
|
||
resume_from_scene = 0
|
||
if os.path.exists(tmp_path) and os.path.getsize(tmp_path) > 10:
|
||
try:
|
||
with open(tmp_path) as f:
|
||
existing = json.load(f)
|
||
all_segments = existing.get("segments", [])
|
||
if all_segments:
|
||
# 找出最後一個 segment 的 end_time,決定 resume 起點
|
||
last_end = max(s.get("end", 0) for s in all_segments)
|
||
# 找出最後完成的 scene_idx(場景 end_time > last_end)
|
||
for i, (st, et) in enumerate(cut_scenes):
|
||
if et > last_end:
|
||
resume_from_scene = i
|
||
break
|
||
else:
|
||
resume_from_scene = len(cut_scenes) # 全部完成
|
||
# 繼承 language
|
||
if existing.get("language"):
|
||
transcript_language = existing["language"]
|
||
sys.stderr.write(f"[ASR] Resume from scene {resume_from_scene}/{len(cut_scenes)} "
|
||
f"(last segment end={last_end:.1f}s, {len(all_segments)} existing segments)\n")
|
||
sys.stderr.flush()
|
||
except Exception as e:
|
||
sys.stderr.write(f"[ASR] Failed to load tmp for resume: {e}, starting fresh\n")
|
||
sys.stderr.flush()
|
||
all_segments = []
|
||
|
||
for idx, (start_t, end_t) in enumerate(cut_scenes):
|
||
if idx < resume_from_scene:
|
||
continue # 跳過已處理的 scenes
|
||
seg_wav = os.path.join(temp_dir, f"seg_{idx:04d}.wav")
|
||
sys.stderr.write(f"[ASR] Scene {idx}: {start_t:.1f}-{end_t:.1f}s\n")
|
||
sys.stderr.flush()
|
||
# 用 ffmpeg 萃取出該段音訊
|
||
t0 = time.time()
|
||
cmd = ["ffmpeg", "-y", "-v", "quiet", "-i", video_path,
|
||
"-ss", str(start_t), "-to", str(end_t),
|
||
"-ar", "16000", "-ac", "1", seg_wav]
|
||
subprocess.run(cmd, check=False, capture_output=True)
|
||
sys.stderr.write(f"[ASR] Scene {idx}: ffmpeg took {time.time()-t0:.1f}s\n")
|
||
sys.stderr.flush()
|
||
|
||
if not os.path.exists(seg_wav) or os.path.getsize(seg_wav) < 100:
|
||
sys.stderr.write(f"[ASR] Scene {idx}: empty audio, skipping\n")
|
||
sys.stderr.flush()
|
||
continue
|
||
|
||
try:
|
||
t1 = time.time()
|
||
seg_result, seg_info = model.transcribe(
|
||
seg_wav, beam_size=5,
|
||
vad_filter=True,
|
||
vad_parameters=dict(min_silence_duration_ms=500, speech_pad_ms=200),
|
||
)
|
||
sys.stderr.write(f"[ASR] Scene {idx}: transcribe took {time.time()-t1:.1f}s, language={seg_info.language}\n")
|
||
sys.stderr.flush()
|
||
|
||
scene_segments = []
|
||
seg_language = seg_info.language if seg_info else transcript_language
|
||
for segment in seg_result:
|
||
seg_start = start_t + segment.start
|
||
seg_end = start_t + segment.end
|
||
scene_idx = find_scene_idx((seg_start + seg_end) / 2)
|
||
scene_segments.append({
|
||
"start_time": seg_start,
|
||
"end_time": seg_end,
|
||
"start_frame": int(round(seg_start * fps)),
|
||
"end_frame": int(round(seg_end * fps)),
|
||
"text": segment.text.strip(),
|
||
"scene_number": scene_idx + 1,
|
||
"language": seg_language,
|
||
})
|
||
total_segments += 1
|
||
|
||
# 當前 scene 結果寫入 .asr.tmp
|
||
all_segments.extend(scene_segments)
|
||
with open(tmp_path, "w") as f:
|
||
json.dump({"language": transcript_language or "", "segments": all_segments}, f)
|
||
|
||
if total_segments % 100 == 0:
|
||
if publisher:
|
||
publisher.progress("asr", total_segments, 0, f"Segment {total_segments}")
|
||
except Exception as e:
|
||
print(f"[ASR] Segment {idx} failed: {e}", file=sys.stderr)
|
||
|
||
# 清理暫存 WAV
|
||
try: os.remove(seg_wav)
|
||
except: pass
|
||
|
||
try: os.rmdir(temp_dir)
|
||
except: pass
|
||
|
||
info_language = transcript_language or "unknown"
|
||
print(f"[ASR] Segmented transcription complete: {total_segments} segments", file=sys.stderr)
|
||
else:
|
||
# 無 CUT 資料,直接轉錄(原有流程)
|
||
segments, info = transcribe_with_fallback(model, video_path, publisher)
|
||
info_language = info.language
|
||
|
||
tmp_path = output_path + ".tmp"
|
||
all_segments = []
|
||
for segment in segments:
|
||
all_segments.append({
|
||
"start_time": segment.start,
|
||
"end_time": segment.end,
|
||
"start_frame": int(round(segment.start * fps)),
|
||
"end_frame": int(round(segment.end * fps)),
|
||
"text": segment.text.strip(),
|
||
})
|
||
total_segments += 1
|
||
if total_segments % 100 == 0:
|
||
if publisher:
|
||
publisher.progress("asr", total_segments, 0, f"Segment {total_segments}")
|
||
with open(tmp_path, "w") as f:
|
||
json.dump({"language": info_language, "segments": all_segments}, f)
|
||
|
||
if publisher:
|
||
publisher.info("asr", f"ASR_LANGUAGE:{info_language}")
|
||
|
||
# rename .tmp → .json
|
||
os.rename(tmp_path, output_path)
|
||
|
||
if publisher:
|
||
publisher.complete("asr", f"{len(results)} segments")
|
||
|
||
sys.stderr.write(
|
||
f"ASR: Transcription complete, {len(results)} segments written to {output_path}\n"
|
||
)
|
||
sys.stderr.flush()
|
||
sys.exit(0)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
parser = argparse.ArgumentParser(description="ASR Transcription")
|
||
parser.add_argument("video_path", help="Path to video file")
|
||
parser.add_argument("output_path", help="Output JSON path")
|
||
parser.add_argument("--uuid", "-u", help="UUID for Redis progress", default="")
|
||
parser.add_argument("--fps", type=float, help="Override FPS (default: auto-detect)")
|
||
args = parser.parse_args()
|
||
|
||
run_asr(args.video_path, args.output_path, args.uuid, fps=args.fps)
|