#!/opt/homebrew/bin/python3.11 """ Integrated Body Action Decoder - Combine InsightFace + MediaPipe Holistic Purpose: 1. Combine InsightFace pose_angle (existing) with MediaPipe holistic 2. Generate complete body action timeline 3. Support trace-based analysis Input: - face.json (InsightFace: embedding, pose_angle) - holistic.json (MediaPipe: face_mesh, pose, hands) Output: - Integrated action data with all body parts """ import json import argparse from typing import Dict, List from collections import defaultdict class IntegratedBodyActionDecoder: """ Decode body actions from combined InsightFace + MediaPipe data """ def __init__(self): # Action thresholds self.EAR_THRESHOLDS = { "closed": 0.15, "squint": 0.25, "wide_open": 0.4, } self.MAR_THRESHOLDS = { "closed": 0.2, "slightly_open": 0.3, "open": 0.5, "yawn": 0.7, } self.ELBOW_ANGLE_THRESHOLDS = { "fold": 90, "extend": 150, } self.KNEE_ANGLE_THRESHOLDS = { "knee_bend": 120, "standing": 160, } def decode_frame_actions( self, face_data: Dict, holistic_data: Dict, ) -> Dict: """ Decode all actions for single frame Args: face_data: InsightFace data (pose_angle, embedding) holistic_data: MediaPipe data (face_mesh, pose, hands) Returns: Dict with all decoded actions """ actions = { "face": [], "eyes": [], "mouth": [], "arms": [], "hands": [], "legs": [], "combined": [], } # 1. Face pose (from InsightFace) if face_data and "pose_angle" in face_data: pose_angle = face_data["pose_angle"] angle = pose_angle.get("angle", "unknown") confidence = pose_angle.get("confidence", 0.0) actions["face"].append({ "action": f"pose_{angle}", "description": f"Face pose: {angle}", "confidence": confidence, "source": "insightface", }) # 2. Eye actions (from MediaPipe face_mesh) if holistic_data and "face_mesh" in holistic_data: eye_features = holistic_data["face_mesh"].get("eye_features", {}) eye_action = eye_features.get("eye_action", "unknown") ear = eye_features.get("avg_ear", 0) gaze = eye_features.get("gaze_direction", "center") if eye_action != "unknown": actions["eyes"].append({ "action": f"eye_{eye_action}", "description": f"Eye: {eye_action} (EAR: {ear:.3f})", "ear": ear, "gaze": gaze, "source": "mediapipe_face_mesh", }) if gaze != "center": actions["eyes"].append({ "action": f"gaze_{gaze}", "description": f"Gaze: looking {gaze}", "source": "mediapipe_face_mesh", }) # 3. Mouth actions (from MediaPipe face_mesh) if holistic_data and "face_mesh" in holistic_data: mouth_features = holistic_data["face_mesh"].get("mouth_features", {}) mouth_action = mouth_features.get("mouth_action", "unknown") mar = mouth_features.get("mar", 0) if mouth_action != "unknown": actions["mouth"].append({ "action": f"mouth_{mouth_action}", "description": f"Mouth: {mouth_action} (MAR: {mar:.3f})", "mar": mar, "source": "mediapipe_face_mesh", }) # 4. Arm actions (from MediaPipe pose) if holistic_data and "pose" in holistic_data: arm_features = holistic_data["pose"].get("arm_features", {}) left_arm_action = arm_features.get("left_arm_action", "unknown") right_arm_action = arm_features.get("right_arm_action", "unknown") left_angle = arm_features.get("left_elbow_angle", 0) right_angle = arm_features.get("right_elbow_angle", 0) cross_arms = arm_features.get("cross_arms", False) if left_arm_action != "unknown": actions["arms"].append({ "action": f"left_arm_{left_arm_action}", "description": f"Left arm: {left_arm_action} (angle: {left_angle:.1f}°)", "angle": left_angle, "source": "mediapipe_pose", }) if right_arm_action != "unknown": actions["arms"].append({ "action": f"right_arm_{right_arm_action}", "description": f"Right arm: {right_arm_action} (angle: {right_angle:.1f}°)", "angle": right_angle, "source": "mediapipe_pose", }) if cross_arms: actions["arms"].append({ "action": "cross_arms", "description": "Arms crossed", "source": "mediapipe_pose", }) # 5. Hand actions (from MediaPipe hands) if holistic_data and "hands" in holistic_data: for hand_type in ["left", "right"]: hand_data = holistic_data["hands"].get(hand_type) if hand_data: gesture = hand_data.get("gesture", "unknown") num_fingers = hand_data.get("num_fingers_extended", 0) if gesture != "unknown": actions["hands"].append({ "action": f"{hand_type}_hand_{gesture}", "description": f"{hand_type.capitalize()} hand: {gesture} ({num_fingers} fingers)", "num_fingers_extended": num_fingers, "source": "mediapipe_hands", }) # 6. Leg actions (from MediaPipe pose) if holistic_data and "pose" in holistic_data: leg_features = holistic_data["pose"].get("leg_features", {}) leg_action = leg_features.get("leg_action", "unknown") if leg_action != "unknown": actions["legs"].append({ "action": f"leg_{leg_action}", "description": f"Leg: {leg_action}", "source": "mediapipe_pose", }) # 7. Combined actions actions["combined"] = self._detect_combined_actions(actions) return actions def _detect_combined_actions(self, actions: Dict) -> List[Dict]: """ Detect combined actions from multiple body parts Args: actions: Dict with all individual actions Returns: List of combined actions """ combined = [] detected_actions = [] for category, action_list in actions.items(): for act in action_list: detected_actions.append(act["action"]) # Thinking: touch_face + look_down if "pose_tilted_down" in detected_actions and "left_hand_pointing" in detected_actions: combined.append({ "action": "thinking_pose", "description": "Thinking pose (looking down + pointing)", "components": ["pose_tilted_down", "left_hand_pointing"], }) # Crossed arms + neutral pose if "cross_arms" in detected_actions and "pose_frontal" in detected_actions: combined.append({ "action": "defensive_pose", "description": "Defensive pose (crossed arms + frontal)", "components": ["cross_arms", "pose_frontal"], }) # Open mouth + squint = surprise if "mouth_open" in detected_actions and "eye_wide_open" in detected_actions: combined.append({ "action": "surprise_expression", "description": "Surprise expression (wide eyes + open mouth)", "components": ["eye_wide_open", "mouth_open"], }) return combined def integrate_and_decode( self, face_json_path: str, holistic_json_path: str, ) -> Dict: """ Integrate face.json + holistic.json and decode actions Args: face_json_path: Path to face.json (InsightFace) holistic_json_path: Path to holistic.json (MediaPipe) Returns: Integrated action data """ # Load face.json with open(face_json_path) as f: face_data = json.load(f) # Load holistic.json with open(holistic_json_path) as f: holistic_data = json.load(f) # Merge frames face_frames = face_data.get("frames", {}) holistic_frames = holistic_data.get("frames", {}) # Find common frames common_frames = set(face_frames.keys()) & set(holistic_frames.keys()) print(f"Face frames: {len(face_frames)}") print(f"Holistic frames: {len(holistic_frames)}") print(f"Common frames: {len(common_frames)}") print() integrated_data = { "metadata": { "face_source": face_json_path, "holistic_source": holistic_json_path, "total_frames": len(common_frames), "sources": ["insightface", "mediapipe_holistic"], }, "frames": {}, "action_summary": defaultdict(int), } for frame_num in sorted(common_frames, key=int): face_frame = face_frames[frame_num] holistic_frame = holistic_frames[frame_num] # Get first face/person face_person = face_frame.get("faces", [{}])[0] holistic_person = holistic_frame.get("persons", [{}])[0] # Decode actions actions = self.decode_frame_actions(face_person, holistic_person) # Store integrated_data["frames"][frame_num] = { "frame_number": int(frame_num), "actions": actions, "insightface_data": { "pose_angle": face_person.get("pose_angle"), "embedding": face_person.get("embedding")[:10] if face_person.get("embedding") else None, # Only first 10 values }, "mediapipe_data": { "eye_action": (holistic_person.get("face_mesh") or {}).get("eye_features", {}).get("eye_action"), "mouth_action": (holistic_person.get("face_mesh") or {}).get("mouth_features", {}).get("mouth_action"), "left_arm_action": (holistic_person.get("pose") or {}).get("arm_features", {}).get("left_arm_action"), "right_arm_action": (holistic_person.get("pose") or {}).get("arm_features", {}).get("right_arm_action"), "leg_action": (holistic_person.get("pose") or {}).get("leg_features", {}).get("leg_action"), "left_hand_gesture": ((holistic_person.get("hands") or {}).get("left") or {}).get("gesture"), "right_hand_gesture": ((holistic_person.get("hands") or {}).get("right") or {}).get("gesture"), }, } # Update summary for category, action_list in actions.items(): for act in action_list: integrated_data["action_summary"][act["action"]] += 1 # Convert defaultdict to dict integrated_data["action_summary"] = dict(integrated_data["action_summary"]) return integrated_data def print_action_report(self, integrated_data: Dict) -> None: """ Print action report """ print("\n" + "=" * 70) print("Integrated Body Action Decoder Report") print("=" * 70) print(f"\nTotal frames: {integrated_data['metadata']['total_frames']}") print(f"Sources: {', '.join(integrated_data['metadata']['sources'])}") print("\n" + "=" * 70) print("Action Summary") print("=" * 70) summary = integrated_data["action_summary"] # Group by category categories = { "Face": [k for k in summary if k.startswith("pose_")], "Eyes": [k for k in summary if k.startswith("eye_") or k.startswith("gaze_")], "Mouth": [k for k in summary if k.startswith("mouth_")], "Arms": [k for k in summary if k.startswith("left_arm_") or k.startswith("right_arm_") or k == "cross_arms"], "Hands": [k for k in summary if k.startswith("left_hand_") or k.startswith("right_hand_")], "Legs": [k for k in summary if k.startswith("leg_")], "Combined": [k for k in summary if not any(k.startswith(p) for p in ["pose_", "eye_", "gaze_", "mouth_", "left_arm_", "right_arm_", "left_hand_", "right_hand_", "leg_", "cross_arms"])], } for category, action_keys in categories.items(): if action_keys: print(f"\n{category} Actions:") for action in sorted(action_keys): count = summary[action] print(f" {action}: {count} times") print("\n" + "=" * 70) print("Sample Frame Actions") print("=" * 70) # Show first 3 frames for i, (frame_num, frame_data) in enumerate(sorted(integrated_data["frames"].items(), key=lambda x: int(x[0]))[:3]): print(f"\nFrame {frame_num}:") for category, action_list in frame_data["actions"].items(): if action_list: action_names = [a["action"] for a in action_list] print(f" {category}: {', '.join(action_names)}") def main(): parser = argparse.ArgumentParser(description="Integrated Body Action Decoder") parser.add_argument("--face-json", required=True, help="Path to face.json (InsightFace)") parser.add_argument("--holistic-json", required=True, help="Path to holistic.json (MediaPipe)") parser.add_argument("--output-json", help="Output JSON path") parser.add_argument("--frame", type=int, help="Analyze single frame") args = parser.parse_args() print("=" * 70) print("Integrated Body Action Decoder") print("=" * 70) decoder = IntegratedBodyActionDecoder() if args.frame: # Load single frame with open(args.face_json) as f: face_data = json.load(f) with open(args.holistic_json) as f: holistic_data = json.load(f) frame_num = str(args.frame) if frame_num in face_data["frames"] and frame_num in holistic_data["frames"]: face_person = face_data["frames"][frame_num]["faces"][0] holistic_person = holistic_data["frames"][frame_num]["persons"][0] actions = decoder.decode_frame_actions(face_person, holistic_person) print(f"\n=== Frame {frame_num} Actions ===") for category, action_list in actions.items(): if action_list: print(f"\n{category.upper()}:") for act in action_list: print(f" {act['action']}: {act['description']}") else: print(f"❌ Frame {frame_num} not found in both files") else: # Process all frames integrated_data = decoder.integrate_and_decode( args.face_json, args.holistic_json, ) decoder.print_action_report(integrated_data) if args.output_json: with open(args.output_json, "w") as f: json.dump(integrated_data, f, indent=2) print(f"\n✅ Output saved to: {args.output_json}") if __name__ == "__main__": main()