#!/opt/homebrew/bin/python3.11 """ Pose Action Decoder - Convert pose_trace into human-readable action names Purpose: 1. Decode pose transitions into action names (turn left/right, look up/down, shake head, nod) 2. Identify stable pose segments with duration 3. Generate action timeline for each trace Action Types: - Simple: turn_left, turn_right, look_up, look_down - Complex: shake_head, nod_head, turn_full - Stable: frontal_stable, profile_left_stable, profile_right_stable, three_quarter_stable Output: 1. Action timeline (frame-based action list) 2. Action summary (total counts, duration) 3. Action visualization (timeline plot) """ import sys import json import argparse import numpy as np import matplotlib.pyplot as plt from typing import Dict, List, Optional from collections import defaultdict # Action definitions POSE_TO_ACTION = { # Turn actions (angle changes) ("frontal", "three_quarter"): "turn_partial", ("frontal", "profile_left"): "turn_left", ("frontal", "profile_right"): "turn_right", ("three_quarter", "frontal"): "return_frontal", ("three_quarter", "profile_left"): "turn_left", ("three_quarter", "profile_right"): "turn_right", ("profile_left", "frontal"): "turn_to_frontal", ("profile_left", "three_quarter"): "turn_to_three_quarter", ("profile_left", "profile_right"): "turn_full", ("profile_right", "frontal"): "turn_to_frontal", ("profile_right", "three_quarter"): "turn_to_three_quarter", ("profile_right", "profile_left"): "turn_full", # Pitch actions ("neutral", "tilted_up"): "look_up", ("neutral", "tilted_down"): "look_down", ("tilted_up", "neutral"): "return_neutral", ("tilted_down", "neutral"): "return_neutral", ("tilted_up", "tilted_down"): "nod_full", ("tilted_down", "tilted_up"): "nod_full", } # Stable pose names STABLE_ACTION_NAMES = { "frontal": "frontal_stable", "three_quarter": "three_quarter_stable", "profile_left": "profile_left_stable", "profile_right": "profile_right_stable", "unknown": "pose_unknown", } # Complex action patterns (3+ transitions in short time) COMPLEX_PATTERNS = { # Shake head: profile_left → profile_right → profile_left (or reverse) "shake_head": { "sequence": ["profile_left", "profile_right", "profile_left"], "min_frames": 5, "max_frames": 30, }, "shake_head_reverse": { "sequence": ["profile_right", "profile_left", "profile_right"], "min_frames": 5, "max_frames": 30, }, # Nod: tilted_up → tilted_down → tilted_up (or reverse) "nod_head": { "sequence": ["tilted_up", "tilted_down", "tilted_up"], "min_frames": 3, "max_frames": 20, "pitch_mode": True, }, } def decode_pose_to_action(from_pose: str, to_pose: str) -> str: """ Decode single pose transition to action name Args: from_pose: Source pose angle to_pose: Target pose angle Returns: Action name """ key = (from_pose, to_pose) if key in POSE_TO_ACTION: return POSE_TO_ACTION[key] # Default action return f"pose_change_{from_pose}_to_{to_pose}" def detect_complex_actions(pose_trace: List[Dict]) -> List[Dict]: """ Detect complex action patterns (shake head, nod, etc.) Args: pose_trace: Pose trace list Returns: List of complex action events """ complex_actions = [] # Shake head detection for i in range(len(pose_trace) - 2): angles = [pose_trace[i]["angle"], pose_trace[i+1]["angle"], pose_trace[i+2]["angle"]] # Check shake_head pattern if angles == ["profile_left", "profile_right", "profile_left"]: duration_frames = pose_trace[i+2]["frame"] - pose_trace[i]["frame"] if 5 <= duration_frames <= 30: complex_actions.append({ "action": "shake_head", "start_frame": pose_trace[i]["frame"], "end_frame": pose_trace[i+2]["frame"], "duration_frames": duration_frames, "description": "shake head left-right-left", }) elif angles == ["profile_right", "profile_left", "profile_right"]: duration_frames = pose_trace[i+2]["frame"] - pose_trace[i]["frame"] if 5 <= duration_frames <= 30: complex_actions.append({ "action": "shake_head", "start_frame": pose_trace[i]["frame"], "end_frame": pose_trace[i+2]["frame"], "duration_frames": duration_frames, "description": "shake head right-left-right", }) # Nod detection (pitch-based) for i in range(len(pose_trace) - 2): pitches = [pose_trace[i]["pitch"], pose_trace[i+1]["pitch"], pose_trace[i+2]["pitch"]] if pitches == ["tilted_up", "tilted_down", "tilted_up"] or \ pitches == ["tilted_down", "tilted_up", "tilted_down"]: duration_frames = pose_trace[i+2]["frame"] - pose_trace[i]["frame"] if 3 <= duration_frames <= 20: complex_actions.append({ "action": "nod_head", "start_frame": pose_trace[i]["frame"], "end_frame": pose_trace[i+2]["frame"], "duration_frames": duration_frames, "description": "nod head up-down", }) return complex_actions def build_action_timeline(trace: Dict) -> Dict: """ Build action timeline from pose_trace Args: trace: Trace data with pose_trace, pose_transitions Returns: Action timeline dict """ pose_trace = trace.get("pose_trace", []) pose_transitions = trace.get("pose_transitions", []) if len(pose_trace) < 1: return { "trace_id": trace.get("trace_id"), "action_timeline": [], "action_summary": {}, "complex_actions": [], } action_timeline = [] complex_actions = detect_complex_actions(pose_trace) # Build pose segments (stable periods) pose_segments = [] current_pose = pose_trace[0]["angle"] current_start = pose_trace[0]["frame"] current_pitch = pose_trace[0]["pitch"] for i in range(1, len(pose_trace)): pose = pose_trace[i] # Check if pose changed if pose["angle"] != current_pose or pose["pitch"] != current_pitch: pose_segments.append({ "angle": current_pose, "pitch": current_pitch, "start_frame": current_start, "end_frame": pose_trace[i-1]["frame"], "duration_frames": pose_trace[i-1]["frame"] - current_start + 1, }) current_pose = pose["angle"] current_pitch = pose["pitch"] current_start = pose["frame"] # Add last segment pose_segments.append({ "angle": current_pose, "pitch": current_pitch, "start_frame": current_start, "end_frame": pose_trace[-1]["frame"], "duration_frames": pose_trace[-1]["frame"] - current_start + 1, }) # Build action timeline for seg in pose_segments: # Determine action name if seg["duration_frames"] >= 10: # Stable pose (>= 10 frames) action_name = STABLE_ACTION_NAMES.get(seg["angle"], "pose_stable") # Add pitch modifier if seg["pitch"] != "neutral": action_name += f"_pitch_{seg['pitch']}" action_timeline.append({ "frame": seg["start_frame"], "action": action_name, "duration_frames": seg["duration_frames"], "description": f"stable {seg['angle']} pose for {seg['duration_frames']} frames", "type": "stable", }) else: # Short pose (transitional) action_name = f"pose_{seg['angle']}_brief" action_timeline.append({ "frame": seg["start_frame"], "action": action_name, "duration_frames": seg["duration_frames"], "description": f"brief {seg['angle']} pose for {seg['duration_frames']} frames", "type": "transitional", }) # Add transition actions for trans in pose_transitions: action_name = decode_pose_to_action(trans["from_angle"], trans["to_angle"]) action_timeline.append({ "frame": trans["frame"], "action": action_name, "duration_frames": 1, # Transition is instant "description": f"transition from {trans['from_angle']} to {trans['to_angle']}", "type": "transition", }) # Sort by frame action_timeline.sort(key=lambda x: x["frame"]) # Add complex actions for complex_act in complex_actions: action_timeline.append({ "frame": complex_act["start_frame"], "action": complex_act["action"], "duration_frames": complex_act["duration_frames"], "description": complex_act["description"], "type": "complex", }) # Re-sort action_timeline.sort(key=lambda x: (x["frame"], -x["duration_frames"])) # Build action summary action_counts = defaultdict(int) action_durations = defaultdict(float) for act in action_timeline: action_counts[act["action"]] += 1 action_durations[act["action"]] += act["duration_frames"] action_summary = { "total_actions": len(action_timeline), "unique_actions": len(action_counts), "action_counts": dict(action_counts), "action_durations_frames": {k: round(v, 1) for k, v in action_durations.items()}, "complex_action_count": len(complex_actions), "stable_percentage": round( sum(1 for act in action_timeline if act["type"] == "stable") / len(action_timeline) * 100, 1 ) if action_timeline else 0, } return { "trace_id": trace.get("trace_id"), "action_timeline": action_timeline, "action_summary": action_summary, "complex_actions": complex_actions, } def generate_action_description(action_timeline: List[Dict]) -> str: """ Generate human-readable action description Args: action_timeline: Action timeline list Returns: Action description string """ if not action_timeline: return "No actions detected" # Group actions by type stable_actions = [a for a in action_timeline if a["type"] == "stable"] transition_actions = [a for a in action_timeline if a["type"] == "transition"] complex_actions = [a for a in action_timeline if a["type"] == "complex"] desc_parts = [] # Stable poses if stable_actions: stable_desc = [] for act in stable_actions[:3]: # Top 3 stable poses stable_desc.append(f"{act['description']}") desc_parts.append(f"Stable poses: {', '.join(stable_desc)}") # Transitions if transition_actions: trans_desc = [act["action"] for act in transition_actions[:5]] # Top 5 transitions desc_parts.append(f"Transitions: {', '.join(trans_desc)}") # Complex actions if complex_actions: complex_desc = [act["action"] for act in complex_actions] desc_parts.append(f"Complex actions: {', '.join(complex_desc)}") return ". ".join(desc_parts) def visualize_action_timeline(action_data: Dict, output_path: str = None) -> None: """ Visualize action timeline """ traces_data = action_data.get("traces", {}) if not traces_data: print("No traces found") return fig, axes = plt.subplots(len(traces_data), 1, figsize=(16, 3 * len(traces_data))) if len(traces_data) == 1: axes = [axes] action_colors = { "frontal_stable": "green", "three_quarter_stable": "blue", "profile_left_stable": "orange", "profile_right_stable": "red", "turn_left": "purple", "turn_right": "purple", "turn_full": "darkred", "shake_head": "yellow", "nod_head": "cyan", "look_up": "lightgreen", "look_down": "brown", } for ax, (trace_id, data) in zip(axes, sorted(traces_data.items())): timeline = data["action_timeline"] if not timeline: continue # Plot action timeline as bars for act in timeline: color = action_colors.get(act["action"], "gray") if act["duration_frames"] > 1: ax.barh( y=0, width=act["duration_frames"], left=act["frame"], height=0.8, color=color, alpha=0.6, edgecolor="black", linewidth=0.5, ) # Add label for stable actions if act["type"] == "stable" and act["duration_frames"] > 30: ax.text( act["frame"] + act["duration_frames"] / 2, 0, act["action"], ha="center", va="center", fontsize=8, color="white", ) else: # Instant action (transition) ax.axvline(x=act["frame"], color=color, linestyle="--", alpha=0.8) ax.text( act["frame"], 0.5, act["action"], fontsize=7, rotation=90, va="bottom", ha="center", ) ax.set_xlabel("Frame Number") ax.set_ylabel("Action") ax.set_title(f"Trace {trace_id} Action Timeline") ax.set_ylim(-0.5, 1) ax.grid(True, alpha=0.3) plt.tight_layout() if output_path: plt.savefig(output_path, dpi=150, bbox_inches="tight") print(f"\n✅ Visualization saved to: {output_path}") else: plt.show() def print_action_report(action_data: Dict) -> None: """ Print action report """ traces_data = action_data.get("traces", {}) print("\n" + "=" * 70) print("Pose Action Decoder Report") print("=" * 70) for trace_id, data in sorted(traces_data.items()): print(f"\n{'='*70}") print(f"Trace {trace_id}") print(f"{'='*70}") summary = data["action_summary"] print(f"\nSummary:") print(f" Total Actions: {summary['total_actions']}") print(f" Unique Actions: {summary['unique_actions']}") print(f" Complex Actions: {summary['complex_action_count']}") print(f" Stable Percentage: {summary['stable_percentage']}%") print(f"\nAction Counts:") for action, count in sorted(summary["action_counts"].items(), key=lambda x: x[1], reverse=True): print(f" {action}: {count}") print(f"\nAction Timeline (前 10 个):") timeline = data["action_timeline"] for act in timeline[:10]: print(f" Frame {act['frame']}: {act['action']} ({act['type']}, {act['duration_frames']} frames)") if data["complex_actions"]: print(f"\nComplex Actions:") for act in data["complex_actions"]: print(f" {act['action']}: frames {act['start_frame']}-{act['end_frame']} ({act['duration_frames']} frames)") # Generate description desc = generate_action_description(data["action_timeline"]) print(f"\nHuman-readable Description:") print(f" {desc}") def main(): parser = argparse.ArgumentParser(description="Decode pose_trace into action names") parser.add_argument("--face-json", required=True, help="Path to face_traced.json") parser.add_argument("--output-json", help="Output action data JSON") parser.add_argument("--output-plot", help="Output action timeline plot PNG") parser.add_argument("--trace-id", type=int, help="Analyze specific trace only") args = parser.parse_args() print("=" * 70) print("Pose Action Decoder") print("=" * 70) with open(args.face_json) as f: face_data = json.load(f) traces = face_data.get("traces", {}) if not traces: print("No traces found in face_traced.json") return # Filter by trace_id if specified if args.trace_id: traces = {str(args.trace_id): traces.get(str(args.trace_id))} if not traces[str(args.trace_id)]: print(f"Trace {args.trace_id} not found") return print(f"\nAnalyzing {len(traces)} traces...") action_data = {"traces": {}} for trace_id_str, trace in traces.items(): action_result = build_action_timeline(trace) action_data["traces"][trace_id_str] = action_result print_action_report(action_data) if args.output_json: with open(args.output_json, "w") as f: json.dump(action_data, f, indent=2) print(f"\n✅ Action data saved to: {args.output_json}") if args.output_plot: visualize_action_timeline(action_data, args.output_plot) if __name__ == "__main__": main()