Files
momentry_core/scripts/lip_processor.py
Warren e75c4d6f07 cleanup: remove dead code and duplicate docs
- Remove session-ses_2f27.md (161KB raw session log)
- Remove 49 ROOT_* duplicate files across REFERENCE/
- Remove 14 duplicate files between REFERENCE/ root and history/
- Remove asr_legacy.rs (dead code, replaced by asr.rs)
- Remove src/core/worker/ (duplicate JobWorker)
- Remove src/core/layers/ (empty directory)
- Remove 4 .bak files in src/
- Remove 7 dead private methods in worker/processor.rs
- Remove backup directory from git tracking
2026-05-04 01:31:21 +08:00

352 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/opt/homebrew/bin/python3.11
"""
Lip Processor - 嘴部動作檢測
使用 MediaPipe Face Mesh 檢測 468 個人臉關鍵點
專注於嘴部開合度檢測
MediaPipe 0.10+ 使用新 API
"""
import sys
import json
import argparse
import os
import signal
import cv2
import numpy as np
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from redis_publisher import RedisPublisher
def signal_handler(signum, frame):
print(f"LIP: Received signal {signum}, exiting...")
sys.exit(1)
# MediaPipe Face Mesh 嘴部相關關鍵點索引
UPPER_LIP_TOP = 13 # 上嘴唇頂部
LOWER_LIP_BOTTOM = 14 # 下嘴唇底部
UPPER_LIP_BOTTOM = 78 # 上嘴唇底部
LOWER_LIP_TOP = 308 # 下嘴唇頂部
LEFT_MOUTH_CORNER = 61 # 左嘴角
RIGHT_MOUTH_CORNER = 291 # 右嘴角
def calculate_lip_openness(landmarks):
"""
計算嘴部開合度
Args:
landmarks: Face Mesh landmarks (numpy array of shape [468, 3])
Returns:
openness: 0.0-1.0 (0=閉合1=張開)
width: 嘴部寬度
height: 嘴部高度
"""
if len(landmarks) < 468:
return 0.0, 0.0, 0.0
# 獲取關鍵點座標
upper_top = landmarks[UPPER_LIP_TOP]
lower_bottom = landmarks[LOWER_LIP_BOTTOM]
upper_bottom = landmarks[UPPER_LIP_BOTTOM]
lower_top = landmarks[LOWER_LIP_TOP]
left_corner = landmarks[LEFT_MOUTH_CORNER]
right_corner = landmarks[RIGHT_MOUTH_CORNER]
# 計算垂直開合度(上下距離)
vertical_openness = abs(upper_bottom[1] - lower_top[1])
# 計算水平寬度
width = abs(left_corner[0] - right_corner[0])
# 計算垂直高度
height = abs(upper_top[1] - lower_bottom[1])
# 歸一化開合度(相對於嘴部寬度)
if width > 0:
openness = vertical_openness / width
else:
openness = 0.0
# 限制在 0-1 範圍
openness = min(1.0, max(0.0, openness))
return openness, width, height
def is_speaking(openness, threshold=0.1):
"""
判斷是否在說話
Args:
openness: 嘴部開合度
threshold: 閾值(預設 0.1
Returns:
bool: 是否在說話
"""
return openness > threshold
def process_lip(
video_path: str, output_path: str, uuid: str = "", sample_interval: int = 1
):
"""
處理影片檢測嘴部動作
Args:
video_path: 影片路徑
output_path: 輸出 JSON 路徑
uuid: UUID for Redis progress
sample_interval: 採樣間隔(每 N 幀檢測一次)
"""
# Set up signal handlers
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
publisher = RedisPublisher(uuid) if uuid else None
if publisher:
publisher.info("lip", "LIP_START")
if publisher:
publisher.info("lip", "LIP_LOADING_MEDIAPIPE")
# 初始化 MediaPipe Face Mesh (新版本 API)
try:
import mediapipe as mp
# 新版本使用 BaseOptions 和 FaceLandmarker
base_options = mp.tasks.BaseOptions(
model_asset_path="face_landmarker.task",
delegate=mp.tasks.BaseOptions.Delegate.CPU,
)
options = mp.tasks.vision.FaceLandmarkerOptions(
base_options=base_options,
running_mode=mp.tasks.vision.RunningMode.VIDEO,
num_faces=1,
min_face_detection_confidence=0.5,
min_tracking_confidence=0.5,
)
face_landmarker = mp.tasks.vision.FaceLandmarker.create_from_options(options)
use_new_api = True
except Exception as e:
# 回退到舊版 API
if publisher:
publisher.info("lip", f"New API failed, trying old API: {e}")
try:
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(
static_image_mode=False,
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5,
)
use_new_api = False
except Exception as e2:
if publisher:
publisher.error("lip", f"Failed to load MediaPipe: {e2}")
result = {
"frame_count": 0,
"fps": 0.0,
"frames": [],
"stats": {},
"error": str(e2),
}
with open(output_path, "w") as f:
json.dump(result, f, indent=2)
sys.exit(1)
if publisher:
publisher.info("lip", "LIP_OPENING_VIDEO")
# 打開影片
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if publisher:
publisher.info("lip", f"fps={fps}, frames={total_frames}")
publisher.progress("lip", 0, total_frames, "Starting")
frames = []
frame_count = 0
processed = 0
# 追蹤嘴部動作統計
speaking_frames = 0
total_openness = 0.0
max_openness = 0.0
timestamp_ms = 0
if publisher:
publisher.info("lip", f"LIP_PROCESSING (sample_interval={sample_interval})")
while True:
ret, frame = cap.read()
if not ret:
break
frame_count += 1
timestamp_ms = int(((frame_count - 1) / fps) * 1000) if fps > 0 else 0
# 採樣處理
if frame_count % sample_interval != 0:
continue
processed += 1
timestamp = (frame_count - 1) / fps if fps > 0 else 0
# 轉換顏色BGR → RGB
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_frame)
# 檢測人臉關鍵點
try:
if use_new_api:
detection_result = face_landmarker.detect_for_video(
mp_image, timestamp_ms
)
if (
detection_result.face_landmarks
and len(detection_result.face_landmarks) > 0
):
landmarks = np.array(
[
[kp.x, kp.y, kp.z]
for kp in detection_result.face_landmarks[0]
]
)
else:
landmarks = None
else:
results = face_mesh.process(rgb_frame)
if results.face_landmarks:
landmarks = np.array(
[[kp.x, kp.y, kp.z] for kp in results.face_landmarks]
)
else:
landmarks = None
except Exception:
landmarks = None
if landmarks is not None and len(landmarks) >= 468:
# 計算嘴部開合度
openness, width, height = calculate_lip_openness(landmarks)
# 判斷是否在說話
speaking = is_speaking(openness)
if speaking:
speaking_frames += 1
total_openness += openness
max_openness = max(max_openness, openness)
# 記錄結果
frames.append(
{
"frame": frame_count - 1,
"timestamp": round(timestamp, 3),
"face_detected": True,
"lip_openness": round(openness, 4),
"lip_width": round(width, 4),
"lip_height": round(height, 4),
"is_speaking": speaking,
}
)
if publisher and processed % 100 == 0:
publisher.progress(
"lip",
processed,
total_frames // sample_interval,
f"Frame {frame_count}, openness={openness:.3f}",
)
else:
# 未檢測到人臉
if frame_count % 10 == 0: # 每 10 幀記錄一次無臉幀
frames.append(
{
"frame": frame_count - 1,
"timestamp": round(timestamp, 3),
"face_detected": False,
"lip_openness": 0.0,
"lip_width": 0.0,
"lip_height": 0.0,
"is_speaking": False,
}
)
cap.release()
if use_new_api:
face_landmarker.close()
else:
face_mesh.close()
# 計算統計數據
avg_openness = total_openness / processed if processed > 0 else 0.0
speaking_rate = speaking_frames / processed if processed > 0 else 0.0
result = {
"frame_count": total_frames,
"fps": fps,
"processed_frames": processed,
"sample_interval": sample_interval,
"frames": frames,
"stats": {
"total_frames": total_frames,
"processed_frames": processed,
"frames_with_face": len(
[f for f in frames if f.get("face_detected", False)]
),
"speaking_frames": speaking_frames,
"speaking_rate": round(speaking_rate, 4),
"avg_lip_openness": round(avg_openness, 4),
"max_lip_openness": round(max_openness, 4),
},
}
if publisher:
publisher.complete(
"lip",
f"{len(frames)} frames, {speaking_frames} speaking ({speaking_rate * 100:.1f}%)",
)
with open(output_path, "w") as f:
json.dump(result, f, indent=2)
sys.stderr.write(
f"LIP: Processing complete, {len(frames)} frames written to {output_path}\n"
)
sys.stderr.flush()
sys.exit(0)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Lip Movement Detection (MediaPipe Face Mesh)"
)
parser.add_argument("video_path", help="Path to video file")
parser.add_argument("output_path", help="Output JSON path")
parser.add_argument("--uuid", "-u", help="UUID for Redis progress", default="")
parser.add_argument(
"--sample-interval",
"-s",
type=int,
default=1,
help="Process every N frames (default: 1, set higher for faster processing)",
)
args = parser.parse_args()
process_lip(args.video_path, args.output_path, args.uuid, args.sample_interval)