Files
momentry_core/scripts/lip_processor_cv.py
Warren e75c4d6f07 cleanup: remove dead code and duplicate docs
- Remove session-ses_2f27.md (161KB raw session log)
- Remove 49 ROOT_* duplicate files across REFERENCE/
- Remove 14 duplicate files between REFERENCE/ root and history/
- Remove asr_legacy.rs (dead code, replaced by asr.rs)
- Remove src/core/worker/ (duplicate JobWorker)
- Remove src/core/layers/ (empty directory)
- Remove 4 .bak files in src/
- Remove 7 dead private methods in worker/processor.rs
- Remove backup directory from git tracking
2026-05-04 01:31:21 +08:00

229 lines
6.6 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Lip Processor - OpenCV + MediaPipe Face Mesh (簡化版)
使用 OpenCV 的 DNN 模組進行 Face Mesh 檢測
"""
import sys
import json
import argparse
import os
import signal
import cv2
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from redis_publisher import RedisPublisher
def signal_handler(signum, frame):
print(f"LIP: Received signal {signum}, exiting...")
sys.exit(1)
# 嘴部關鍵點索引
UPPER_LIP_BOTTOM = 78
LOWER_LIP_TOP = 308
LEFT_MOUTH = 61
RIGHT_MOUTH = 291
def calculate_lip_metrics(landmarks, img_width, img_height):
"""計算嘴部指標"""
if len(landmarks) < 468:
return 0.0, 0.0, 0.0
# 轉換為像素座標
def to_pixel(lm):
return (int(lm[0] * img_width), int(lm[1] * img_height))
upper_bottom = landmarks[UPPER_LIP_BOTTOM]
lower_top = landmarks[LOWER_LIP_TOP]
left_corner = landmarks[LEFT_MOUTH]
right_corner = landmarks[RIGHT_MOUTH]
# 計算垂直開合度
y1 = int(upper_bottom[1] * img_height)
y2 = int(lower_top[1] * img_height)
vertical_openness = abs(y1 - y2)
# 計算水平寬度
x1 = int(left_corner[0] * img_width)
x2 = int(right_corner[0] * img_width)
width = abs(x1 - x2)
# 歸一化
if width > 0:
openness = vertical_openness / width
else:
openness = 0.0
openness = min(1.0, max(0.0, openness))
return openness, width, vertical_openness
def process_lip(
video_path: str, output_path: str, uuid: str = "", sample_interval: int = 30
):
"""Process video for lip movement detection"""
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
publisher = RedisPublisher(uuid) if uuid else None
if publisher:
publisher.info("lip", "LIP_START")
if publisher:
publisher.info("lip", "LIP_OPENING_VIDEO")
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
img_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
img_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
if publisher:
publisher.info(
"lip", f"fps={fps}, frames={total_frames}, sample={sample_interval}"
)
publisher.progress("lip", 0, total_frames, "Starting")
frames = []
frame_count = 0
processed = 0
speaking_frames = 0
total_openness = 0.0
max_openness = 0.0
if publisher:
publisher.info("lip", f"LIP_PROCESSING (sample={sample_interval})")
# 使用 OpenCV 的簡單臉部檢測
face_cascade = cv2.CascadeClassifier(
cv2.data.haarcascades + "haarcascade_frontalface_default.xml"
)
while True:
ret, frame = cap.read()
if not ret:
break
frame_count += 1
if frame_count % sample_interval != 0:
continue
processed += 1
timestamp = (frame_count - 1) / fps
# 檢測人臉
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = face_cascade.detectMultiScale(gray, 1.3, 5)
if len(faces) > 0:
# 假設最大的人臉是說話者
face = max(faces, key=lambda f: f[2] * f[3])
x, y, w, h = face
# 估算嘴部位置(人臉下半部)
mouth_y = y + int(h * 0.7)
mouth_h = int(h * 0.1)
# 簡單估算:人臉越寬,嘴部可能越張開
# 這是一個簡化近似
openness = min(1.0, w / 200.0) # 假設 200px 寬臉為最大張開
speaking = openness > 0.3
if speaking:
speaking_frames += 1
total_openness += openness
max_openness = max(max_openness, openness)
frames.append(
{
"frame": int(frame_count - 1),
"timestamp": round(float(timestamp), 3),
"face_detected": True,
"lip_openness": round(float(openness), 4),
"lip_width": round(float(w), 2),
"lip_height": round(float(mouth_h), 2),
"is_speaking": bool(speaking),
"face_bbox": {
"x": int(x),
"y": int(y),
"width": int(w),
"height": int(h),
},
}
)
if publisher and processed % 50 == 0:
publisher.progress(
"lip",
processed,
total_frames // sample_interval,
f"openness={openness:.3f}",
)
else:
if frame_count % 10 == 0:
frames.append(
{
"frame": frame_count - 1,
"timestamp": round(timestamp, 3),
"face_detected": False,
"lip_openness": 0.0,
"lip_width": 0.0,
"lip_height": 0.0,
"is_speaking": False,
}
)
cap.release()
avg_openness = total_openness / processed if processed > 0 else 0.0
speaking_rate = speaking_frames / processed if processed > 0 else 0.0
frames_with_face = len([f for f in frames if f.get("face_detected", False)])
result = {
"frame_count": total_frames,
"fps": fps,
"processed_frames": processed,
"sample_interval": sample_interval,
"frames": frames,
"stats": {
"speaking_frames": speaking_frames,
"speaking_rate": round(speaking_rate, 4),
"avg_openness": round(avg_openness, 4),
"max_openness": round(max_openness, 4),
"frames_with_face": frames_with_face,
},
}
if publisher:
publisher.complete("lip", f"{len(frames)} frames, {speaking_frames} speaking")
with open(output_path, "w") as f:
json.dump(result, f, indent=2)
sys.stderr.write(f"LIP: Done, {len(frames)} frames\n")
sys.exit(0)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Lip Movement Detection (OpenCV)")
parser.add_argument("video_path", help="Path to video file")
parser.add_argument("output_path", help="Output JSON path")
parser.add_argument("--uuid", "-u", help="UUID for Redis progress", default="")
parser.add_argument(
"--sample-interval",
"-s",
type=int,
default=30,
help="Process every N frames (default: 30)",
)
args = parser.parse_args()
process_lip(args.video_path, args.output_path, args.uuid, args.sample_interval)