Files
momentry_core/scripts/lip_processor_mp.py
Warren 8f05a7c188 feat: update Python processors and add utility scripts
- Update ASR, face, OCR, pose processors
- Add release pre-flight check script
- Add synonym generation, chunk processing scripts
- Add face recognition, stamp search utilities
2026-04-30 15:07:49 +08:00

189 lines
5.2 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Lip Processor - 嘴部動作檢測
使用 MediaPipe Tasks API (v0.10+)
"""
import sys
import json
import argparse
import os
import signal
import cv2
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from redis_publisher import RedisPublisher
def signal_handler(signum, frame):
print(f"LIP: Received signal {signum}, exiting...")
sys.exit(1)
# 嘴部關鍵點索引
UPPER_LIP_BOTTOM = 78
LOWER_LIP_TOP = 308
LEFT_MOUTH = 61
RIGHT_MOUTH = 291
def process_lip(
video_path: str, output_path: str, uuid: str = "", sample_interval: int = 30
):
"""Process video for lip movement detection using MediaPipe Tasks API"""
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
publisher = RedisPublisher(uuid) if uuid else None
if publisher:
publisher.info("lip", "LIP_START")
if publisher:
publisher.info("lip", "LIP_LOADING_MEDIAPIPE")
try:
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
# 創建 Face Landmarker
base_options = python.BaseOptions(
model_asset_path="face_landmarker.task",
delegate=python.BaseOptions.Delegate.CPU,
)
options = vision.FaceLandmarkerOptions(
base_options=base_options,
running_mode=vision.RunningMode.VIDEO,
num_faces=1,
min_face_detection_confidence=0.5,
min_tracking_confidence=0.5,
)
detector = vision.FaceLandmarker.create_from_options(options)
except Exception as e:
if publisher:
publisher.error("lip", f"Failed to load MediaPipe: {e}")
result = {"error": str(e), "frames": []}
with open(output_path, "w") as f:
json.dump(result, f, indent=2)
sys.exit(1)
if publisher:
publisher.info("lip", "LIP_OPENING_VIDEO")
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if publisher:
publisher.info("lip", f"fps={fps}, frames={total_frames}")
publisher.progress("lip", 0, total_frames, "Starting")
frames = []
frame_count = 0
processed = 0
speaking_frames = 0
total_openness = 0.0
timestamp_ms = 0
if publisher:
publisher.info("lip", f"LIP_PROCESSING (sample={sample_interval})")
while True:
ret, frame = cap.read()
if not ret:
break
frame_count += 1
timestamp_ms = int(((frame_count - 1) / fps) * 1000)
if frame_count % sample_interval != 0:
continue
processed += 1
timestamp = (frame_count - 1) / fps
# 轉換為 MediaPipe Image
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
mp_image = vision.Image(image_format=vision.ImageFormat.SRGB, data=rgb)
# 檢測
result = detector.detect_for_video(mp_image, timestamp_ms)
if result.face_landmarks and len(result.face_landmarks) > 0:
lm = result.face_landmarks[0]
# 計算嘴部開合度
openness = abs(lm[UPPER_LIP_BOTTOM].y - lm[LOWER_LIP_TOP].y)
width = abs(lm[LEFT_MOUTH].x - lm[RIGHT_MOUTH].x)
if width > 0:
normalized = openness / width
else:
normalized = 0.0
speaking = normalized > 0.1
if speaking:
speaking_frames += 1
total_openness += normalized
frames.append(
{
"frame": frame_count - 1,
"timestamp": round(timestamp, 3),
"face_detected": True,
"lip_openness": round(normalized, 4),
"is_speaking": speaking,
}
)
if publisher and processed % 50 == 0:
publisher.progress(
"lip",
processed,
total_frames // sample_interval,
f"openness={normalized:.3f}",
)
cap.release()
detector.close()
avg_openness = total_openness / processed if processed > 0 else 0.0
speaking_rate = speaking_frames / processed if processed > 0 else 0.0
result = {
"frame_count": total_frames,
"fps": fps,
"processed_frames": processed,
"sample_interval": sample_interval,
"frames": frames,
"stats": {
"speaking_frames": speaking_frames,
"speaking_rate": round(speaking_rate, 4),
"avg_openness": round(avg_openness, 4),
},
}
if publisher:
publisher.complete("lip", f"{len(frames)} frames")
with open(output_path, "w") as f:
json.dump(result, f, indent=2)
sys.stderr.write(f"LIP: Done, {len(frames)} frames\n")
sys.exit(0)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("video_path")
parser.add_argument("output_path")
parser.add_argument("--uuid", "-u", default="")
parser.add_argument("--sample-interval", "-s", type=int, default=30)
args = parser.parse_args()
process_lip(args.video_path, args.output_path, args.uuid, args.sample_interval)