#!/opt/homebrew/bin/python3.11 """ Pose Processor Wrapper Calls Swift Vision Framework pose (swift_pose) with fallback to YOLOv8 Pose. Uses VNDetectHumanBodyPoseRequest with ANE acceleration. """ import sys import json import os import subprocess import argparse SWIFT_POSE_PATH = os.path.join( os.path.dirname(os.path.abspath(__file__)), "swift_processors/.build/debug/swift_pose" ) SWIFT_POSE_ALT = os.path.join( os.path.dirname(os.path.abspath(__file__)), "swift_processors/.build/arm64-apple-macosx/debug/swift_pose" ) def process_pose( video_path: str, output_path: str, uuid: str = "", sample_interval: int = 30, ) -> dict: swift_bin = SWIFT_POSE_PATH if not os.path.exists(swift_bin): swift_bin = SWIFT_POSE_ALT if not os.path.exists(swift_bin): print("[Pose] Swift binary not found, using YOLOv8 fallback", file=sys.stderr) return _fallback(video_path, output_path, uuid, sample_interval) cmd = [swift_bin, video_path, output_path, "--sample-interval", str(sample_interval), "--uuid", uuid] print(f"[Pose] Running Swift Pose (Vision Framework)", file=sys.stderr) result = subprocess.run(cmd, capture_output=True, text=True, timeout=7200) if result.stdout: for line in result.stdout.strip().split("\n"): print(f" {line}", file=sys.stderr) if result.stderr: for line in result.stderr.strip().split("\n"): print(f" {line}", file=sys.stderr) if result.returncode != 0 or not os.path.exists(output_path): print(f"[Pose] Swift Pose failed, falling back to YOLOv8", file=sys.stderr) return _fallback(video_path, output_path, uuid, sample_interval) with open(output_path) as f: return json.load(f) def _fallback(video_path, output_path, uuid, sample_interval): """Fallback to YOLOv8 Pose""" from ultralytics import YOLO import cv2 model = YOLO("yolov8n-pose.pt") cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) frame_count = 0 frames = [] while cap.isOpened(): ret, frame = cap.read() if not ret: break if frame_count % sample_interval == 0: ts = frame_count / fps if fps > 0 else 0 results = model(frame, verbose=False, device="cpu") persons = [] for r in results: if r.keypoints is None: continue for kp_data in r.keypoints: kps = kp_data.xy[0].cpu().numpy() if hasattr(kp_data, 'xy') else [] confs = kp_data.conf[0].cpu().numpy() if hasattr(kp_data, 'conf') else [] keypoints = [] names = ["nose", "left_eye", "right_eye", "left_ear", "right_ear", "left_shoulder", "right_shoulder", "left_elbow", "right_elbow", "left_wrist", "right_wrist", "left_hip", "right_hip", "left_knee", "right_knee", "left_ankle", "right_ankle"] for j, name in enumerate(names): if j < len(kps): x, y = float(kps[j][0]), float(kps[j][1]) c = float(confs[j]) if j < len(confs) else 0 keypoints.append({"name": name, "x": x, "y": y, "confidence": c}) if keypoints: xs = [k["x"] for k in keypoints if k["confidence"] > 0.1] ys = [k["y"] for k in keypoints if k["confidence"] > 0.1] bbox = {"x": int(min(xs)), "y": int(min(ys)), "width": int(max(xs)-min(xs)), "height": int(max(ys)-min(ys))} if xs else {"x": 0, "y": 0, "width": 0, "height": 0} persons.append({"keypoints": keypoints, "bbox": bbox}) if persons: frames.append({"frame": frame_count, "timestamp": ts, "persons": persons}) frame_count += 1 cap.release() result = {"frame_count": len(frames), "fps": fps, "frames": frames} with open(output_path, "w") as f: json.dump(result, f, indent=2) return result if __name__ == "__main__": parser = argparse.ArgumentParser(description="Pose Processor (Swift Vision)") parser.add_argument("video_path") parser.add_argument("output_path") parser.add_argument("--uuid", "-u", default="") parser.add_argument("--sample-interval", type=int, default=30) args = parser.parse_args() result = process_pose(args.video_path, args.output_path, args.uuid, args.sample_interval) with open(args.output_path, "w") as f: json.dump(result, f, indent=2) print(f"Pose: {len(result.get('frames', []))} frames with poses")