Files
momentry_core/scripts/asr_processor_simplified.py
Warren 8f05a7c188 feat: update Python processors and add utility scripts
- Update ASR, face, OCR, pose processors
- Add release pre-flight check script
- Add synonym generation, chunk processing scripts
- Add face recognition, stamp search utilities
2026-04-30 15:07:49 +08:00

340 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/opt/homebrew/bin/python3.11
"""
ASR Processor - 簡化標準化版本
功能:執行自動語音識別處理
輸入:視頻文件路徑,輸出文件路徑
輸出JSON 格式的語音識別結果
標準化特性:
1. 移除不必要的監控邏輯
2. 簡化架構(<300 行)
3. 統一的錯誤處理
4. 標準化的輸出格式
5. 配置參數化
"""
import sys
import json
import os
import argparse
import signal
import tempfile
import time
import subprocess
from typing import Dict, Any, Tuple
import traceback
# 環境檢查
def check_environment() -> Tuple[bool, str]:
"""檢查必要的環境和依賴"""
try:
# 檢查 Whisper
import whisper
# 檢查 ffmpeg/ffprobe
result = subprocess.run(["ffprobe", "-version"], capture_output=True, text=True)
if result.returncode != 0:
return False, "ffprobe not found or not working"
return True, "Environment OK"
except ImportError as e:
return False, f"Missing dependency: {e}"
except Exception as e:
return False, f"Environment check failed: {e}"
# 信號處理
def signal_handler(signum, frame):
"""處理中斷信號"""
print(f"[ASR] Received signal {signum}, cleaning up...", file=sys.stderr)
sys.exit(1)
# Whisper 模型緩存
_whisper_model_cache = {}
def get_whisper_model(model_name: str = "base"):
"""獲取 Whisper 模型(帶緩存)"""
if model_name not in _whisper_model_cache:
import whisper
print(f"[ASR] Loading Whisper model: {model_name}", file=sys.stderr)
_whisper_model_cache[model_name] = whisper.load_model(model_name)
return _whisper_model_cache[model_name]
# 主要處理類
class ASRProcessor:
def __init__(
self,
video_path: str,
output_path: str,
model_name: str = "base",
chunk_size: int = 300,
):
self.video_path = video_path
self.output_path = output_path
self.model_name = model_name
self.chunk_size = chunk_size # 分塊大小(秒)
self.start_time = time.time()
def validate_input(self) -> Tuple[bool, str]:
"""驗證輸入文件"""
if not os.path.exists(self.video_path):
return False, f"Video file not found: {self.video_path}"
# 檢查是否有音頻流
if not self._has_audio_stream():
return False, f"No audio stream found in: {self.video_path}"
return True, "Input validation passed"
def _has_audio_stream(self) -> bool:
"""檢查視頻文件是否有音頻流"""
try:
cmd = [
"ffprobe",
"-v",
"error",
"-select_streams",
"a",
"-show_entries",
"stream=codec_type",
"-of",
"csv=p=0",
self.video_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
return "audio" in result.stdout
except Exception:
return False
def _get_media_duration(self) -> float:
"""獲取媒體文件時長(秒)"""
try:
cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"csv=p=0",
self.video_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
return float(result.stdout.strip())
except Exception as e:
print(f"[ASR] Warning: Failed to get duration: {e}", file=sys.stderr)
return 0.0
def _extract_audio(self, audio_path: str) -> bool:
"""提取音頻到臨時文件"""
try:
cmd = [
"ffmpeg",
"-i",
self.video_path,
"-vn", # 禁用視頻
"-acodec",
"pcm_s16le", # PCM 16-bit 小端
"-ar",
"16000", # 16kHz 採樣率
"-ac",
"1", # 單聲道
"-y", # 覆蓋輸出文件
audio_path,
]
print(f"[ASR] Extracting audio to: {audio_path}", file=sys.stderr)
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(
f"[ASR] Audio extraction failed: {result.stderr}", file=sys.stderr
)
return False
return os.path.exists(audio_path) and os.path.getsize(audio_path) > 0
except Exception as e:
print(f"[ASR] Audio extraction error: {e}", file=sys.stderr)
return False
def process(self) -> Dict[str, Any]:
"""執行 ASR 處理邏輯"""
try:
# 1. 準備工作目錄
work_dir = tempfile.mkdtemp(prefix="asr_")
print(f"[ASR] Working directory: {work_dir}", file=sys.stderr)
# 2. 獲取媒體時長
duration = self._get_media_duration()
print(f"[ASR] Media duration: {duration:.2f} seconds", file=sys.stderr)
# 3. 根據時長決定處理策略
if duration <= self.chunk_size or self.chunk_size <= 0:
# 小文件或不分塊:直接處理
result = self._process_single_file(work_dir)
else:
# 大文件:分塊處理
result = self._process_chunked(work_dir, duration)
# 4. 添加元數據
processing_time = time.time() - self.start_time
result["metadata"] = {
"processing_time": processing_time,
"video_path": self.video_path,
"duration": duration,
"model": self.model_name,
"chunk_size": self.chunk_size,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"module_version": "1.0.0",
}
# 5. 清理工作目錄
try:
import shutil
shutil.rmtree(work_dir)
print("[ASR] Cleaned up working directory", file=sys.stderr)
except Exception as e:
print(f"[ASR] Warning: Failed to clean up: {e}", file=sys.stderr)
return result
except Exception as e:
print(f"[ASR] Processing failed: {e}", file=sys.stderr)
print(f"[ASR] Traceback: {traceback.format_exc()}", file=sys.stderr)
raise
def _process_single_file(self, work_dir: str) -> Dict[str, Any]:
"""處理單個文件(不分塊)"""
# 1. 提取音頻
audio_path = os.path.join(work_dir, "audio.wav")
if not self._extract_audio(audio_path):
raise RuntimeError("Failed to extract audio")
# 2. 加載模型
model = get_whisper_model(self.model_name)
# 3. 執行轉錄
print("[ASR] Transcribing audio...", file=sys.stderr)
result = model.transcribe(audio_path)
# 4. 格式化結果
segments = []
for segment in result.get("segments", []):
segments.append(
{
"start": segment.get("start", 0.0),
"end": segment.get("end", 0.0),
"text": segment.get("text", "").strip(),
"confidence": segment.get("confidence", 0.0),
}
)
return {
"language": result.get("language"),
"language_probability": result.get("language_probability"),
"segments": segments,
"summary": {
"segment_count": len(segments),
"total_duration": result.get("duration", 0.0),
},
}
def _process_chunked(self, work_dir: str, duration: float) -> Dict[str, Any]:
"""分塊處理大文件"""
# 簡化版本:暫時只實現單文件處理
# 完整分塊處理邏輯可以在後續版本中添加
print(
f"[ASR] Large file detected ({duration:.2f}s), using single file mode",
file=sys.stderr,
)
return self._process_single_file(work_dir)
def save_result(self, result: Dict[str, Any]):
"""保存結果到文件"""
# 確保輸出目錄存在
output_dir = os.path.dirname(self.output_path)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
with open(self.output_path, "w", encoding="utf-8") as f:
json.dump(result, f, ensure_ascii=False, indent=2)
print(f"[ASR] Result saved to: {self.output_path}", file=sys.stderr)
print(
f"[ASR] Processing completed in {result['metadata']['processing_time']:.2f} seconds",
file=sys.stderr,
)
# 命令行接口
def main():
parser = argparse.ArgumentParser(description="ASR 處理器 - 簡化標準化版本")
parser.add_argument("video_path", help="輸入視頻文件路徑")
parser.add_argument("output_path", help="輸出 JSON 文件路徑")
parser.add_argument(
"--model",
default="base",
help="Whisper 模型名稱 (tiny, base, small, medium, large)",
)
parser.add_argument(
"--chunk-size", type=int, default=300, help="分塊大小0 表示不分塊"
)
args = parser.parse_args()
# 設置信號處理
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# 環境檢查
env_ok, env_msg = check_environment()
if not env_ok:
print(f"ERROR: {env_msg}", file=sys.stderr)
sys.exit(1)
print("[ASR] Starting ASR processing", file=sys.stderr)
print(f"[ASR] Video: {args.video_path}", file=sys.stderr)
print(f"[ASR] Output: {args.output_path}", file=sys.stderr)
print(f"[ASR] Model: {args.model}, Chunk size: {args.chunk_size}s", file=sys.stderr)
# 執行處理
processor = ASRProcessor(
video_path=args.video_path,
output_path=args.output_path,
model_name=args.model,
chunk_size=args.chunk_size,
)
# 驗證輸入
valid, msg = processor.validate_input()
if not valid:
print(f"ERROR: {msg}", file=sys.stderr)
sys.exit(1)
try:
result = processor.process()
processor.save_result(result)
print("[ASR] Processing completed successfully", file=sys.stderr)
except KeyboardInterrupt:
print("[ASR] Processing interrupted by user", file=sys.stderr)
sys.exit(130)
except Exception as e:
print(f"ERROR: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()