- Update ASR, face, OCR, pose processors - Add release pre-flight check script - Add synonym generation, chunk processing scripts - Add face recognition, stamp search utilities
278 lines
8.0 KiB
Python
278 lines
8.0 KiB
Python
#!/opt/homebrew/bin/python3.11
|
||
"""
|
||
Lip Processor - MediaPipe Tasks API 版本
|
||
使用 MediaPipe Face Landmarker 檢測 468 個人臉關鍵點
|
||
專注於嘴部開合度檢測
|
||
"""
|
||
|
||
import sys
|
||
import json
|
||
import argparse
|
||
import os
|
||
import signal
|
||
import cv2
|
||
|
||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||
from redis_publisher import RedisPublisher
|
||
|
||
|
||
def signal_handler(signum, frame):
|
||
print(f"LIP: Received signal {signum}, exiting...")
|
||
sys.exit(1)
|
||
|
||
|
||
# 嘴部關鍵點索引 (MediaPipe Face Mesh 468 點)
|
||
UPPER_LIP_BOTTOM = 78 # 上嘴唇底部
|
||
LOWER_LIP_TOP = 308 # 下嘴唇頂部
|
||
LEFT_MOUTH = 61 # 左嘴角
|
||
RIGHT_MOUTH = 291 # 右嘴角
|
||
UPPER_LIP_TOP = 13 # 上嘴唇頂部
|
||
LOWER_LIP_BOTTOM = 14 # 下嘴唇底部
|
||
|
||
|
||
def calculate_lip_metrics(landmarks):
|
||
"""
|
||
計算嘴部指標
|
||
|
||
Args:
|
||
landmarks: MediaPipe Face Mesh landmarks (468 點)
|
||
|
||
Returns:
|
||
openness: 0.0-1.0 (0=閉合,1=張開)
|
||
width: 嘴部寬度
|
||
height: 嘴部高度
|
||
"""
|
||
if len(landmarks) < 468:
|
||
return 0.0, 0.0, 0.0
|
||
|
||
# 獲取關鍵點座標
|
||
upper_bottom = landmarks[UPPER_LIP_BOTTOM]
|
||
lower_top = landmarks[LOWER_LIP_TOP]
|
||
left_corner = landmarks[LEFT_MOUTH]
|
||
right_corner = landmarks[RIGHT_MOUTH]
|
||
upper_top = landmarks[UPPER_LIP_TOP]
|
||
lower_bottom = landmarks[LOWER_LIP_BOTTOM]
|
||
|
||
# 計算垂直開合度(上下距離)
|
||
vertical_openness = abs(upper_bottom.y - lower_top.y)
|
||
|
||
# 計算水平寬度
|
||
width = abs(left_corner.x - right_corner.x)
|
||
|
||
# 計算垂直高度
|
||
height = abs(upper_top.y - lower_bottom.y)
|
||
|
||
# 歸一化開合度(相對於嘴部寬度)
|
||
if width > 0:
|
||
openness = vertical_openness / width
|
||
else:
|
||
openness = 0.0
|
||
|
||
# 限制在 0-1 範圍
|
||
openness = min(1.0, max(0.0, openness))
|
||
|
||
return openness, width, height
|
||
|
||
|
||
def is_speaking(openness, threshold=0.1):
|
||
"""判斷是否在說話"""
|
||
return openness > threshold
|
||
|
||
|
||
def process_lip(
|
||
video_path: str, output_path: str, uuid: str = "", sample_interval: int = 30
|
||
):
|
||
"""Process video for lip movement detection using MediaPipe Tasks API"""
|
||
|
||
signal.signal(signal.SIGTERM, signal_handler)
|
||
signal.signal(signal.SIGINT, signal_handler)
|
||
|
||
publisher = RedisPublisher(uuid) if uuid else None
|
||
if publisher:
|
||
publisher.info("lip", "LIP_START")
|
||
|
||
if publisher:
|
||
publisher.info("lip", "LIP_LOADING_MEDIAPIPE")
|
||
|
||
try:
|
||
from mediapipe.tasks import python
|
||
from mediapipe.tasks.python import vision
|
||
|
||
# 模型路徑
|
||
model_path = "/Users/accusys/momentry_core_0.1/models/face_landmarker.task"
|
||
|
||
if not os.path.exists(model_path):
|
||
raise FileNotFoundError(f"Model not found: {model_path}")
|
||
|
||
# 創建 Face Landmarker
|
||
base_options = python.BaseOptions(
|
||
model_asset_path=model_path, delegate=python.BaseOptions.Delegate.CPU
|
||
)
|
||
|
||
options = vision.FaceLandmarkerOptions(
|
||
base_options=base_options,
|
||
running_mode=vision.RunningMode.VIDEO,
|
||
num_faces=1,
|
||
min_face_detection_confidence=0.5,
|
||
min_tracking_confidence=0.5,
|
||
)
|
||
|
||
detector = vision.FaceLandmarker.create_from_options(options)
|
||
|
||
if publisher:
|
||
publisher.info("lip", "MediaPipe model loaded successfully")
|
||
|
||
except Exception as e:
|
||
if publisher:
|
||
publisher.error("lip", f"Failed to load MediaPipe: {e}")
|
||
result = {"error": str(e), "frames": []}
|
||
with open(output_path, "w") as f:
|
||
json.dump(result, f, indent=2)
|
||
sys.stderr.write(f"LIP Error: {e}\n")
|
||
sys.exit(1)
|
||
|
||
if publisher:
|
||
publisher.info("lip", "LIP_OPENING_VIDEO")
|
||
|
||
cap = cv2.VideoCapture(video_path)
|
||
fps = cap.get(cv2.CAP_PROP_FPS)
|
||
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||
|
||
if publisher:
|
||
publisher.info(
|
||
"lip", f"fps={fps}, frames={total_frames}, sample={sample_interval}"
|
||
)
|
||
publisher.progress("lip", 0, total_frames, "Starting")
|
||
|
||
frames = []
|
||
frame_count = 0
|
||
processed = 0
|
||
speaking_frames = 0
|
||
total_openness = 0.0
|
||
max_openness = 0.0
|
||
timestamp_ms = 0
|
||
|
||
if publisher:
|
||
publisher.info("lip", f"LIP_PROCESSING (sample={sample_interval})")
|
||
|
||
while True:
|
||
ret, frame = cap.read()
|
||
if not ret:
|
||
break
|
||
|
||
frame_count += 1
|
||
timestamp_ms = int(((frame_count - 1) / fps) * 1000)
|
||
|
||
if frame_count % sample_interval != 0:
|
||
continue
|
||
|
||
processed += 1
|
||
timestamp = (frame_count - 1) / fps
|
||
|
||
# 轉換為 MediaPipe Image
|
||
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||
mp_image = vision.Image(image_format=vision.ImageFormat.SRGB, data=rgb)
|
||
|
||
# 檢測
|
||
result = detector.detect_for_video(mp_image, timestamp_ms)
|
||
|
||
if result.face_landmarks and len(result.face_landmarks) > 0:
|
||
lm = result.face_landmarks[0]
|
||
|
||
# 計算嘴部指標
|
||
openness, width, height = calculate_lip_metrics(lm)
|
||
|
||
# 判斷是否在說話
|
||
speaking = is_speaking(openness)
|
||
if speaking:
|
||
speaking_frames += 1
|
||
|
||
total_openness += openness
|
||
max_openness = max(max_openness, openness)
|
||
|
||
# 記錄結果
|
||
frames.append(
|
||
{
|
||
"frame": frame_count - 1,
|
||
"timestamp": round(timestamp, 3),
|
||
"face_detected": True,
|
||
"lip_openness": round(openness, 4),
|
||
"lip_width": round(width, 4),
|
||
"lip_height": round(height, 4),
|
||
"is_speaking": speaking,
|
||
}
|
||
)
|
||
|
||
if publisher and processed % 50 == 0:
|
||
publisher.progress(
|
||
"lip",
|
||
processed,
|
||
total_frames // sample_interval,
|
||
f"openness={openness:.3f}",
|
||
)
|
||
else:
|
||
# 未檢測到人臉
|
||
if frame_count % 10 == 0:
|
||
frames.append(
|
||
{
|
||
"frame": frame_count - 1,
|
||
"timestamp": round(timestamp, 3),
|
||
"face_detected": False,
|
||
"lip_openness": 0.0,
|
||
"lip_width": 0.0,
|
||
"lip_height": 0.0,
|
||
"is_speaking": False,
|
||
}
|
||
)
|
||
|
||
cap.release()
|
||
detector.close()
|
||
|
||
# 計算統計數據
|
||
avg_openness = total_openness / processed if processed > 0 else 0.0
|
||
speaking_rate = speaking_frames / processed if processed > 0 else 0.0
|
||
frames_with_face = len([f for f in frames if f.get("face_detected", False)])
|
||
|
||
result = {
|
||
"frame_count": total_frames,
|
||
"fps": fps,
|
||
"processed_frames": processed,
|
||
"sample_interval": sample_interval,
|
||
"frames": frames,
|
||
"stats": {
|
||
"speaking_frames": speaking_frames,
|
||
"speaking_rate": round(speaking_rate, 4),
|
||
"avg_openness": round(avg_openness, 4),
|
||
"max_openness": round(max_openness, 4),
|
||
"frames_with_face": frames_with_face,
|
||
},
|
||
}
|
||
|
||
if publisher:
|
||
publisher.complete("lip", f"{len(frames)} frames, {speaking_frames} speaking")
|
||
|
||
with open(output_path, "w") as f:
|
||
json.dump(result, f, indent=2)
|
||
|
||
sys.stderr.write(f"LIP: Done, {len(frames)} frames\n")
|
||
sys.exit(0)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
parser = argparse.ArgumentParser(
|
||
description="Lip Movement Detection (MediaPipe Tasks API)"
|
||
)
|
||
parser.add_argument("video_path", help="Path to video file")
|
||
parser.add_argument("output_path", help="Output JSON path")
|
||
parser.add_argument("--uuid", "-u", help="UUID for Redis progress", default="")
|
||
parser.add_argument(
|
||
"--sample-interval",
|
||
"-s",
|
||
type=int,
|
||
default=30,
|
||
help="Process every N frames (default: 30)",
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
process_lip(args.video_path, args.output_path, args.uuid, args.sample_interval)
|