Files
momentry_core/scripts/utils/pose_transition_analyzer.py
Warren e75c4d6f07 cleanup: remove dead code and duplicate docs
- Remove session-ses_2f27.md (161KB raw session log)
- Remove 49 ROOT_* duplicate files across REFERENCE/
- Remove 14 duplicate files between REFERENCE/ root and history/
- Remove asr_legacy.rs (dead code, replaced by asr.rs)
- Remove src/core/worker/ (duplicate JobWorker)
- Remove src/core/layers/ (empty directory)
- Remove 4 .bak files in src/
- Remove 7 dead private methods in worker/processor.rs
- Remove backup directory from git tracking
2026-05-04 01:31:21 +08:00

237 lines
8.2 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Pose Transition Analyzer - Analyze pose changes within traces
Purpose:
1. Visualize pose transitions over time
2. Calculate transition frequency and duration
3. Identify pose stability patterns
Output:
1. Pose transition timeline
2. Pose duration statistics
3. Stability score per trace
"""
import json
import argparse
import numpy as np
import matplotlib.pyplot as plt
from typing import Dict
def analyze_pose_transitions(face_data: Dict) -> Dict:
"""
Analyze pose transitions for all traces
Returns:
Dict with transition analysis results
"""
traces = face_data.get("traces", {})
if not traces:
return {}
analysis = {}
for trace_id_str, trace in traces.items():
trace_id = int(trace_id_str)
pose_trace = trace.get("pose_trace", [])
transitions = trace.get("pose_transitions", [])
if len(pose_trace) < 2:
continue
# Pose duration analysis
pose_segments = []
current_pose = pose_trace[0]["angle"]
current_start = pose_trace[0]["frame"]
for i, pose in enumerate(pose_trace[1:], 1):
if pose["angle"] != current_pose:
pose_segments.append({
"angle": current_pose,
"start_frame": current_start,
"end_frame": pose_trace[i-1]["frame"],
"duration_frames": pose_trace[i-1]["frame"] - current_start + 1,
"avg_confidence": np.mean([
p["confidence"]
for p in pose_trace[current_start-pose_trace[0]["frame"]:i]
]),
})
current_pose = pose["angle"]
current_start = pose["frame"]
# Add last segment
pose_segments.append({
"angle": current_pose,
"start_frame": current_start,
"end_frame": pose_trace[-1]["frame"],
"duration_frames": pose_trace[-1]["frame"] - current_start + 1,
"avg_confidence": np.mean([
p["confidence"]
for p in pose_trace[current_start-pose_trace[0]["frame"]:]
]),
})
# Transition frequency
transition_frequency = len(transitions) / trace["duration_seconds"] if trace["duration_seconds"] > 0 else 0
# Stability score (inverse of transition frequency)
stability_score = 1.0 - min(transition_frequency / 2.0, 1.0) # 2 transitions/second = fully unstable
# Pose average duration
pose_avg_duration = {}
for angle in set([s["angle"] for s in pose_segments]):
segments_for_angle = [s for s in pose_segments if s["angle"] == angle]
avg_dur = np.mean([s["duration_frames"] for s in segments_for_angle])
pose_avg_duration[angle] = round(avg_dur, 1)
analysis[trace_id] = {
"trace_id": trace_id,
"total_transitions": len(transitions),
"transition_frequency": round(transition_frequency, 3), # transitions per second
"stability_score": round(stability_score, 3), # 0-1, higher = more stable
"pose_segments": pose_segments,
"pose_avg_duration": pose_avg_duration,
"longest_stable_pose": max(pose_segments, key=lambda x: x["duration_frames"]),
"transition_events": transitions,
}
return analysis
def visualize_pose_transitions(face_data: Dict, output_path: str = None) -> None:
"""
Visualize pose transitions for all traces
"""
traces = face_data.get("traces", {})
if not traces:
print("No traces found")
return
sorted_traces = sorted(traces.values(), key=lambda x: x["duration_frames"], reverse=True)
fig, axes = plt.subplots(len(sorted_traces), 1, figsize=(16, 4 * len(sorted_traces)))
if len(sorted_traces) == 1:
axes = [axes]
pose_colors = {
"frontal": "green",
"three_quarter": "blue",
"profile_left": "orange",
"profile_right": "red",
"unknown": "gray",
}
for ax, trace in zip(axes, sorted_traces):
trace_id = trace["trace_id"]
pose_trace = trace.get("pose_trace", [])
if not pose_trace:
continue
frames = [p["frame"] for p in pose_trace]
angles = [p["angle"] for p in pose_trace]
confidences = [p["confidence"] for p in pose_trace]
# Plot pose angle timeline
for i in range(len(frames) - 1):
color = pose_colors.get(angles[i], "gray")
ax.fill_between(
[frames[i], frames[i+1]],
[0, 0],
[1, 1],
color=color,
alpha=0.6,
)
# Mark transitions
transitions = trace.get("pose_transitions", [])
for t in transitions:
ax.axvline(x=t["frame"], color="black", linestyle="--", alpha=0.5, linewidth=1)
ax.text(t["frame"], 1.05, f"{t['from_angle']}{t['to_angle']}",
fontsize=8, rotation=90, va="bottom", ha="center")
# Plot confidence line
ax2 = ax.twinx()
ax2.plot(frames, confidences, color="purple", linewidth=1, alpha=0.7, label="Confidence")
ax2.set_ylabel("Confidence", color="purple")
ax2.set_ylim(0, 1)
ax.set_xlabel("Frame Number")
ax.set_ylabel("Pose Angle")
ax.set_title(f"Trace {trace_id} Pose Timeline (Frames {trace['start_frame']}-{trace['end_frame']})")
ax.set_ylim(0, 1.2)
# Add pose legend
legend_elements = []
for pose in set(angles):
color = pose_colors.get(pose, "gray")
legend_elements.append(plt.Rectangle((0, 0), 1, 1, fc=color, alpha=0.6, label=pose))
ax.legend(handles=legend_elements, loc="upper right", fontsize=8)
plt.tight_layout()
if output_path:
plt.savefig(output_path, dpi=150, bbox_inches="tight")
print(f"\n✅ Visualization saved to: {output_path}")
else:
plt.show()
def print_transition_analysis(analysis: Dict) -> None:
"""
Print transition analysis results
"""
print("\n" + "=" * 60)
print("Pose Transition Analysis")
print("=" * 60)
for trace_id, data in sorted(analysis.items()):
print(f"\n=== Trace {trace_id} ===")
print(f"Total Transitions: {data['total_transitions']}")
print(f"Transition Frequency: {data['transition_frequency']} transitions/second")
print(f"Stability Score: {data['stability_score']} (0-1, higher = more stable)")
print(f"Longest Stable Pose: {data['longest_stable_pose']['angle']} ({data['longest_stable_pose']['duration_frames']} frames)")
print("\nPose Average Duration:")
for angle, avg_dur in data['pose_avg_duration'].items():
print(f" {angle}: {avg_dur} frames")
print(f"\nPose Segments (共 {len(data['pose_segments'])} 个):")
for seg in data['pose_segments'][:5]:
print(f" {seg['angle']}: frames {seg['start_frame']}-{seg['end_frame']} ({seg['duration_frames']} frames, confidence: {seg['avg_confidence']:.3f})")
def main():
parser = argparse.ArgumentParser(description="Analyze pose transitions in face traces")
parser.add_argument("--face-json", required=True, help="Path to face_traced.json")
parser.add_argument("--output-plot", help="Output plot path (PNG)")
parser.add_argument("--output-json", help="Output analysis JSON path")
args = parser.parse_args()
with open(args.face_json) as f:
face_data = json.load(f)
print("=" * 60)
print("Pose Transition Analyzer")
print("=" * 60)
analysis = analyze_pose_transitions(face_data)
print_transition_analysis(analysis)
if args.output_json:
with open(args.output_json, "w") as f:
json.dump(analysis, f, indent=2)
print(f"\n✅ Analysis saved to: {args.output_json}")
if args.output_plot:
visualize_pose_transitions(face_data, args.output_plot)
if __name__ == "__main__":
main()