- Update ASR, face, OCR, pose processors - Add release pre-flight check script - Add synonym generation, chunk processing scripts - Add face recognition, stamp search utilities
340 lines
11 KiB
Python
340 lines
11 KiB
Python
#!/opt/homebrew/bin/python3.11
|
||
"""
|
||
ASR Processor - 簡化標準化版本
|
||
|
||
功能:執行自動語音識別處理
|
||
輸入:視頻文件路徑,輸出文件路徑
|
||
輸出:JSON 格式的語音識別結果
|
||
|
||
標準化特性:
|
||
1. 移除不必要的監控邏輯
|
||
2. 簡化架構(<300 行)
|
||
3. 統一的錯誤處理
|
||
4. 標準化的輸出格式
|
||
5. 配置參數化
|
||
"""
|
||
|
||
import sys
|
||
import json
|
||
import os
|
||
import argparse
|
||
import signal
|
||
import tempfile
|
||
import time
|
||
import subprocess
|
||
from typing import Dict, Any, Tuple
|
||
import traceback
|
||
|
||
|
||
# 環境檢查
|
||
def check_environment() -> Tuple[bool, str]:
|
||
"""檢查必要的環境和依賴"""
|
||
try:
|
||
# 檢查 Whisper
|
||
import whisper
|
||
|
||
# 檢查 ffmpeg/ffprobe
|
||
result = subprocess.run(["ffprobe", "-version"], capture_output=True, text=True)
|
||
if result.returncode != 0:
|
||
return False, "ffprobe not found or not working"
|
||
|
||
return True, "Environment OK"
|
||
|
||
except ImportError as e:
|
||
return False, f"Missing dependency: {e}"
|
||
except Exception as e:
|
||
return False, f"Environment check failed: {e}"
|
||
|
||
|
||
# 信號處理
|
||
def signal_handler(signum, frame):
|
||
"""處理中斷信號"""
|
||
print(f"[ASR] Received signal {signum}, cleaning up...", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
|
||
# Whisper 模型緩存
|
||
_whisper_model_cache = {}
|
||
|
||
|
||
def get_whisper_model(model_name: str = "base"):
|
||
"""獲取 Whisper 模型(帶緩存)"""
|
||
if model_name not in _whisper_model_cache:
|
||
import whisper
|
||
|
||
print(f"[ASR] Loading Whisper model: {model_name}", file=sys.stderr)
|
||
_whisper_model_cache[model_name] = whisper.load_model(model_name)
|
||
return _whisper_model_cache[model_name]
|
||
|
||
|
||
# 主要處理類
|
||
class ASRProcessor:
|
||
def __init__(
|
||
self,
|
||
video_path: str,
|
||
output_path: str,
|
||
model_name: str = "base",
|
||
chunk_size: int = 300,
|
||
):
|
||
self.video_path = video_path
|
||
self.output_path = output_path
|
||
self.model_name = model_name
|
||
self.chunk_size = chunk_size # 分塊大小(秒)
|
||
self.start_time = time.time()
|
||
|
||
def validate_input(self) -> Tuple[bool, str]:
|
||
"""驗證輸入文件"""
|
||
if not os.path.exists(self.video_path):
|
||
return False, f"Video file not found: {self.video_path}"
|
||
|
||
# 檢查是否有音頻流
|
||
if not self._has_audio_stream():
|
||
return False, f"No audio stream found in: {self.video_path}"
|
||
|
||
return True, "Input validation passed"
|
||
|
||
def _has_audio_stream(self) -> bool:
|
||
"""檢查視頻文件是否有音頻流"""
|
||
try:
|
||
cmd = [
|
||
"ffprobe",
|
||
"-v",
|
||
"error",
|
||
"-select_streams",
|
||
"a",
|
||
"-show_entries",
|
||
"stream=codec_type",
|
||
"-of",
|
||
"csv=p=0",
|
||
self.video_path,
|
||
]
|
||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||
return "audio" in result.stdout
|
||
except Exception:
|
||
return False
|
||
|
||
def _get_media_duration(self) -> float:
|
||
"""獲取媒體文件時長(秒)"""
|
||
try:
|
||
cmd = [
|
||
"ffprobe",
|
||
"-v",
|
||
"error",
|
||
"-show_entries",
|
||
"format=duration",
|
||
"-of",
|
||
"csv=p=0",
|
||
self.video_path,
|
||
]
|
||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||
return float(result.stdout.strip())
|
||
except Exception as e:
|
||
print(f"[ASR] Warning: Failed to get duration: {e}", file=sys.stderr)
|
||
return 0.0
|
||
|
||
def _extract_audio(self, audio_path: str) -> bool:
|
||
"""提取音頻到臨時文件"""
|
||
try:
|
||
cmd = [
|
||
"ffmpeg",
|
||
"-i",
|
||
self.video_path,
|
||
"-vn", # 禁用視頻
|
||
"-acodec",
|
||
"pcm_s16le", # PCM 16-bit 小端
|
||
"-ar",
|
||
"16000", # 16kHz 採樣率
|
||
"-ac",
|
||
"1", # 單聲道
|
||
"-y", # 覆蓋輸出文件
|
||
audio_path,
|
||
]
|
||
|
||
print(f"[ASR] Extracting audio to: {audio_path}", file=sys.stderr)
|
||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||
|
||
if result.returncode != 0:
|
||
print(
|
||
f"[ASR] Audio extraction failed: {result.stderr}", file=sys.stderr
|
||
)
|
||
return False
|
||
|
||
return os.path.exists(audio_path) and os.path.getsize(audio_path) > 0
|
||
|
||
except Exception as e:
|
||
print(f"[ASR] Audio extraction error: {e}", file=sys.stderr)
|
||
return False
|
||
|
||
def process(self) -> Dict[str, Any]:
|
||
"""執行 ASR 處理邏輯"""
|
||
try:
|
||
# 1. 準備工作目錄
|
||
work_dir = tempfile.mkdtemp(prefix="asr_")
|
||
print(f"[ASR] Working directory: {work_dir}", file=sys.stderr)
|
||
|
||
# 2. 獲取媒體時長
|
||
duration = self._get_media_duration()
|
||
print(f"[ASR] Media duration: {duration:.2f} seconds", file=sys.stderr)
|
||
|
||
# 3. 根據時長決定處理策略
|
||
if duration <= self.chunk_size or self.chunk_size <= 0:
|
||
# 小文件或不分塊:直接處理
|
||
result = self._process_single_file(work_dir)
|
||
else:
|
||
# 大文件:分塊處理
|
||
result = self._process_chunked(work_dir, duration)
|
||
|
||
# 4. 添加元數據
|
||
processing_time = time.time() - self.start_time
|
||
result["metadata"] = {
|
||
"processing_time": processing_time,
|
||
"video_path": self.video_path,
|
||
"duration": duration,
|
||
"model": self.model_name,
|
||
"chunk_size": self.chunk_size,
|
||
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
|
||
"module_version": "1.0.0",
|
||
}
|
||
|
||
# 5. 清理工作目錄
|
||
try:
|
||
import shutil
|
||
|
||
shutil.rmtree(work_dir)
|
||
print("[ASR] Cleaned up working directory", file=sys.stderr)
|
||
except Exception as e:
|
||
print(f"[ASR] Warning: Failed to clean up: {e}", file=sys.stderr)
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
print(f"[ASR] Processing failed: {e}", file=sys.stderr)
|
||
print(f"[ASR] Traceback: {traceback.format_exc()}", file=sys.stderr)
|
||
raise
|
||
|
||
def _process_single_file(self, work_dir: str) -> Dict[str, Any]:
|
||
"""處理單個文件(不分塊)"""
|
||
# 1. 提取音頻
|
||
audio_path = os.path.join(work_dir, "audio.wav")
|
||
if not self._extract_audio(audio_path):
|
||
raise RuntimeError("Failed to extract audio")
|
||
|
||
# 2. 加載模型
|
||
model = get_whisper_model(self.model_name)
|
||
|
||
# 3. 執行轉錄
|
||
print("[ASR] Transcribing audio...", file=sys.stderr)
|
||
|
||
result = model.transcribe(audio_path)
|
||
|
||
# 4. 格式化結果
|
||
segments = []
|
||
for segment in result.get("segments", []):
|
||
segments.append(
|
||
{
|
||
"start": segment.get("start", 0.0),
|
||
"end": segment.get("end", 0.0),
|
||
"text": segment.get("text", "").strip(),
|
||
"confidence": segment.get("confidence", 0.0),
|
||
}
|
||
)
|
||
|
||
return {
|
||
"language": result.get("language"),
|
||
"language_probability": result.get("language_probability"),
|
||
"segments": segments,
|
||
"summary": {
|
||
"segment_count": len(segments),
|
||
"total_duration": result.get("duration", 0.0),
|
||
},
|
||
}
|
||
|
||
def _process_chunked(self, work_dir: str, duration: float) -> Dict[str, Any]:
|
||
"""分塊處理大文件"""
|
||
# 簡化版本:暫時只實現單文件處理
|
||
# 完整分塊處理邏輯可以在後續版本中添加
|
||
print(
|
||
f"[ASR] Large file detected ({duration:.2f}s), using single file mode",
|
||
file=sys.stderr,
|
||
)
|
||
return self._process_single_file(work_dir)
|
||
|
||
def save_result(self, result: Dict[str, Any]):
|
||
"""保存結果到文件"""
|
||
# 確保輸出目錄存在
|
||
output_dir = os.path.dirname(self.output_path)
|
||
if output_dir and not os.path.exists(output_dir):
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
|
||
with open(self.output_path, "w", encoding="utf-8") as f:
|
||
json.dump(result, f, ensure_ascii=False, indent=2)
|
||
|
||
print(f"[ASR] Result saved to: {self.output_path}", file=sys.stderr)
|
||
print(
|
||
f"[ASR] Processing completed in {result['metadata']['processing_time']:.2f} seconds",
|
||
file=sys.stderr,
|
||
)
|
||
|
||
|
||
# 命令行接口
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="ASR 處理器 - 簡化標準化版本")
|
||
parser.add_argument("video_path", help="輸入視頻文件路徑")
|
||
parser.add_argument("output_path", help="輸出 JSON 文件路徑")
|
||
parser.add_argument(
|
||
"--model",
|
||
default="base",
|
||
help="Whisper 模型名稱 (tiny, base, small, medium, large)",
|
||
)
|
||
parser.add_argument(
|
||
"--chunk-size", type=int, default=300, help="分塊大小(秒),0 表示不分塊"
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
# 設置信號處理
|
||
signal.signal(signal.SIGINT, signal_handler)
|
||
signal.signal(signal.SIGTERM, signal_handler)
|
||
|
||
# 環境檢查
|
||
env_ok, env_msg = check_environment()
|
||
if not env_ok:
|
||
print(f"ERROR: {env_msg}", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
print("[ASR] Starting ASR processing", file=sys.stderr)
|
||
print(f"[ASR] Video: {args.video_path}", file=sys.stderr)
|
||
print(f"[ASR] Output: {args.output_path}", file=sys.stderr)
|
||
print(f"[ASR] Model: {args.model}, Chunk size: {args.chunk_size}s", file=sys.stderr)
|
||
|
||
# 執行處理
|
||
processor = ASRProcessor(
|
||
video_path=args.video_path,
|
||
output_path=args.output_path,
|
||
model_name=args.model,
|
||
chunk_size=args.chunk_size,
|
||
)
|
||
|
||
# 驗證輸入
|
||
valid, msg = processor.validate_input()
|
||
if not valid:
|
||
print(f"ERROR: {msg}", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
try:
|
||
result = processor.process()
|
||
processor.save_result(result)
|
||
print("[ASR] Processing completed successfully", file=sys.stderr)
|
||
|
||
except KeyboardInterrupt:
|
||
print("[ASR] Processing interrupted by user", file=sys.stderr)
|
||
sys.exit(130)
|
||
|
||
except Exception as e:
|
||
print(f"ERROR: {e}", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|