Files
momentry_core/scripts/pose_processor_mps.py
Warren e75c4d6f07 cleanup: remove dead code and duplicate docs
- Remove session-ses_2f27.md (161KB raw session log)
- Remove 49 ROOT_* duplicate files across REFERENCE/
- Remove 14 duplicate files between REFERENCE/ root and history/
- Remove asr_legacy.rs (dead code, replaced by asr.rs)
- Remove src/core/worker/ (duplicate JobWorker)
- Remove src/core/layers/ (empty directory)
- Remove 4 .bak files in src/
- Remove 7 dead private methods in worker/processor.rs
- Remove backup directory from git tracking
2026-05-04 01:31:21 +08:00

376 lines
11 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Pose Processor - Apple MPS Optimized Version
Uses YOLOv8 Pose with Apple Silicon MPS acceleration
Features:
- Automatic MPS/CPU fallback
- Metal GPU acceleration for inference
- YOLOv8 Pose model support
- Memory-optimized for unified memory architecture
"""
import sys
import json
import argparse
import os
import signal
import time
from datetime import datetime
from typing import Dict
import cv2
import torch
from ultralytics import YOLO
# COCO keypoint names (17 keypoints)
KEYPOINT_NAMES = [
"nose",
"left_eye",
"right_eye",
"left_ear",
"right_ear",
"left_shoulder",
"right_shoulder",
"left_elbow",
"right_elbow",
"left_wrist",
"right_wrist",
"left_hip",
"right_hip",
"left_knee",
"right_knee",
"left_ankle",
"right_ankle",
]
# Keypoint connections for skeleton visualization
KEYPOINT_CONNECTIONS = [
("left_shoulder", "right_shoulder"),
("left_shoulder", "left_elbow"),
("left_elbow", "left_wrist"),
("right_shoulder", "right_elbow"),
("right_elbow", "right_wrist"),
("left_shoulder", "left_hip"),
("right_shoulder", "right_hip"),
("left_hip", "right_hip"),
("left_hip", "left_knee"),
("left_knee", "left_ankle"),
("right_hip", "right_knee"),
("right_knee", "right_ankle"),
]
def get_device() -> str:
"""Determine the best available device for inference"""
if torch.backends.mps.is_available():
return "mps"
elif torch.cuda.is_available():
return "cuda"
else:
return "cpu"
def signal_handler(signum, frame):
"""Handle interrupt signals gracefully"""
print(f"\n[Pose] Received signal {signum}, saving results and exiting...")
sys.exit(0)
def process_video_pose(
video_path: str,
output_path: str,
model_name: str = "yolov8n-pose",
confidence: float = 0.5,
device: str = "auto",
sample_interval: int = 30,
resume: bool = True,
save_interval: int = 30,
) -> Dict:
"""
Process video for pose estimation with MPS acceleration
Args:
video_path: Path to input video file
output_path: Path to output JSON file
model_name: YOLO Pose model name (yolov8n-pose/s/m/l/x)
confidence: Confidence threshold for keypoints
device: Device to use ('auto', 'mps', 'cuda', 'cpu')
sample_interval: Process every N frames
resume: Whether to resume from existing results
save_interval: Auto-save interval in seconds
Returns:
Dictionary with pose estimation results and metadata
"""
# Set up signal handlers
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# Determine device
if device == "auto":
device = get_device()
print(f"[Pose] Starting pose estimation with device: {device}")
print(f"[Pose] Model: {model_name}, Confidence: {confidence}")
# Load model
print(f"[Pose] Loading model: {model_name}")
model = YOLO(f"{model_name}.pt")
# Move to device
if device in ["mps", "cuda"]:
model.to(device)
# Get video info
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
cap.release()
print(f"[Pose] Video: {width}x{height} @ {fps:.2f} FPS, {total_frames} frames")
# Load existing data if resuming
existing_data = None
last_processed_frame = 0
if resume and os.path.exists(output_path):
try:
with open(output_path, "r") as f:
existing_data = json.load(f)
frames = existing_data.get("frames", {})
if frames:
last_processed_frame = max(int(k) for k in frames.keys())
print(f"[Pose] Resuming from frame {last_processed_frame}")
except (json.JSONDecodeError, KeyError):
pass
# Initialize result structure
result = {
"video_path": video_path,
"model": model_name,
"device": device,
"confidence_threshold": confidence,
"processed_at": datetime.now().isoformat(),
"keypoint_names": KEYPOINT_NAMES,
"connections": KEYPOINT_CONNECTIONS,
"frames": {},
}
if existing_data:
result["frames"] = existing_data.get("frames", {})
# Process video
print(f"[Pose] Processing video: {video_path}")
start_time = time.time()
frame_count = 0
pose_count = 0
last_save_time = start_time
try:
# Use stream mode for memory efficiency
results = model(
video_path,
conf=confidence,
device=device,
stream=True,
imgsz=640,
pose=True,
verbose=False,
)
for idx, r in enumerate(results):
# Skip frames based on sample_interval
if idx % sample_interval != 0:
continue
# Get pose results
keypoints = r.keypoints
if keypoints is not None and len(keypoints) > 0:
# Get keypoint data
kp_data = keypoints.data.cpu().numpy()
frame_poses = []
for person_idx in range(len(keypoints)):
person_keypoints = []
for kp_idx in range(min(17, len(kp_data[person_idx]))):
kp = kp_data[person_idx][kp_idx]
# Keypoint: [x, y, confidence]
if len(kp) >= 3 and kp[2] > confidence:
person_keypoints.append(
{
"name": KEYPOINT_NAMES[kp_idx]
if kp_idx < len(KEYPOINT_NAMES)
else f"kp_{kp_idx}",
"x": float(kp[0]),
"y": float(kp[1]),
"confidence": float(kp[2]),
}
)
if person_keypoints:
frame_poses.append(
{
"keypoints": person_keypoints,
"person_id": person_idx,
}
)
pose_count += 1
if frame_poses:
result["frames"][str(idx)] = {
"timestamp": idx / fps if fps > 0 else 0,
"poses": frame_poses,
}
frame_count += 1
# Progress reporting
if frame_count % 100 == 0:
elapsed = time.time() - start_time
fps_rate = frame_count / elapsed if elapsed > 0 else 0
print(
f"[Pose] Processed {frame_count} frames, {pose_count} poses, {fps_rate:.1f} FPS"
)
# Periodic save
if save_interval > 0 and time.time() - last_save_time > save_interval:
with open(output_path, "w") as f:
json.dump(result, f, indent=2)
last_save_time = time.time()
print(f"[Pose] Auto-saved at frame {frame_count}")
except Exception as e:
print(f"[Pose] Error during processing: {e}")
raise
# Final save
elapsed_time = time.time() - start_time
avg_fps = frame_count / elapsed_time if elapsed_time > 0 else 0
result["summary"] = {
"total_frames": frame_count,
"total_poses": pose_count,
"processing_time": round(elapsed_time, 2),
"average_fps": round(avg_fps, 2),
"model": model_name,
"device": device,
}
# Save final results
with open(output_path, "w") as f:
json.dump(result, f, indent=2)
print(
f"[Pose] Completed: {frame_count} frames, {pose_count} poses in {elapsed_time:.1f}s ({avg_fps:.1f} FPS)"
)
print(f"[Pose] Results saved to: {output_path}")
return result
def benchmark_pose_models(video_path: str, num_frames: int = 100) -> Dict:
"""Benchmark different YOLO Pose models and devices"""
devices = ["cpu"]
if torch.backends.mps.is_available():
devices.append("mps")
if torch.cuda.is_available():
devices.append("cuda")
models = ["yolov8n-pose", "yolov8s-pose"]
results = {}
for model_name in models:
for device in devices:
print(f"[Pose] Benchmarking {model_name} on {device}...")
model = YOLO(f"{model_name}.pt")
if device != "cpu":
model.to(device)
start_time = time.time()
count = 0
try:
for idx, r in enumerate(
model(video_path, device=device, stream=True, imgsz=320, pose=True)
):
if idx >= num_frames:
break
count += 1
except Exception as e:
print(f"[Pose] Error: {e}")
continue
elapsed = time.time() - start_time
fps = count / elapsed if elapsed > 0 else 0
key = f"{model_name}_{device}"
results[key] = {
"frames": count,
"time": round(elapsed, 2),
"fps": round(fps, 2),
}
return results
def main():
parser = argparse.ArgumentParser(description="Pose Processor with MPS Support")
parser.add_argument("--video", required=True, help="Input video path")
parser.add_argument("--output", required=True, help="Output JSON path")
parser.add_argument(
"--model", default="yolov8n-pose", help="YOLO Pose model (yolov8n-pose/s/m/l/x)"
)
parser.add_argument(
"--confidence", type=float, default=0.5, help="Confidence threshold"
)
parser.add_argument(
"--device",
default="auto",
choices=["auto", "mps", "cuda", "cpu"],
help="Device to use",
)
parser.add_argument(
"--sample-interval", type=int, default=30, help="Process every N frames"
)
parser.add_argument(
"--no-resume", action="store_true", help="Do not resume from existing results"
)
parser.add_argument(
"--save-interval", type=int, default=30, help="Auto-save interval in seconds"
)
parser.add_argument(
"--benchmark", action="store_true", help="Run benchmark instead of processing"
)
args = parser.parse_args()
if args.benchmark:
results = benchmark_pose_models(args.video)
print("\n[Benchmark Results]")
print(json.dumps(results, indent=2))
else:
process_video_pose(
video_path=args.video,
output_path=args.output,
model_name=args.model,
confidence=args.confidence,
device=args.device,
sample_interval=args.sample_interval,
resume=not args.no_resume,
save_interval=args.save_interval,
)
if __name__ == "__main__":
main()