- Add Appearance_Feature_System_V1.0.md design doc - Add proportion_calculator.py for body proportions (height, body shape) - Add feature_extractor.py for hierarchical feature extraction - Add tkg_level1_builder.py for TKG person_trace nodes - Fix mediapipe_holistic_processor.py to output Top-Left pixels - Add MediaPipe format conversion in proportion_calculator Coordinate system alignment: - Swift Pose: Top-Left pixels (Y-flip done in swift_pose.swift) - MediaPipe: Top-Left pixels (norm→pixel conversion added)
341 lines
11 KiB
Python
341 lines
11 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
TKG Level 1 Builder - Store Level 1 appearance features in TKG
|
|
|
|
Purpose:
|
|
1. Extract Level 1 features from pose.json + video frames
|
|
2. Store as person_trace nodes in TKG
|
|
3. Enable tracking via Level 1 feature similarity
|
|
|
|
Level 1 Features:
|
|
- body: overall color distribution
|
|
- head_top: hair color
|
|
- upper_body: upper clothing color
|
|
- lower_body: lower clothing color
|
|
|
|
Usage:
|
|
python tkg_level1_builder.py --file-uuid <uuid> [--schema <schema>]
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import json
|
|
import argparse
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
import cv2
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "utils"))
|
|
|
|
from utils.feature_extractor import HierarchicalFeatureExtractor
|
|
from utils.proportion_calculator import calculate_proportions, get_head_region
|
|
|
|
DB_URL = os.environ.get("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry")
|
|
SCHEMA = os.environ.get("DATABASE_SCHEMA", "dev")
|
|
OUTPUT_DIR = os.environ.get("MOMENTRY_OUTPUT_DIR", "/Users/accusys/momentry/output_dev")
|
|
|
|
|
|
def get_conn():
|
|
return psycopg2.connect(DB_URL)
|
|
|
|
|
|
def ensure_node(cur, schema, file_uuid, node_type, external_id, label="", properties=None):
|
|
"""Insert or get graph node"""
|
|
cur.execute(
|
|
f"""
|
|
INSERT INTO {schema}.tkg_nodes (node_type, external_id, file_uuid, label, properties)
|
|
VALUES (%s, %s, %s, %s, %s::jsonb)
|
|
ON CONFLICT (file_uuid, node_type, external_id)
|
|
DO UPDATE SET properties = COALESCE(EXCLUDED.properties, {schema}.tkg_nodes.properties),
|
|
label = COALESCE(NULLIF(EXCLUDED.label, ''), {schema}.tkg_nodes.label)
|
|
RETURNING id
|
|
""",
|
|
(node_type, str(external_id), file_uuid, label, json.dumps(properties or {})),
|
|
)
|
|
row = cur.fetchone()
|
|
return row[0]
|
|
|
|
|
|
def extract_level1_features(video_path, pose_json_path):
|
|
"""
|
|
Extract Level 1 features for each person in each frame
|
|
|
|
Args:
|
|
video_path: Path to video file
|
|
pose_json_path: Path to pose.json
|
|
|
|
Returns:
|
|
List of (frame, person_index, bbox, level1_features)
|
|
"""
|
|
with open(pose_json_path) as f:
|
|
pose_data = json.load(f)
|
|
|
|
cap = cv2.VideoCapture(video_path)
|
|
if not cap.isOpened():
|
|
print(f"[TKG-L1] Cannot open video: {video_path}", file=sys.stderr)
|
|
return []
|
|
|
|
fps = pose_data.get("fps", 30.0)
|
|
extractor = HierarchicalFeatureExtractor()
|
|
|
|
results = []
|
|
|
|
for pose_frame in pose_data.get("frames", []):
|
|
frame_num = pose_frame["frame"]
|
|
persons = pose_frame.get("persons", [])
|
|
|
|
if not persons:
|
|
continue
|
|
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
|
|
ret, frame = cap.read()
|
|
|
|
if not ret:
|
|
continue
|
|
|
|
for person_idx, person in enumerate(persons):
|
|
bbox = person.get("bbox", {})
|
|
keypoints = person.get("keypoints", [])
|
|
|
|
if bbox.get("width", 0) <= 0 or bbox.get("height", 0) <= 0:
|
|
continue
|
|
|
|
# Calculate proportions
|
|
proportions = calculate_proportions(keypoints, bbox)
|
|
|
|
# Get head region
|
|
head_region = get_head_region(keypoints)
|
|
|
|
# Extract Level 1 features
|
|
level1 = extractor.extract_level1(frame, bbox, head_region)
|
|
|
|
results.append({
|
|
"frame": frame_num,
|
|
"timestamp": pose_frame.get("timestamp", frame_num / fps),
|
|
"person_index": person_idx,
|
|
"bbox": bbox,
|
|
"proportions": proportions,
|
|
"level1_features": level1,
|
|
})
|
|
|
|
cap.release()
|
|
return results
|
|
|
|
|
|
def build_person_trace_nodes(cur, schema, file_uuid, level1_data):
|
|
"""
|
|
Build person_trace nodes with Level 1 features
|
|
|
|
Args:
|
|
cur: Database cursor
|
|
schema: Database schema
|
|
file_uuid: File UUID
|
|
level1_data: Level 1 extracted features
|
|
"""
|
|
print("[TKG-L1] Building person_trace nodes...")
|
|
|
|
# Group by person (assuming person_index consistency across frames)
|
|
person_groups = {}
|
|
for item in level1_data:
|
|
person_idx = item["person_index"]
|
|
if person_idx not in person_groups:
|
|
person_groups[person_idx] = []
|
|
person_groups[person_idx].append(item)
|
|
|
|
count = 0
|
|
for person_idx, items in person_groups.items():
|
|
if not items:
|
|
continue
|
|
|
|
# Aggregate Level 1 features (average across frames)
|
|
body_colors = []
|
|
head_colors = []
|
|
upper_colors = []
|
|
lower_colors = []
|
|
|
|
frames = []
|
|
bboxes = []
|
|
|
|
for item in items:
|
|
l1 = item["level1_features"]
|
|
frames.append(item["frame"])
|
|
bboxes.append(item["bbox"])
|
|
|
|
if "body" in l1 and "color" in l1["body"]:
|
|
body_colors.append(l1["body"]["color"].get("dominant_colors", []))
|
|
|
|
if "head_top" in l1 and "color" in l1["head_top"]:
|
|
head_colors.append(l1["head_top"]["color"].get("dominant_colors", []))
|
|
|
|
if "upper_body" in l1 and "color" in l1["upper_body"]:
|
|
upper_colors.append(l1["upper_body"]["color"].get("dominant_colors", []))
|
|
|
|
if "lower_body" in l1 and "color" in l1["lower_body"]:
|
|
lower_colors.append(l1["lower_body"]["color"].get("dominant_colors", []))
|
|
|
|
# Average dominant colors
|
|
avg_body_color = average_colors(body_colors) if body_colors else []
|
|
avg_head_color = average_colors(head_colors) if head_colors else []
|
|
avg_upper_color = average_colors(upper_colors) if upper_colors else []
|
|
avg_lower_color = average_colors(lower_colors) if lower_colors else []
|
|
|
|
# Build node properties
|
|
external_id = f"person_{person_idx}"
|
|
label = f"Person {person_idx}"
|
|
|
|
# Get average height and body shape
|
|
avg_height_estimate = {}
|
|
avg_body_shape = {}
|
|
|
|
for item in items:
|
|
props = item.get("proportions", {})
|
|
if "height_estimate" in props:
|
|
if not avg_height_estimate:
|
|
avg_height_estimate = props["height_estimate"]
|
|
if "body_shape" in props:
|
|
if not avg_body_shape:
|
|
avg_body_shape = props["body_shape"]
|
|
|
|
properties = {
|
|
"frame_count": len(frames),
|
|
"frames": frames,
|
|
"avg_bbox": average_bbox(bboxes) if bboxes else {},
|
|
"height_estimate": avg_height_estimate,
|
|
"body_shape": avg_body_shape,
|
|
"level1_features": {
|
|
"body": {
|
|
"dominant_colors": avg_body_color,
|
|
"h_mean": average_h_mean(items, "body"),
|
|
},
|
|
"head_top": {
|
|
"dominant_colors": avg_head_color,
|
|
"h_mean": average_h_mean(items, "head_top"),
|
|
},
|
|
"upper_body": {
|
|
"dominant_colors": avg_upper_color,
|
|
"h_mean": average_h_mean(items, "upper_body"),
|
|
},
|
|
"lower_body": {
|
|
"dominant_colors": avg_lower_color,
|
|
"h_mean": average_h_mean(items, "lower_body"),
|
|
},
|
|
},
|
|
}
|
|
|
|
# Store node
|
|
ensure_node(cur, schema, file_uuid, "person_trace", external_id, label, properties)
|
|
count += 1
|
|
print(f"[TKG-L1] Created person_trace node: {external_id} ({len(frames)} frames)")
|
|
|
|
print(f"[TKG-L1] Total: {count} person_trace nodes")
|
|
return count
|
|
|
|
|
|
def average_colors(color_lists):
|
|
"""Average multiple color lists"""
|
|
if not color_lists:
|
|
return []
|
|
|
|
valid_colors = [c for c in color_lists if c]
|
|
if not valid_colors:
|
|
return []
|
|
|
|
# Average first dominant color
|
|
first_colors = [c[0] if c else [0, 0, 0] for c in valid_colors]
|
|
avg = [sum(x) / len(x) for x in zip(*first_colors)]
|
|
return [round(x, 2) for x in avg]
|
|
|
|
|
|
def average_h_mean(items, region):
|
|
"""Average H mean from Level 1 items"""
|
|
h_means = []
|
|
for item in items:
|
|
l1 = item["level1_features"]
|
|
if region in l1 and "color" in l1[region]:
|
|
h_mean = l1[region]["color"].get("h_mean", 0)
|
|
if h_mean:
|
|
h_means.append(h_mean)
|
|
|
|
return round(sum(h_means) / len(h_means), 2) if h_means else 0
|
|
|
|
|
|
def average_bbox(bboxes):
|
|
"""Average bbox across frames"""
|
|
if not bboxes:
|
|
return {}
|
|
|
|
avg_x = sum(b.get("x", 0) for b in bboxes) / len(bboxes)
|
|
avg_y = sum(b.get("y", 0) for b in bboxes) / len(bboxes)
|
|
avg_w = sum(b.get("width", 0) for b in bboxes) / len(bboxes)
|
|
avg_h = sum(b.get("height", 0) for b in bboxes) / len(bboxes)
|
|
|
|
return {
|
|
"x": round(avg_x, 1),
|
|
"y": round(avg_y, 1),
|
|
"width": round(avg_w, 1),
|
|
"height": round(avg_h, 1),
|
|
}
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="TKG Level 1 Builder")
|
|
parser.add_argument("--file-uuid", "-u", required=True, help="File UUID")
|
|
parser.add_argument("--schema", "-s", default=SCHEMA, help="Database schema")
|
|
parser.add_argument("--video", "-v", help="Video path (optional, auto-detected)")
|
|
parser.add_argument("--pose-json", "-p", help="Pose JSON path (optional, auto-detected)")
|
|
args = parser.parse_args()
|
|
|
|
file_uuid = args.file_uuid
|
|
schema = args.schema
|
|
|
|
# Auto-detect paths
|
|
video_path = args.video or f"{OUTPUT_DIR}/{file_uuid}.mp4"
|
|
pose_json_path = args.pose_json or f"{OUTPUT_DIR}/{file_uuid}.pose.json"
|
|
|
|
# Check files exist
|
|
if not os.path.exists(video_path):
|
|
print(f"[TKG-L1] Video not found: {video_path}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
if not os.path.exists(pose_json_path):
|
|
print(f"[TKG-L1] Pose JSON not found: {pose_json_path}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
print(f"[TKG-L1] Processing: {file_uuid}")
|
|
print(f"[TKG-L1] Video: {video_path}")
|
|
print(f"[TKG-L1] Pose: {pose_json_path}")
|
|
|
|
# Extract Level 1 features
|
|
print("[TKG-L1] Extracting Level 1 features...")
|
|
level1_data = extract_level1_features(video_path, pose_json_path)
|
|
|
|
if not level1_data:
|
|
print("[TKG-L1] No Level 1 data extracted", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
print(f"[TKG-L1] Extracted: {len(level1_data)} frame-person pairs")
|
|
|
|
# Connect to DB
|
|
conn = get_conn()
|
|
cur = conn.cursor()
|
|
|
|
try:
|
|
# Build person_trace nodes
|
|
count = build_person_trace_nodes(cur, schema, file_uuid, level1_data)
|
|
|
|
conn.commit()
|
|
print(f"[TKG-L1] Success: {count} person_trace nodes created")
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
print(f"[TKG-L1] Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
finally:
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |