- Remove session-ses_2f27.md (161KB raw session log) - Remove 49 ROOT_* duplicate files across REFERENCE/ - Remove 14 duplicate files between REFERENCE/ root and history/ - Remove asr_legacy.rs (dead code, replaced by asr.rs) - Remove src/core/worker/ (duplicate JobWorker) - Remove src/core/layers/ (empty directory) - Remove 4 .bak files in src/ - Remove 7 dead private methods in worker/processor.rs - Remove backup directory from git tracking
436 lines
16 KiB
Python
436 lines
16 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Integrated Body Action Decoder - Combine InsightFace + MediaPipe Holistic
|
|
|
|
Purpose:
|
|
1. Combine InsightFace pose_angle (existing) with MediaPipe holistic
|
|
2. Generate complete body action timeline
|
|
3. Support trace-based analysis
|
|
|
|
Input:
|
|
- face.json (InsightFace: embedding, pose_angle)
|
|
- holistic.json (MediaPipe: face_mesh, pose, hands)
|
|
|
|
Output:
|
|
- Integrated action data with all body parts
|
|
"""
|
|
|
|
import json
|
|
import argparse
|
|
from typing import Dict, List
|
|
from collections import defaultdict
|
|
|
|
|
|
class IntegratedBodyActionDecoder:
|
|
"""
|
|
Decode body actions from combined InsightFace + MediaPipe data
|
|
"""
|
|
|
|
def __init__(self):
|
|
# Action thresholds
|
|
self.EAR_THRESHOLDS = {
|
|
"closed": 0.15,
|
|
"squint": 0.25,
|
|
"wide_open": 0.4,
|
|
}
|
|
|
|
self.MAR_THRESHOLDS = {
|
|
"closed": 0.2,
|
|
"slightly_open": 0.3,
|
|
"open": 0.5,
|
|
"yawn": 0.7,
|
|
}
|
|
|
|
self.ELBOW_ANGLE_THRESHOLDS = {
|
|
"fold": 90,
|
|
"extend": 150,
|
|
}
|
|
|
|
self.KNEE_ANGLE_THRESHOLDS = {
|
|
"knee_bend": 120,
|
|
"standing": 160,
|
|
}
|
|
|
|
def decode_frame_actions(
|
|
self,
|
|
face_data: Dict,
|
|
holistic_data: Dict,
|
|
) -> Dict:
|
|
"""
|
|
Decode all actions for single frame
|
|
|
|
Args:
|
|
face_data: InsightFace data (pose_angle, embedding)
|
|
holistic_data: MediaPipe data (face_mesh, pose, hands)
|
|
|
|
Returns:
|
|
Dict with all decoded actions
|
|
"""
|
|
actions = {
|
|
"face": [],
|
|
"eyes": [],
|
|
"mouth": [],
|
|
"arms": [],
|
|
"hands": [],
|
|
"legs": [],
|
|
"combined": [],
|
|
}
|
|
|
|
# 1. Face pose (from InsightFace)
|
|
if face_data and "pose_angle" in face_data:
|
|
pose_angle = face_data["pose_angle"]
|
|
|
|
angle = pose_angle.get("angle", "unknown")
|
|
confidence = pose_angle.get("confidence", 0.0)
|
|
|
|
actions["face"].append({
|
|
"action": f"pose_{angle}",
|
|
"description": f"Face pose: {angle}",
|
|
"confidence": confidence,
|
|
"source": "insightface",
|
|
})
|
|
|
|
# 2. Eye actions (from MediaPipe face_mesh)
|
|
if holistic_data and "face_mesh" in holistic_data:
|
|
eye_features = holistic_data["face_mesh"].get("eye_features", {})
|
|
|
|
eye_action = eye_features.get("eye_action", "unknown")
|
|
ear = eye_features.get("avg_ear", 0)
|
|
gaze = eye_features.get("gaze_direction", "center")
|
|
|
|
if eye_action != "unknown":
|
|
actions["eyes"].append({
|
|
"action": f"eye_{eye_action}",
|
|
"description": f"Eye: {eye_action} (EAR: {ear:.3f})",
|
|
"ear": ear,
|
|
"gaze": gaze,
|
|
"source": "mediapipe_face_mesh",
|
|
})
|
|
|
|
if gaze != "center":
|
|
actions["eyes"].append({
|
|
"action": f"gaze_{gaze}",
|
|
"description": f"Gaze: looking {gaze}",
|
|
"source": "mediapipe_face_mesh",
|
|
})
|
|
|
|
# 3. Mouth actions (from MediaPipe face_mesh)
|
|
if holistic_data and "face_mesh" in holistic_data:
|
|
mouth_features = holistic_data["face_mesh"].get("mouth_features", {})
|
|
|
|
mouth_action = mouth_features.get("mouth_action", "unknown")
|
|
mar = mouth_features.get("mar", 0)
|
|
|
|
if mouth_action != "unknown":
|
|
actions["mouth"].append({
|
|
"action": f"mouth_{mouth_action}",
|
|
"description": f"Mouth: {mouth_action} (MAR: {mar:.3f})",
|
|
"mar": mar,
|
|
"source": "mediapipe_face_mesh",
|
|
})
|
|
|
|
# 4. Arm actions (from MediaPipe pose)
|
|
if holistic_data and "pose" in holistic_data:
|
|
arm_features = holistic_data["pose"].get("arm_features", {})
|
|
|
|
left_arm_action = arm_features.get("left_arm_action", "unknown")
|
|
right_arm_action = arm_features.get("right_arm_action", "unknown")
|
|
|
|
left_angle = arm_features.get("left_elbow_angle", 0)
|
|
right_angle = arm_features.get("right_elbow_angle", 0)
|
|
|
|
cross_arms = arm_features.get("cross_arms", False)
|
|
|
|
if left_arm_action != "unknown":
|
|
actions["arms"].append({
|
|
"action": f"left_arm_{left_arm_action}",
|
|
"description": f"Left arm: {left_arm_action} (angle: {left_angle:.1f}°)",
|
|
"angle": left_angle,
|
|
"source": "mediapipe_pose",
|
|
})
|
|
|
|
if right_arm_action != "unknown":
|
|
actions["arms"].append({
|
|
"action": f"right_arm_{right_arm_action}",
|
|
"description": f"Right arm: {right_arm_action} (angle: {right_angle:.1f}°)",
|
|
"angle": right_angle,
|
|
"source": "mediapipe_pose",
|
|
})
|
|
|
|
if cross_arms:
|
|
actions["arms"].append({
|
|
"action": "cross_arms",
|
|
"description": "Arms crossed",
|
|
"source": "mediapipe_pose",
|
|
})
|
|
|
|
# 5. Hand actions (from MediaPipe hands)
|
|
if holistic_data and "hands" in holistic_data:
|
|
for hand_type in ["left", "right"]:
|
|
hand_data = holistic_data["hands"].get(hand_type)
|
|
|
|
if hand_data:
|
|
gesture = hand_data.get("gesture", "unknown")
|
|
num_fingers = hand_data.get("num_fingers_extended", 0)
|
|
|
|
if gesture != "unknown":
|
|
actions["hands"].append({
|
|
"action": f"{hand_type}_hand_{gesture}",
|
|
"description": f"{hand_type.capitalize()} hand: {gesture} ({num_fingers} fingers)",
|
|
"num_fingers_extended": num_fingers,
|
|
"source": "mediapipe_hands",
|
|
})
|
|
|
|
# 6. Leg actions (from MediaPipe pose)
|
|
if holistic_data and "pose" in holistic_data:
|
|
leg_features = holistic_data["pose"].get("leg_features", {})
|
|
|
|
leg_action = leg_features.get("leg_action", "unknown")
|
|
|
|
if leg_action != "unknown":
|
|
actions["legs"].append({
|
|
"action": f"leg_{leg_action}",
|
|
"description": f"Leg: {leg_action}",
|
|
"source": "mediapipe_pose",
|
|
})
|
|
|
|
# 7. Combined actions
|
|
actions["combined"] = self._detect_combined_actions(actions)
|
|
|
|
return actions
|
|
|
|
def _detect_combined_actions(self, actions: Dict) -> List[Dict]:
|
|
"""
|
|
Detect combined actions from multiple body parts
|
|
|
|
Args:
|
|
actions: Dict with all individual actions
|
|
|
|
Returns:
|
|
List of combined actions
|
|
"""
|
|
combined = []
|
|
|
|
detected_actions = []
|
|
for category, action_list in actions.items():
|
|
for act in action_list:
|
|
detected_actions.append(act["action"])
|
|
|
|
# Thinking: touch_face + look_down
|
|
if "pose_tilted_down" in detected_actions and "left_hand_pointing" in detected_actions:
|
|
combined.append({
|
|
"action": "thinking_pose",
|
|
"description": "Thinking pose (looking down + pointing)",
|
|
"components": ["pose_tilted_down", "left_hand_pointing"],
|
|
})
|
|
|
|
# Crossed arms + neutral pose
|
|
if "cross_arms" in detected_actions and "pose_frontal" in detected_actions:
|
|
combined.append({
|
|
"action": "defensive_pose",
|
|
"description": "Defensive pose (crossed arms + frontal)",
|
|
"components": ["cross_arms", "pose_frontal"],
|
|
})
|
|
|
|
# Open mouth + squint = surprise
|
|
if "mouth_open" in detected_actions and "eye_wide_open" in detected_actions:
|
|
combined.append({
|
|
"action": "surprise_expression",
|
|
"description": "Surprise expression (wide eyes + open mouth)",
|
|
"components": ["eye_wide_open", "mouth_open"],
|
|
})
|
|
|
|
return combined
|
|
|
|
def integrate_and_decode(
|
|
self,
|
|
face_json_path: str,
|
|
holistic_json_path: str,
|
|
) -> Dict:
|
|
"""
|
|
Integrate face.json + holistic.json and decode actions
|
|
|
|
Args:
|
|
face_json_path: Path to face.json (InsightFace)
|
|
holistic_json_path: Path to holistic.json (MediaPipe)
|
|
|
|
Returns:
|
|
Integrated action data
|
|
"""
|
|
# Load face.json
|
|
with open(face_json_path) as f:
|
|
face_data = json.load(f)
|
|
|
|
# Load holistic.json
|
|
with open(holistic_json_path) as f:
|
|
holistic_data = json.load(f)
|
|
|
|
# Merge frames
|
|
face_frames = face_data.get("frames", {})
|
|
holistic_frames = holistic_data.get("frames", {})
|
|
|
|
# Find common frames
|
|
common_frames = set(face_frames.keys()) & set(holistic_frames.keys())
|
|
|
|
print(f"Face frames: {len(face_frames)}")
|
|
print(f"Holistic frames: {len(holistic_frames)}")
|
|
print(f"Common frames: {len(common_frames)}")
|
|
print()
|
|
|
|
integrated_data = {
|
|
"metadata": {
|
|
"face_source": face_json_path,
|
|
"holistic_source": holistic_json_path,
|
|
"total_frames": len(common_frames),
|
|
"sources": ["insightface", "mediapipe_holistic"],
|
|
},
|
|
"frames": {},
|
|
"action_summary": defaultdict(int),
|
|
}
|
|
|
|
for frame_num in sorted(common_frames, key=int):
|
|
face_frame = face_frames[frame_num]
|
|
holistic_frame = holistic_frames[frame_num]
|
|
|
|
# Get first face/person
|
|
face_person = face_frame.get("faces", [{}])[0]
|
|
holistic_person = holistic_frame.get("persons", [{}])[0]
|
|
|
|
# Decode actions
|
|
actions = self.decode_frame_actions(face_person, holistic_person)
|
|
|
|
# Store
|
|
integrated_data["frames"][frame_num] = {
|
|
"frame_number": int(frame_num),
|
|
"actions": actions,
|
|
"insightface_data": {
|
|
"pose_angle": face_person.get("pose_angle"),
|
|
"embedding": face_person.get("embedding")[:10] if face_person.get("embedding") else None, # Only first 10 values
|
|
},
|
|
"mediapipe_data": {
|
|
"eye_action": (holistic_person.get("face_mesh") or {}).get("eye_features", {}).get("eye_action"),
|
|
"mouth_action": (holistic_person.get("face_mesh") or {}).get("mouth_features", {}).get("mouth_action"),
|
|
"left_arm_action": (holistic_person.get("pose") or {}).get("arm_features", {}).get("left_arm_action"),
|
|
"right_arm_action": (holistic_person.get("pose") or {}).get("arm_features", {}).get("right_arm_action"),
|
|
"leg_action": (holistic_person.get("pose") or {}).get("leg_features", {}).get("leg_action"),
|
|
"left_hand_gesture": ((holistic_person.get("hands") or {}).get("left") or {}).get("gesture"),
|
|
"right_hand_gesture": ((holistic_person.get("hands") or {}).get("right") or {}).get("gesture"),
|
|
},
|
|
}
|
|
|
|
# Update summary
|
|
for category, action_list in actions.items():
|
|
for act in action_list:
|
|
integrated_data["action_summary"][act["action"]] += 1
|
|
|
|
# Convert defaultdict to dict
|
|
integrated_data["action_summary"] = dict(integrated_data["action_summary"])
|
|
|
|
return integrated_data
|
|
|
|
def print_action_report(self, integrated_data: Dict) -> None:
|
|
"""
|
|
Print action report
|
|
"""
|
|
print("\n" + "=" * 70)
|
|
print("Integrated Body Action Decoder Report")
|
|
print("=" * 70)
|
|
|
|
print(f"\nTotal frames: {integrated_data['metadata']['total_frames']}")
|
|
print(f"Sources: {', '.join(integrated_data['metadata']['sources'])}")
|
|
|
|
print("\n" + "=" * 70)
|
|
print("Action Summary")
|
|
print("=" * 70)
|
|
|
|
summary = integrated_data["action_summary"]
|
|
|
|
# Group by category
|
|
categories = {
|
|
"Face": [k for k in summary if k.startswith("pose_")],
|
|
"Eyes": [k for k in summary if k.startswith("eye_") or k.startswith("gaze_")],
|
|
"Mouth": [k for k in summary if k.startswith("mouth_")],
|
|
"Arms": [k for k in summary if k.startswith("left_arm_") or k.startswith("right_arm_") or k == "cross_arms"],
|
|
"Hands": [k for k in summary if k.startswith("left_hand_") or k.startswith("right_hand_")],
|
|
"Legs": [k for k in summary if k.startswith("leg_")],
|
|
"Combined": [k for k in summary if not any(k.startswith(p) for p in ["pose_", "eye_", "gaze_", "mouth_", "left_arm_", "right_arm_", "left_hand_", "right_hand_", "leg_", "cross_arms"])],
|
|
}
|
|
|
|
for category, action_keys in categories.items():
|
|
if action_keys:
|
|
print(f"\n{category} Actions:")
|
|
for action in sorted(action_keys):
|
|
count = summary[action]
|
|
print(f" {action}: {count} times")
|
|
|
|
print("\n" + "=" * 70)
|
|
print("Sample Frame Actions")
|
|
print("=" * 70)
|
|
|
|
# Show first 3 frames
|
|
for i, (frame_num, frame_data) in enumerate(sorted(integrated_data["frames"].items(), key=lambda x: int(x[0]))[:3]):
|
|
print(f"\nFrame {frame_num}:")
|
|
|
|
for category, action_list in frame_data["actions"].items():
|
|
if action_list:
|
|
action_names = [a["action"] for a in action_list]
|
|
print(f" {category}: {', '.join(action_names)}")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Integrated Body Action Decoder")
|
|
parser.add_argument("--face-json", required=True, help="Path to face.json (InsightFace)")
|
|
parser.add_argument("--holistic-json", required=True, help="Path to holistic.json (MediaPipe)")
|
|
parser.add_argument("--output-json", help="Output JSON path")
|
|
parser.add_argument("--frame", type=int, help="Analyze single frame")
|
|
args = parser.parse_args()
|
|
|
|
print("=" * 70)
|
|
print("Integrated Body Action Decoder")
|
|
print("=" * 70)
|
|
|
|
decoder = IntegratedBodyActionDecoder()
|
|
|
|
if args.frame:
|
|
# Load single frame
|
|
with open(args.face_json) as f:
|
|
face_data = json.load(f)
|
|
|
|
with open(args.holistic_json) as f:
|
|
holistic_data = json.load(f)
|
|
|
|
frame_num = str(args.frame)
|
|
|
|
if frame_num in face_data["frames"] and frame_num in holistic_data["frames"]:
|
|
face_person = face_data["frames"][frame_num]["faces"][0]
|
|
holistic_person = holistic_data["frames"][frame_num]["persons"][0]
|
|
|
|
actions = decoder.decode_frame_actions(face_person, holistic_person)
|
|
|
|
print(f"\n=== Frame {frame_num} Actions ===")
|
|
|
|
for category, action_list in actions.items():
|
|
if action_list:
|
|
print(f"\n{category.upper()}:")
|
|
for act in action_list:
|
|
print(f" {act['action']}: {act['description']}")
|
|
else:
|
|
print(f"❌ Frame {frame_num} not found in both files")
|
|
|
|
else:
|
|
# Process all frames
|
|
integrated_data = decoder.integrate_and_decode(
|
|
args.face_json,
|
|
args.holistic_json,
|
|
)
|
|
|
|
decoder.print_action_report(integrated_data)
|
|
|
|
if args.output_json:
|
|
with open(args.output_json, "w") as f:
|
|
json.dump(integrated_data, f, indent=2)
|
|
print(f"\n✅ Output saved to: {args.output_json}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |