- Remove session-ses_2f27.md (161KB raw session log) - Remove 49 ROOT_* duplicate files across REFERENCE/ - Remove 14 duplicate files between REFERENCE/ root and history/ - Remove asr_legacy.rs (dead code, replaced by asr.rs) - Remove src/core/worker/ (duplicate JobWorker) - Remove src/core/layers/ (empty directory) - Remove 4 .bak files in src/ - Remove 7 dead private methods in worker/processor.rs - Remove backup directory from git tracking
520 lines
17 KiB
Python
520 lines
17 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Pose Action Decoder - Convert pose_trace into human-readable action names
|
|
|
|
Purpose:
|
|
1. Decode pose transitions into action names (turn left/right, look up/down, shake head, nod)
|
|
2. Identify stable pose segments with duration
|
|
3. Generate action timeline for each trace
|
|
|
|
Action Types:
|
|
- Simple: turn_left, turn_right, look_up, look_down
|
|
- Complex: shake_head, nod_head, turn_full
|
|
- Stable: frontal_stable, profile_left_stable, profile_right_stable, three_quarter_stable
|
|
|
|
Output:
|
|
1. Action timeline (frame-based action list)
|
|
2. Action summary (total counts, duration)
|
|
3. Action visualization (timeline plot)
|
|
"""
|
|
|
|
import json
|
|
import argparse
|
|
import matplotlib.pyplot as plt
|
|
from typing import Dict, List
|
|
from collections import defaultdict
|
|
|
|
|
|
# Action definitions
|
|
POSE_TO_ACTION = {
|
|
# Turn actions (angle changes)
|
|
("frontal", "three_quarter"): "turn_partial",
|
|
("frontal", "profile_left"): "turn_left",
|
|
("frontal", "profile_right"): "turn_right",
|
|
("three_quarter", "frontal"): "return_frontal",
|
|
("three_quarter", "profile_left"): "turn_left",
|
|
("three_quarter", "profile_right"): "turn_right",
|
|
("profile_left", "frontal"): "turn_to_frontal",
|
|
("profile_left", "three_quarter"): "turn_to_three_quarter",
|
|
("profile_left", "profile_right"): "turn_full",
|
|
("profile_right", "frontal"): "turn_to_frontal",
|
|
("profile_right", "three_quarter"): "turn_to_three_quarter",
|
|
("profile_right", "profile_left"): "turn_full",
|
|
|
|
# Pitch actions
|
|
("neutral", "tilted_up"): "look_up",
|
|
("neutral", "tilted_down"): "look_down",
|
|
("tilted_up", "neutral"): "return_neutral",
|
|
("tilted_down", "neutral"): "return_neutral",
|
|
("tilted_up", "tilted_down"): "nod_full",
|
|
("tilted_down", "tilted_up"): "nod_full",
|
|
}
|
|
|
|
# Stable pose names
|
|
STABLE_ACTION_NAMES = {
|
|
"frontal": "frontal_stable",
|
|
"three_quarter": "three_quarter_stable",
|
|
"profile_left": "profile_left_stable",
|
|
"profile_right": "profile_right_stable",
|
|
"unknown": "pose_unknown",
|
|
}
|
|
|
|
# Complex action patterns (3+ transitions in short time)
|
|
COMPLEX_PATTERNS = {
|
|
# Shake head: profile_left → profile_right → profile_left (or reverse)
|
|
"shake_head": {
|
|
"sequence": ["profile_left", "profile_right", "profile_left"],
|
|
"min_frames": 5,
|
|
"max_frames": 30,
|
|
},
|
|
"shake_head_reverse": {
|
|
"sequence": ["profile_right", "profile_left", "profile_right"],
|
|
"min_frames": 5,
|
|
"max_frames": 30,
|
|
},
|
|
# Nod: tilted_up → tilted_down → tilted_up (or reverse)
|
|
"nod_head": {
|
|
"sequence": ["tilted_up", "tilted_down", "tilted_up"],
|
|
"min_frames": 3,
|
|
"max_frames": 20,
|
|
"pitch_mode": True,
|
|
},
|
|
}
|
|
|
|
|
|
def decode_pose_to_action(from_pose: str, to_pose: str) -> str:
|
|
"""
|
|
Decode single pose transition to action name
|
|
|
|
Args:
|
|
from_pose: Source pose angle
|
|
to_pose: Target pose angle
|
|
|
|
Returns:
|
|
Action name
|
|
"""
|
|
key = (from_pose, to_pose)
|
|
|
|
if key in POSE_TO_ACTION:
|
|
return POSE_TO_ACTION[key]
|
|
|
|
# Default action
|
|
return f"pose_change_{from_pose}_to_{to_pose}"
|
|
|
|
|
|
def detect_complex_actions(pose_trace: List[Dict]) -> List[Dict]:
|
|
"""
|
|
Detect complex action patterns (shake head, nod, etc.)
|
|
|
|
Args:
|
|
pose_trace: Pose trace list
|
|
|
|
Returns:
|
|
List of complex action events
|
|
"""
|
|
complex_actions = []
|
|
|
|
# Shake head detection
|
|
for i in range(len(pose_trace) - 2):
|
|
angles = [pose_trace[i]["angle"], pose_trace[i+1]["angle"], pose_trace[i+2]["angle"]]
|
|
|
|
# Check shake_head pattern
|
|
if angles == ["profile_left", "profile_right", "profile_left"]:
|
|
duration_frames = pose_trace[i+2]["frame"] - pose_trace[i]["frame"]
|
|
if 5 <= duration_frames <= 30:
|
|
complex_actions.append({
|
|
"action": "shake_head",
|
|
"start_frame": pose_trace[i]["frame"],
|
|
"end_frame": pose_trace[i+2]["frame"],
|
|
"duration_frames": duration_frames,
|
|
"description": "shake head left-right-left",
|
|
})
|
|
|
|
elif angles == ["profile_right", "profile_left", "profile_right"]:
|
|
duration_frames = pose_trace[i+2]["frame"] - pose_trace[i]["frame"]
|
|
if 5 <= duration_frames <= 30:
|
|
complex_actions.append({
|
|
"action": "shake_head",
|
|
"start_frame": pose_trace[i]["frame"],
|
|
"end_frame": pose_trace[i+2]["frame"],
|
|
"duration_frames": duration_frames,
|
|
"description": "shake head right-left-right",
|
|
})
|
|
|
|
# Nod detection (pitch-based)
|
|
for i in range(len(pose_trace) - 2):
|
|
pitches = [pose_trace[i]["pitch"], pose_trace[i+1]["pitch"], pose_trace[i+2]["pitch"]]
|
|
|
|
if pitches == ["tilted_up", "tilted_down", "tilted_up"] or \
|
|
pitches == ["tilted_down", "tilted_up", "tilted_down"]:
|
|
duration_frames = pose_trace[i+2]["frame"] - pose_trace[i]["frame"]
|
|
if 3 <= duration_frames <= 20:
|
|
complex_actions.append({
|
|
"action": "nod_head",
|
|
"start_frame": pose_trace[i]["frame"],
|
|
"end_frame": pose_trace[i+2]["frame"],
|
|
"duration_frames": duration_frames,
|
|
"description": "nod head up-down",
|
|
})
|
|
|
|
return complex_actions
|
|
|
|
|
|
def build_action_timeline(trace: Dict) -> Dict:
|
|
"""
|
|
Build action timeline from pose_trace
|
|
|
|
Args:
|
|
trace: Trace data with pose_trace, pose_transitions
|
|
|
|
Returns:
|
|
Action timeline dict
|
|
"""
|
|
pose_trace = trace.get("pose_trace", [])
|
|
pose_transitions = trace.get("pose_transitions", [])
|
|
|
|
if len(pose_trace) < 1:
|
|
return {
|
|
"trace_id": trace.get("trace_id"),
|
|
"action_timeline": [],
|
|
"action_summary": {},
|
|
"complex_actions": [],
|
|
}
|
|
|
|
action_timeline = []
|
|
complex_actions = detect_complex_actions(pose_trace)
|
|
|
|
# Build pose segments (stable periods)
|
|
pose_segments = []
|
|
current_pose = pose_trace[0]["angle"]
|
|
current_start = pose_trace[0]["frame"]
|
|
current_pitch = pose_trace[0]["pitch"]
|
|
|
|
for i in range(1, len(pose_trace)):
|
|
pose = pose_trace[i]
|
|
|
|
# Check if pose changed
|
|
if pose["angle"] != current_pose or pose["pitch"] != current_pitch:
|
|
pose_segments.append({
|
|
"angle": current_pose,
|
|
"pitch": current_pitch,
|
|
"start_frame": current_start,
|
|
"end_frame": pose_trace[i-1]["frame"],
|
|
"duration_frames": pose_trace[i-1]["frame"] - current_start + 1,
|
|
})
|
|
current_pose = pose["angle"]
|
|
current_pitch = pose["pitch"]
|
|
current_start = pose["frame"]
|
|
|
|
# Add last segment
|
|
pose_segments.append({
|
|
"angle": current_pose,
|
|
"pitch": current_pitch,
|
|
"start_frame": current_start,
|
|
"end_frame": pose_trace[-1]["frame"],
|
|
"duration_frames": pose_trace[-1]["frame"] - current_start + 1,
|
|
})
|
|
|
|
# Build action timeline
|
|
for seg in pose_segments:
|
|
# Determine action name
|
|
if seg["duration_frames"] >= 10: # Stable pose (>= 10 frames)
|
|
action_name = STABLE_ACTION_NAMES.get(seg["angle"], "pose_stable")
|
|
|
|
# Add pitch modifier
|
|
if seg["pitch"] != "neutral":
|
|
action_name += f"_pitch_{seg['pitch']}"
|
|
|
|
action_timeline.append({
|
|
"frame": seg["start_frame"],
|
|
"action": action_name,
|
|
"duration_frames": seg["duration_frames"],
|
|
"description": f"stable {seg['angle']} pose for {seg['duration_frames']} frames",
|
|
"type": "stable",
|
|
})
|
|
|
|
else: # Short pose (transitional)
|
|
action_name = f"pose_{seg['angle']}_brief"
|
|
action_timeline.append({
|
|
"frame": seg["start_frame"],
|
|
"action": action_name,
|
|
"duration_frames": seg["duration_frames"],
|
|
"description": f"brief {seg['angle']} pose for {seg['duration_frames']} frames",
|
|
"type": "transitional",
|
|
})
|
|
|
|
# Add transition actions
|
|
for trans in pose_transitions:
|
|
action_name = decode_pose_to_action(trans["from_angle"], trans["to_angle"])
|
|
action_timeline.append({
|
|
"frame": trans["frame"],
|
|
"action": action_name,
|
|
"duration_frames": 1, # Transition is instant
|
|
"description": f"transition from {trans['from_angle']} to {trans['to_angle']}",
|
|
"type": "transition",
|
|
})
|
|
|
|
# Sort by frame
|
|
action_timeline.sort(key=lambda x: x["frame"])
|
|
|
|
# Add complex actions
|
|
for complex_act in complex_actions:
|
|
action_timeline.append({
|
|
"frame": complex_act["start_frame"],
|
|
"action": complex_act["action"],
|
|
"duration_frames": complex_act["duration_frames"],
|
|
"description": complex_act["description"],
|
|
"type": "complex",
|
|
})
|
|
|
|
# Re-sort
|
|
action_timeline.sort(key=lambda x: (x["frame"], -x["duration_frames"]))
|
|
|
|
# Build action summary
|
|
action_counts = defaultdict(int)
|
|
action_durations = defaultdict(float)
|
|
|
|
for act in action_timeline:
|
|
action_counts[act["action"]] += 1
|
|
action_durations[act["action"]] += act["duration_frames"]
|
|
|
|
action_summary = {
|
|
"total_actions": len(action_timeline),
|
|
"unique_actions": len(action_counts),
|
|
"action_counts": dict(action_counts),
|
|
"action_durations_frames": {k: round(v, 1) for k, v in action_durations.items()},
|
|
"complex_action_count": len(complex_actions),
|
|
"stable_percentage": round(
|
|
sum(1 for act in action_timeline if act["type"] == "stable") / len(action_timeline) * 100, 1
|
|
) if action_timeline else 0,
|
|
}
|
|
|
|
return {
|
|
"trace_id": trace.get("trace_id"),
|
|
"action_timeline": action_timeline,
|
|
"action_summary": action_summary,
|
|
"complex_actions": complex_actions,
|
|
}
|
|
|
|
|
|
def generate_action_description(action_timeline: List[Dict]) -> str:
|
|
"""
|
|
Generate human-readable action description
|
|
|
|
Args:
|
|
action_timeline: Action timeline list
|
|
|
|
Returns:
|
|
Action description string
|
|
"""
|
|
if not action_timeline:
|
|
return "No actions detected"
|
|
|
|
# Group actions by type
|
|
stable_actions = [a for a in action_timeline if a["type"] == "stable"]
|
|
transition_actions = [a for a in action_timeline if a["type"] == "transition"]
|
|
complex_actions = [a for a in action_timeline if a["type"] == "complex"]
|
|
|
|
desc_parts = []
|
|
|
|
# Stable poses
|
|
if stable_actions:
|
|
stable_desc = []
|
|
for act in stable_actions[:3]: # Top 3 stable poses
|
|
stable_desc.append(f"{act['description']}")
|
|
desc_parts.append(f"Stable poses: {', '.join(stable_desc)}")
|
|
|
|
# Transitions
|
|
if transition_actions:
|
|
trans_desc = [act["action"] for act in transition_actions[:5]] # Top 5 transitions
|
|
desc_parts.append(f"Transitions: {', '.join(trans_desc)}")
|
|
|
|
# Complex actions
|
|
if complex_actions:
|
|
complex_desc = [act["action"] for act in complex_actions]
|
|
desc_parts.append(f"Complex actions: {', '.join(complex_desc)}")
|
|
|
|
return ". ".join(desc_parts)
|
|
|
|
|
|
def visualize_action_timeline(action_data: Dict, output_path: str = None) -> None:
|
|
"""
|
|
Visualize action timeline
|
|
"""
|
|
traces_data = action_data.get("traces", {})
|
|
|
|
if not traces_data:
|
|
print("No traces found")
|
|
return
|
|
|
|
fig, axes = plt.subplots(len(traces_data), 1, figsize=(16, 3 * len(traces_data)))
|
|
|
|
if len(traces_data) == 1:
|
|
axes = [axes]
|
|
|
|
action_colors = {
|
|
"frontal_stable": "green",
|
|
"three_quarter_stable": "blue",
|
|
"profile_left_stable": "orange",
|
|
"profile_right_stable": "red",
|
|
"turn_left": "purple",
|
|
"turn_right": "purple",
|
|
"turn_full": "darkred",
|
|
"shake_head": "yellow",
|
|
"nod_head": "cyan",
|
|
"look_up": "lightgreen",
|
|
"look_down": "brown",
|
|
}
|
|
|
|
for ax, (trace_id, data) in zip(axes, sorted(traces_data.items())):
|
|
timeline = data["action_timeline"]
|
|
|
|
if not timeline:
|
|
continue
|
|
|
|
# Plot action timeline as bars
|
|
for act in timeline:
|
|
color = action_colors.get(act["action"], "gray")
|
|
|
|
if act["duration_frames"] > 1:
|
|
ax.barh(
|
|
y=0,
|
|
width=act["duration_frames"],
|
|
left=act["frame"],
|
|
height=0.8,
|
|
color=color,
|
|
alpha=0.6,
|
|
edgecolor="black",
|
|
linewidth=0.5,
|
|
)
|
|
|
|
# Add label for stable actions
|
|
if act["type"] == "stable" and act["duration_frames"] > 30:
|
|
ax.text(
|
|
act["frame"] + act["duration_frames"] / 2,
|
|
0,
|
|
act["action"],
|
|
ha="center",
|
|
va="center",
|
|
fontsize=8,
|
|
color="white",
|
|
)
|
|
else:
|
|
# Instant action (transition)
|
|
ax.axvline(x=act["frame"], color=color, linestyle="--", alpha=0.8)
|
|
ax.text(
|
|
act["frame"],
|
|
0.5,
|
|
act["action"],
|
|
fontsize=7,
|
|
rotation=90,
|
|
va="bottom",
|
|
ha="center",
|
|
)
|
|
|
|
ax.set_xlabel("Frame Number")
|
|
ax.set_ylabel("Action")
|
|
ax.set_title(f"Trace {trace_id} Action Timeline")
|
|
ax.set_ylim(-0.5, 1)
|
|
ax.grid(True, alpha=0.3)
|
|
|
|
plt.tight_layout()
|
|
|
|
if output_path:
|
|
plt.savefig(output_path, dpi=150, bbox_inches="tight")
|
|
print(f"\n✅ Visualization saved to: {output_path}")
|
|
else:
|
|
plt.show()
|
|
|
|
|
|
def print_action_report(action_data: Dict) -> None:
|
|
"""
|
|
Print action report
|
|
"""
|
|
traces_data = action_data.get("traces", {})
|
|
|
|
print("\n" + "=" * 70)
|
|
print("Pose Action Decoder Report")
|
|
print("=" * 70)
|
|
|
|
for trace_id, data in sorted(traces_data.items()):
|
|
print(f"\n{'='*70}")
|
|
print(f"Trace {trace_id}")
|
|
print(f"{'='*70}")
|
|
|
|
summary = data["action_summary"]
|
|
print("\nSummary:")
|
|
print(f" Total Actions: {summary['total_actions']}")
|
|
print(f" Unique Actions: {summary['unique_actions']}")
|
|
print(f" Complex Actions: {summary['complex_action_count']}")
|
|
print(f" Stable Percentage: {summary['stable_percentage']}%")
|
|
|
|
print("\nAction Counts:")
|
|
for action, count in sorted(summary["action_counts"].items(), key=lambda x: x[1], reverse=True):
|
|
print(f" {action}: {count}")
|
|
|
|
print("\nAction Timeline (前 10 个):")
|
|
timeline = data["action_timeline"]
|
|
for act in timeline[:10]:
|
|
print(f" Frame {act['frame']}: {act['action']} ({act['type']}, {act['duration_frames']} frames)")
|
|
|
|
if data["complex_actions"]:
|
|
print("\nComplex Actions:")
|
|
for act in data["complex_actions"]:
|
|
print(f" {act['action']}: frames {act['start_frame']}-{act['end_frame']} ({act['duration_frames']} frames)")
|
|
|
|
# Generate description
|
|
desc = generate_action_description(data["action_timeline"])
|
|
print("\nHuman-readable Description:")
|
|
print(f" {desc}")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Decode pose_trace into action names")
|
|
parser.add_argument("--face-json", required=True, help="Path to face_traced.json")
|
|
parser.add_argument("--output-json", help="Output action data JSON")
|
|
parser.add_argument("--output-plot", help="Output action timeline plot PNG")
|
|
parser.add_argument("--trace-id", type=int, help="Analyze specific trace only")
|
|
args = parser.parse_args()
|
|
|
|
print("=" * 70)
|
|
print("Pose Action Decoder")
|
|
print("=" * 70)
|
|
|
|
with open(args.face_json) as f:
|
|
face_data = json.load(f)
|
|
|
|
traces = face_data.get("traces", {})
|
|
|
|
if not traces:
|
|
print("No traces found in face_traced.json")
|
|
return
|
|
|
|
# Filter by trace_id if specified
|
|
if args.trace_id:
|
|
traces = {str(args.trace_id): traces.get(str(args.trace_id))}
|
|
if not traces[str(args.trace_id)]:
|
|
print(f"Trace {args.trace_id} not found")
|
|
return
|
|
|
|
print(f"\nAnalyzing {len(traces)} traces...")
|
|
|
|
action_data = {"traces": {}}
|
|
|
|
for trace_id_str, trace in traces.items():
|
|
action_result = build_action_timeline(trace)
|
|
action_data["traces"][trace_id_str] = action_result
|
|
|
|
print_action_report(action_data)
|
|
|
|
if args.output_json:
|
|
with open(args.output_json, "w") as f:
|
|
json.dump(action_data, f, indent=2)
|
|
print(f"\n✅ Action data saved to: {args.output_json}")
|
|
|
|
if args.output_plot:
|
|
visualize_action_timeline(action_data, args.output_plot)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |