Phase 2.6.1: co_occurrence_edges migration - build_co_occurrence_edges_from_qdrant() - Qdrant embeddings → frame grouping → YOLO objects - Result: 6679 edges (vs 6701 PostgreSQL) Phase 2.6.2: face_face_edges migration - build_face_face_edges_from_qdrant() - Qdrant embeddings → frame grouping → face pairs - mutual_gaze detection preserved - Result: 6 edges (exact match) Phase 2.6.3: speaker_face_edges migration - build_speaker_face_edges_from_qdrant() - Qdrant embeddings → trace_id frame ranges - SPEAKS_AS edge creation Architecture: - All edges use Qdrant payload (no face_detections queries) - PostgreSQL fallback for empty Qdrant - Estimated 3.6x performance improvement Testing: - Playground (3003): ✓ All Phase 2.6 logs verified - Edge counts: ✓ Close match with PostgreSQL - Fallback: ✓ Working Docs: - docs_v1.0/DESIGN/TKG_PHASE2_6_EDGES_MIGRATION.md - docs_v1.0/M4_workspace/2026-06-21_phase2_6_test.md
158 lines
4.7 KiB
Python
158 lines
4.7 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Appearance Processor - HSV color feature extraction for person tracking
|
|
|
|
Input:
|
|
- video_path: source video
|
|
- pose_json: pose.json with frame bboxes
|
|
- output_path: output JSON
|
|
|
|
Output: appearance.json with HSV histogram per person per frame
|
|
|
|
Depends on pose.json (bbox). Same 0-based frame numbering as face/pose/mediapipe.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import json
|
|
import argparse
|
|
import cv2
|
|
import numpy as np
|
|
|
|
|
|
def extract_appearance(frame, bbox):
|
|
x, y, w, h = bbox["x"], bbox["y"], bbox["width"], bbox["height"]
|
|
if w <= 0 or h <= 0:
|
|
return None
|
|
|
|
x1, y1 = max(0, x), max(0, y)
|
|
x2 = min(frame.shape[1], x + w)
|
|
y2 = min(frame.shape[0], y + h)
|
|
if x2 <= x1 or y2 <= y1:
|
|
return None
|
|
|
|
person_roi = frame[y1:y2, x1:x2]
|
|
hsv = cv2.cvtColor(person_roi, cv2.COLOR_BGR2HSV)
|
|
pixels = hsv.reshape(-1, 3).astype(np.float32)
|
|
|
|
# HSV histograms
|
|
h_hist = cv2.calcHist([hsv], [0], None, [30], [0, 180]).flatten()
|
|
s_hist = cv2.calcHist([hsv], [1], None, [32], [0, 256]).flatten()
|
|
v_hist = cv2.calcHist([hsv], [2], None, [32], [0, 256]).flatten()
|
|
h_sum = h_hist.sum() or 1
|
|
s_sum = s_hist.sum() or 1
|
|
v_sum = v_hist.sum() or 1
|
|
|
|
# Dominant colors via k-means
|
|
dominant = []
|
|
if len(pixels) >= 5:
|
|
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 10, 1.0)
|
|
_, labels, centers = cv2.kmeans(
|
|
pixels, 5, None, criteria, 10, cv2.KMEANS_RANDOM_CENTERS
|
|
)
|
|
counts = np.bincount(labels.flatten())
|
|
dominant = centers[np.argsort(-counts)[:5]].tolist()
|
|
elif len(pixels) > 0:
|
|
dominant = [pixels.mean(axis=0).tolist()]
|
|
|
|
# Upper / lower body split
|
|
mid_y = y1 + (y2 - y1) // 2
|
|
|
|
def roi_hist(roi):
|
|
if roi is None or roi.size == 0:
|
|
return None
|
|
hsv_r = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
|
|
hh = cv2.calcHist([hsv_r], [0], None, [30], [0, 180]).flatten()
|
|
sh = cv2.calcHist([hsv_r], [1], None, [32], [0, 256]).flatten()
|
|
vh = cv2.calcHist([hsv_r], [2], None, [32], [0, 256]).flatten()
|
|
hs = hh.sum() or 1
|
|
ss = sh.sum() or 1
|
|
vs = vh.sum() or 1
|
|
return [(hh / hs).tolist(), (sh / ss).tolist(), (vh / vs).tolist()]
|
|
|
|
upper_roi = frame[y1:mid_y, x1:x2] if mid_y > y1 else None
|
|
lower_roi = frame[mid_y:y2, x1:x2] if y2 > mid_y else None
|
|
|
|
return {
|
|
"hsv_histogram": [
|
|
(h_hist / h_sum).tolist(),
|
|
(s_hist / s_sum).tolist(),
|
|
(v_hist / v_sum).tolist(),
|
|
],
|
|
"dominant_colors": dominant,
|
|
"upper_body": roi_hist(upper_roi),
|
|
"lower_body": roi_hist(lower_roi),
|
|
}
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Appearance Processor")
|
|
parser.add_argument("video_path", help="Video file path")
|
|
parser.add_argument("pose_json", help="Pose JSON path (bbox input)")
|
|
parser.add_argument("output_path", help="Output JSON path")
|
|
parser.add_argument("--uuid", "-u", default="")
|
|
args = parser.parse_args()
|
|
|
|
with open(args.pose_json) as f:
|
|
pose_data = json.load(f)
|
|
|
|
fps = pose_data.get("fps", 30.0)
|
|
|
|
cap = cv2.VideoCapture(args.video_path)
|
|
if not cap.isOpened():
|
|
print("[APPEARANCE] Cannot open video", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
frames_out = []
|
|
for pose_frame in pose_data.get("frames", []):
|
|
frame_num = pose_frame["frame"]
|
|
persons = pose_frame.get("persons", [])
|
|
if not persons:
|
|
continue
|
|
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
continue
|
|
|
|
frame_persons = []
|
|
for pid, person in enumerate(persons):
|
|
bbox = person.get("bbox", {})
|
|
if bbox.get("width", 0) <= 0 or bbox.get("height", 0) <= 0:
|
|
continue
|
|
appearance = extract_appearance(frame, bbox)
|
|
if appearance is None:
|
|
continue
|
|
frame_persons.append(
|
|
{
|
|
"person_id": pid,
|
|
"bbox": bbox,
|
|
**appearance,
|
|
}
|
|
)
|
|
|
|
if frame_persons:
|
|
frames_out.append(
|
|
{
|
|
"frame": frame_num,
|
|
"timestamp": pose_frame.get("timestamp", frame_num / fps),
|
|
"persons": frame_persons,
|
|
}
|
|
)
|
|
|
|
cap.release()
|
|
|
|
output = {
|
|
"frame_count": len(frames_out),
|
|
"fps": fps,
|
|
"frames": frames_out,
|
|
}
|
|
with open(args.output_path, "w") as f:
|
|
json.dump(output, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"[APPEARANCE] Done: {len(frames_out)} frames")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|