#!/opt/homebrew/bin/python3.11 """ TKG Level 1 Builder - Store Level 1 appearance features in TKG Purpose: 1. Extract Level 1 features from pose.json + video frames 2. Store as person_trace nodes in TKG 3. Enable tracking via Level 1 feature similarity Level 1 Features: - body: overall color distribution - head_top: hair color - upper_body: upper clothing color - lower_body: lower clothing color Usage: python tkg_level1_builder.py --file-uuid [--schema ] """ import sys import os import json import argparse import psycopg2 import psycopg2.extras import cv2 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "utils")) from utils.feature_extractor import HierarchicalFeatureExtractor from utils.proportion_calculator import calculate_proportions, get_head_region DB_URL = os.environ.get("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry") SCHEMA = os.environ.get("DATABASE_SCHEMA", "dev") OUTPUT_DIR = os.environ.get("MOMENTRY_OUTPUT_DIR", "/Users/accusys/momentry/output_dev") def get_conn(): return psycopg2.connect(DB_URL) def ensure_node(cur, schema, file_uuid, node_type, external_id, label="", properties=None): """Insert or get graph node""" cur.execute( f""" INSERT INTO {schema}.tkg_nodes (node_type, external_id, file_uuid, label, properties) VALUES (%s, %s, %s, %s, %s::jsonb) ON CONFLICT (file_uuid, node_type, external_id) DO UPDATE SET properties = COALESCE(EXCLUDED.properties, {schema}.tkg_nodes.properties), label = COALESCE(NULLIF(EXCLUDED.label, ''), {schema}.tkg_nodes.label) RETURNING id """, (node_type, str(external_id), file_uuid, label, json.dumps(properties or {})), ) row = cur.fetchone() return row[0] def extract_level1_features(video_path, pose_json_path): """ Extract Level 1 features for each person in each frame Args: video_path: Path to video file pose_json_path: Path to pose.json Returns: List of (frame, person_index, bbox, level1_features) """ with open(pose_json_path) as f: pose_data = json.load(f) cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"[TKG-L1] Cannot open video: {video_path}", file=sys.stderr) return [] fps = pose_data.get("fps", 30.0) extractor = HierarchicalFeatureExtractor() results = [] for pose_frame in pose_data.get("frames", []): frame_num = pose_frame["frame"] persons = pose_frame.get("persons", []) if not persons: continue cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num) ret, frame = cap.read() if not ret: continue for person_idx, person in enumerate(persons): bbox = person.get("bbox", {}) keypoints = person.get("keypoints", []) if bbox.get("width", 0) <= 0 or bbox.get("height", 0) <= 0: continue # Calculate proportions proportions = calculate_proportions(keypoints, bbox) # Get head region head_region = get_head_region(keypoints) # Extract Level 1 features level1 = extractor.extract_level1(frame, bbox, head_region) results.append({ "frame": frame_num, "timestamp": pose_frame.get("timestamp", frame_num / fps), "person_index": person_idx, "bbox": bbox, "proportions": proportions, "level1_features": level1, }) cap.release() return results def build_person_trace_nodes(cur, schema, file_uuid, level1_data): """ Build person_trace nodes with Level 1 features Args: cur: Database cursor schema: Database schema file_uuid: File UUID level1_data: Level 1 extracted features """ print("[TKG-L1] Building person_trace nodes...") # Group by person (assuming person_index consistency across frames) person_groups = {} for item in level1_data: person_idx = item["person_index"] if person_idx not in person_groups: person_groups[person_idx] = [] person_groups[person_idx].append(item) count = 0 for person_idx, items in person_groups.items(): if not items: continue # Aggregate Level 1 features (average across frames) body_colors = [] head_colors = [] upper_colors = [] lower_colors = [] frames = [] bboxes = [] for item in items: l1 = item["level1_features"] frames.append(item["frame"]) bboxes.append(item["bbox"]) if "body" in l1 and "color" in l1["body"]: body_colors.append(l1["body"]["color"].get("dominant_colors", [])) if "head_top" in l1 and "color" in l1["head_top"]: head_colors.append(l1["head_top"]["color"].get("dominant_colors", [])) if "upper_body" in l1 and "color" in l1["upper_body"]: upper_colors.append(l1["upper_body"]["color"].get("dominant_colors", [])) if "lower_body" in l1 and "color" in l1["lower_body"]: lower_colors.append(l1["lower_body"]["color"].get("dominant_colors", [])) # Average dominant colors avg_body_color = average_colors(body_colors) if body_colors else [] avg_head_color = average_colors(head_colors) if head_colors else [] avg_upper_color = average_colors(upper_colors) if upper_colors else [] avg_lower_color = average_colors(lower_colors) if lower_colors else [] # Build node properties external_id = f"person_{person_idx}" label = f"Person {person_idx}" # Get average height and body shape avg_height_estimate = {} avg_body_shape = {} for item in items: props = item.get("proportions", {}) if "height_estimate" in props: if not avg_height_estimate: avg_height_estimate = props["height_estimate"] if "body_shape" in props: if not avg_body_shape: avg_body_shape = props["body_shape"] properties = { "frame_count": len(frames), "frames": frames, "avg_bbox": average_bbox(bboxes) if bboxes else {}, "height_estimate": avg_height_estimate, "body_shape": avg_body_shape, "level1_features": { "body": { "dominant_colors": avg_body_color, "h_mean": average_h_mean(items, "body"), }, "head_top": { "dominant_colors": avg_head_color, "h_mean": average_h_mean(items, "head_top"), }, "upper_body": { "dominant_colors": avg_upper_color, "h_mean": average_h_mean(items, "upper_body"), }, "lower_body": { "dominant_colors": avg_lower_color, "h_mean": average_h_mean(items, "lower_body"), }, }, } # Store node ensure_node(cur, schema, file_uuid, "person_trace", external_id, label, properties) count += 1 print(f"[TKG-L1] Created person_trace node: {external_id} ({len(frames)} frames)") print(f"[TKG-L1] Total: {count} person_trace nodes") return count def average_colors(color_lists): """Average multiple color lists""" if not color_lists: return [] valid_colors = [c for c in color_lists if c] if not valid_colors: return [] # Average first dominant color first_colors = [c[0] if c else [0, 0, 0] for c in valid_colors] avg = [sum(x) / len(x) for x in zip(*first_colors)] return [round(x, 2) for x in avg] def average_h_mean(items, region): """Average H mean from Level 1 items""" h_means = [] for item in items: l1 = item["level1_features"] if region in l1 and "color" in l1[region]: h_mean = l1[region]["color"].get("h_mean", 0) if h_mean: h_means.append(h_mean) return round(sum(h_means) / len(h_means), 2) if h_means else 0 def average_bbox(bboxes): """Average bbox across frames""" if not bboxes: return {} avg_x = sum(b.get("x", 0) for b in bboxes) / len(bboxes) avg_y = sum(b.get("y", 0) for b in bboxes) / len(bboxes) avg_w = sum(b.get("width", 0) for b in bboxes) / len(bboxes) avg_h = sum(b.get("height", 0) for b in bboxes) / len(bboxes) return { "x": round(avg_x, 1), "y": round(avg_y, 1), "width": round(avg_w, 1), "height": round(avg_h, 1), } def main(): parser = argparse.ArgumentParser(description="TKG Level 1 Builder") parser.add_argument("--file-uuid", "-u", required=True, help="File UUID") parser.add_argument("--schema", "-s", default=SCHEMA, help="Database schema") parser.add_argument("--video", "-v", help="Video path (optional, auto-detected)") parser.add_argument("--pose-json", "-p", help="Pose JSON path (optional, auto-detected)") args = parser.parse_args() file_uuid = args.file_uuid schema = args.schema # Auto-detect paths video_path = args.video or f"{OUTPUT_DIR}/{file_uuid}.mp4" pose_json_path = args.pose_json or f"{OUTPUT_DIR}/{file_uuid}.pose.json" # Check files exist if not os.path.exists(video_path): print(f"[TKG-L1] Video not found: {video_path}", file=sys.stderr) sys.exit(1) if not os.path.exists(pose_json_path): print(f"[TKG-L1] Pose JSON not found: {pose_json_path}", file=sys.stderr) sys.exit(1) print(f"[TKG-L1] Processing: {file_uuid}") print(f"[TKG-L1] Video: {video_path}") print(f"[TKG-L1] Pose: {pose_json_path}") # Extract Level 1 features print("[TKG-L1] Extracting Level 1 features...") level1_data = extract_level1_features(video_path, pose_json_path) if not level1_data: print("[TKG-L1] No Level 1 data extracted", file=sys.stderr) sys.exit(1) print(f"[TKG-L1] Extracted: {len(level1_data)} frame-person pairs") # Connect to DB conn = get_conn() cur = conn.cursor() try: # Build person_trace nodes count = build_person_trace_nodes(cur, schema, file_uuid, level1_data) conn.commit() print(f"[TKG-L1] Success: {count} person_trace nodes created") except Exception as e: conn.rollback() print(f"[TKG-L1] Error: {e}", file=sys.stderr) sys.exit(1) finally: cur.close() conn.close() if __name__ == "__main__": main()