#!/opt/homebrew/bin/python3.11 """ Momentry Eye — Multi-model vision detection agent Models: grounding-dino (default), paligemma Usage: python3 scripts/vision_agent.py curl localhost:5052/health curl localhost:5052/detect -d '{"time":5461,"prompt":"gun","model":"grounding-dino"}' curl localhost:5052/search -d '{"query":"find the gun","model":"paligemma"}' """ import json, os, sys, time, cv2, torch, re, psycopg2, threading from PIL import Image, ImageDraw from flask import Flask, request, jsonify, send_file app = Flask(__name__) DB_URL = "postgresql://accusys@localhost:5432/momentry?host=/tmp" BASE_DIR = "/Users/accusys/momentry/output_dev" SHOTS_DIR = os.path.join(BASE_DIR, "vision_shots") os.makedirs(SHOTS_DIR, exist_ok=True) PORT = int(os.environ.get("VISION_AGENT_PORT", 5052)) DEVICE = "mps" if torch.backends.mps.is_available() else "cpu" VIDEO_PATHS = { "aeed71342a899fe4b4c57b7d41bcb692": "/Users/accusys/momentry/var/sftpgo/data/demo/Charade (1963) Cary Grant & Audrey Hepburn \uff5c Comedy Mystery Romance Thriller \uff5c Full Movie.mp4", } # ======================== Model Registry ======================== MODELS = {} # name -> {"model": obj, "processor": obj, "info": dict} def load_gdino(): """Load Grounding DINO Base.""" from transformers import AutoProcessor, AutoModelForZeroShotObjectDetection print("[GDINO] Loading...") t0 = time.time() proc = AutoProcessor.from_pretrained("IDEA-Research/grounding-dino-base") model = AutoModelForZeroShotObjectDetection.from_pretrained("IDEA-Research/grounding-dino-base").to(DEVICE) print(f"[GDINO] Loaded in {time.time()-t0:.1f}s") return { "model": model, "processor": proc, "info": { "name": "grounding-dino", "params_m": 232, "size_mb": 891, "resolution": 384, "has_confidence": True, "license": "Apache 2.0", } } def load_paligemma(): """Load PaliGemma 3B mix-224.""" from transformers import AutoProcessor, PaliGemmaForConditionalGeneration print("[PaliGemma] Loading...") t0 = time.time() proc = AutoProcessor.from_pretrained("google/paligemma-3b-mix-224") model = PaliGemmaForConditionalGeneration.from_pretrained( "google/paligemma-3b-mix-224", dtype=torch.bfloat16 ).to(DEVICE) print(f"[PaliGemma] Loaded in {time.time()-t0:.1f}s") return { "model": model, "processor": proc, "info": { "name": "paligemma", "params_m": 2923, "size_mb": 3000, "resolution": 224, "has_confidence": False, "license": "Gemma license", } } MODEL_REGISTRY = { "grounding-dino": load_gdino, "paligemma": load_paligemma, } def get_model(name): """Lazy-load and cache a model by name.""" if name not in MODELS: if name not in MODEL_REGISTRY: return None MODELS[name] = MODEL_REGISTRY[name]() return MODELS[name] # ======================== Inference ======================== def infer_gdino(img, prompt, threshold=0.1): """Grounding DINO inference. Returns [{bbox, score, label}].""" m = get_model("grounding-dino") inputs = m["processor"](images=img, text=f"{prompt}.", return_tensors="pt").to(DEVICE) with torch.no_grad(): outputs = m["model"](**inputs) dets = m["processor"].post_process_grounded_object_detection( outputs, threshold=threshold, target_sizes=[img.size[::-1]])[0] results = [] for i in range(len(dets["boxes"])): results.append({ "bbox": [round(v, 1) for v in dets["boxes"][i].tolist()], "score": round(dets["scores"][i].item(), 3), "label": prompt, }) return results def infer_paligemma(img, prompt, threshold=0.1): """PaliGemma inference. Returns [{bbox, label}] — no confidence scores.""" m = get_model("paligemma") inputs = m["processor"](text=f"detect {prompt}", images=img, return_tensors="pt").to(DEVICE) with torch.no_grad(): outputs = m["model"].generate(**inputs, max_new_tokens=100) result = m["processor"].decode(outputs[0], skip_special_tokens=True) # Parse PaliGemma output format: label locs = re.findall(r'', result) results = [] if len(locs) >= 4: n_dets = len(locs) // 4 # Extract labels (text between bbox tokens) labels = re.findall(r'>\s*(\w+)\s*<|>\s*(\w+)$', result.replace('detect ' + prompt, '')) for i in range(n_dets): idx = i * 4 # Convert PaliGemma loc tokens to image coordinates (0-1024 range) img_w, img_h = img.size x1 = int(locs[idx]) / 1024 * img_w y1 = int(locs[idx+1]) / 1024 * img_h x2 = int(locs[idx+2]) / 1024 * img_w y2 = int(locs[idx+3]) / 1024 * img_h results.append({ "bbox": [round(x1, 1), round(y1, 1), round(x2, 1), round(y2, 1)], "score": 1.0, "label": prompt, }) return results INFERENCE = { "grounding-dino": infer_gdino, "paligemma": infer_paligemma, } # ======================== Utilities ======================== def find_video(uuid): if uuid in VIDEO_PATHS: return VIDEO_PATHS[uuid] import glob base = "/Users/accusys/momentry/var/sftpgo/data/demo" for f in glob.glob(f"{base}/**/Charade*", recursive=True): if f.endswith((".mp4", ".mov", ".avi")): VIDEO_PATHS[uuid] = f; return f for f in glob.glob(f"{base}/**/*{uuid[:8]}*", recursive=True): if f.endswith((".mp4", ".mov", ".avi")): VIDEO_PATHS[uuid] = f; return f return None def parse_query(query): query = query.lower().strip() prefixes = ["find ", "show ", "search ", "where is ", "where are ", "looking for ", "detect ", "locate ", "spot ", "scan for "] for p in prefixes: if query.startswith(p): query = query[len(p):] for a in ["a ", "an ", "the ", "some ", "any "]: if query.startswith(a): query = query[len(a):] query = query.rstrip(".?!,") for s in [" in the image", " in this scene", " in the picture", " being held", " in hand", " in frame", " please"]: if query.endswith(s): query = query[: -len(s)] return query.strip() def resolve_target(target_str): if not target_str or ":" not in target_str: return None parts = target_str.split(":", 1) if len(parts) != 2: return None uuid, identifier = parts conn = psycopg2.connect(DB_URL) cur = conn.cursor() cur.execute("SELECT start_time, end_time FROM dev.chunks WHERE file_uuid=%s AND chunk_id=%s LIMIT 1", (uuid, identifier)) row = cur.fetchone() if row: cur.close(); conn.close(); return (uuid, float(row[0]), float(row[1])) if identifier.isdigit(): cid = f"{uuid}_{identifier}" cur.execute("SELECT start_time, end_time FROM dev.chunks WHERE file_uuid=%s AND chunk_id=%s LIMIT 1", (uuid, cid)) row = cur.fetchone() if row: cur.close(); conn.close(); return (uuid, float(row[0]), float(row[1])) tid = identifier.replace("trace_", "") cur.execute("SELECT MIN(start_time), MAX(end_time) FROM dev.chunks WHERE file_uuid=%s AND chunk_type='trace' AND chunk_id LIKE %s", (uuid, f"%_trace_{tid}")) row = cur.fetchone() if row and row[0] is not None: cur.close(); conn.close(); return (uuid, float(row[0]), float(row[1])) cur.close(); conn.close() return None def register_resource(resource_id, name, info): try: conn = psycopg2.connect(DB_URL) cur = conn.cursor() cur.execute(""" INSERT INTO dev.resources (resource_id, resource_type, category, capabilities, config, metadata, status, last_heartbeat) VALUES (%s, %s, %s, %s::jsonb, %s::jsonb, %s::jsonb, %s, NOW()) ON CONFLICT (resource_id) DO UPDATE SET status=%s, last_heartbeat=NOW(), config=EXCLUDED.config """, ( resource_id, "vision_model", "object_detection", json.dumps({"detect": "Single-frame detection", "search": "Range search with NL query", "has_confidence": info.get("has_confidence", True)}), json.dumps({"name": name, "port": PORT, "device": DEVICE, "params_m": info.get("params_m"), "resolution": info.get("resolution"), "license": info.get("license")}), json.dumps({"version": "2.0", "docs": "/health"}), "online", "online")) conn.commit(); cur.close(); conn.close() print(f"[Resource] Registered '{resource_id}'") except Exception as e: print(f"[Resource] Register '{resource_id}' failed: {e}") def heartbeat_loop(resource_ids): while True: try: conn = psycopg2.connect(DB_URL) cur = conn.cursor() for rid in resource_ids: cur.execute("UPDATE dev.resources SET last_heartbeat = NOW() WHERE resource_id = %s", (rid,)) conn.commit(); cur.close(); conn.close() except: pass time.sleep(60) # ======================== Annotate ======================== def annotate_image(img, detections, prompt): draw = ImageDraw.Draw(img) for d in detections: b = d["bbox"] score = d.get("score", 1.0) draw.rectangle(b, outline="lime", width=3) draw.text((b[0], b[1]-18), f"{prompt} {score:.2f}", fill="lime") return img # ======================== API Routes ======================== @app.route("/models", methods=["GET"]) def list_models(): """List available models and their status.""" result = [] for name, loader in MODEL_REGISTRY.items(): cached = name in MODELS info = dict(MODELS[name]["info"]) if cached else {"name": name, "loaded": False} info["loaded"] = cached result.append(info) return jsonify({"models": result}) # Default fusion weights: GDINO 0.6, PaliGemma 0.4 FUSION_WEIGHTS = {"grounding-dino": 0.6, "paligemma": 0.4} @app.route("/detect", methods=["POST"]) def detect(): data = request.json or {} uuid = data.get("uuid", "aeed71342a899fe4b4c57b7d41bcb692") t_sec = data.get("time", 0) prompt = data.get("prompt", "gun") model_name = data.get("model", "grounding-dino") threshold = data.get("threshold", 0.1) weights = data.get("weights", None) # e.g. {"grounding-dino":0.7,"paligemma":0.3} fusion_weights = weights if weights else \ ({model_name: 1.0} if model_name != "fusion" else FUSION_WEIGHTS) # Determine which models to run if model_name == "fusion": models_to_run = list(INFERENCE.keys()) elif model_name in INFERENCE: models_to_run = [model_name] else: return jsonify({"error": f"Unknown model: {model_name}"}), 400 video = find_video(uuid) if not video: return jsonify({"error": "Video not found"}), 404 cap = cv2.VideoCapture(video) cap.set(cv2.CAP_PROP_POS_FRAMES, int(t_sec * (cap.get(cv2.CAP_PROP_FPS) or 25.0))) ret, frame = cap.read() cap.release() if not ret: return jsonify({"error": f"Cannot read frame at {t_sec}s"}), 400 img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) all_detections = {} fusion_results = [] t0 = time.time() for mn in models_to_run: if mn not in INFERENCE: continue detections = INFERENCE[mn](img, prompt, threshold) all_detections[mn] = detections w = fusion_weights.get(mn, 0.5) for d in detections: gdino_score = d.get("score", 1.0) # PaliGemma has no score, treat detected=1.0 model_score = gdino_score if mn == "grounding-dino" else 1.0 fused = round(model_score * w, 3) fusion_results.append({ "bbox": d["bbox"], "label": d["label"], "score": model_score, "fused_score": fused, "source_model": mn, }) infer_ms = (time.time() - t0) * 1000 # Deduplicate by bbox IOU for fusion mode if model_name == "fusion" and len(fusion_results) > 1: deduped = [] fusion_results.sort(key=lambda x: -x["fused_score"]) for r in fusion_results: overlap = False for d in deduped: b1, b2 = r["bbox"], d["bbox"] iou = calc_iou(b1, b2) if iou > 0.5: overlap = True break if not overlap: deduped.append(r) fusion_results = deduped # Annotate with best result display_dets = [{"bbox": r["bbox"], "score": r["fused_score"], "label": prompt} for r in fusion_results] if model_name != "fusion": display_dets = all_detections.get(model_name, []) img_ann = annotate_image(img.copy(), display_dets, prompt) shot_name = f"{uuid[:8]}_{int(t_sec)}s_{prompt}_{model_name}.jpg" img_ann.save(os.path.join(SHOTS_DIR, shot_name)) return jsonify({ "model": model_name, "fusion_weights": fusion_weights, "models_used": models_to_run, "per_model": {mn: {"detections": all_detections.get(mn, []), "n_detections": len(all_detections.get(mn, []))} for mn in models_to_run}, "fusion": fusion_results if model_name == "fusion" else None, "detections": display_dets, "time_ms": round(infer_ms, 1), "n_detections": len(display_dets), "shot_url": f"/shots/{shot_name}", }) def calc_iou(b1, b2): xi1 = max(b1[0], b2[0]); yi1 = max(b1[1], b2[1]) xi2 = min(b1[2], b2[2]); yi2 = min(b1[3], b2[3]) inter = max(0, xi2 - xi1) * max(0, yi2 - yi1) a1 = (b1[2]-b1[0])*(b1[3]-b1[1]) a2 = (b2[2]-b2[0])*(b2[3]-b2[1]) return inter / (a1 + a2 - inter + 1e-10) @app.route("/search", methods=["POST"]) def search(): data = request.json or {} uuid = data.get("uuid", "aeed71342a899fe4b4c57b7d41bcb692") target_str = data.get("target", "") query = data.get("query", "find the gun") range_str = data.get("range", "0-6780") interval = data.get("interval", 30) threshold = data.get("threshold", 0.15) model_name = data.get("model", "grounding-dino") if model_name not in INFERENCE: return jsonify({"error": f"Unknown model: {model_name}. Available: {list(INFERENCE.keys())}"}), 400 # Parse query → object name prompt = parse_query(query) if not prompt: return jsonify({"error": f"Cannot parse query: {query}"}), 400 # Resolve target → time range resolved_label = "" if target_str: resolved = resolve_target(target_str) if not resolved: return jsonify({"error": f"Cannot resolve target: {target_str}"}), 404 uuid, range_start, range_end = resolved else: parts = range_str.split("-") if "-" in range_str else ["0", "6780"] range_start = float(parts[0]) range_end = float(parts[1]) if len(parts) > 1 else 6780 video = find_video(uuid) if not video: return jsonify({"error": "Video not found"}), 404 cap = cv2.VideoCapture(video) fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) hits = [] t_start = time.time() infer_fn = INFERENCE[model_name] frame_step = int(interval * fps) for frame_num in range(int(range_start * fps), min(int(range_end * fps), total_frames), frame_step): cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num) ret, frame = cap.read() if not ret: continue img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) detections = infer_fn(img, prompt, threshold) if detections: ts = frame_num / fps best = max(d.get("score", 1.0) for d in detections) hits.append({ "time": round(ts, 1), "time_str": f"{int(ts//60)}:{int(ts%60):02d}.{int((ts%1)*fps):02d}", "frame": frame_num, "n_detections": len(detections), "best_score": best, "detections": detections[:3], }) if len(hits) >= 100: break cap.release() elapsed = time.time() - t_start return jsonify({ "model": model_name, "query": query, "object": prompt, "target": target_str or None, "range": f"{range_start:.0f}-{range_end:.0f}", "interval_secs": interval, "hits": hits, "n_hits": len(hits), "elapsed_secs": round(elapsed, 1), }) @app.route("/multimodal", methods=["POST"]) def multimodal_search(): """Multi-modal search across all chunk types. For sentence chunks: ASR text + visual confirmation. For trace/story/cut chunks: visual detection only (no ASR text). Input: {"keyword":"gun"} — find chunks mentioning "gun" in ASR + visually confirm {"keyword":"gun","chunk_type":"trace"} — search trace chunks visually (no ASR) {"target":"file_uuid:chunk_id"} — search a specific chunk visually """ data = request.json or {} uuid = data.get("uuid", "aeed71342a899fe4b4c57b7d41bcb692") keyword = data.get("keyword", "") prompt = data.get("prompt", keyword or "") target_str = data.get("target", "") chunk_type = data.get("chunk_type", "sentence") # sentence, trace, story, cut threshold = data.get("threshold", 0.15) model_name = "grounding-dino" conn = psycopg2.connect(DB_URL) cur = conn.cursor() # Resolve target first if provided if target_str: resolved = resolve_target(target_str) if not resolved: return jsonify({"error": f"Cannot resolve target: {target_str}"}), 404 uuid, st, et = resolved cur.execute("SELECT chunk_id, chunk_index, chunk_type, text_content FROM dev.chunks WHERE file_uuid=%s AND start_time=%s AND end_time=%s LIMIT 1", (uuid, st, et)) chunks = [(r[0], r[1], r[2], st, et, r[3] or "") for r in cur.fetchall()] elif keyword and chunk_type == "sentence": # Search sentence chunks by ASR text keyword cur.execute(""" SELECT chunk_id, chunk_index, chunk_type, start_time, end_time, text_content FROM dev.chunks WHERE file_uuid=%s AND chunk_type='sentence' AND text_content ILIKE CONCAT('%%', %s, '%%') ORDER BY start_time """, (uuid, keyword)) chunks = cur.fetchall() else: # Search any chunk type by time range (visual only, no ASR) range_str = data.get("range", "0-6780") parts = range_str.split("-") if "-" in range_str else ["0", "6780"] rs, re = float(parts[0]), float(parts[1]) if len(parts) > 1 else 6780 cur.execute(""" SELECT chunk_id, chunk_index, chunk_type, start_time, end_time, COALESCE(text_content, '') FROM dev.chunks WHERE file_uuid=%s AND chunk_type=%s AND start_time BETWEEN %s AND %s ORDER BY start_time """, (uuid, chunk_type, rs, re)) chunks = cur.fetchall() conn.close() if not chunks: return jsonify({"error": f"No matching chunks found"}), 404 # Visual confirmation video = find_video(uuid) if not video: return jsonify({"error": "Video not found"}), 404 cap = cv2.VideoCapture(video) fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 infer_fn = INFERENCE.get(model_name) results = [] t_start = time.time() for chunk_id, chunk_idx, ctype, st, et, text in chunks: center = (st + et) / 2 frame_num = int(center * fps) cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num) ret, frame = cap.read() if not ret: continue img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) detections = infer_fn(img, prompt or keyword, threshold) entry = { "chunk_id": chunk_id, "chunk_index": chunk_idx, "chunk_type": ctype, "time_range": f"{st:.1f}-{et:.1f}", "time_str": f"{int(st//60)}:{int(st%60):02d}-{int(et//60)}:{int(et%60):02d}", "visual_confirmed": len(detections) > 0, "best_score": round(max(d.get("score", 1.0) for d in detections), 3) if detections else 0, "n_visual_dets": len(detections), } if keyword and ctype == "sentence": entry["asr_text"] = text[:150] entry["asr_matched"] = keyword.lower() in text.lower() results.append(entry) cap.release() elapsed = time.time() - t_start return jsonify({ "keyword": keyword or prompt, "chunk_type": chunk_type, "target": target_str or None, "total_chunks": len(chunks), "visual_confirmed": sum(1 for r in results if r["visual_confirmed"]), "asr_matched": sum(1 for r in results if r.get("asr_matched")), "elapsed_secs": round(elapsed, 1), "results": results, }) @app.route("/shots/") def serve_shot(filename): path = os.path.join(SHOTS_DIR, filename) if not os.path.exists(path): return jsonify({"error": "Not found"}), 404 return send_file(path, mimetype="image/jpeg") @app.route("/health") def health(): loaded = list(MODELS.keys()) available = list(MODEL_REGISTRY.keys()) return jsonify({ "status": "ok", "models_loaded": loaded, "models_available": available, "device": DEVICE, "port": PORT, }) if __name__ == "__main__": # Register both as resources gdino_info = {"params_m": 232, "resolution": 384, "has_confidence": True, "license": "Apache 2.0"} pg_info = {"params_m": 2923, "resolution": 224, "has_confidence": False, "license": "Gemma license"} register_resource("eye-gdino", "grounding-dino", gdino_info) register_resource("eye-paligemma", "paligemma", pg_info) # Start heartbeat t = threading.Thread(target=heartbeat_loop, args=(["eye-gdino", "eye-paligemma"],), daemon=True) t.start() # Pre-load grounding-dino by default print(f"\n{'='*60}") print(f" 👁️ Momentry Eye — port {PORT}") print(f"{'='*60}") print(f" Models: {', '.join(MODEL_REGISTRY.keys())}") print(f" Device: {DEVICE}") print(f" Resources: eye-gdino, eye-paligemma") print(f" Loading default model...") get_model("grounding-dino") print(f" 👁️ Ready: http://localhost:{PORT}") app.run(host="0.0.0.0", port=PORT, threaded=True)