Files
momentry_core/scripts/lip_processor_simple.py
Warren e75c4d6f07 cleanup: remove dead code and duplicate docs
- Remove session-ses_2f27.md (161KB raw session log)
- Remove 49 ROOT_* duplicate files across REFERENCE/
- Remove 14 duplicate files between REFERENCE/ root and history/
- Remove asr_legacy.rs (dead code, replaced by asr.rs)
- Remove src/core/worker/ (duplicate JobWorker)
- Remove src/core/layers/ (empty directory)
- Remove 4 .bak files in src/
- Remove 7 dead private methods in worker/processor.rs
- Remove backup directory from git tracking
2026-05-04 01:31:21 +08:00

180 lines
4.8 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Lip Processor - 嘴部動作檢測 (簡化版)
使用 MediaPipe Face Mesh 檢測嘴部開合度
"""
import sys
import json
import argparse
import os
import signal
import cv2
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from redis_publisher import RedisPublisher
def signal_handler(signum, frame):
print(f"LIP: Received signal {signum}, exiting...")
sys.exit(1)
# 嘴部關鍵點索引 (MediaPipe Face Mesh 468 點)
UPPER_LIP_TOP = 13
LOWER_LIP_BOTTOM = 14
UPPER_LIP_BOTTOM = 78
LOWER_LIP_TOP = 308
LEFT_MOUTH = 61
RIGHT_MOUTH = 291
def process_lip(
video_path: str, output_path: str, uuid: str = "", sample_interval: int = 30
):
"""Process video for lip movement detection"""
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
publisher = RedisPublisher(uuid) if uuid else None
if publisher:
publisher.info("lip", "LIP_START")
if publisher:
publisher.info("lip", "LIP_LOADING_MEDIAPIPE")
# 使用 MediaPipe 舊版 API (如果可用)
try:
import mediapipe as mp
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(
static_image_mode=False,
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5,
)
use_legacy = True
except:
use_legacy = False
if publisher:
publisher.error("lip", "MediaPipe legacy API not available")
result = {"error": "MediaPipe API not available", "frames": []}
with open(output_path, "w") as f:
json.dump(result, f, indent=2)
sys.exit(1)
if publisher:
publisher.info("lip", "LIP_OPENING_VIDEO")
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if publisher:
publisher.info(
"lip", f"fps={fps}, frames={total_frames}, sample={sample_interval}"
)
publisher.progress("lip", 0, total_frames, "Starting")
frames = []
frame_count = 0
processed = 0
speaking_frames = 0
total_openness = 0.0
if publisher:
publisher.info("lip", "LIP_PROCESSING")
while True:
ret, frame = cap.read()
if not ret:
break
frame_count += 1
if frame_count % sample_interval != 0:
continue
processed += 1
timestamp = (frame_count - 1) / fps if fps > 0 else 0
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = face_mesh.process(rgb)
if results.face_landmarks:
lm = results.face_landmarks
# 計算嘴部開合度
openness = abs(lm[UPPER_LIP_BOTTOM].y - lm[LOWER_LIP_TOP].y)
width = abs(lm[LEFT_MOUTH].x - lm[RIGHT_MOUTH].x)
if width > 0:
normalized = openness / width
else:
normalized = 0.0
speaking = normalized > 0.1
if speaking:
speaking_frames += 1
total_openness += normalized
frames.append(
{
"frame": frame_count - 1,
"timestamp": round(timestamp, 3),
"face_detected": True,
"lip_openness": round(normalized, 4),
"is_speaking": speaking,
}
)
if publisher and processed % 50 == 0:
publisher.progress(
"lip",
processed,
total_frames // sample_interval,
f"openness={normalized:.3f}",
)
cap.release()
avg_openness = total_openness / processed if processed > 0 else 0.0
speaking_rate = speaking_frames / processed if processed > 0 else 0.0
result = {
"frame_count": total_frames,
"fps": fps,
"processed_frames": processed,
"sample_interval": sample_interval,
"frames": frames,
"stats": {
"speaking_frames": speaking_frames,
"speaking_rate": round(speaking_rate, 4),
"avg_openness": round(avg_openness, 4),
},
}
if publisher:
publisher.complete("lip", f"{len(frames)} frames")
with open(output_path, "w") as f:
json.dump(result, f, indent=2)
sys.stderr.write(f"LIP: Done, {len(frames)} frames\n")
sys.exit(0)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("video_path")
parser.add_argument("output_path")
parser.add_argument("--uuid", "-u", default="")
parser.add_argument("--sample-interval", "-s", type=int, default=30)
args = parser.parse_args()
process_lip(args.video_path, args.output_path, args.uuid, args.sample_interval)