Files
momentry_core/scripts/scan_handheld_objects.py
Accusys 39ba5ddf76 feat: Phase 1 handover - schema migration, correction mechanism, API fixes
Schema changes: dev.chunks->dev.chunk, remove old_chunk_id/chunk_index
Correction: asr-1.json format, generate/apply scripts
API: 37/37 endpoints fixed and tested
Docs: HANDOVER_V2.0.md for M4
2026-05-11 07:03:22 +08:00

165 lines
5.4 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Scan Charade for hand-held objects using YOLO spatial overlap + pose wrist verification.
Strategy:
1. Sample frames at regular intervals
2. For each person, check if non-person objects overlap with hand area
3. Use pose wrist keypoints to verify hand position
4. Classify with Grounding DINO
"""
import json, sys, time, psycopg2
from collections import defaultdict, Counter
UUID = "aeed71342a899fe4b4c57b7d41bcb692"
DB_URL = "postgresql://accusys@localhost:5432/momentry?host=/tmp"
FPS = 25.0
SAMPLE_INTERVAL = 300 # every 300 frames = every 12s
HAND_RADIUS = 100 # pixels around wrist to check for objects
def iou(box1, box2):
"""Calculate intersection over union of two boxes [x,y,w,h]."""
x1, y1, w1, h1 = box1
x2, y2, w2, h2 = box2
xi1 = max(x1, x2)
yi1 = max(y1, y2)
xi2 = min(x1 + w1, x2 + w2)
yi2 = min(y1 + h1, y2 + h2)
inter = max(0, xi2 - xi1) * max(0, yi2 - yi1)
if inter == 0: return 0
area1 = w1 * h1
area2 = w2 * h2
union = area1 + area2 - inter
return inter / union if union > 0 else 0
print("=== Hand-held Object Scanner ===")
conn = psycopg2.connect(DB_URL)
cur = conn.cursor()
# Load pose wrist data (frame → wrist positions)
print("Loading pose wrist data...")
t0 = time.time()
cur.execute("""
SELECT start_frame, data
FROM dev.pre_chunks
WHERE file_uuid=%s AND processor_type='pose'
AND data->'persons' IS NOT NULL
ORDER BY start_frame
""", (UUID,))
pose_wrists = {} # frame → list of (x, y) wrist positions
for r in cur.fetchall():
frame = r[0]
persons = r[1].get("persons", [])
wrists = []
for p in persons:
for kp in p.get("keypoints", []):
name = kp.get("name", "")
if name in ("left_wrist", "right_wrist") and kp.get("confidence", 0) > 0.1:
wrists.append((kp["x"], kp["y"]))
if wrists:
pose_wrists[frame] = wrists
print(f" {len(pose_wrists)} frames with wrists ({time.time()-t0:.1f}s)")
# Scan YOLO frames
print(f"Scanning YOLO data (interval={SAMPLE_INTERVAL})...")
t0 = time.time()
# Get total frames
cur.execute("SELECT MAX(start_frame) FROM dev.pre_chunks WHERE file_uuid=%s AND processor_type='yolo'", (UUID,))
max_frame = cur.fetchone()[0] or 0
results = []
for frame_num in range(0, max_frame + 1, SAMPLE_INTERVAL):
# Get YOLO detections for this frame
cur.execute("""
SELECT data->'objects'
FROM dev.pre_chunks
WHERE file_uuid=%s AND processor_type='yolo' AND start_frame=%s
""", (UUID, frame_num))
yolo_row = cur.fetchone()
if not yolo_row or not yolo_row[0]:
continue
objects = yolo_row[0]
# Find persons
persons = [o for o in objects if o.get("class_name") == "person" and o.get("confidence", 0) > 0.5]
if not persons:
continue
# Find non-person objects
items = [o for o in objects if o.get("class_name") != "person" and o.get("confidence", 0) > 0.3]
if not items:
continue
# Get wrist positions for this frame
wrists = pose_wrists.get(frame_num, [])
ts = frame_num / FPS
frame_results = []
for item in items:
item_box = (item["x"], item["y"], item["width"], item["height"])
item_center_x = item["x"] + item["width"] / 2
item_center_y = item["y"] + item["height"] / 2
# Check if item is near any person
for person in persons:
person_box = (person["x"], person["y"], person["width"], person["height"])
overlap = iou(item_box, person_box)
if overlap > 0.01:
# Check if near a wrist (if pose data available)
near_hand = False
for wx, wy in wrists:
dist = ((item_center_x - wx) ** 2 + (item_center_y - wy) ** 2) ** 0.5
if dist < HAND_RADIUS:
near_hand = True
break
cls = item["class_name"]
conf = item.get("confidence", 0)
frame_results.append({
"frame": frame_num,
"timestamp": round(ts, 1),
"time_str": f"{int(ts//60)}:{int(ts%60):02d}",
"object": cls,
"confidence": round(conf, 3),
"near_hand": near_hand,
"overlap": round(overlap, 3),
})
if frame_results:
results.extend(frame_results)
elapsed = time.time() - t0
print(f" Scanned in {elapsed:.1f}s")
# Deduplicate
seen = set()
deduped = []
for r in results:
key = (r["frame"], r["object"])
if key not in seen:
seen.add(key)
deduped.append(r)
# Group by object type
by_object = defaultdict(list)
for r in deduped:
by_object[r["object"]].append(r)
print(f"\n=== Results: {len(deduped)} hand-held object detections ===")
print(f"{'Object':<20} {'Count':>6} {'Near hand':>12} {'Timestamps':<40}")
print("-"*80)
for obj, items in sorted(by_object.items(), key=lambda x: -len(x[1])):
near_hand = sum(1 for i in items if i["near_hand"])
ts_list = ", ".join(i["time_str"] for i in items[:5])
if len(items) > 5:
ts_list += f" ... (+{len(items)-5})"
print(f"{obj:<20} {len(items):>6} {near_hand:>8d} {ts_list:<40}")
# Save
json.dump(deduped, open("/Users/accusys/momentry/output_dev/handheld_objects.json", "w"), indent=2)
print(f"\nSaved to output_dev/handheld_objects.json")
conn.close()