Files
momentry_core/scripts/gdino_comparison_test.py
Accusys 39ba5ddf76 feat: Phase 1 handover - schema migration, correction mechanism, API fixes
Schema changes: dev.chunks->dev.chunk, remove old_chunk_id/chunk_index
Correction: asr-1.json format, generate/apply scripts
API: 37/37 endpoints fixed and tested
Docs: HANDOVER_V2.0.md for M4
2026-05-11 07:03:22 +08:00

143 lines
5.2 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Grounding DINO Base vs Large comparison test.
Both use Swin-B backbone; Large trained on 7 datasets vs Base's 3.
"""
import json, os, sys, time, cv2, torch
from PIL import Image
from transformers import AutoProcessor, AutoModelForZeroShotObjectDetection
VIDEO = "/Users/accusys/momentry/var/sftpgo/data/demo/Charade (1963) Cary Grant & Audrey Hepburn \uff5c Comedy Mystery Romance Thriller \uff5c Full Movie.mp4"
OUTPUT_DIR = "/Users/accusys/momentry/output_dev/gdino_comparison"
LARGE_PATH = "/Users/accusys/momentry_core_0.1/models/gun/grounding-dino-large-hf"
os.makedirs(OUTPUT_DIR, exist_ok=True)
TIMEPOINTS = [
(2646, "2646s"), (3188, "3188s"), (3697, "3697s"), (5341, "5341s"),
(5461, "5461s"), (6309, "6309s"), (6377, "6377s"), (6479, "6479s"),
]
PROMPTS = ["gun", "pistol", "rifle", "weapon"]
cap = cv2.VideoCapture(VIDEO)
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
def get_frame(t_sec):
cap.set(cv2.CAP_PROP_POS_FRAMES, int(t_sec * fps))
ret, frame = cap.read()
return frame if ret else None
models = {
"base": {"path": "IDEA-Research/grounding-dino-base", "label": "Base (3 datasets)"},
"large": {"path": LARGE_PATH, "label": "Large (7 datasets)"},
}
all_results = {}
device = "mps" if torch.backends.mps.is_available() else "cpu"
print(f"Device: {device}")
for model_name, model_info in models.items():
print(f"\n{'='*60}")
print(f"Loading {model_info['label']} ({model_name})...")
print(f"{'='*60}")
t_load = time.time()
processor = AutoProcessor.from_pretrained(model_info["path"])
model = AutoModelForZeroShotObjectDetection.from_pretrained(model_info["path"]).to(device)
load_time = time.time() - t_load
print(f" Loaded in {load_time:.1f}s")
model_dets = {}
t0 = time.time()
for t_sec, label in TIMEPOINTS:
frame = get_frame(t_sec)
if frame is None: continue
img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
for prompt in PROMPTS:
inputs = processor(images=img, text=f"{prompt}.", return_tensors="pt").to(device)
with torch.no_grad():
outputs = model(**inputs)
target = torch.tensor([img.size[::-1]])
dets = processor.post_process_grounded_object_detection(
outputs, threshold=0.05, target_sizes=target
)[0]
det_list = []
for i in range(len(dets["boxes"])):
det_list.append({
"bbox": [round(v, 1) for v in dets["boxes"][i].tolist()],
"score": round(dets["scores"][i].item(), 3),
"label": prompt,
})
model_dets[f"{label}_prompt-{prompt}"] = det_list
elapsed = time.time() - t0
all_results[model_name] = {"elapsed": round(elapsed, 1), "detections": model_dets}
print(f" Inference: {elapsed:.1f}s")
del model
torch.mps.empty_cache()
cap.release()
# ========== Summary ==========
print(f"\n{'='*60}")
print("COMPARISON SUMMARY")
print(f"{'='*60}")
for model_name in ["base", "large"]:
d = all_results[model_name]
dets = d["detections"]
hits = sum(1 for v in dets.values() if v)
total = sum(len(v) for v in dets.values())
print(f"\n{model_name.upper()} ({d['elapsed']}s): {hits}/32 prompt-timepoint hits, {total} total detections")
for t_sec, label in TIMEPOINTS:
candidates = []
for p in PROMPTS:
key = f"{label}_prompt-{p}"
key_rev = f"{label}_prompt-{p}."
for k in [key, key_rev]:
if k in dets and dets[k]:
for dd in dets[k]:
candidates.append((p, dd["score"]))
if candidates:
best = max(candidates, key=lambda x: x[1])
print(f" {t_sec}s ({(t_sec//60)}:{t_sec%60:02d}): best={best[1]:.3f} (prompt='{best[0]}')")
else:
print(f" {t_sec}s: no detections")
# Per-timepoint comparison
print(f"\n{'='*60}")
print("PER-TIMEPOINT COMPARISON")
print(f"{'='*60}")
for t_sec, label in TIMEPOINTS:
base_best = None
large_best = None
for p in PROMPTS:
for mn in ["base", "large"]:
dets = all_results[mn]["detections"]
for k in [f"{label}_prompt-{p}", f"{label}_prompt-{p}."]:
if k in dets and dets[k]:
scores = [dd["score"] for dd in dets[k]]
best = max(scores)
if mn == "base" and (base_best is None or best > base_best[1]):
base_best = (p, best)
if mn == "large" and (large_best is None or best > large_best[1]):
large_best = (p, best)
b_str = f"base={base_best[1]:.3f} ({base_best[0]})" if base_best else "base=no det"
l_str = f"large={large_best[1]:.3f} ({large_best[0]})" if large_best else "large=no det"
delta = ""
if base_best and large_best:
d = large_best[1] - base_best[1]
delta = f" ({'+'if d>0 else ''}{d:.3f})"
print(f" {t_sec}s: {b_str:30s} | {l_str:30s}{delta}")
# Save
json.dump(all_results, open(os.path.join(OUTPUT_DIR, "comparison_results.json"), "w"), indent=2)
print(f"\nSaved to {OUTPUT_DIR}/")