Files
momentry_core/scripts/lip_processor_media.py
Warren 8f05a7c188 feat: update Python processors and add utility scripts
- Update ASR, face, OCR, pose processors
- Add release pre-flight check script
- Add synonym generation, chunk processing scripts
- Add face recognition, stamp search utilities
2026-04-30 15:07:49 +08:00

278 lines
8.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/opt/homebrew/bin/python3.11
"""
Lip Processor - MediaPipe Tasks API 版本
使用 MediaPipe Face Landmarker 檢測 468 個人臉關鍵點
專注於嘴部開合度檢測
"""
import sys
import json
import argparse
import os
import signal
import cv2
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from redis_publisher import RedisPublisher
def signal_handler(signum, frame):
print(f"LIP: Received signal {signum}, exiting...")
sys.exit(1)
# 嘴部關鍵點索引 (MediaPipe Face Mesh 468 點)
UPPER_LIP_BOTTOM = 78 # 上嘴唇底部
LOWER_LIP_TOP = 308 # 下嘴唇頂部
LEFT_MOUTH = 61 # 左嘴角
RIGHT_MOUTH = 291 # 右嘴角
UPPER_LIP_TOP = 13 # 上嘴唇頂部
LOWER_LIP_BOTTOM = 14 # 下嘴唇底部
def calculate_lip_metrics(landmarks):
"""
計算嘴部指標
Args:
landmarks: MediaPipe Face Mesh landmarks (468 點)
Returns:
openness: 0.0-1.0 (0=閉合1=張開)
width: 嘴部寬度
height: 嘴部高度
"""
if len(landmarks) < 468:
return 0.0, 0.0, 0.0
# 獲取關鍵點座標
upper_bottom = landmarks[UPPER_LIP_BOTTOM]
lower_top = landmarks[LOWER_LIP_TOP]
left_corner = landmarks[LEFT_MOUTH]
right_corner = landmarks[RIGHT_MOUTH]
upper_top = landmarks[UPPER_LIP_TOP]
lower_bottom = landmarks[LOWER_LIP_BOTTOM]
# 計算垂直開合度(上下距離)
vertical_openness = abs(upper_bottom.y - lower_top.y)
# 計算水平寬度
width = abs(left_corner.x - right_corner.x)
# 計算垂直高度
height = abs(upper_top.y - lower_bottom.y)
# 歸一化開合度(相對於嘴部寬度)
if width > 0:
openness = vertical_openness / width
else:
openness = 0.0
# 限制在 0-1 範圍
openness = min(1.0, max(0.0, openness))
return openness, width, height
def is_speaking(openness, threshold=0.1):
"""判斷是否在說話"""
return openness > threshold
def process_lip(
video_path: str, output_path: str, uuid: str = "", sample_interval: int = 30
):
"""Process video for lip movement detection using MediaPipe Tasks API"""
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
publisher = RedisPublisher(uuid) if uuid else None
if publisher:
publisher.info("lip", "LIP_START")
if publisher:
publisher.info("lip", "LIP_LOADING_MEDIAPIPE")
try:
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
# 模型路徑
model_path = "/Users/accusys/momentry_core_0.1/models/face_landmarker.task"
if not os.path.exists(model_path):
raise FileNotFoundError(f"Model not found: {model_path}")
# 創建 Face Landmarker
base_options = python.BaseOptions(
model_asset_path=model_path, delegate=python.BaseOptions.Delegate.CPU
)
options = vision.FaceLandmarkerOptions(
base_options=base_options,
running_mode=vision.RunningMode.VIDEO,
num_faces=1,
min_face_detection_confidence=0.5,
min_tracking_confidence=0.5,
)
detector = vision.FaceLandmarker.create_from_options(options)
if publisher:
publisher.info("lip", "MediaPipe model loaded successfully")
except Exception as e:
if publisher:
publisher.error("lip", f"Failed to load MediaPipe: {e}")
result = {"error": str(e), "frames": []}
with open(output_path, "w") as f:
json.dump(result, f, indent=2)
sys.stderr.write(f"LIP Error: {e}\n")
sys.exit(1)
if publisher:
publisher.info("lip", "LIP_OPENING_VIDEO")
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if publisher:
publisher.info(
"lip", f"fps={fps}, frames={total_frames}, sample={sample_interval}"
)
publisher.progress("lip", 0, total_frames, "Starting")
frames = []
frame_count = 0
processed = 0
speaking_frames = 0
total_openness = 0.0
max_openness = 0.0
timestamp_ms = 0
if publisher:
publisher.info("lip", f"LIP_PROCESSING (sample={sample_interval})")
while True:
ret, frame = cap.read()
if not ret:
break
frame_count += 1
timestamp_ms = int(((frame_count - 1) / fps) * 1000)
if frame_count % sample_interval != 0:
continue
processed += 1
timestamp = (frame_count - 1) / fps
# 轉換為 MediaPipe Image
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
mp_image = vision.Image(image_format=vision.ImageFormat.SRGB, data=rgb)
# 檢測
result = detector.detect_for_video(mp_image, timestamp_ms)
if result.face_landmarks and len(result.face_landmarks) > 0:
lm = result.face_landmarks[0]
# 計算嘴部指標
openness, width, height = calculate_lip_metrics(lm)
# 判斷是否在說話
speaking = is_speaking(openness)
if speaking:
speaking_frames += 1
total_openness += openness
max_openness = max(max_openness, openness)
# 記錄結果
frames.append(
{
"frame": frame_count - 1,
"timestamp": round(timestamp, 3),
"face_detected": True,
"lip_openness": round(openness, 4),
"lip_width": round(width, 4),
"lip_height": round(height, 4),
"is_speaking": speaking,
}
)
if publisher and processed % 50 == 0:
publisher.progress(
"lip",
processed,
total_frames // sample_interval,
f"openness={openness:.3f}",
)
else:
# 未檢測到人臉
if frame_count % 10 == 0:
frames.append(
{
"frame": frame_count - 1,
"timestamp": round(timestamp, 3),
"face_detected": False,
"lip_openness": 0.0,
"lip_width": 0.0,
"lip_height": 0.0,
"is_speaking": False,
}
)
cap.release()
detector.close()
# 計算統計數據
avg_openness = total_openness / processed if processed > 0 else 0.0
speaking_rate = speaking_frames / processed if processed > 0 else 0.0
frames_with_face = len([f for f in frames if f.get("face_detected", False)])
result = {
"frame_count": total_frames,
"fps": fps,
"processed_frames": processed,
"sample_interval": sample_interval,
"frames": frames,
"stats": {
"speaking_frames": speaking_frames,
"speaking_rate": round(speaking_rate, 4),
"avg_openness": round(avg_openness, 4),
"max_openness": round(max_openness, 4),
"frames_with_face": frames_with_face,
},
}
if publisher:
publisher.complete("lip", f"{len(frames)} frames, {speaking_frames} speaking")
with open(output_path, "w") as f:
json.dump(result, f, indent=2)
sys.stderr.write(f"LIP: Done, {len(frames)} frames\n")
sys.exit(0)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Lip Movement Detection (MediaPipe Tasks API)"
)
parser.add_argument("video_path", help="Path to video file")
parser.add_argument("output_path", help="Output JSON path")
parser.add_argument("--uuid", "-u", help="UUID for Redis progress", default="")
parser.add_argument(
"--sample-interval",
"-s",
type=int,
default=30,
help="Process every N frames (default: 30)",
)
args = parser.parse_args()
process_lip(args.video_path, args.output_path, args.uuid, args.sample_interval)