Files
momentry_core/scripts/pose_processor.py

120 lines
4.7 KiB
Python
Executable File

#!/opt/homebrew/bin/python3.11
"""
Pose Processor Wrapper
Calls Swift Vision Framework pose (swift_pose) with fallback to YOLOv8 Pose.
Uses VNDetectHumanBodyPoseRequest with ANE acceleration.
"""
import sys
import json
import os
import subprocess
import argparse
SWIFT_POSE_PATH = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"swift_processors/.build/debug/swift_pose"
)
SWIFT_POSE_ALT = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"swift_processors/.build/arm64-apple-macosx/debug/swift_pose"
)
def process_pose(
video_path: str,
output_path: str,
uuid: str = "",
sample_interval: int = 30,
) -> dict:
swift_bin = SWIFT_POSE_PATH
if not os.path.exists(swift_bin):
swift_bin = SWIFT_POSE_ALT
if not os.path.exists(swift_bin):
print("[Pose] Swift binary not found, using YOLOv8 fallback", file=sys.stderr)
return _fallback(video_path, output_path, uuid, sample_interval)
cmd = [swift_bin, video_path, output_path,
"--sample-interval", str(sample_interval),
"--uuid", uuid]
print(f"[Pose] Running Swift Pose (Vision Framework)", file=sys.stderr)
result = subprocess.run(cmd, capture_output=True, text=True, timeout=7200)
if result.stdout:
for line in result.stdout.strip().split("\n"):
print(f" {line}", file=sys.stderr)
if result.stderr:
for line in result.stderr.strip().split("\n"):
print(f" {line}", file=sys.stderr)
if result.returncode != 0 or not os.path.exists(output_path):
print(f"[Pose] Swift Pose failed, falling back to YOLOv8", file=sys.stderr)
return _fallback(video_path, output_path, uuid, sample_interval)
with open(output_path) as f:
return json.load(f)
def _fallback(video_path, output_path, uuid, sample_interval):
"""Fallback to YOLOv8 Pose"""
from ultralytics import YOLO
import cv2
model = YOLO("yolov8n-pose.pt")
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = 0
frames = []
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if frame_count % sample_interval == 0:
ts = frame_count / fps if fps > 0 else 0
results = model(frame, verbose=False, device="cpu")
persons = []
for r in results:
if r.keypoints is None:
continue
for kp_data in r.keypoints:
kps = kp_data.xy[0].cpu().numpy() if hasattr(kp_data, 'xy') else []
confs = kp_data.conf[0].cpu().numpy() if hasattr(kp_data, 'conf') else []
keypoints = []
names = ["nose", "left_eye", "right_eye", "left_ear", "right_ear",
"left_shoulder", "right_shoulder", "left_elbow", "right_elbow",
"left_wrist", "right_wrist", "left_hip", "right_hip",
"left_knee", "right_knee", "left_ankle", "right_ankle"]
for j, name in enumerate(names):
if j < len(kps):
x, y = float(kps[j][0]), float(kps[j][1])
c = float(confs[j]) if j < len(confs) else 0
keypoints.append({"name": name, "x": x, "y": y, "confidence": c})
if keypoints:
xs = [k["x"] for k in keypoints if k["confidence"] > 0.1]
ys = [k["y"] for k in keypoints if k["confidence"] > 0.1]
bbox = {"x": int(min(xs)), "y": int(min(ys)), "width": int(max(xs)-min(xs)), "height": int(max(ys)-min(ys))} if xs else {"x": 0, "y": 0, "width": 0, "height": 0}
persons.append({"keypoints": keypoints, "bbox": bbox})
if persons:
frames.append({"frame": frame_count, "timestamp": ts, "persons": persons})
frame_count += 1
cap.release()
result = {"frame_count": len(frames), "fps": fps, "frames": frames}
with open(output_path, "w") as f:
json.dump(result, f, indent=2)
return result
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pose Processor (Swift Vision)")
parser.add_argument("video_path")
parser.add_argument("output_path")
parser.add_argument("--uuid", "-u", default="")
parser.add_argument("--sample-interval", type=int, default=30)
args = parser.parse_args()
result = process_pose(args.video_path, args.output_path, args.uuid, args.sample_interval)
with open(args.output_path, "w") as f:
json.dump(result, f, indent=2)
print(f"Pose: {len(result.get('frames', []))} frames with poses")