#!/opt/homebrew/bin/python3.11 """ Pose Processor Wrapper Calls Swift Vision Framework pose (swift_pose) with fallback to YOLOv8 Pose. Uses VNDetectHumanBodyPoseRequest with ANE acceleration. """ import re import sys import json import os import subprocess import argparse sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from redis_publisher import RedisPublisher SWIFT_POSE_PATH = os.path.join( os.path.dirname(os.path.abspath(__file__)), "swift_processors/.build/debug/swift_pose" ) SWIFT_POSE_ALT = os.path.join( os.path.dirname(os.path.abspath(__file__)), "swift_processors/.build/arm64-apple-macosx/debug/swift_pose" ) SWIFT_POSE_PROGRESS_RE = re.compile(r"\[SwiftPose\] Progress:\s*(\d+)%") def process_pose( video_path: str, output_path: str, uuid: str = "", sample_interval: int = 30, publisher: RedisPublisher = None, ) -> dict: swift_bin = SWIFT_POSE_PATH if not os.path.exists(swift_bin): swift_bin = SWIFT_POSE_ALT if not os.path.exists(swift_bin): print("[Pose] Swift binary not found, using YOLOv8 fallback", file=sys.stderr) if publisher: publisher.error("pose", "Swift binary not found, using fallback") return _fallback(video_path, output_path, uuid, sample_interval) cmd = [swift_bin, video_path, output_path, "--sample-interval", str(sample_interval), "--uuid", uuid] print(f"[Pose] Running Swift Pose (Vision Framework)", file=sys.stderr) proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) last_pct = -1 for line in proc.stdout: line = line.strip() m = SWIFT_POSE_PROGRESS_RE.search(line) if m: pct = int(m.group(1)) if pct > last_pct: last_pct = pct print(f"[Pose] Progress: {pct}%", file=sys.stderr) if publisher: publisher.progress("pose", pct, 100, f"{pct}%") elif line: print(f" {line}", file=sys.stderr) stderr_output = proc.stderr.read() if stderr_output: print(stderr_output.strip(), file=sys.stderr) proc.wait() if proc.returncode != 0 or not os.path.exists(output_path): print(f"[Pose] Swift Pose failed (exit={proc.returncode}), falling back to YOLOv8", file=sys.stderr) if publisher: publisher.error("pose", f"Swift Pose failed, using fallback") return _fallback(video_path, output_path, uuid, sample_interval) with open(output_path) as f: return json.load(f) def _fallback(video_path, output_path, uuid, sample_interval): """Fallback to YOLOv8 Pose""" from ultralytics import YOLO import cv2 model = YOLO("yolov8n-pose.pt") cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) frame_count = 0 frames = [] while cap.isOpened(): ret, frame = cap.read() if not ret: break if frame_count % sample_interval == 0: ts = frame_count / fps if fps > 0 else 0 results = model(frame, verbose=False, device="cpu") persons = [] for r in results: if r.keypoints is None: continue for kp_data in r.keypoints: kps = kp_data.xy[0].cpu().numpy() if hasattr(kp_data, 'xy') else [] confs = kp_data.conf[0].cpu().numpy() if hasattr(kp_data, 'conf') else [] keypoints = [] names = ["nose", "left_eye", "right_eye", "left_ear", "right_ear", "left_shoulder", "right_shoulder", "left_elbow", "right_elbow", "left_wrist", "right_wrist", "left_hip", "right_hip", "left_knee", "right_knee", "left_ankle", "right_ankle"] for j, name in enumerate(names): if j < len(kps): x, y = float(kps[j][0]), float(kps[j][1]) c = float(confs[j]) if j < len(confs) else 0 keypoints.append({"name": name, "x": x, "y": y, "confidence": c}) if keypoints: xs = [k["x"] for k in keypoints if k["confidence"] > 0.1] ys = [k["y"] for k in keypoints if k["confidence"] > 0.1] bbox = {"x": int(min(xs)), "y": int(min(ys)), "width": int(max(xs)-min(xs)), "height": int(max(ys)-min(ys))} if xs else {"x": 0, "y": 0, "width": 0, "height": 0} persons.append({"keypoints": keypoints, "bbox": bbox}) if persons: frames.append({"frame": frame_count, "timestamp": ts, "persons": persons}) frame_count += 1 cap.release() result = {"frame_count": len(frames), "fps": fps, "frames": frames} with open(output_path, "w") as f: json.dump(result, f, indent=2) return result if __name__ == "__main__": parser = argparse.ArgumentParser(description="Pose Processor (Swift Vision)") parser.add_argument("video_path") parser.add_argument("output_path") parser.add_argument("--uuid", "-u", default="") parser.add_argument("--sample-interval", type=int, default=30) args = parser.parse_args() publisher = RedisPublisher(args.uuid) if args.uuid else None if publisher: publisher.info("pose", "POSE_START") result = process_pose(args.video_path, args.output_path, args.uuid, args.sample_interval, publisher) with open(args.output_path, "w") as f: json.dump(result, f, indent=2) print(f"Pose: {len(result.get('frames', []))} frames with poses") if publisher: publisher.complete("pose", f"{len(result.get('frames',[]))} frames")