#!/opt/homebrew/bin/python3.11 """ Gun Detector Scan — YOLOv8n fine-tuned gun detector on Charade (1963). Scans at ASR "gun" trigger points + fixed intervals, saves annotated screenshots. """ import json, os, sys, time, cv2, re import numpy as np from pathlib import Path from collections import defaultdict from ultralytics import YOLO VIDEO = "/Users/accusys/momentry/var/sftpgo/data/demo/Charade (1963) Cary Grant & Audrey Hepburn \uff5c Comedy Mystery Romance Thriller \uff5c Full Movie.mp4" MODEL = "/Users/accusys/momentry_core_0.1/models/gun/gun_detector/weights/best.pt" OUTPUT_DIR = "/Users/accusys/momentry/output_dev/gun_detections" UUID = "aeed71342a899fe4b4c57b7d41bcb692" CLASS_NAMES = {0: "grenade", 1: "knife", 2: "pistol", 3: "rifle"} os.makedirs(OUTPUT_DIR, exist_ok=True) # Load model print(f"Loading model: {MODEL}") model = YOLO(MODEL) print(f"Classes: {model.names}") # Open video cap = cv2.VideoCapture(VIDEO) fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) print(f"Video: {fps:.1f} fps, {total_frames} frames ({total_frames/fps/60:.1f} min)") # === Collect scan timepoints === print("\n=== Collecting scan timepoints ===") # 1. ASR mentions of "gun" import psycopg2 DB_URL = "postgresql://accusys@localhost:5432/momentry?host=/tmp" conn = psycopg2.connect(DB_URL) cur = conn.cursor() cur.execute(""" SELECT DISTINCT start_time FROM dev.chunks WHERE file_uuid=%s AND chunk_type='sentence' AND text_content ILIKE CONCAT('%%', %s, '%%') ORDER BY start_time """, (UUID, 'gun')) asr_times = [r[0] for r in cur.fetchall()] conn.close() print(f"ASR 'gun' mentions: {len(asr_times)} timepoints") # 2. Fixed interval scan (every 60 seconds) fixed_times = list(range(0, int(total_frames / fps), 60)) print(f"Fixed interval (60s): {len(fixed_times)} timepoints") # 3. The original 5 pistol timestamps (3188, 5461, 6309, 6377, 6479) original_hits = [3188, 5461, 6309, 6377, 6479] # Merge all timepoints, rounded to nearest second all_times = set() for t in asr_times + fixed_times + original_hits: all_times.add(int(round(t))) all_times = sorted(all_times) print(f"Total unique scan points: {len(all_times)}") print(f"Range: {all_times[0]}s - {all_times[-1]}s") # === Scan === print("\n=== Scanning ===") results = [] frame_step = 30 # scan 30 frames around each timepoint t0 = time.time() for scan_idx, t_sec in enumerate(all_times): # Scan frames around this timepoint center_frame = int(t_sec * fps) start_frame = max(0, center_frame - frame_step) end_frame = min(total_frames, center_frame + frame_step) for frame_num in range(start_frame, end_frame + 1, 3): # every 3rd frame cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num) ret, frame = cap.read() if not ret: break dets = model(frame, conf=0.25, verbose=False)[0] for det in dets.boxes.data: cls_id = int(det[5]) conf = float(det[4]) class_name = CLASS_NAMES.get(cls_id, f"class_{cls_id}") # Draw annotation x1, y1, x2, y2 = map(int, det[:4]) cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2) label = f"{class_name} {conf:.2f}" cv2.putText(frame, label, (x1, y1-5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2) ts = frame_num / fps filename = f"{int(ts)}s_{class_name}_{conf:.3f}.jpg" filepath = os.path.join(OUTPUT_DIR, filename) cv2.imwrite(filepath, frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) results.append({ "timestamp": round(ts, 1), "time_str": f"{int(ts//60)}:{int(ts%60):02d}.{int((ts%1)*fps):02.0f}", "frame": frame_num, "class": class_name, "confidence": round(conf, 3), "image": filename, }) if (scan_idx + 1) % 20 == 0: elapsed = time.time() - t0 print(f" [{scan_idx+1}/{len(all_times)}] {len(results)} detections so far [{elapsed:.0f}s]") cap.release() print(f"\n=== Scan Complete ===") print(f"Scan points: {len(all_times)}") print(f"Total detections: {len(results)}") # Deduplicate nearby detections (same class within 2 seconds) results.sort(key=lambda r: (r["timestamp"], r["class"])) deduped = [] for r in results: if deduped and r["timestamp"] - deduped[-1]["timestamp"] < 2 and r["class"] == deduped[-1]["class"]: if r["confidence"] > deduped[-1]["confidence"]: deduped[-1] = r else: deduped.append(r) print(f"After dedup: {len(deduped)} detections") # Group by class by_class = defaultdict(list) for r in deduped: by_class[r["class"]].append(r) print(f"\nDetections by class:") for cls, items in sorted(by_class.items()): print(f" {cls}: {len(items)}") for r in sorted(items, key=lambda x: -x["confidence"])[:5]: print(f" {r['time_str']} conf={r['confidence']:.3f} frame={r['frame']} {r['image']}") # Check if original 5 were found print(f"\nOriginal 5 pistol timestamps:") for t in original_hits: found = [r for r in deduped if abs(r["timestamp"] - t) < 3 and r["class"] == "pistol"] if found: best = max(found, key=lambda x: x["confidence"]) print(f" {t}s: ✅ FOUND conf={best['confidence']:.3f} {best['image']}") else: print(f" {t}s: ❌ NOT FOUND") # Save JSON output = { "uuid": UUID, "model": str(MODEL), "scan_points": len(all_times), "total_detections": len(results), "after_dedup": len(deduped), "detections": sorted(deduped, key=lambda x: x["timestamp"]), } json_path = os.path.join(OUTPUT_DIR, "gun_detections.json") json.dump(output, open(json_path, "w"), indent=2) print(f"\nSaved: {json_path}") print(f"Images: {OUTPUT_DIR}/")