Files
momentry_core/scripts/integrated_body_action_decoder.py
Warren e75c4d6f07 cleanup: remove dead code and duplicate docs
- Remove session-ses_2f27.md (161KB raw session log)
- Remove 49 ROOT_* duplicate files across REFERENCE/
- Remove 14 duplicate files between REFERENCE/ root and history/
- Remove asr_legacy.rs (dead code, replaced by asr.rs)
- Remove src/core/worker/ (duplicate JobWorker)
- Remove src/core/layers/ (empty directory)
- Remove 4 .bak files in src/
- Remove 7 dead private methods in worker/processor.rs
- Remove backup directory from git tracking
2026-05-04 01:31:21 +08:00

436 lines
16 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Integrated Body Action Decoder - Combine InsightFace + MediaPipe Holistic
Purpose:
1. Combine InsightFace pose_angle (existing) with MediaPipe holistic
2. Generate complete body action timeline
3. Support trace-based analysis
Input:
- face.json (InsightFace: embedding, pose_angle)
- holistic.json (MediaPipe: face_mesh, pose, hands)
Output:
- Integrated action data with all body parts
"""
import json
import argparse
from typing import Dict, List
from collections import defaultdict
class IntegratedBodyActionDecoder:
"""
Decode body actions from combined InsightFace + MediaPipe data
"""
def __init__(self):
# Action thresholds
self.EAR_THRESHOLDS = {
"closed": 0.15,
"squint": 0.25,
"wide_open": 0.4,
}
self.MAR_THRESHOLDS = {
"closed": 0.2,
"slightly_open": 0.3,
"open": 0.5,
"yawn": 0.7,
}
self.ELBOW_ANGLE_THRESHOLDS = {
"fold": 90,
"extend": 150,
}
self.KNEE_ANGLE_THRESHOLDS = {
"knee_bend": 120,
"standing": 160,
}
def decode_frame_actions(
self,
face_data: Dict,
holistic_data: Dict,
) -> Dict:
"""
Decode all actions for single frame
Args:
face_data: InsightFace data (pose_angle, embedding)
holistic_data: MediaPipe data (face_mesh, pose, hands)
Returns:
Dict with all decoded actions
"""
actions = {
"face": [],
"eyes": [],
"mouth": [],
"arms": [],
"hands": [],
"legs": [],
"combined": [],
}
# 1. Face pose (from InsightFace)
if face_data and "pose_angle" in face_data:
pose_angle = face_data["pose_angle"]
angle = pose_angle.get("angle", "unknown")
confidence = pose_angle.get("confidence", 0.0)
actions["face"].append({
"action": f"pose_{angle}",
"description": f"Face pose: {angle}",
"confidence": confidence,
"source": "insightface",
})
# 2. Eye actions (from MediaPipe face_mesh)
if holistic_data and "face_mesh" in holistic_data:
eye_features = holistic_data["face_mesh"].get("eye_features", {})
eye_action = eye_features.get("eye_action", "unknown")
ear = eye_features.get("avg_ear", 0)
gaze = eye_features.get("gaze_direction", "center")
if eye_action != "unknown":
actions["eyes"].append({
"action": f"eye_{eye_action}",
"description": f"Eye: {eye_action} (EAR: {ear:.3f})",
"ear": ear,
"gaze": gaze,
"source": "mediapipe_face_mesh",
})
if gaze != "center":
actions["eyes"].append({
"action": f"gaze_{gaze}",
"description": f"Gaze: looking {gaze}",
"source": "mediapipe_face_mesh",
})
# 3. Mouth actions (from MediaPipe face_mesh)
if holistic_data and "face_mesh" in holistic_data:
mouth_features = holistic_data["face_mesh"].get("mouth_features", {})
mouth_action = mouth_features.get("mouth_action", "unknown")
mar = mouth_features.get("mar", 0)
if mouth_action != "unknown":
actions["mouth"].append({
"action": f"mouth_{mouth_action}",
"description": f"Mouth: {mouth_action} (MAR: {mar:.3f})",
"mar": mar,
"source": "mediapipe_face_mesh",
})
# 4. Arm actions (from MediaPipe pose)
if holistic_data and "pose" in holistic_data:
arm_features = holistic_data["pose"].get("arm_features", {})
left_arm_action = arm_features.get("left_arm_action", "unknown")
right_arm_action = arm_features.get("right_arm_action", "unknown")
left_angle = arm_features.get("left_elbow_angle", 0)
right_angle = arm_features.get("right_elbow_angle", 0)
cross_arms = arm_features.get("cross_arms", False)
if left_arm_action != "unknown":
actions["arms"].append({
"action": f"left_arm_{left_arm_action}",
"description": f"Left arm: {left_arm_action} (angle: {left_angle:.1f}°)",
"angle": left_angle,
"source": "mediapipe_pose",
})
if right_arm_action != "unknown":
actions["arms"].append({
"action": f"right_arm_{right_arm_action}",
"description": f"Right arm: {right_arm_action} (angle: {right_angle:.1f}°)",
"angle": right_angle,
"source": "mediapipe_pose",
})
if cross_arms:
actions["arms"].append({
"action": "cross_arms",
"description": "Arms crossed",
"source": "mediapipe_pose",
})
# 5. Hand actions (from MediaPipe hands)
if holistic_data and "hands" in holistic_data:
for hand_type in ["left", "right"]:
hand_data = holistic_data["hands"].get(hand_type)
if hand_data:
gesture = hand_data.get("gesture", "unknown")
num_fingers = hand_data.get("num_fingers_extended", 0)
if gesture != "unknown":
actions["hands"].append({
"action": f"{hand_type}_hand_{gesture}",
"description": f"{hand_type.capitalize()} hand: {gesture} ({num_fingers} fingers)",
"num_fingers_extended": num_fingers,
"source": "mediapipe_hands",
})
# 6. Leg actions (from MediaPipe pose)
if holistic_data and "pose" in holistic_data:
leg_features = holistic_data["pose"].get("leg_features", {})
leg_action = leg_features.get("leg_action", "unknown")
if leg_action != "unknown":
actions["legs"].append({
"action": f"leg_{leg_action}",
"description": f"Leg: {leg_action}",
"source": "mediapipe_pose",
})
# 7. Combined actions
actions["combined"] = self._detect_combined_actions(actions)
return actions
def _detect_combined_actions(self, actions: Dict) -> List[Dict]:
"""
Detect combined actions from multiple body parts
Args:
actions: Dict with all individual actions
Returns:
List of combined actions
"""
combined = []
detected_actions = []
for category, action_list in actions.items():
for act in action_list:
detected_actions.append(act["action"])
# Thinking: touch_face + look_down
if "pose_tilted_down" in detected_actions and "left_hand_pointing" in detected_actions:
combined.append({
"action": "thinking_pose",
"description": "Thinking pose (looking down + pointing)",
"components": ["pose_tilted_down", "left_hand_pointing"],
})
# Crossed arms + neutral pose
if "cross_arms" in detected_actions and "pose_frontal" in detected_actions:
combined.append({
"action": "defensive_pose",
"description": "Defensive pose (crossed arms + frontal)",
"components": ["cross_arms", "pose_frontal"],
})
# Open mouth + squint = surprise
if "mouth_open" in detected_actions and "eye_wide_open" in detected_actions:
combined.append({
"action": "surprise_expression",
"description": "Surprise expression (wide eyes + open mouth)",
"components": ["eye_wide_open", "mouth_open"],
})
return combined
def integrate_and_decode(
self,
face_json_path: str,
holistic_json_path: str,
) -> Dict:
"""
Integrate face.json + holistic.json and decode actions
Args:
face_json_path: Path to face.json (InsightFace)
holistic_json_path: Path to holistic.json (MediaPipe)
Returns:
Integrated action data
"""
# Load face.json
with open(face_json_path) as f:
face_data = json.load(f)
# Load holistic.json
with open(holistic_json_path) as f:
holistic_data = json.load(f)
# Merge frames
face_frames = face_data.get("frames", {})
holistic_frames = holistic_data.get("frames", {})
# Find common frames
common_frames = set(face_frames.keys()) & set(holistic_frames.keys())
print(f"Face frames: {len(face_frames)}")
print(f"Holistic frames: {len(holistic_frames)}")
print(f"Common frames: {len(common_frames)}")
print()
integrated_data = {
"metadata": {
"face_source": face_json_path,
"holistic_source": holistic_json_path,
"total_frames": len(common_frames),
"sources": ["insightface", "mediapipe_holistic"],
},
"frames": {},
"action_summary": defaultdict(int),
}
for frame_num in sorted(common_frames, key=int):
face_frame = face_frames[frame_num]
holistic_frame = holistic_frames[frame_num]
# Get first face/person
face_person = face_frame.get("faces", [{}])[0]
holistic_person = holistic_frame.get("persons", [{}])[0]
# Decode actions
actions = self.decode_frame_actions(face_person, holistic_person)
# Store
integrated_data["frames"][frame_num] = {
"frame_number": int(frame_num),
"actions": actions,
"insightface_data": {
"pose_angle": face_person.get("pose_angle"),
"embedding": face_person.get("embedding")[:10] if face_person.get("embedding") else None, # Only first 10 values
},
"mediapipe_data": {
"eye_action": (holistic_person.get("face_mesh") or {}).get("eye_features", {}).get("eye_action"),
"mouth_action": (holistic_person.get("face_mesh") or {}).get("mouth_features", {}).get("mouth_action"),
"left_arm_action": (holistic_person.get("pose") or {}).get("arm_features", {}).get("left_arm_action"),
"right_arm_action": (holistic_person.get("pose") or {}).get("arm_features", {}).get("right_arm_action"),
"leg_action": (holistic_person.get("pose") or {}).get("leg_features", {}).get("leg_action"),
"left_hand_gesture": ((holistic_person.get("hands") or {}).get("left") or {}).get("gesture"),
"right_hand_gesture": ((holistic_person.get("hands") or {}).get("right") or {}).get("gesture"),
},
}
# Update summary
for category, action_list in actions.items():
for act in action_list:
integrated_data["action_summary"][act["action"]] += 1
# Convert defaultdict to dict
integrated_data["action_summary"] = dict(integrated_data["action_summary"])
return integrated_data
def print_action_report(self, integrated_data: Dict) -> None:
"""
Print action report
"""
print("\n" + "=" * 70)
print("Integrated Body Action Decoder Report")
print("=" * 70)
print(f"\nTotal frames: {integrated_data['metadata']['total_frames']}")
print(f"Sources: {', '.join(integrated_data['metadata']['sources'])}")
print("\n" + "=" * 70)
print("Action Summary")
print("=" * 70)
summary = integrated_data["action_summary"]
# Group by category
categories = {
"Face": [k for k in summary if k.startswith("pose_")],
"Eyes": [k for k in summary if k.startswith("eye_") or k.startswith("gaze_")],
"Mouth": [k for k in summary if k.startswith("mouth_")],
"Arms": [k for k in summary if k.startswith("left_arm_") or k.startswith("right_arm_") or k == "cross_arms"],
"Hands": [k for k in summary if k.startswith("left_hand_") or k.startswith("right_hand_")],
"Legs": [k for k in summary if k.startswith("leg_")],
"Combined": [k for k in summary if not any(k.startswith(p) for p in ["pose_", "eye_", "gaze_", "mouth_", "left_arm_", "right_arm_", "left_hand_", "right_hand_", "leg_", "cross_arms"])],
}
for category, action_keys in categories.items():
if action_keys:
print(f"\n{category} Actions:")
for action in sorted(action_keys):
count = summary[action]
print(f" {action}: {count} times")
print("\n" + "=" * 70)
print("Sample Frame Actions")
print("=" * 70)
# Show first 3 frames
for i, (frame_num, frame_data) in enumerate(sorted(integrated_data["frames"].items(), key=lambda x: int(x[0]))[:3]):
print(f"\nFrame {frame_num}:")
for category, action_list in frame_data["actions"].items():
if action_list:
action_names = [a["action"] for a in action_list]
print(f" {category}: {', '.join(action_names)}")
def main():
parser = argparse.ArgumentParser(description="Integrated Body Action Decoder")
parser.add_argument("--face-json", required=True, help="Path to face.json (InsightFace)")
parser.add_argument("--holistic-json", required=True, help="Path to holistic.json (MediaPipe)")
parser.add_argument("--output-json", help="Output JSON path")
parser.add_argument("--frame", type=int, help="Analyze single frame")
args = parser.parse_args()
print("=" * 70)
print("Integrated Body Action Decoder")
print("=" * 70)
decoder = IntegratedBodyActionDecoder()
if args.frame:
# Load single frame
with open(args.face_json) as f:
face_data = json.load(f)
with open(args.holistic_json) as f:
holistic_data = json.load(f)
frame_num = str(args.frame)
if frame_num in face_data["frames"] and frame_num in holistic_data["frames"]:
face_person = face_data["frames"][frame_num]["faces"][0]
holistic_person = holistic_data["frames"][frame_num]["persons"][0]
actions = decoder.decode_frame_actions(face_person, holistic_person)
print(f"\n=== Frame {frame_num} Actions ===")
for category, action_list in actions.items():
if action_list:
print(f"\n{category.upper()}:")
for act in action_list:
print(f" {act['action']}: {act['description']}")
else:
print(f"❌ Frame {frame_num} not found in both files")
else:
# Process all frames
integrated_data = decoder.integrate_and_decode(
args.face_json,
args.holistic_json,
)
decoder.print_action_report(integrated_data)
if args.output_json:
with open(args.output_json, "w") as f:
json.dump(integrated_data, f, indent=2)
print(f"\n✅ Output saved to: {args.output_json}")
if __name__ == "__main__":
main()