#!/opt/homebrew/bin/python3.11 """ Scan Charade for hand-held objects using YOLO spatial overlap + pose wrist verification. Strategy: 1. Sample frames at regular intervals 2. For each person, check if non-person objects overlap with hand area 3. Use pose wrist keypoints to verify hand position 4. Classify with Grounding DINO """ import json, sys, time, psycopg2 from collections import defaultdict, Counter UUID = "aeed71342a899fe4b4c57b7d41bcb692" DB_URL = "postgresql://accusys@localhost:5432/momentry?host=/tmp" FPS = 25.0 SAMPLE_INTERVAL = 300 # every 300 frames = every 12s HAND_RADIUS = 100 # pixels around wrist to check for objects def iou(box1, box2): """Calculate intersection over union of two boxes [x,y,w,h].""" x1, y1, w1, h1 = box1 x2, y2, w2, h2 = box2 xi1 = max(x1, x2) yi1 = max(y1, y2) xi2 = min(x1 + w1, x2 + w2) yi2 = min(y1 + h1, y2 + h2) inter = max(0, xi2 - xi1) * max(0, yi2 - yi1) if inter == 0: return 0 area1 = w1 * h1 area2 = w2 * h2 union = area1 + area2 - inter return inter / union if union > 0 else 0 print("=== Hand-held Object Scanner ===") conn = psycopg2.connect(DB_URL) cur = conn.cursor() # Load pose wrist data (frame → wrist positions) print("Loading pose wrist data...") t0 = time.time() cur.execute(""" SELECT start_frame, data FROM dev.pre_chunks WHERE file_uuid=%s AND processor_type='pose' AND data->'persons' IS NOT NULL ORDER BY start_frame """, (UUID,)) pose_wrists = {} # frame → list of (x, y) wrist positions for r in cur.fetchall(): frame = r[0] persons = r[1].get("persons", []) wrists = [] for p in persons: for kp in p.get("keypoints", []): name = kp.get("name", "") if name in ("left_wrist", "right_wrist") and kp.get("confidence", 0) > 0.1: wrists.append((kp["x"], kp["y"])) if wrists: pose_wrists[frame] = wrists print(f" {len(pose_wrists)} frames with wrists ({time.time()-t0:.1f}s)") # Scan YOLO frames print(f"Scanning YOLO data (interval={SAMPLE_INTERVAL})...") t0 = time.time() # Get total frames cur.execute("SELECT MAX(start_frame) FROM dev.pre_chunks WHERE file_uuid=%s AND processor_type='yolo'", (UUID,)) max_frame = cur.fetchone()[0] or 0 results = [] for frame_num in range(0, max_frame + 1, SAMPLE_INTERVAL): # Get YOLO detections for this frame cur.execute(""" SELECT data->'objects' FROM dev.pre_chunks WHERE file_uuid=%s AND processor_type='yolo' AND start_frame=%s """, (UUID, frame_num)) yolo_row = cur.fetchone() if not yolo_row or not yolo_row[0]: continue objects = yolo_row[0] # Find persons persons = [o for o in objects if o.get("class_name") == "person" and o.get("confidence", 0) > 0.5] if not persons: continue # Find non-person objects items = [o for o in objects if o.get("class_name") != "person" and o.get("confidence", 0) > 0.3] if not items: continue # Get wrist positions for this frame wrists = pose_wrists.get(frame_num, []) ts = frame_num / FPS frame_results = [] for item in items: item_box = (item["x"], item["y"], item["width"], item["height"]) item_center_x = item["x"] + item["width"] / 2 item_center_y = item["y"] + item["height"] / 2 # Check if item is near any person for person in persons: person_box = (person["x"], person["y"], person["width"], person["height"]) overlap = iou(item_box, person_box) if overlap > 0.01: # Check if near a wrist (if pose data available) near_hand = False for wx, wy in wrists: dist = ((item_center_x - wx) ** 2 + (item_center_y - wy) ** 2) ** 0.5 if dist < HAND_RADIUS: near_hand = True break cls = item["class_name"] conf = item.get("confidence", 0) frame_results.append({ "frame": frame_num, "timestamp": round(ts, 1), "time_str": f"{int(ts//60)}:{int(ts%60):02d}", "object": cls, "confidence": round(conf, 3), "near_hand": near_hand, "overlap": round(overlap, 3), }) if frame_results: results.extend(frame_results) elapsed = time.time() - t0 print(f" Scanned in {elapsed:.1f}s") # Deduplicate seen = set() deduped = [] for r in results: key = (r["frame"], r["object"]) if key not in seen: seen.add(key) deduped.append(r) # Group by object type by_object = defaultdict(list) for r in deduped: by_object[r["object"]].append(r) print(f"\n=== Results: {len(deduped)} hand-held object detections ===") print(f"{'Object':<20} {'Count':>6} {'Near hand':>12} {'Timestamps':<40}") print("-"*80) for obj, items in sorted(by_object.items(), key=lambda x: -len(x[1])): near_hand = sum(1 for i in items if i["near_hand"]) ts_list = ", ".join(i["time_str"] for i in items[:5]) if len(items) > 5: ts_list += f" ... (+{len(items)-5})" print(f"{obj:<20} {len(items):>6} {near_hand:>8d} {ts_list:<40}") # Save json.dump(deduped, open("/Users/accusys/momentry/output_dev/handheld_objects.json", "w"), indent=2) print(f"\nSaved to output_dev/handheld_objects.json") conn.close()