#!/opt/homebrew/bin/python3.11 """ Fast Multi-Stage Stamp Search Stage 1: OpenCV fast container detection (skin/hands, rectangles/paper) Stage 2: OWL-ViT only on container crops for stamp detection """ import os import cv2 import json import time import numpy as np from PIL import Image import torch from transformers import OwlViTProcessor, OwlViTForObjectDetection UUID = "384b0ff44aaaa1f1" VIDEO_PATH = f"output/{UUID}/{UUID}.mp4" OUTPUT_DIR = f"output/{UUID}/fast_stamp_search" os.makedirs(OUTPUT_DIR, exist_ok=True) CROPS_DIR = os.path.join(OUTPUT_DIR, "crops") os.makedirs(CROPS_DIR, exist_ok=True) FRAME_INTERVAL = 5 MIN_STAMP_SCORE = 0.06 print("=" * 60) print("⚔ Fast Multi-Stage Stamp Search") print("=" * 60) cap = cv2.VideoCapture(VIDEO_PATH) fps = cap.get(cv2.CAP_PROP_FPS) total_sec = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) / fps) print(f"šŸ“¹ Video: {total_sec}s ({total_sec // 60} min), {fps:.1f} fps") # Load OWL-ViT once for stamp detection print("šŸ”¬ Loading OWL-ViT stamp detector...") processor = OwlViTProcessor.from_pretrained("google/owlvit-base-patch32") model = OwlViTForObjectDetection.from_pretrained("google/owlvit-base-patch32") model.eval() STAMP_TERMS = ["postage stamp", "stamp on paper", "small stamp", "stamp"] def find_containers_fast(frame): """Fast OpenCV-based container detection""" containers = [] h, w = frame.shape[:2] # 1. Skin color detection (hands) hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) skin_mask = cv2.inRange(hsv, np.array([0, 20, 70]), np.array([20, 150, 255])) skin_mask += cv2.inRange(hsv, np.array([160, 20, 70]), np.array([179, 150, 255])) # Morphological cleanup kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (11, 11)) skin_mask = cv2.morphologyEx(skin_mask, cv2.MORPH_CLOSE, kernel) skin_mask = cv2.morphologyEx(skin_mask, cv2.MORPH_OPEN, kernel) # Find hand contours contours, _ = cv2.findContours( skin_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE ) for cnt in contours: area = cv2.contourArea(cnt) if 2000 < area < h * w * 0.4: x, y, w_cnt, h_cnt = cv2.boundingRect(cnt) margin = 40 containers.append( { "type": "hand", "bbox": [ max(0, x - margin), max(0, y - margin), min(w, x + w_cnt + margin), min(h, y + h_cnt + margin), ], } ) # 2. Bright rectangular regions (envelopes/paper) gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) _, bright = cv2.threshold(gray, 180, 255, cv2.THRESH_BINARY) contours, _ = cv2.findContours(bright, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) for cnt in contours: area = cv2.contourArea(cnt) if 5000 < area < h * w * 0.5: x, y, w_cnt, h_cnt = cv2.boundingRect(cnt) aspect = w_cnt / h_cnt if h_cnt > 0 else 0 if 0.3 < aspect < 3.0: margin = 30 containers.append( { "type": "paper", "bbox": [ max(0, x - margin), max(0, y - margin), min(w, x + w_cnt + margin), min(h, y + h_cnt + margin), ], } ) return containers all_results = [] start_time = time.time() for sec in range(0, total_sec, FRAME_INTERVAL): cap.set(cv2.CAP_PROP_POS_MSEC, sec * 1000) ret, frame = cap.read() if not ret: continue elapsed = time.time() - start_time eta = (elapsed / (sec / FRAME_INTERVAL + 1)) * ( total_sec / FRAME_INTERVAL - sec / FRAME_INTERVAL - 1 ) # Stage 1: Fast container detection containers = find_containers_fast(frame) if not containers: if sec % 60 == 0: print( f" [{sec // 60}min/{total_sec // 60}min] No containers | ETA: {eta:.0f}s" ) continue print( f" [{sec}s] Found {len(containers)} containers ({[c['type'] for c in containers]})" ) # Stage 2: OWL-ViT stamp detection on each container for container in containers: cx1, cy1, cx2, cy2 = container["bbox"] container_img = frame[cy1:cy2, cx1:cx2] if container_img.size == 0: continue ch, cw = container_img.shape[:2] # Scale up for better detection scale = max(2, 500 // max(ch, cw)) if scale > 1: scaled = cv2.resize( container_img, None, fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC ) else: scaled = container_img scaled_pil = Image.fromarray(cv2.cvtColor(scaled, cv2.COLOR_BGR2RGB)) sh, sw = scaled.shape[:2] for term in STAMP_TERMS: try: inputs = processor( text=[[term]], images=scaled_pil, return_tensors="pt" ) with torch.no_grad(): outputs = model(**inputs) target_sizes = torch.Tensor([sh, sw]) results = processor.post_process_object_detection( outputs=outputs, target_sizes=target_sizes, threshold=MIN_STAMP_SCORE, ) for score, label, box in zip( results[0]["scores"], results[0]["labels"], results[0]["boxes"] ): s = float(score) if s > MIN_STAMP_SCORE: sx1, sy1, sx2, sy2 = box.tolist() orig_w = (sx2 - sx1) / scale orig_h = (sy2 - sy1) / scale if not (15 < orig_w < 200 and 15 < orig_h < 200): continue ox1 = cx1 + int(sx1 / scale) oy1 = cy1 + int(sy1 / scale) ox2 = cx1 + int(sx2 / scale) oy2 = cy1 + int(sy2 / scale) crop = frame[oy1:oy2, ox1:ox2] if crop.size == 0: continue result = { "timestamp": sec, "container": container["type"], "stamp_term": term, "score": s, "bbox": [ox1, oy1, ox2, oy2], "size": [int(orig_w), int(orig_h)], } all_results.append(result) # Save crop_name = f"stamp_{sec}s_{term.replace(' ', '_')}_{s:.2f}.jpg" cv2.imwrite(os.path.join(CROPS_DIR, crop_name), crop) # Annotate full frame cv2.rectangle(frame, (ox1, oy1), (ox2, oy2), (0, 255, 0), 3) cv2.putText( frame, f"{term[:8]} {s:.2f}", (ox1, oy1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2, ) print( f" šŸŽÆ {sec}s | {term} | {s:.2f} | {int(orig_w)}x{int(orig_h)}px" ) except Exception: pass # Save annotated frame if stamps found if any(r["timestamp"] == sec for r in all_results): ann_path = os.path.join(OUTPUT_DIR, f"annotated_{sec}s.jpg") cv2.imwrite(ann_path, frame) cap.release() # Deduplicate by timestamp seen = set() unique = [] for r in all_results: ts = r["timestamp"] if ts not in seen: seen.add(ts) unique.append(r) unique.sort(key=lambda x: x["score"], reverse=True) print(f"\n{'=' * 60}") print(f"šŸ“Š Found {len(unique)} unique stamp candidates") for r in unique: print( f" šŸŽÆ {r['timestamp']}s | {r['stamp_term']} | {r['score']:.2f} | via: {r['container']}" ) with open(os.path.join(OUTPUT_DIR, "results.json"), "w") as f: json.dump(unique, f, indent=2) print(f"\nšŸ Done. Crops: {CROPS_DIR}")