#!/opt/homebrew/bin/python3.11 """ Lip Processor - 嘴部動作檢測 使用 MediaPipe Face Mesh 檢測 468 個人臉關鍵點 專注於嘴部開合度檢測 MediaPipe 0.10+ 使用新 API """ import sys import json import argparse import os import signal import cv2 import numpy as np sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from redis_publisher import RedisPublisher def signal_handler(signum, frame): print(f"LIP: Received signal {signum}, exiting...") sys.exit(1) # MediaPipe Face Mesh 嘴部相關關鍵點索引 UPPER_LIP_TOP = 13 # 上嘴唇頂部 LOWER_LIP_BOTTOM = 14 # 下嘴唇底部 UPPER_LIP_BOTTOM = 78 # 上嘴唇底部 LOWER_LIP_TOP = 308 # 下嘴唇頂部 LEFT_MOUTH_CORNER = 61 # 左嘴角 RIGHT_MOUTH_CORNER = 291 # 右嘴角 def calculate_lip_openness(landmarks): """ 計算嘴部開合度 Args: landmarks: Face Mesh landmarks (numpy array of shape [468, 3]) Returns: openness: 0.0-1.0 (0=閉合,1=張開) width: 嘴部寬度 height: 嘴部高度 """ if len(landmarks) < 468: return 0.0, 0.0, 0.0 # 獲取關鍵點座標 upper_top = landmarks[UPPER_LIP_TOP] lower_bottom = landmarks[LOWER_LIP_BOTTOM] upper_bottom = landmarks[UPPER_LIP_BOTTOM] lower_top = landmarks[LOWER_LIP_TOP] left_corner = landmarks[LEFT_MOUTH_CORNER] right_corner = landmarks[RIGHT_MOUTH_CORNER] # 計算垂直開合度(上下距離) vertical_openness = abs(upper_bottom[1] - lower_top[1]) # 計算水平寬度 width = abs(left_corner[0] - right_corner[0]) # 計算垂直高度 height = abs(upper_top[1] - lower_bottom[1]) # 歸一化開合度(相對於嘴部寬度) if width > 0: openness = vertical_openness / width else: openness = 0.0 # 限制在 0-1 範圍 openness = min(1.0, max(0.0, openness)) return openness, width, height def is_speaking(openness, threshold=0.1): """ 判斷是否在說話 Args: openness: 嘴部開合度 threshold: 閾值(預設 0.1) Returns: bool: 是否在說話 """ return openness > threshold def process_lip( video_path: str, output_path: str, uuid: str = "", sample_interval: int = 1 ): """ 處理影片檢測嘴部動作 Args: video_path: 影片路徑 output_path: 輸出 JSON 路徑 uuid: UUID for Redis progress sample_interval: 採樣間隔(每 N 幀檢測一次) """ # Set up signal handlers signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) publisher = RedisPublisher(uuid) if uuid else None if publisher: publisher.info("lip", "LIP_START") if publisher: publisher.info("lip", "LIP_LOADING_MEDIAPIPE") # 初始化 MediaPipe Face Mesh (新版本 API) try: import mediapipe as mp # 新版本使用 BaseOptions 和 FaceLandmarker base_options = mp.tasks.BaseOptions( model_asset_path="face_landmarker.task", delegate=mp.tasks.BaseOptions.Delegate.CPU, ) options = mp.tasks.vision.FaceLandmarkerOptions( base_options=base_options, running_mode=mp.tasks.vision.RunningMode.VIDEO, num_faces=1, min_face_detection_confidence=0.5, min_tracking_confidence=0.5, ) face_landmarker = mp.tasks.vision.FaceLandmarker.create_from_options(options) use_new_api = True except Exception as e: # 回退到舊版 API if publisher: publisher.info("lip", f"New API failed, trying old API: {e}") try: mp_face_mesh = mp.solutions.face_mesh face_mesh = mp_face_mesh.FaceMesh( static_image_mode=False, max_num_faces=1, refine_landmarks=True, min_detection_confidence=0.5, min_tracking_confidence=0.5, ) use_new_api = False except Exception as e2: if publisher: publisher.error("lip", f"Failed to load MediaPipe: {e2}") result = { "frame_count": 0, "fps": 0.0, "frames": [], "stats": {}, "error": str(e2), } with open(output_path, "w") as f: json.dump(result, f, indent=2) sys.exit(1) if publisher: publisher.info("lip", "LIP_OPENING_VIDEO") # 打開影片 cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if publisher: publisher.info("lip", f"fps={fps}, frames={total_frames}") publisher.progress("lip", 0, total_frames, "Starting") frames = [] frame_count = 0 processed = 0 # 追蹤嘴部動作統計 speaking_frames = 0 total_openness = 0.0 max_openness = 0.0 timestamp_ms = 0 if publisher: publisher.info("lip", f"LIP_PROCESSING (sample_interval={sample_interval})") while True: ret, frame = cap.read() if not ret: break frame_count += 1 timestamp_ms = int(((frame_count - 1) / fps) * 1000) if fps > 0 else 0 # 採樣處理 if frame_count % sample_interval != 0: continue processed += 1 timestamp = (frame_count - 1) / fps if fps > 0 else 0 # 轉換顏色(BGR → RGB) rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_frame) # 檢測人臉關鍵點 try: if use_new_api: detection_result = face_landmarker.detect_for_video( mp_image, timestamp_ms ) if ( detection_result.face_landmarks and len(detection_result.face_landmarks) > 0 ): landmarks = np.array( [ [kp.x, kp.y, kp.z] for kp in detection_result.face_landmarks[0] ] ) else: landmarks = None else: results = face_mesh.process(rgb_frame) if results.face_landmarks: landmarks = np.array( [[kp.x, kp.y, kp.z] for kp in results.face_landmarks] ) else: landmarks = None except Exception: landmarks = None if landmarks is not None and len(landmarks) >= 468: # 計算嘴部開合度 openness, width, height = calculate_lip_openness(landmarks) # 判斷是否在說話 speaking = is_speaking(openness) if speaking: speaking_frames += 1 total_openness += openness max_openness = max(max_openness, openness) # 記錄結果 frames.append( { "frame": frame_count - 1, "timestamp": round(timestamp, 3), "face_detected": True, "lip_openness": round(openness, 4), "lip_width": round(width, 4), "lip_height": round(height, 4), "is_speaking": speaking, } ) if publisher and processed % 100 == 0: publisher.progress( "lip", processed, total_frames // sample_interval, f"Frame {frame_count}, openness={openness:.3f}", ) else: # 未檢測到人臉 if frame_count % 10 == 0: # 每 10 幀記錄一次無臉幀 frames.append( { "frame": frame_count - 1, "timestamp": round(timestamp, 3), "face_detected": False, "lip_openness": 0.0, "lip_width": 0.0, "lip_height": 0.0, "is_speaking": False, } ) cap.release() if use_new_api: face_landmarker.close() else: face_mesh.close() # 計算統計數據 avg_openness = total_openness / processed if processed > 0 else 0.0 speaking_rate = speaking_frames / processed if processed > 0 else 0.0 result = { "frame_count": total_frames, "fps": fps, "processed_frames": processed, "sample_interval": sample_interval, "frames": frames, "stats": { "total_frames": total_frames, "processed_frames": processed, "frames_with_face": len( [f for f in frames if f.get("face_detected", False)] ), "speaking_frames": speaking_frames, "speaking_rate": round(speaking_rate, 4), "avg_lip_openness": round(avg_openness, 4), "max_lip_openness": round(max_openness, 4), }, } if publisher: publisher.complete( "lip", f"{len(frames)} frames, {speaking_frames} speaking ({speaking_rate * 100:.1f}%)", ) with open(output_path, "w") as f: json.dump(result, f, indent=2) sys.stderr.write( f"LIP: Processing complete, {len(frames)} frames written to {output_path}\n" ) sys.stderr.flush() sys.exit(0) if __name__ == "__main__": parser = argparse.ArgumentParser( description="Lip Movement Detection (MediaPipe Face Mesh)" ) parser.add_argument("video_path", help="Path to video file") parser.add_argument("output_path", help="Output JSON path") parser.add_argument("--uuid", "-u", help="UUID for Redis progress", default="") parser.add_argument( "--sample-interval", "-s", type=int, default=1, help="Process every N frames (default: 1, set higher for faster processing)", ) args = parser.parse_args() process_lip(args.video_path, args.output_path, args.uuid, args.sample_interval)