Schema changes: dev.chunks->dev.chunk, remove old_chunk_id/chunk_index Correction: asr-1.json format, generate/apply scripts API: 37/37 endpoints fixed and tested Docs: HANDOVER_V2.0.md for M4
165 lines
5.4 KiB
Python
165 lines
5.4 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Scan Charade for hand-held objects using YOLO spatial overlap + pose wrist verification.
|
|
Strategy:
|
|
1. Sample frames at regular intervals
|
|
2. For each person, check if non-person objects overlap with hand area
|
|
3. Use pose wrist keypoints to verify hand position
|
|
4. Classify with Grounding DINO
|
|
"""
|
|
import json, sys, time, psycopg2
|
|
from collections import defaultdict, Counter
|
|
|
|
UUID = "aeed71342a899fe4b4c57b7d41bcb692"
|
|
DB_URL = "postgresql://accusys@localhost:5432/momentry?host=/tmp"
|
|
FPS = 25.0
|
|
SAMPLE_INTERVAL = 300 # every 300 frames = every 12s
|
|
HAND_RADIUS = 100 # pixels around wrist to check for objects
|
|
|
|
def iou(box1, box2):
|
|
"""Calculate intersection over union of two boxes [x,y,w,h]."""
|
|
x1, y1, w1, h1 = box1
|
|
x2, y2, w2, h2 = box2
|
|
xi1 = max(x1, x2)
|
|
yi1 = max(y1, y2)
|
|
xi2 = min(x1 + w1, x2 + w2)
|
|
yi2 = min(y1 + h1, y2 + h2)
|
|
inter = max(0, xi2 - xi1) * max(0, yi2 - yi1)
|
|
if inter == 0: return 0
|
|
area1 = w1 * h1
|
|
area2 = w2 * h2
|
|
union = area1 + area2 - inter
|
|
return inter / union if union > 0 else 0
|
|
|
|
print("=== Hand-held Object Scanner ===")
|
|
conn = psycopg2.connect(DB_URL)
|
|
cur = conn.cursor()
|
|
|
|
# Load pose wrist data (frame → wrist positions)
|
|
print("Loading pose wrist data...")
|
|
t0 = time.time()
|
|
cur.execute("""
|
|
SELECT start_frame, data
|
|
FROM dev.pre_chunks
|
|
WHERE file_uuid=%s AND processor_type='pose'
|
|
AND data->'persons' IS NOT NULL
|
|
ORDER BY start_frame
|
|
""", (UUID,))
|
|
pose_wrists = {} # frame → list of (x, y) wrist positions
|
|
for r in cur.fetchall():
|
|
frame = r[0]
|
|
persons = r[1].get("persons", [])
|
|
wrists = []
|
|
for p in persons:
|
|
for kp in p.get("keypoints", []):
|
|
name = kp.get("name", "")
|
|
if name in ("left_wrist", "right_wrist") and kp.get("confidence", 0) > 0.1:
|
|
wrists.append((kp["x"], kp["y"]))
|
|
if wrists:
|
|
pose_wrists[frame] = wrists
|
|
print(f" {len(pose_wrists)} frames with wrists ({time.time()-t0:.1f}s)")
|
|
|
|
# Scan YOLO frames
|
|
print(f"Scanning YOLO data (interval={SAMPLE_INTERVAL})...")
|
|
t0 = time.time()
|
|
|
|
# Get total frames
|
|
cur.execute("SELECT MAX(start_frame) FROM dev.pre_chunks WHERE file_uuid=%s AND processor_type='yolo'", (UUID,))
|
|
max_frame = cur.fetchone()[0] or 0
|
|
|
|
results = []
|
|
for frame_num in range(0, max_frame + 1, SAMPLE_INTERVAL):
|
|
# Get YOLO detections for this frame
|
|
cur.execute("""
|
|
SELECT data->'objects'
|
|
FROM dev.pre_chunks
|
|
WHERE file_uuid=%s AND processor_type='yolo' AND start_frame=%s
|
|
""", (UUID, frame_num))
|
|
yolo_row = cur.fetchone()
|
|
if not yolo_row or not yolo_row[0]:
|
|
continue
|
|
|
|
objects = yolo_row[0]
|
|
# Find persons
|
|
persons = [o for o in objects if o.get("class_name") == "person" and o.get("confidence", 0) > 0.5]
|
|
if not persons:
|
|
continue
|
|
|
|
# Find non-person objects
|
|
items = [o for o in objects if o.get("class_name") != "person" and o.get("confidence", 0) > 0.3]
|
|
if not items:
|
|
continue
|
|
|
|
# Get wrist positions for this frame
|
|
wrists = pose_wrists.get(frame_num, [])
|
|
|
|
ts = frame_num / FPS
|
|
frame_results = []
|
|
|
|
for item in items:
|
|
item_box = (item["x"], item["y"], item["width"], item["height"])
|
|
item_center_x = item["x"] + item["width"] / 2
|
|
item_center_y = item["y"] + item["height"] / 2
|
|
|
|
# Check if item is near any person
|
|
for person in persons:
|
|
person_box = (person["x"], person["y"], person["width"], person["height"])
|
|
overlap = iou(item_box, person_box)
|
|
|
|
if overlap > 0.01:
|
|
# Check if near a wrist (if pose data available)
|
|
near_hand = False
|
|
for wx, wy in wrists:
|
|
dist = ((item_center_x - wx) ** 2 + (item_center_y - wy) ** 2) ** 0.5
|
|
if dist < HAND_RADIUS:
|
|
near_hand = True
|
|
break
|
|
|
|
cls = item["class_name"]
|
|
conf = item.get("confidence", 0)
|
|
|
|
frame_results.append({
|
|
"frame": frame_num,
|
|
"timestamp": round(ts, 1),
|
|
"time_str": f"{int(ts//60)}:{int(ts%60):02d}",
|
|
"object": cls,
|
|
"confidence": round(conf, 3),
|
|
"near_hand": near_hand,
|
|
"overlap": round(overlap, 3),
|
|
})
|
|
|
|
if frame_results:
|
|
results.extend(frame_results)
|
|
|
|
elapsed = time.time() - t0
|
|
print(f" Scanned in {elapsed:.1f}s")
|
|
|
|
# Deduplicate
|
|
seen = set()
|
|
deduped = []
|
|
for r in results:
|
|
key = (r["frame"], r["object"])
|
|
if key not in seen:
|
|
seen.add(key)
|
|
deduped.append(r)
|
|
|
|
# Group by object type
|
|
by_object = defaultdict(list)
|
|
for r in deduped:
|
|
by_object[r["object"]].append(r)
|
|
|
|
print(f"\n=== Results: {len(deduped)} hand-held object detections ===")
|
|
print(f"{'Object':<20} {'Count':>6} {'Near hand':>12} {'Timestamps':<40}")
|
|
print("-"*80)
|
|
for obj, items in sorted(by_object.items(), key=lambda x: -len(x[1])):
|
|
near_hand = sum(1 for i in items if i["near_hand"])
|
|
ts_list = ", ".join(i["time_str"] for i in items[:5])
|
|
if len(items) > 5:
|
|
ts_list += f" ... (+{len(items)-5})"
|
|
print(f"{obj:<20} {len(items):>6} {near_hand:>8d} {ts_list:<40}")
|
|
|
|
# Save
|
|
json.dump(deduped, open("/Users/accusys/momentry/output_dev/handheld_objects.json", "w"), indent=2)
|
|
print(f"\nSaved to output_dev/handheld_objects.json")
|
|
conn.close()
|