- Remove session-ses_2f27.md (161KB raw session log) - Remove 49 ROOT_* duplicate files across REFERENCE/ - Remove 14 duplicate files between REFERENCE/ root and history/ - Remove asr_legacy.rs (dead code, replaced by asr.rs) - Remove src/core/worker/ (duplicate JobWorker) - Remove src/core/layers/ (empty directory) - Remove 4 .bak files in src/ - Remove 7 dead private methods in worker/processor.rs - Remove backup directory from git tracking
229 lines
6.6 KiB
Python
229 lines
6.6 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Lip Processor - OpenCV + MediaPipe Face Mesh (簡化版)
|
|
使用 OpenCV 的 DNN 模組進行 Face Mesh 檢測
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import argparse
|
|
import os
|
|
import signal
|
|
import cv2
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
from redis_publisher import RedisPublisher
|
|
|
|
|
|
def signal_handler(signum, frame):
|
|
print(f"LIP: Received signal {signum}, exiting...")
|
|
sys.exit(1)
|
|
|
|
|
|
# 嘴部關鍵點索引
|
|
UPPER_LIP_BOTTOM = 78
|
|
LOWER_LIP_TOP = 308
|
|
LEFT_MOUTH = 61
|
|
RIGHT_MOUTH = 291
|
|
|
|
|
|
def calculate_lip_metrics(landmarks, img_width, img_height):
|
|
"""計算嘴部指標"""
|
|
if len(landmarks) < 468:
|
|
return 0.0, 0.0, 0.0
|
|
|
|
# 轉換為像素座標
|
|
def to_pixel(lm):
|
|
return (int(lm[0] * img_width), int(lm[1] * img_height))
|
|
|
|
upper_bottom = landmarks[UPPER_LIP_BOTTOM]
|
|
lower_top = landmarks[LOWER_LIP_TOP]
|
|
left_corner = landmarks[LEFT_MOUTH]
|
|
right_corner = landmarks[RIGHT_MOUTH]
|
|
|
|
# 計算垂直開合度
|
|
y1 = int(upper_bottom[1] * img_height)
|
|
y2 = int(lower_top[1] * img_height)
|
|
vertical_openness = abs(y1 - y2)
|
|
|
|
# 計算水平寬度
|
|
x1 = int(left_corner[0] * img_width)
|
|
x2 = int(right_corner[0] * img_width)
|
|
width = abs(x1 - x2)
|
|
|
|
# 歸一化
|
|
if width > 0:
|
|
openness = vertical_openness / width
|
|
else:
|
|
openness = 0.0
|
|
|
|
openness = min(1.0, max(0.0, openness))
|
|
|
|
return openness, width, vertical_openness
|
|
|
|
|
|
def process_lip(
|
|
video_path: str, output_path: str, uuid: str = "", sample_interval: int = 30
|
|
):
|
|
"""Process video for lip movement detection"""
|
|
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
publisher = RedisPublisher(uuid) if uuid else None
|
|
if publisher:
|
|
publisher.info("lip", "LIP_START")
|
|
|
|
if publisher:
|
|
publisher.info("lip", "LIP_OPENING_VIDEO")
|
|
|
|
cap = cv2.VideoCapture(video_path)
|
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
img_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
img_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
|
|
if publisher:
|
|
publisher.info(
|
|
"lip", f"fps={fps}, frames={total_frames}, sample={sample_interval}"
|
|
)
|
|
publisher.progress("lip", 0, total_frames, "Starting")
|
|
|
|
frames = []
|
|
frame_count = 0
|
|
processed = 0
|
|
speaking_frames = 0
|
|
total_openness = 0.0
|
|
max_openness = 0.0
|
|
|
|
if publisher:
|
|
publisher.info("lip", f"LIP_PROCESSING (sample={sample_interval})")
|
|
|
|
# 使用 OpenCV 的簡單臉部檢測
|
|
face_cascade = cv2.CascadeClassifier(
|
|
cv2.data.haarcascades + "haarcascade_frontalface_default.xml"
|
|
)
|
|
|
|
while True:
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
|
|
frame_count += 1
|
|
|
|
if frame_count % sample_interval != 0:
|
|
continue
|
|
|
|
processed += 1
|
|
timestamp = (frame_count - 1) / fps
|
|
|
|
# 檢測人臉
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
faces = face_cascade.detectMultiScale(gray, 1.3, 5)
|
|
|
|
if len(faces) > 0:
|
|
# 假設最大的人臉是說話者
|
|
face = max(faces, key=lambda f: f[2] * f[3])
|
|
x, y, w, h = face
|
|
|
|
# 估算嘴部位置(人臉下半部)
|
|
mouth_y = y + int(h * 0.7)
|
|
mouth_h = int(h * 0.1)
|
|
|
|
# 簡單估算:人臉越寬,嘴部可能越張開
|
|
# 這是一個簡化近似
|
|
openness = min(1.0, w / 200.0) # 假設 200px 寬臉為最大張開
|
|
|
|
speaking = openness > 0.3
|
|
if speaking:
|
|
speaking_frames += 1
|
|
|
|
total_openness += openness
|
|
max_openness = max(max_openness, openness)
|
|
|
|
frames.append(
|
|
{
|
|
"frame": int(frame_count - 1),
|
|
"timestamp": round(float(timestamp), 3),
|
|
"face_detected": True,
|
|
"lip_openness": round(float(openness), 4),
|
|
"lip_width": round(float(w), 2),
|
|
"lip_height": round(float(mouth_h), 2),
|
|
"is_speaking": bool(speaking),
|
|
"face_bbox": {
|
|
"x": int(x),
|
|
"y": int(y),
|
|
"width": int(w),
|
|
"height": int(h),
|
|
},
|
|
}
|
|
)
|
|
|
|
if publisher and processed % 50 == 0:
|
|
publisher.progress(
|
|
"lip",
|
|
processed,
|
|
total_frames // sample_interval,
|
|
f"openness={openness:.3f}",
|
|
)
|
|
else:
|
|
if frame_count % 10 == 0:
|
|
frames.append(
|
|
{
|
|
"frame": frame_count - 1,
|
|
"timestamp": round(timestamp, 3),
|
|
"face_detected": False,
|
|
"lip_openness": 0.0,
|
|
"lip_width": 0.0,
|
|
"lip_height": 0.0,
|
|
"is_speaking": False,
|
|
}
|
|
)
|
|
|
|
cap.release()
|
|
|
|
avg_openness = total_openness / processed if processed > 0 else 0.0
|
|
speaking_rate = speaking_frames / processed if processed > 0 else 0.0
|
|
frames_with_face = len([f for f in frames if f.get("face_detected", False)])
|
|
|
|
result = {
|
|
"frame_count": total_frames,
|
|
"fps": fps,
|
|
"processed_frames": processed,
|
|
"sample_interval": sample_interval,
|
|
"frames": frames,
|
|
"stats": {
|
|
"speaking_frames": speaking_frames,
|
|
"speaking_rate": round(speaking_rate, 4),
|
|
"avg_openness": round(avg_openness, 4),
|
|
"max_openness": round(max_openness, 4),
|
|
"frames_with_face": frames_with_face,
|
|
},
|
|
}
|
|
|
|
if publisher:
|
|
publisher.complete("lip", f"{len(frames)} frames, {speaking_frames} speaking")
|
|
|
|
with open(output_path, "w") as f:
|
|
json.dump(result, f, indent=2)
|
|
|
|
sys.stderr.write(f"LIP: Done, {len(frames)} frames\n")
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Lip Movement Detection (OpenCV)")
|
|
parser.add_argument("video_path", help="Path to video file")
|
|
parser.add_argument("output_path", help="Output JSON path")
|
|
parser.add_argument("--uuid", "-u", help="UUID for Redis progress", default="")
|
|
parser.add_argument(
|
|
"--sample-interval",
|
|
"-s",
|
|
type=int,
|
|
default=30,
|
|
help="Process every N frames (default: 30)",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
process_lip(args.video_path, args.output_path, args.uuid, args.sample_interval)
|