#!/opt/homebrew/bin/python3.11 """ Lip Processor - 嘴部動作檢測 (簡化版) 使用 MediaPipe Face Mesh 檢測嘴部開合度 """ import sys import json import argparse import os import signal import cv2 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from redis_publisher import RedisPublisher def signal_handler(signum, frame): print(f"LIP: Received signal {signum}, exiting...") sys.exit(1) # 嘴部關鍵點索引 (MediaPipe Face Mesh 468 點) UPPER_LIP_TOP = 13 LOWER_LIP_BOTTOM = 14 UPPER_LIP_BOTTOM = 78 LOWER_LIP_TOP = 308 LEFT_MOUTH = 61 RIGHT_MOUTH = 291 def process_lip( video_path: str, output_path: str, uuid: str = "", sample_interval: int = 30 ): """Process video for lip movement detection""" signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) publisher = RedisPublisher(uuid) if uuid else None if publisher: publisher.info("lip", "LIP_START") if publisher: publisher.info("lip", "LIP_LOADING_MEDIAPIPE") # 使用 MediaPipe 舊版 API (如果可用) try: import mediapipe as mp mp_face_mesh = mp.solutions.face_mesh face_mesh = mp_face_mesh.FaceMesh( static_image_mode=False, max_num_faces=1, refine_landmarks=True, min_detection_confidence=0.5, min_tracking_confidence=0.5, ) use_legacy = True except: use_legacy = False if publisher: publisher.error("lip", "MediaPipe legacy API not available") result = {"error": "MediaPipe API not available", "frames": []} with open(output_path, "w") as f: json.dump(result, f, indent=2) sys.exit(1) if publisher: publisher.info("lip", "LIP_OPENING_VIDEO") cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if publisher: publisher.info( "lip", f"fps={fps}, frames={total_frames}, sample={sample_interval}" ) publisher.progress("lip", 0, total_frames, "Starting") frames = [] frame_count = 0 processed = 0 speaking_frames = 0 total_openness = 0.0 if publisher: publisher.info("lip", "LIP_PROCESSING") while True: ret, frame = cap.read() if not ret: break frame_count += 1 if frame_count % sample_interval != 0: continue processed += 1 timestamp = (frame_count - 1) / fps if fps > 0 else 0 rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) results = face_mesh.process(rgb) if results.face_landmarks: lm = results.face_landmarks # 計算嘴部開合度 openness = abs(lm[UPPER_LIP_BOTTOM].y - lm[LOWER_LIP_TOP].y) width = abs(lm[LEFT_MOUTH].x - lm[RIGHT_MOUTH].x) if width > 0: normalized = openness / width else: normalized = 0.0 speaking = normalized > 0.1 if speaking: speaking_frames += 1 total_openness += normalized frames.append( { "frame": frame_count - 1, "timestamp": round(timestamp, 3), "face_detected": True, "lip_openness": round(normalized, 4), "is_speaking": speaking, } ) if publisher and processed % 50 == 0: publisher.progress( "lip", processed, total_frames // sample_interval, f"openness={normalized:.3f}", ) cap.release() avg_openness = total_openness / processed if processed > 0 else 0.0 speaking_rate = speaking_frames / processed if processed > 0 else 0.0 result = { "frame_count": total_frames, "fps": fps, "processed_frames": processed, "sample_interval": sample_interval, "frames": frames, "stats": { "speaking_frames": speaking_frames, "speaking_rate": round(speaking_rate, 4), "avg_openness": round(avg_openness, 4), }, } if publisher: publisher.complete("lip", f"{len(frames)} frames") with open(output_path, "w") as f: json.dump(result, f, indent=2) sys.stderr.write(f"LIP: Done, {len(frames)} frames\n") sys.exit(0) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("video_path") parser.add_argument("output_path") parser.add_argument("--uuid", "-u", default="") parser.add_argument("--sample-interval", "-s", type=int, default=30) args = parser.parse_args() process_lip(args.video_path, args.output_path, args.uuid, args.sample_interval)