#!/opt/homebrew/bin/python3.11 """ Lip Processor - MediaPipe Tasks API 版本 使用 MediaPipe Face Landmarker 檢測 468 個人臉關鍵點 專注於嘴部開合度檢測 """ import sys import json import argparse import os import signal import cv2 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from redis_publisher import RedisPublisher def signal_handler(signum, frame): print(f"LIP: Received signal {signum}, exiting...") sys.exit(1) # 嘴部關鍵點索引 (MediaPipe Face Mesh 468 點) UPPER_LIP_BOTTOM = 78 # 上嘴唇底部 LOWER_LIP_TOP = 308 # 下嘴唇頂部 LEFT_MOUTH = 61 # 左嘴角 RIGHT_MOUTH = 291 # 右嘴角 UPPER_LIP_TOP = 13 # 上嘴唇頂部 LOWER_LIP_BOTTOM = 14 # 下嘴唇底部 def calculate_lip_metrics(landmarks): """ 計算嘴部指標 Args: landmarks: MediaPipe Face Mesh landmarks (468 點) Returns: openness: 0.0-1.0 (0=閉合,1=張開) width: 嘴部寬度 height: 嘴部高度 """ if len(landmarks) < 468: return 0.0, 0.0, 0.0 # 獲取關鍵點座標 upper_bottom = landmarks[UPPER_LIP_BOTTOM] lower_top = landmarks[LOWER_LIP_TOP] left_corner = landmarks[LEFT_MOUTH] right_corner = landmarks[RIGHT_MOUTH] upper_top = landmarks[UPPER_LIP_TOP] lower_bottom = landmarks[LOWER_LIP_BOTTOM] # 計算垂直開合度(上下距離) vertical_openness = abs(upper_bottom.y - lower_top.y) # 計算水平寬度 width = abs(left_corner.x - right_corner.x) # 計算垂直高度 height = abs(upper_top.y - lower_bottom.y) # 歸一化開合度(相對於嘴部寬度) if width > 0: openness = vertical_openness / width else: openness = 0.0 # 限制在 0-1 範圍 openness = min(1.0, max(0.0, openness)) return openness, width, height def is_speaking(openness, threshold=0.1): """判斷是否在說話""" return openness > threshold def process_lip( video_path: str, output_path: str, uuid: str = "", sample_interval: int = 30 ): """Process video for lip movement detection using MediaPipe Tasks API""" signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) publisher = RedisPublisher(uuid) if uuid else None if publisher: publisher.info("lip", "LIP_START") if publisher: publisher.info("lip", "LIP_LOADING_MEDIAPIPE") try: from mediapipe.tasks import python from mediapipe.tasks.python import vision # 模型路徑 model_path = "/Users/accusys/momentry_core_0.1/models/face_landmarker.task" if not os.path.exists(model_path): raise FileNotFoundError(f"Model not found: {model_path}") # 創建 Face Landmarker base_options = python.BaseOptions( model_asset_path=model_path, delegate=python.BaseOptions.Delegate.CPU ) options = vision.FaceLandmarkerOptions( base_options=base_options, running_mode=vision.RunningMode.VIDEO, num_faces=1, min_face_detection_confidence=0.5, min_tracking_confidence=0.5, ) detector = vision.FaceLandmarker.create_from_options(options) if publisher: publisher.info("lip", "MediaPipe model loaded successfully") except Exception as e: if publisher: publisher.error("lip", f"Failed to load MediaPipe: {e}") result = {"error": str(e), "frames": []} with open(output_path, "w") as f: json.dump(result, f, indent=2) sys.stderr.write(f"LIP Error: {e}\n") sys.exit(1) if publisher: publisher.info("lip", "LIP_OPENING_VIDEO") cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if publisher: publisher.info( "lip", f"fps={fps}, frames={total_frames}, sample={sample_interval}" ) publisher.progress("lip", 0, total_frames, "Starting") frames = [] frame_count = 0 processed = 0 speaking_frames = 0 total_openness = 0.0 max_openness = 0.0 timestamp_ms = 0 if publisher: publisher.info("lip", f"LIP_PROCESSING (sample={sample_interval})") while True: ret, frame = cap.read() if not ret: break frame_count += 1 timestamp_ms = int(((frame_count - 1) / fps) * 1000) if frame_count % sample_interval != 0: continue processed += 1 timestamp = (frame_count - 1) / fps # 轉換為 MediaPipe Image rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) mp_image = vision.Image(image_format=vision.ImageFormat.SRGB, data=rgb) # 檢測 result = detector.detect_for_video(mp_image, timestamp_ms) if result.face_landmarks and len(result.face_landmarks) > 0: lm = result.face_landmarks[0] # 計算嘴部指標 openness, width, height = calculate_lip_metrics(lm) # 判斷是否在說話 speaking = is_speaking(openness) if speaking: speaking_frames += 1 total_openness += openness max_openness = max(max_openness, openness) # 記錄結果 frames.append( { "frame": frame_count - 1, "timestamp": round(timestamp, 3), "face_detected": True, "lip_openness": round(openness, 4), "lip_width": round(width, 4), "lip_height": round(height, 4), "is_speaking": speaking, } ) if publisher and processed % 50 == 0: publisher.progress( "lip", processed, total_frames // sample_interval, f"openness={openness:.3f}", ) else: # 未檢測到人臉 if frame_count % 10 == 0: frames.append( { "frame": frame_count - 1, "timestamp": round(timestamp, 3), "face_detected": False, "lip_openness": 0.0, "lip_width": 0.0, "lip_height": 0.0, "is_speaking": False, } ) cap.release() detector.close() # 計算統計數據 avg_openness = total_openness / processed if processed > 0 else 0.0 speaking_rate = speaking_frames / processed if processed > 0 else 0.0 frames_with_face = len([f for f in frames if f.get("face_detected", False)]) result = { "frame_count": total_frames, "fps": fps, "processed_frames": processed, "sample_interval": sample_interval, "frames": frames, "stats": { "speaking_frames": speaking_frames, "speaking_rate": round(speaking_rate, 4), "avg_openness": round(avg_openness, 4), "max_openness": round(max_openness, 4), "frames_with_face": frames_with_face, }, } if publisher: publisher.complete("lip", f"{len(frames)} frames, {speaking_frames} speaking") with open(output_path, "w") as f: json.dump(result, f, indent=2) sys.stderr.write(f"LIP: Done, {len(frames)} frames\n") sys.exit(0) if __name__ == "__main__": parser = argparse.ArgumentParser( description="Lip Movement Detection (MediaPipe Tasks API)" ) parser.add_argument("video_path", help="Path to video file") parser.add_argument("output_path", help="Output JSON path") parser.add_argument("--uuid", "-u", help="UUID for Redis progress", default="") parser.add_argument( "--sample-interval", "-s", type=int, default=30, help="Process every N frames (default: 30)", ) args = parser.parse_args() process_lip(args.video_path, args.output_path, args.uuid, args.sample_interval)