Files
momentry_core/scripts/utils/pose_action_decoder.py
Warren e75c4d6f07 cleanup: remove dead code and duplicate docs
- Remove session-ses_2f27.md (161KB raw session log)
- Remove 49 ROOT_* duplicate files across REFERENCE/
- Remove 14 duplicate files between REFERENCE/ root and history/
- Remove asr_legacy.rs (dead code, replaced by asr.rs)
- Remove src/core/worker/ (duplicate JobWorker)
- Remove src/core/layers/ (empty directory)
- Remove 4 .bak files in src/
- Remove 7 dead private methods in worker/processor.rs
- Remove backup directory from git tracking
2026-05-04 01:31:21 +08:00

520 lines
17 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Pose Action Decoder - Convert pose_trace into human-readable action names
Purpose:
1. Decode pose transitions into action names (turn left/right, look up/down, shake head, nod)
2. Identify stable pose segments with duration
3. Generate action timeline for each trace
Action Types:
- Simple: turn_left, turn_right, look_up, look_down
- Complex: shake_head, nod_head, turn_full
- Stable: frontal_stable, profile_left_stable, profile_right_stable, three_quarter_stable
Output:
1. Action timeline (frame-based action list)
2. Action summary (total counts, duration)
3. Action visualization (timeline plot)
"""
import json
import argparse
import matplotlib.pyplot as plt
from typing import Dict, List
from collections import defaultdict
# Action definitions
POSE_TO_ACTION = {
# Turn actions (angle changes)
("frontal", "three_quarter"): "turn_partial",
("frontal", "profile_left"): "turn_left",
("frontal", "profile_right"): "turn_right",
("three_quarter", "frontal"): "return_frontal",
("three_quarter", "profile_left"): "turn_left",
("three_quarter", "profile_right"): "turn_right",
("profile_left", "frontal"): "turn_to_frontal",
("profile_left", "three_quarter"): "turn_to_three_quarter",
("profile_left", "profile_right"): "turn_full",
("profile_right", "frontal"): "turn_to_frontal",
("profile_right", "three_quarter"): "turn_to_three_quarter",
("profile_right", "profile_left"): "turn_full",
# Pitch actions
("neutral", "tilted_up"): "look_up",
("neutral", "tilted_down"): "look_down",
("tilted_up", "neutral"): "return_neutral",
("tilted_down", "neutral"): "return_neutral",
("tilted_up", "tilted_down"): "nod_full",
("tilted_down", "tilted_up"): "nod_full",
}
# Stable pose names
STABLE_ACTION_NAMES = {
"frontal": "frontal_stable",
"three_quarter": "three_quarter_stable",
"profile_left": "profile_left_stable",
"profile_right": "profile_right_stable",
"unknown": "pose_unknown",
}
# Complex action patterns (3+ transitions in short time)
COMPLEX_PATTERNS = {
# Shake head: profile_left → profile_right → profile_left (or reverse)
"shake_head": {
"sequence": ["profile_left", "profile_right", "profile_left"],
"min_frames": 5,
"max_frames": 30,
},
"shake_head_reverse": {
"sequence": ["profile_right", "profile_left", "profile_right"],
"min_frames": 5,
"max_frames": 30,
},
# Nod: tilted_up → tilted_down → tilted_up (or reverse)
"nod_head": {
"sequence": ["tilted_up", "tilted_down", "tilted_up"],
"min_frames": 3,
"max_frames": 20,
"pitch_mode": True,
},
}
def decode_pose_to_action(from_pose: str, to_pose: str) -> str:
"""
Decode single pose transition to action name
Args:
from_pose: Source pose angle
to_pose: Target pose angle
Returns:
Action name
"""
key = (from_pose, to_pose)
if key in POSE_TO_ACTION:
return POSE_TO_ACTION[key]
# Default action
return f"pose_change_{from_pose}_to_{to_pose}"
def detect_complex_actions(pose_trace: List[Dict]) -> List[Dict]:
"""
Detect complex action patterns (shake head, nod, etc.)
Args:
pose_trace: Pose trace list
Returns:
List of complex action events
"""
complex_actions = []
# Shake head detection
for i in range(len(pose_trace) - 2):
angles = [pose_trace[i]["angle"], pose_trace[i+1]["angle"], pose_trace[i+2]["angle"]]
# Check shake_head pattern
if angles == ["profile_left", "profile_right", "profile_left"]:
duration_frames = pose_trace[i+2]["frame"] - pose_trace[i]["frame"]
if 5 <= duration_frames <= 30:
complex_actions.append({
"action": "shake_head",
"start_frame": pose_trace[i]["frame"],
"end_frame": pose_trace[i+2]["frame"],
"duration_frames": duration_frames,
"description": "shake head left-right-left",
})
elif angles == ["profile_right", "profile_left", "profile_right"]:
duration_frames = pose_trace[i+2]["frame"] - pose_trace[i]["frame"]
if 5 <= duration_frames <= 30:
complex_actions.append({
"action": "shake_head",
"start_frame": pose_trace[i]["frame"],
"end_frame": pose_trace[i+2]["frame"],
"duration_frames": duration_frames,
"description": "shake head right-left-right",
})
# Nod detection (pitch-based)
for i in range(len(pose_trace) - 2):
pitches = [pose_trace[i]["pitch"], pose_trace[i+1]["pitch"], pose_trace[i+2]["pitch"]]
if pitches == ["tilted_up", "tilted_down", "tilted_up"] or \
pitches == ["tilted_down", "tilted_up", "tilted_down"]:
duration_frames = pose_trace[i+2]["frame"] - pose_trace[i]["frame"]
if 3 <= duration_frames <= 20:
complex_actions.append({
"action": "nod_head",
"start_frame": pose_trace[i]["frame"],
"end_frame": pose_trace[i+2]["frame"],
"duration_frames": duration_frames,
"description": "nod head up-down",
})
return complex_actions
def build_action_timeline(trace: Dict) -> Dict:
"""
Build action timeline from pose_trace
Args:
trace: Trace data with pose_trace, pose_transitions
Returns:
Action timeline dict
"""
pose_trace = trace.get("pose_trace", [])
pose_transitions = trace.get("pose_transitions", [])
if len(pose_trace) < 1:
return {
"trace_id": trace.get("trace_id"),
"action_timeline": [],
"action_summary": {},
"complex_actions": [],
}
action_timeline = []
complex_actions = detect_complex_actions(pose_trace)
# Build pose segments (stable periods)
pose_segments = []
current_pose = pose_trace[0]["angle"]
current_start = pose_trace[0]["frame"]
current_pitch = pose_trace[0]["pitch"]
for i in range(1, len(pose_trace)):
pose = pose_trace[i]
# Check if pose changed
if pose["angle"] != current_pose or pose["pitch"] != current_pitch:
pose_segments.append({
"angle": current_pose,
"pitch": current_pitch,
"start_frame": current_start,
"end_frame": pose_trace[i-1]["frame"],
"duration_frames": pose_trace[i-1]["frame"] - current_start + 1,
})
current_pose = pose["angle"]
current_pitch = pose["pitch"]
current_start = pose["frame"]
# Add last segment
pose_segments.append({
"angle": current_pose,
"pitch": current_pitch,
"start_frame": current_start,
"end_frame": pose_trace[-1]["frame"],
"duration_frames": pose_trace[-1]["frame"] - current_start + 1,
})
# Build action timeline
for seg in pose_segments:
# Determine action name
if seg["duration_frames"] >= 10: # Stable pose (>= 10 frames)
action_name = STABLE_ACTION_NAMES.get(seg["angle"], "pose_stable")
# Add pitch modifier
if seg["pitch"] != "neutral":
action_name += f"_pitch_{seg['pitch']}"
action_timeline.append({
"frame": seg["start_frame"],
"action": action_name,
"duration_frames": seg["duration_frames"],
"description": f"stable {seg['angle']} pose for {seg['duration_frames']} frames",
"type": "stable",
})
else: # Short pose (transitional)
action_name = f"pose_{seg['angle']}_brief"
action_timeline.append({
"frame": seg["start_frame"],
"action": action_name,
"duration_frames": seg["duration_frames"],
"description": f"brief {seg['angle']} pose for {seg['duration_frames']} frames",
"type": "transitional",
})
# Add transition actions
for trans in pose_transitions:
action_name = decode_pose_to_action(trans["from_angle"], trans["to_angle"])
action_timeline.append({
"frame": trans["frame"],
"action": action_name,
"duration_frames": 1, # Transition is instant
"description": f"transition from {trans['from_angle']} to {trans['to_angle']}",
"type": "transition",
})
# Sort by frame
action_timeline.sort(key=lambda x: x["frame"])
# Add complex actions
for complex_act in complex_actions:
action_timeline.append({
"frame": complex_act["start_frame"],
"action": complex_act["action"],
"duration_frames": complex_act["duration_frames"],
"description": complex_act["description"],
"type": "complex",
})
# Re-sort
action_timeline.sort(key=lambda x: (x["frame"], -x["duration_frames"]))
# Build action summary
action_counts = defaultdict(int)
action_durations = defaultdict(float)
for act in action_timeline:
action_counts[act["action"]] += 1
action_durations[act["action"]] += act["duration_frames"]
action_summary = {
"total_actions": len(action_timeline),
"unique_actions": len(action_counts),
"action_counts": dict(action_counts),
"action_durations_frames": {k: round(v, 1) for k, v in action_durations.items()},
"complex_action_count": len(complex_actions),
"stable_percentage": round(
sum(1 for act in action_timeline if act["type"] == "stable") / len(action_timeline) * 100, 1
) if action_timeline else 0,
}
return {
"trace_id": trace.get("trace_id"),
"action_timeline": action_timeline,
"action_summary": action_summary,
"complex_actions": complex_actions,
}
def generate_action_description(action_timeline: List[Dict]) -> str:
"""
Generate human-readable action description
Args:
action_timeline: Action timeline list
Returns:
Action description string
"""
if not action_timeline:
return "No actions detected"
# Group actions by type
stable_actions = [a for a in action_timeline if a["type"] == "stable"]
transition_actions = [a for a in action_timeline if a["type"] == "transition"]
complex_actions = [a for a in action_timeline if a["type"] == "complex"]
desc_parts = []
# Stable poses
if stable_actions:
stable_desc = []
for act in stable_actions[:3]: # Top 3 stable poses
stable_desc.append(f"{act['description']}")
desc_parts.append(f"Stable poses: {', '.join(stable_desc)}")
# Transitions
if transition_actions:
trans_desc = [act["action"] for act in transition_actions[:5]] # Top 5 transitions
desc_parts.append(f"Transitions: {', '.join(trans_desc)}")
# Complex actions
if complex_actions:
complex_desc = [act["action"] for act in complex_actions]
desc_parts.append(f"Complex actions: {', '.join(complex_desc)}")
return ". ".join(desc_parts)
def visualize_action_timeline(action_data: Dict, output_path: str = None) -> None:
"""
Visualize action timeline
"""
traces_data = action_data.get("traces", {})
if not traces_data:
print("No traces found")
return
fig, axes = plt.subplots(len(traces_data), 1, figsize=(16, 3 * len(traces_data)))
if len(traces_data) == 1:
axes = [axes]
action_colors = {
"frontal_stable": "green",
"three_quarter_stable": "blue",
"profile_left_stable": "orange",
"profile_right_stable": "red",
"turn_left": "purple",
"turn_right": "purple",
"turn_full": "darkred",
"shake_head": "yellow",
"nod_head": "cyan",
"look_up": "lightgreen",
"look_down": "brown",
}
for ax, (trace_id, data) in zip(axes, sorted(traces_data.items())):
timeline = data["action_timeline"]
if not timeline:
continue
# Plot action timeline as bars
for act in timeline:
color = action_colors.get(act["action"], "gray")
if act["duration_frames"] > 1:
ax.barh(
y=0,
width=act["duration_frames"],
left=act["frame"],
height=0.8,
color=color,
alpha=0.6,
edgecolor="black",
linewidth=0.5,
)
# Add label for stable actions
if act["type"] == "stable" and act["duration_frames"] > 30:
ax.text(
act["frame"] + act["duration_frames"] / 2,
0,
act["action"],
ha="center",
va="center",
fontsize=8,
color="white",
)
else:
# Instant action (transition)
ax.axvline(x=act["frame"], color=color, linestyle="--", alpha=0.8)
ax.text(
act["frame"],
0.5,
act["action"],
fontsize=7,
rotation=90,
va="bottom",
ha="center",
)
ax.set_xlabel("Frame Number")
ax.set_ylabel("Action")
ax.set_title(f"Trace {trace_id} Action Timeline")
ax.set_ylim(-0.5, 1)
ax.grid(True, alpha=0.3)
plt.tight_layout()
if output_path:
plt.savefig(output_path, dpi=150, bbox_inches="tight")
print(f"\n✅ Visualization saved to: {output_path}")
else:
plt.show()
def print_action_report(action_data: Dict) -> None:
"""
Print action report
"""
traces_data = action_data.get("traces", {})
print("\n" + "=" * 70)
print("Pose Action Decoder Report")
print("=" * 70)
for trace_id, data in sorted(traces_data.items()):
print(f"\n{'='*70}")
print(f"Trace {trace_id}")
print(f"{'='*70}")
summary = data["action_summary"]
print("\nSummary:")
print(f" Total Actions: {summary['total_actions']}")
print(f" Unique Actions: {summary['unique_actions']}")
print(f" Complex Actions: {summary['complex_action_count']}")
print(f" Stable Percentage: {summary['stable_percentage']}%")
print("\nAction Counts:")
for action, count in sorted(summary["action_counts"].items(), key=lambda x: x[1], reverse=True):
print(f" {action}: {count}")
print("\nAction Timeline (前 10 个):")
timeline = data["action_timeline"]
for act in timeline[:10]:
print(f" Frame {act['frame']}: {act['action']} ({act['type']}, {act['duration_frames']} frames)")
if data["complex_actions"]:
print("\nComplex Actions:")
for act in data["complex_actions"]:
print(f" {act['action']}: frames {act['start_frame']}-{act['end_frame']} ({act['duration_frames']} frames)")
# Generate description
desc = generate_action_description(data["action_timeline"])
print("\nHuman-readable Description:")
print(f" {desc}")
def main():
parser = argparse.ArgumentParser(description="Decode pose_trace into action names")
parser.add_argument("--face-json", required=True, help="Path to face_traced.json")
parser.add_argument("--output-json", help="Output action data JSON")
parser.add_argument("--output-plot", help="Output action timeline plot PNG")
parser.add_argument("--trace-id", type=int, help="Analyze specific trace only")
args = parser.parse_args()
print("=" * 70)
print("Pose Action Decoder")
print("=" * 70)
with open(args.face_json) as f:
face_data = json.load(f)
traces = face_data.get("traces", {})
if not traces:
print("No traces found in face_traced.json")
return
# Filter by trace_id if specified
if args.trace_id:
traces = {str(args.trace_id): traces.get(str(args.trace_id))}
if not traces[str(args.trace_id)]:
print(f"Trace {args.trace_id} not found")
return
print(f"\nAnalyzing {len(traces)} traces...")
action_data = {"traces": {}}
for trace_id_str, trace in traces.items():
action_result = build_action_timeline(trace)
action_data["traces"][trace_id_str] = action_result
print_action_report(action_data)
if args.output_json:
with open(args.output_json, "w") as f:
json.dump(action_data, f, indent=2)
print(f"\n✅ Action data saved to: {args.output_json}")
if args.output_plot:
visualize_action_timeline(action_data, args.output_plot)
if __name__ == "__main__":
main()