#!/opt/homebrew/bin/python3.11 """ Appearance Processor - Body part color extraction using pose keypoints Input: - video_path: source video - pose_json: pose.json with keypoints and bbox - output_path: output JSON Output: appearance.json with per-person per-frame body part colors Regions: head, neck, front_upper_body, front_lower_body, back_upper_body, back_lower_body, left_hand, right_hand, left_foot, right_foot """ import sys import os import json import argparse import cv2 import numpy as np def get_kp(keypoints, name): for kp in keypoints: if kp.get("name") == name: return (kp["x"], kp["y"], kp.get("confidence", 1.0)) return None def determine_facing(keypoints): nose = get_kp(keypoints, "nose") left_shoulder = get_kp(keypoints, "left_shoulder") right_shoulder = get_kp(keypoints, "right_shoulder") if nose and nose[2] > 0.5: return "front" sh_vis = sum(1 for s in [left_shoulder, right_shoulder] if s and s[2] > 0.5) if sh_vis >= 2 and (not nose or nose[2] < 0.2): return "back" if sh_vis >= 1: return "profile" return "unknown" def extract_color(roi_bgr): """Extract HSV histogram and dominant colors from an ROI""" if roi_bgr is None or roi_bgr.size == 0: return None if roi_bgr.shape[0] < 2 or roi_bgr.shape[1] < 2: return None hsv = cv2.cvtColor(roi_bgr, cv2.COLOR_BGR2HSV) pixels = hsv.reshape(-1, 3).astype(np.float32) h_hist = cv2.calcHist([hsv], [0], None, [30], [0, 180]).flatten() s_hist = cv2.calcHist([hsv], [1], None, [32], [0, 256]).flatten() v_hist = cv2.calcHist([hsv], [2], None, [32], [0, 256]).flatten() hs = h_hist.sum() or 1 ss = s_hist.sum() or 1 vs = v_hist.sum() or 1 dominant = [] if len(pixels) >= 5: criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 10, 1.0) _, labels, centers = cv2.kmeans(pixels, 5, None, criteria, 10, cv2.KMEANS_RANDOM_CENTERS) counts = np.bincount(labels.flatten()) dominant = centers[np.argsort(-counts)[:5]].tolist() elif len(pixels) > 0: dominant = [pixels.mean(axis=0).tolist()] return { "hsv_histogram": [(h_hist / hs).tolist(), (s_hist / ss).tolist(), (v_hist / vs).tolist()], "dominant_colors": dominant, } def safe_roi(frame, x, y, w, h): """Extract a safe ROI, returning None if invalid""" if w <= 0 or h <= 0: return None x1 = max(0, int(x)) y1 = max(0, int(y)) x2 = min(frame.shape[1], int(x + w)) y2 = min(frame.shape[0], int(y + h)) if x2 <= x1 or y2 <= y1: return None return frame[y1:y2, x1:x2] def compute_body_regions(keypoints, face_bbox, frame_shape): """Use face bbox for size, pose keypoints for alignment""" h, w = frame_shape[:2] fx, fy, fw, fh = face_bbox["x"], face_bbox["y"], face_bbox["width"], face_bbox["height"] face_cx = fx + fw / 2 nose = get_kp(keypoints, "nose") ls = get_kp(keypoints, "left_shoulder") rs = get_kp(keypoints, "right_shoulder") lw = get_kp(keypoints, "left_wrist") rw = get_kp(keypoints, "right_wrist") lh = get_kp(keypoints, "left_hip") rh = get_kp(keypoints, "right_hip") la = get_kp(keypoints, "left_ankle") ra = get_kp(keypoints, "right_ankle") kp_nose = (nose[0], nose[1]) if nose else (face_cx, fy + fh * 0.5) kp_sh_l = ls[0] if ls else (face_cx - fw * 1.5) kp_sh_r = rs[0] if rs else (face_cx + fw * 1.5) kp_sh_mid_x = (kp_sh_l + kp_sh_r) / 2 kp_sh_mid_y = ((ls[1] + rs[1]) / 2) if (ls and rs) else (fy + fh + fh * 0.3) kp_hip_y = ((lh[1] + rh[1]) / 2) if (lh and rh) else (kp_sh_mid_y + fw * 2.0) kp_hip_l = lh[0] if lh else (kp_sh_mid_x - fw * 1.2) kp_hip_r = rh[0] if rh else (kp_sh_mid_x + fw * 1.2) regions = {} # head: nose-aligned, face-proportional head_w = fw * 1.6 head_h = fh * 1.5 regions["head"] = { "x": kp_nose[0] - head_w / 2, "y": kp_nose[1] - head_h * 0.5, "width": head_w, "height": head_h, } # neck: nose-to-shoulder, face-width neck_w = fw * 1.5 regions["neck"] = { "x": kp_sh_mid_x - neck_w / 2, "y": kp_nose[1] + fh * 0.4, "width": neck_w, "height": max(kp_sh_mid_y - kp_nose[1] - fh * 0.4, fh * 0.3), } # upper body: shoulder-aligned ub_w = max(abs(kp_sh_r - kp_sh_l) * 1.3, fw * 3.0) ub_h = fh * 3.0 regions["front_upper_body"] = { "x": kp_sh_mid_x - ub_w / 2, "y": kp_sh_mid_y, "width": ub_w, "height": ub_h, } regions["back_upper_body"] = dict(regions["front_upper_body"]) # lower body: hip-aligned lb_w = max(abs(kp_hip_r - kp_hip_l) * 1.3, fw * 3.5) lb_h = fh * 3.0 regions["front_lower_body"] = { "x": kp_sh_mid_x - lb_w / 2, "y": kp_hip_y, "width": lb_w, "height": lb_h, } regions["back_lower_body"] = dict(regions["front_lower_body"]) # hands: wrist-aligned hs = fw * 1.0 if lw and lw[2] > 0.3: regions["left_hand"] = {"x": lw[0] - hs / 2, "y": lw[1] - hs / 2, "width": hs, "height": hs} else: regions["left_hand"] = {"x": kp_sh_l - hs, "y": kp_sh_mid_y + fh * 0.5, "width": hs, "height": hs} if rw and rw[2] > 0.3: regions["right_hand"] = {"x": rw[0] - hs / 2, "y": rw[1] - hs / 2, "width": hs, "height": hs} else: regions["right_hand"] = {"x": kp_sh_r, "y": kp_sh_mid_y + fh * 0.5, "width": hs, "height": hs} # feet: ankle-aligned fs = fw * 1.0 if la and la[2] > 0.3: regions["left_foot"] = {"x": la[0] - fs / 2, "y": la[1], "width": fs, "height": fs * 0.75} else: regions["left_foot"] = {"x": kp_sh_mid_x - fw * 1.0, "y": kp_hip_y + fh * 2.5, "width": fs, "height": fs * 0.75} if ra and ra[2] > 0.3: regions["right_foot"] = {"x": ra[0] - fs / 2, "y": ra[1], "width": fs, "height": fs * 0.75} else: regions["right_foot"] = {"x": kp_sh_mid_x + fw * 1.0 - fs, "y": kp_hip_y + fh * 2.5, "width": fs, "height": fs * 0.75} # Extrapolate each bbox outward expanded = {} margins = { "head": 0.10, "neck": 0.15, "front_upper_body": 0.20, "back_upper_body": 0.20, "front_lower_body": 0.15, "back_lower_body": 0.15, "left_hand": 0.25, "right_hand": 0.25, "left_foot": 0.20, "right_foot": 0.20, } for name, rb in regions.items(): m = margins.get(name, 0.15) dx = int(rb["width"] * m) dy = int(rb["height"] * m) expanded[name] = { "x": rb["x"] - dx, "y": rb["y"] - dy, "width": rb["width"] + dx * 2, "height": rb["height"] + dy * 2, } return expanded def filter_by_facing(regions, facing): if facing == "front": regions.pop("back_upper_body", None) regions.pop("back_lower_body", None) elif facing == "back": regions.pop("front_upper_body", None) regions.pop("front_lower_body", None) return regions def main(): parser = argparse.ArgumentParser(description="Appearance Processor") parser.add_argument("video_path") parser.add_argument("pose_json") parser.add_argument("output_path") parser.add_argument("--uuid", "-u", default="") args = parser.parse_args() with open(args.pose_json) as f: pose_data = json.load(f) # Load face.json for anchor bbox (same directory as pose_json) face_path = args.pose_json.replace(".pose.json", ".face.json") face_data = {} if os.path.exists(face_path): with open(face_path) as f: face_data = json.load(f) # Build frame -> face bbox lookup face_by_frame = {} for fframe in face_data.get("frames", []): fn = fframe.get("frame") faces = fframe.get("faces", []) if faces: face_by_frame[fn] = faces[0] # first face bbox fps = pose_data.get("fps", 30.0) cap = cv2.VideoCapture(args.video_path) if not cap.isOpened(): print("[APPEARANCE] Cannot open video", file=sys.stderr) sys.exit(1) frames_out = [] for pose_frame in pose_data.get("frames", []): frame_num = pose_frame["frame"] persons = pose_frame.get("persons", []) if not persons: continue cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num) ret, frame = cap.read() if not ret: continue # Get face bbox for this frame face_bbox = face_by_frame.get(frame_num, persons[0].get("bbox", {"x": 0, "y": 0, "width": 0, "height": 0})) frame_persons = [] for pid, person in enumerate(persons): keypoints = person.get("keypoints", []) bbox = person.get("bbox", {}) if not keypoints: continue facing = determine_facing(keypoints) all_regions = compute_body_regions(keypoints, face_bbox, frame.shape) regions = filter_by_facing(all_regions, facing) body_parts = [] for name, rb in regions.items(): roi = safe_roi(frame, rb["x"], rb["y"], rb["width"], rb["height"]) color = extract_color(roi) if color is None: continue body_parts.append({ "name": name, "bbox": rb, "hsv_histogram": color["hsv_histogram"], "dominant_colors": color["dominant_colors"], }) # Full bbox reference colors full = None if bbox.get("width", 0) > 0 and bbox.get("height", 0) > 0: full_roi = safe_roi(frame, bbox["x"], bbox["y"], bbox["width"], bbox["height"]) full = extract_color(full_roi) frame_persons.append({ "person_id": pid, "bbox": bbox, "facing": facing, "body_parts": body_parts, "dominant_colors": full["dominant_colors"] if full else [], "hsv_histogram": full["hsv_histogram"] if full else [[], [], []], }) if frame_persons: frames_out.append({ "frame": frame_num, "timestamp": pose_frame.get("timestamp", frame_num / fps), "persons": frame_persons, }) cap.release() output = {"frame_count": len(frames_out), "fps": fps, "frames": frames_out} with open(args.output_path, "w") as f: json.dump(output, f, indent=2, ensure_ascii=False) print(f"[APPEARANCE] Done: {len(frames_out)} frames") if __name__ == "__main__": main()