Schema changes: dev.chunks->dev.chunk, remove old_chunk_id/chunk_index Correction: asr-1.json format, generate/apply scripts API: 37/37 endpoints fixed and tested Docs: HANDOVER_V2.0.md for M4
139 lines
4.9 KiB
Python
139 lines
4.9 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Lip Analyzer — from face_test.json (Apple Vision outer_lips 14pts) + ASRX
|
|
Computes lip_openness per frame, compares with speaker segments.
|
|
"""
|
|
|
|
import json, sys, os
|
|
from pathlib import Path
|
|
from collections import defaultdict
|
|
|
|
def calc_lip_height(face):
|
|
lips_data = face.get("lips", {})
|
|
if isinstance(lips_data, dict):
|
|
pts = lips_data.get("outer_lips", [])
|
|
elif isinstance(lips_data, list):
|
|
pts = lips_data
|
|
else:
|
|
return None
|
|
if not pts or len(pts) < 3:
|
|
return None
|
|
ys = [pt[1] if isinstance(pt, (list, tuple)) else pt.get("y", 0) for pt in pts]
|
|
return max(ys) - min(ys)
|
|
|
|
def main():
|
|
import argparse
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--face", required=True)
|
|
parser.add_argument("--asrx", required=True)
|
|
parser.add_argument("--output", required=True)
|
|
parser.add_argument("--threshold", type=float, default=0.05)
|
|
args = parser.parse_args()
|
|
|
|
# Load face data
|
|
with open(args.face) as f:
|
|
face_data = json.load(f)
|
|
|
|
frames_data = face_data.get("frames", face_data if isinstance(face_data, list) else [])
|
|
# face_test.json uses frames array
|
|
if not isinstance(frames_data, list) and isinstance(face_data, dict):
|
|
frames_data = face_data.get("frames", [])
|
|
|
|
print(f"\nFace data: {len(frames_data)} frames, {face_data.get('frame_count', '?')} total")
|
|
|
|
# Extract lip openness per frame, per face
|
|
lip_by_frame = {}
|
|
for fdata in frames_data:
|
|
fn = fdata.get("frame", 0) if isinstance(fdata, dict) else 0
|
|
faces = fdata.get("faces", fdata.get("detections", []))
|
|
heights = []
|
|
for face in faces:
|
|
h = calc_lip_height(face)
|
|
if h is not None:
|
|
heights.append(h)
|
|
if heights:
|
|
lip_by_frame[fn] = {"heights": heights, "avg": sum(heights)/len(heights), "count": len(heights)}
|
|
|
|
print(f"Frames with lip data: {len(lip_by_frame)}")
|
|
|
|
# Load ASRX speaker segments
|
|
with open(args.asrx) as f:
|
|
asrx = json.load(f)
|
|
segs = asrx.get("segments", [])
|
|
fps = 25.0
|
|
print(f"ASRX segments: {len(segs)}")
|
|
|
|
# Analyze each ASR segment
|
|
results = []
|
|
speakable = 0
|
|
total = 0
|
|
for seg in segs:
|
|
total += 1
|
|
st = seg.get("start_time", 0)
|
|
et = seg.get("end_time", 0)
|
|
speaker = seg.get("speaker_id", "?")
|
|
text = seg.get("text", "")
|
|
|
|
# Process all segments (no time limit)
|
|
|
|
# Find frames in this segment's window
|
|
start_frame = int(st * fps)
|
|
end_frame = int(et * fps) + 10 # allow some after
|
|
|
|
# Sample before ASR start (baseline 10 frames before)
|
|
baseline_frames = [fn for fn in lip_by_frame if abs(fn - start_frame) <= 10]
|
|
|
|
# Sample after ASR start (during speaking)
|
|
during_frames = [fn for fn in lip_by_frame if fn >= start_frame and fn <= end_frame]
|
|
|
|
baseline_avg = sum(lip_by_frame[fn]["avg"] for fn in baseline_frames) / max(len(baseline_frames), 1)
|
|
during_avg = sum(lip_by_frame[fn]["avg"] for fn in during_frames) / max(len(during_frames), 1)
|
|
|
|
# How many frames have detectable faces (any faces)
|
|
any_face = len(during_frames)
|
|
|
|
motion = (during_avg - baseline_avg) / max(baseline_avg, 1)
|
|
is_speaking = motion > args.threshold
|
|
|
|
r = {
|
|
"start_time": st, "end_time": et, "speaker": speaker,
|
|
"text": text[:40],
|
|
"baseline_avg": round(baseline_avg, 2),
|
|
"during_avg": round(during_avg, 2),
|
|
"motion_ratio": round(motion, 4),
|
|
"is_speaking": is_speaking,
|
|
"baseline_frames": len(baseline_frames),
|
|
"during_frames": any_face,
|
|
}
|
|
results.append(r)
|
|
if any_face > 0:
|
|
speakable += 1
|
|
|
|
# Summary
|
|
print(f"\n=== Results ===")
|
|
print(f"ASRX segments analyzed: {len(results)}")
|
|
print(f"With face data: {speakable} ({speakable*100//max(len(results),1)}%)")
|
|
speech_detected = sum(1 for r in results if r["is_speaking"] and r["during_frames"] > 0)
|
|
print(f"Lip motion detected: {speech_detected} ({speech_detected*100//max(speakable,1)}% of face-present)")
|
|
|
|
print(f"\n=== Sample: first 5 segments ===")
|
|
for r in results[:5]:
|
|
icon = "🗣" if r["is_speaking"] else "🤐"
|
|
print(f" {icon} {r['start_time']:.0f}s {r['speaker']:12s} motion={r['motion_ratio']:.3f} baseline={r['baseline_avg']:.1f} during={r['during_avg']:.1f} faces={r['during_frames']}")
|
|
|
|
# Save
|
|
output = {
|
|
"fps": fps,
|
|
"total_asrx_segments": len(results),
|
|
"segments_with_faces": speakable,
|
|
"segments_with_lip_motion": speech_detected,
|
|
"lip_by_frame_count": len(lip_by_frame),
|
|
"results": results,
|
|
}
|
|
with open(args.output, "w") as f:
|
|
json.dump(output, f, indent=2, ensure_ascii=False)
|
|
print(f"\nSaved: {args.output}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|