Files
momentry_core/scripts/tkg_level1_builder.py
Accusys 606f31f13c feat: add appearance feature system with coordinate/scale fixes
- Add Appearance_Feature_System_V1.0.md design doc
- Add proportion_calculator.py for body proportions (height, body shape)
- Add feature_extractor.py for hierarchical feature extraction
- Add tkg_level1_builder.py for TKG person_trace nodes
- Fix mediapipe_holistic_processor.py to output Top-Left pixels
- Add MediaPipe format conversion in proportion_calculator

Coordinate system alignment:
- Swift Pose: Top-Left pixels (Y-flip done in swift_pose.swift)
- MediaPipe: Top-Left pixels (norm→pixel conversion added)
2026-06-22 02:27:03 +08:00

341 lines
11 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
TKG Level 1 Builder - Store Level 1 appearance features in TKG
Purpose:
1. Extract Level 1 features from pose.json + video frames
2. Store as person_trace nodes in TKG
3. Enable tracking via Level 1 feature similarity
Level 1 Features:
- body: overall color distribution
- head_top: hair color
- upper_body: upper clothing color
- lower_body: lower clothing color
Usage:
python tkg_level1_builder.py --file-uuid <uuid> [--schema <schema>]
"""
import sys
import os
import json
import argparse
import psycopg2
import psycopg2.extras
import cv2
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "utils"))
from utils.feature_extractor import HierarchicalFeatureExtractor
from utils.proportion_calculator import calculate_proportions, get_head_region
DB_URL = os.environ.get("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry")
SCHEMA = os.environ.get("DATABASE_SCHEMA", "dev")
OUTPUT_DIR = os.environ.get("MOMENTRY_OUTPUT_DIR", "/Users/accusys/momentry/output_dev")
def get_conn():
return psycopg2.connect(DB_URL)
def ensure_node(cur, schema, file_uuid, node_type, external_id, label="", properties=None):
"""Insert or get graph node"""
cur.execute(
f"""
INSERT INTO {schema}.tkg_nodes (node_type, external_id, file_uuid, label, properties)
VALUES (%s, %s, %s, %s, %s::jsonb)
ON CONFLICT (file_uuid, node_type, external_id)
DO UPDATE SET properties = COALESCE(EXCLUDED.properties, {schema}.tkg_nodes.properties),
label = COALESCE(NULLIF(EXCLUDED.label, ''), {schema}.tkg_nodes.label)
RETURNING id
""",
(node_type, str(external_id), file_uuid, label, json.dumps(properties or {})),
)
row = cur.fetchone()
return row[0]
def extract_level1_features(video_path, pose_json_path):
"""
Extract Level 1 features for each person in each frame
Args:
video_path: Path to video file
pose_json_path: Path to pose.json
Returns:
List of (frame, person_index, bbox, level1_features)
"""
with open(pose_json_path) as f:
pose_data = json.load(f)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"[TKG-L1] Cannot open video: {video_path}", file=sys.stderr)
return []
fps = pose_data.get("fps", 30.0)
extractor = HierarchicalFeatureExtractor()
results = []
for pose_frame in pose_data.get("frames", []):
frame_num = pose_frame["frame"]
persons = pose_frame.get("persons", [])
if not persons:
continue
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
ret, frame = cap.read()
if not ret:
continue
for person_idx, person in enumerate(persons):
bbox = person.get("bbox", {})
keypoints = person.get("keypoints", [])
if bbox.get("width", 0) <= 0 or bbox.get("height", 0) <= 0:
continue
# Calculate proportions
proportions = calculate_proportions(keypoints, bbox)
# Get head region
head_region = get_head_region(keypoints)
# Extract Level 1 features
level1 = extractor.extract_level1(frame, bbox, head_region)
results.append({
"frame": frame_num,
"timestamp": pose_frame.get("timestamp", frame_num / fps),
"person_index": person_idx,
"bbox": bbox,
"proportions": proportions,
"level1_features": level1,
})
cap.release()
return results
def build_person_trace_nodes(cur, schema, file_uuid, level1_data):
"""
Build person_trace nodes with Level 1 features
Args:
cur: Database cursor
schema: Database schema
file_uuid: File UUID
level1_data: Level 1 extracted features
"""
print("[TKG-L1] Building person_trace nodes...")
# Group by person (assuming person_index consistency across frames)
person_groups = {}
for item in level1_data:
person_idx = item["person_index"]
if person_idx not in person_groups:
person_groups[person_idx] = []
person_groups[person_idx].append(item)
count = 0
for person_idx, items in person_groups.items():
if not items:
continue
# Aggregate Level 1 features (average across frames)
body_colors = []
head_colors = []
upper_colors = []
lower_colors = []
frames = []
bboxes = []
for item in items:
l1 = item["level1_features"]
frames.append(item["frame"])
bboxes.append(item["bbox"])
if "body" in l1 and "color" in l1["body"]:
body_colors.append(l1["body"]["color"].get("dominant_colors", []))
if "head_top" in l1 and "color" in l1["head_top"]:
head_colors.append(l1["head_top"]["color"].get("dominant_colors", []))
if "upper_body" in l1 and "color" in l1["upper_body"]:
upper_colors.append(l1["upper_body"]["color"].get("dominant_colors", []))
if "lower_body" in l1 and "color" in l1["lower_body"]:
lower_colors.append(l1["lower_body"]["color"].get("dominant_colors", []))
# Average dominant colors
avg_body_color = average_colors(body_colors) if body_colors else []
avg_head_color = average_colors(head_colors) if head_colors else []
avg_upper_color = average_colors(upper_colors) if upper_colors else []
avg_lower_color = average_colors(lower_colors) if lower_colors else []
# Build node properties
external_id = f"person_{person_idx}"
label = f"Person {person_idx}"
# Get average height and body shape
avg_height_estimate = {}
avg_body_shape = {}
for item in items:
props = item.get("proportions", {})
if "height_estimate" in props:
if not avg_height_estimate:
avg_height_estimate = props["height_estimate"]
if "body_shape" in props:
if not avg_body_shape:
avg_body_shape = props["body_shape"]
properties = {
"frame_count": len(frames),
"frames": frames,
"avg_bbox": average_bbox(bboxes) if bboxes else {},
"height_estimate": avg_height_estimate,
"body_shape": avg_body_shape,
"level1_features": {
"body": {
"dominant_colors": avg_body_color,
"h_mean": average_h_mean(items, "body"),
},
"head_top": {
"dominant_colors": avg_head_color,
"h_mean": average_h_mean(items, "head_top"),
},
"upper_body": {
"dominant_colors": avg_upper_color,
"h_mean": average_h_mean(items, "upper_body"),
},
"lower_body": {
"dominant_colors": avg_lower_color,
"h_mean": average_h_mean(items, "lower_body"),
},
},
}
# Store node
ensure_node(cur, schema, file_uuid, "person_trace", external_id, label, properties)
count += 1
print(f"[TKG-L1] Created person_trace node: {external_id} ({len(frames)} frames)")
print(f"[TKG-L1] Total: {count} person_trace nodes")
return count
def average_colors(color_lists):
"""Average multiple color lists"""
if not color_lists:
return []
valid_colors = [c for c in color_lists if c]
if not valid_colors:
return []
# Average first dominant color
first_colors = [c[0] if c else [0, 0, 0] for c in valid_colors]
avg = [sum(x) / len(x) for x in zip(*first_colors)]
return [round(x, 2) for x in avg]
def average_h_mean(items, region):
"""Average H mean from Level 1 items"""
h_means = []
for item in items:
l1 = item["level1_features"]
if region in l1 and "color" in l1[region]:
h_mean = l1[region]["color"].get("h_mean", 0)
if h_mean:
h_means.append(h_mean)
return round(sum(h_means) / len(h_means), 2) if h_means else 0
def average_bbox(bboxes):
"""Average bbox across frames"""
if not bboxes:
return {}
avg_x = sum(b.get("x", 0) for b in bboxes) / len(bboxes)
avg_y = sum(b.get("y", 0) for b in bboxes) / len(bboxes)
avg_w = sum(b.get("width", 0) for b in bboxes) / len(bboxes)
avg_h = sum(b.get("height", 0) for b in bboxes) / len(bboxes)
return {
"x": round(avg_x, 1),
"y": round(avg_y, 1),
"width": round(avg_w, 1),
"height": round(avg_h, 1),
}
def main():
parser = argparse.ArgumentParser(description="TKG Level 1 Builder")
parser.add_argument("--file-uuid", "-u", required=True, help="File UUID")
parser.add_argument("--schema", "-s", default=SCHEMA, help="Database schema")
parser.add_argument("--video", "-v", help="Video path (optional, auto-detected)")
parser.add_argument("--pose-json", "-p", help="Pose JSON path (optional, auto-detected)")
args = parser.parse_args()
file_uuid = args.file_uuid
schema = args.schema
# Auto-detect paths
video_path = args.video or f"{OUTPUT_DIR}/{file_uuid}.mp4"
pose_json_path = args.pose_json or f"{OUTPUT_DIR}/{file_uuid}.pose.json"
# Check files exist
if not os.path.exists(video_path):
print(f"[TKG-L1] Video not found: {video_path}", file=sys.stderr)
sys.exit(1)
if not os.path.exists(pose_json_path):
print(f"[TKG-L1] Pose JSON not found: {pose_json_path}", file=sys.stderr)
sys.exit(1)
print(f"[TKG-L1] Processing: {file_uuid}")
print(f"[TKG-L1] Video: {video_path}")
print(f"[TKG-L1] Pose: {pose_json_path}")
# Extract Level 1 features
print("[TKG-L1] Extracting Level 1 features...")
level1_data = extract_level1_features(video_path, pose_json_path)
if not level1_data:
print("[TKG-L1] No Level 1 data extracted", file=sys.stderr)
sys.exit(1)
print(f"[TKG-L1] Extracted: {len(level1_data)} frame-person pairs")
# Connect to DB
conn = get_conn()
cur = conn.cursor()
try:
# Build person_trace nodes
count = build_person_trace_nodes(cur, schema, file_uuid, level1_data)
conn.commit()
print(f"[TKG-L1] Success: {count} person_trace nodes created")
except Exception as e:
conn.rollback()
print(f"[TKG-L1] Error: {e}", file=sys.stderr)
sys.exit(1)
finally:
cur.close()
conn.close()
if __name__ == "__main__":
main()