#!/opt/homebrew/bin/python3.11 """ Hybrid Stamp Search: OpenCV + OWL-ViT Stage 1: OpenCV finds frames with containers (hands/paper) - FAST Stage 2: OWL-ViT validates those frames for actual stamps - ACCURATE """ import os import cv2 import json import time import numpy as np from PIL import Image import torch from transformers import OwlViTProcessor, OwlViTForObjectDetection UUID = "384b0ff44aaaa1f1" VIDEO_PATH = f"output/{UUID}/{UUID}.mp4" OUTPUT_DIR = f"output/{UUID}/hybrid_stamp_search" os.makedirs(OUTPUT_DIR, exist_ok=True) CROPS_DIR = os.path.join(OUTPUT_DIR, "crops") os.makedirs(CROPS_DIR, exist_ok=True) FRAME_INTERVAL = 5 print("=" * 60) print("šŸ”¬ Hybrid Stamp Search: OpenCV + OWL-ViT") print("=" * 60) cap = cv2.VideoCapture(VIDEO_PATH) fps = cap.get(cv2.CAP_PROP_FPS) total_sec = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) / fps) print(f"šŸ“¹ Video: {total_sec}s ({total_sec // 60} min)") # ═══════════════════════════════════════════ # Stage 1: OpenCV - Find container frames # ═══════════════════════════════════════════ print("\n⚔ Stage 1: OpenCV container scanning...") candidate_frames = [] # (sec, frame_array) start = time.time() for sec in range(0, total_sec, FRAME_INTERVAL): cap.set(cv2.CAP_PROP_POS_MSEC, sec * 1000) ret, frame = cap.read() if not ret: continue h, w = frame.shape[:2] has_container = False # 1. Skin/hand detection hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) skin = cv2.inRange(hsv, np.array([0, 20, 60]), np.array([25, 180, 255])) skin += cv2.inRange(hsv, np.array([160, 20, 60]), np.array([179, 180, 255])) kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (9, 9)) skin = cv2.morphologyEx(skin, cv2.MORPH_CLOSE, kernel) skin = cv2.morphologyEx(skin, cv2.MORPH_OPEN, kernel) contours, _ = cv2.findContours(skin, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) for cnt in contours: area = cv2.contourArea(cnt) if 1500 < area < h * w * 0.35: has_container = True break # 2. Bright rectangular regions (paper/envelope) if not has_container: gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) _, bright = cv2.threshold(gray, 175, 255, cv2.THRESH_BINARY) bright = cv2.morphologyEx( bright, cv2.MORPH_CLOSE, cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5)) ) contours, _ = cv2.findContours( bright, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE ) for cnt in contours: area = cv2.contourArea(cnt) if 3000 < area < h * w * 0.5: x, y, cw, ch = cv2.boundingRect(cnt) aspect = cw / ch if ch > 0 else 0 if 0.2 < aspect < 4.0: has_container = True break if has_container: candidate_frames.append((sec, frame)) cap.release() t1 = time.time() - start print(f" āœ… Stage 1 done in {t1:.1f}s") print( f" šŸ“Š {len(candidate_frames)} candidate frames out of {total_sec // FRAME_INTERVAL} total" ) if not candidate_frames: print(" āŒ No containers found. Exiting.") exit() # ═══════════════════════════════════════════ # Stage 2: OWL-ViT - Precise stamp detection # ═══════════════════════════════════════════ print("\nšŸ”¬ Stage 2: OWL-ViT stamp validation...") print(" Loading model...") processor = OwlViTProcessor.from_pretrained("google/owlvit-base-patch32") model = OwlViTForObjectDetection.from_pretrained("google/owlvit-base-patch32") model.eval() STAMP_TERMS = ["postage stamp", "stamp", "small stamp", "stamp on paper"] all_results = [] start2 = time.time() for idx, (sec, frame) in enumerate(candidate_frames): elapsed = time.time() - start2 eta = (elapsed / (idx + 1)) * (len(candidate_frames) - idx - 1) if idx > 0 else 0 image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) h, w = frame.shape[:2] found = False for term in STAMP_TERMS: try: inputs = processor(text=[[term]], images=image, return_tensors="pt") with torch.no_grad(): outputs = model(**inputs) target_sizes = torch.Tensor([h, w]) results = processor.post_process_object_detection( outputs=outputs, target_sizes=target_sizes, threshold=0.06 ) for score, label, box in zip( results[0]["scores"], results[0]["labels"], results[0]["boxes"] ): s = float(score) if s > 0.06: x1, y1, x2, y2 = map(int, box.tolist()) bw, bh = x2 - x1, y2 - y1 # Filter: stamps are small (15-150px) if not (15 < bw < 150 and 15 < bh < 150): continue crop = frame[y1:y2, x1:x2] if crop.size == 0: continue result = { "timestamp": sec, "term": term, "score": s, "bbox": [x1, y1, x2, y2], "size": [bw, bh], } all_results.append(result) found = True # Save crop_name = f"stamp_{sec}s_{term.replace(' ', '_')}_{s:.2f}.jpg" cv2.imwrite(os.path.join(CROPS_DIR, crop_name), crop) # Annotate cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 3) cv2.putText( frame, f"{term[:10]} {s:.2f}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2, ) print(f" šŸŽÆ {sec}s | {term} | {s:.2f} | {bw}x{bh}px") except Exception: pass if found: ann_path = os.path.join(OUTPUT_DIR, f"annotated_{sec}s.jpg") cv2.imwrite(ann_path, frame) if idx % 10 == 0 or idx == len(candidate_frames) - 1: print(f" Progress: {idx + 1}/{len(candidate_frames)} | ETA: {eta:.0f}s") t2 = time.time() - start2 total_time = t1 + t2 # ═══════════════════════════════════════════ # Stage 3: Deduplicate & rank # ═══════════════════════════════════════════ all_results.sort(key=lambda x: x["score"], reverse=True) seen = set() unique = [] for r in all_results: ts = r["timestamp"] if ts not in seen: seen.add(ts) unique.append(r) print(f"\n{'=' * 60}") print(f"ā±ļø Total time: {total_time:.1f}s (OpenCV: {t1:.1f}s + OWL-ViT: {t2:.1f}s)") print(f"šŸ“Š Found {len(unique)} unique stamp candidates") print(f"{'=' * 60}") for r in unique: print( f" šŸŽÆ {r['timestamp']}s | {r['term']} | {r['score']:.2f} | {r['size'][0]}x{r['size'][1]}px" ) with open(os.path.join(OUTPUT_DIR, "results.json"), "w") as f: json.dump(unique, f, indent=2) print(f"\nšŸ Done. Crops: {CROPS_DIR}")