Files
momentry_core/scripts/fast_stamp_search.py
Warren e75c4d6f07 cleanup: remove dead code and duplicate docs
- Remove session-ses_2f27.md (161KB raw session log)
- Remove 49 ROOT_* duplicate files across REFERENCE/
- Remove 14 duplicate files between REFERENCE/ root and history/
- Remove asr_legacy.rs (dead code, replaced by asr.rs)
- Remove src/core/worker/ (duplicate JobWorker)
- Remove src/core/layers/ (empty directory)
- Remove 4 .bak files in src/
- Remove 7 dead private methods in worker/processor.rs
- Remove backup directory from git tracking
2026-05-04 01:31:21 +08:00

255 lines
8.3 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Fast Multi-Stage Stamp Search
Stage 1: OpenCV fast container detection (skin/hands, rectangles/paper)
Stage 2: OWL-ViT only on container crops for stamp detection
"""
import os
import cv2
import json
import time
import numpy as np
from PIL import Image
import torch
from transformers import OwlViTProcessor, OwlViTForObjectDetection
UUID = "384b0ff44aaaa1f1"
VIDEO_PATH = f"output/{UUID}/{UUID}.mp4"
OUTPUT_DIR = f"output/{UUID}/fast_stamp_search"
os.makedirs(OUTPUT_DIR, exist_ok=True)
CROPS_DIR = os.path.join(OUTPUT_DIR, "crops")
os.makedirs(CROPS_DIR, exist_ok=True)
FRAME_INTERVAL = 5
MIN_STAMP_SCORE = 0.06
print("=" * 60)
print("⚡ Fast Multi-Stage Stamp Search")
print("=" * 60)
cap = cv2.VideoCapture(VIDEO_PATH)
fps = cap.get(cv2.CAP_PROP_FPS)
total_sec = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) / fps)
print(f"📹 Video: {total_sec}s ({total_sec // 60} min), {fps:.1f} fps")
# Load OWL-ViT once for stamp detection
print("🔬 Loading OWL-ViT stamp detector...")
processor = OwlViTProcessor.from_pretrained("google/owlvit-base-patch32")
model = OwlViTForObjectDetection.from_pretrained("google/owlvit-base-patch32")
model.eval()
STAMP_TERMS = ["postage stamp", "stamp on paper", "small stamp", "stamp"]
def find_containers_fast(frame):
"""Fast OpenCV-based container detection"""
containers = []
h, w = frame.shape[:2]
# 1. Skin color detection (hands)
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
skin_mask = cv2.inRange(hsv, np.array([0, 20, 70]), np.array([20, 150, 255]))
skin_mask += cv2.inRange(hsv, np.array([160, 20, 70]), np.array([179, 150, 255]))
# Morphological cleanup
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (11, 11))
skin_mask = cv2.morphologyEx(skin_mask, cv2.MORPH_CLOSE, kernel)
skin_mask = cv2.morphologyEx(skin_mask, cv2.MORPH_OPEN, kernel)
# Find hand contours
contours, _ = cv2.findContours(
skin_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
for cnt in contours:
area = cv2.contourArea(cnt)
if 2000 < area < h * w * 0.4:
x, y, w_cnt, h_cnt = cv2.boundingRect(cnt)
margin = 40
containers.append(
{
"type": "hand",
"bbox": [
max(0, x - margin),
max(0, y - margin),
min(w, x + w_cnt + margin),
min(h, y + h_cnt + margin),
],
}
)
# 2. Bright rectangular regions (envelopes/paper)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
_, bright = cv2.threshold(gray, 180, 255, cv2.THRESH_BINARY)
contours, _ = cv2.findContours(bright, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for cnt in contours:
area = cv2.contourArea(cnt)
if 5000 < area < h * w * 0.5:
x, y, w_cnt, h_cnt = cv2.boundingRect(cnt)
aspect = w_cnt / h_cnt if h_cnt > 0 else 0
if 0.3 < aspect < 3.0:
margin = 30
containers.append(
{
"type": "paper",
"bbox": [
max(0, x - margin),
max(0, y - margin),
min(w, x + w_cnt + margin),
min(h, y + h_cnt + margin),
],
}
)
return containers
all_results = []
start_time = time.time()
for sec in range(0, total_sec, FRAME_INTERVAL):
cap.set(cv2.CAP_PROP_POS_MSEC, sec * 1000)
ret, frame = cap.read()
if not ret:
continue
elapsed = time.time() - start_time
eta = (elapsed / (sec / FRAME_INTERVAL + 1)) * (
total_sec / FRAME_INTERVAL - sec / FRAME_INTERVAL - 1
)
# Stage 1: Fast container detection
containers = find_containers_fast(frame)
if not containers:
if sec % 60 == 0:
print(
f" [{sec // 60}min/{total_sec // 60}min] No containers | ETA: {eta:.0f}s"
)
continue
print(
f" [{sec}s] Found {len(containers)} containers ({[c['type'] for c in containers]})"
)
# Stage 2: OWL-ViT stamp detection on each container
for container in containers:
cx1, cy1, cx2, cy2 = container["bbox"]
container_img = frame[cy1:cy2, cx1:cx2]
if container_img.size == 0:
continue
ch, cw = container_img.shape[:2]
# Scale up for better detection
scale = max(2, 500 // max(ch, cw))
if scale > 1:
scaled = cv2.resize(
container_img, None, fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC
)
else:
scaled = container_img
scaled_pil = Image.fromarray(cv2.cvtColor(scaled, cv2.COLOR_BGR2RGB))
sh, sw = scaled.shape[:2]
for term in STAMP_TERMS:
try:
inputs = processor(
text=[[term]], images=scaled_pil, return_tensors="pt"
)
with torch.no_grad():
outputs = model(**inputs)
target_sizes = torch.Tensor([sh, sw])
results = processor.post_process_object_detection(
outputs=outputs,
target_sizes=target_sizes,
threshold=MIN_STAMP_SCORE,
)
for score, label, box in zip(
results[0]["scores"], results[0]["labels"], results[0]["boxes"]
):
s = float(score)
if s > MIN_STAMP_SCORE:
sx1, sy1, sx2, sy2 = box.tolist()
orig_w = (sx2 - sx1) / scale
orig_h = (sy2 - sy1) / scale
if not (15 < orig_w < 200 and 15 < orig_h < 200):
continue
ox1 = cx1 + int(sx1 / scale)
oy1 = cy1 + int(sy1 / scale)
ox2 = cx1 + int(sx2 / scale)
oy2 = cy1 + int(sy2 / scale)
crop = frame[oy1:oy2, ox1:ox2]
if crop.size == 0:
continue
result = {
"timestamp": sec,
"container": container["type"],
"stamp_term": term,
"score": s,
"bbox": [ox1, oy1, ox2, oy2],
"size": [int(orig_w), int(orig_h)],
}
all_results.append(result)
# Save
crop_name = f"stamp_{sec}s_{term.replace(' ', '_')}_{s:.2f}.jpg"
cv2.imwrite(os.path.join(CROPS_DIR, crop_name), crop)
# Annotate full frame
cv2.rectangle(frame, (ox1, oy1), (ox2, oy2), (0, 255, 0), 3)
cv2.putText(
frame,
f"{term[:8]} {s:.2f}",
(ox1, oy1 - 10),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(0, 255, 0),
2,
)
print(
f" 🎯 {sec}s | {term} | {s:.2f} | {int(orig_w)}x{int(orig_h)}px"
)
except Exception:
pass
# Save annotated frame if stamps found
if any(r["timestamp"] == sec for r in all_results):
ann_path = os.path.join(OUTPUT_DIR, f"annotated_{sec}s.jpg")
cv2.imwrite(ann_path, frame)
cap.release()
# Deduplicate by timestamp
seen = set()
unique = []
for r in all_results:
ts = r["timestamp"]
if ts not in seen:
seen.add(ts)
unique.append(r)
unique.sort(key=lambda x: x["score"], reverse=True)
print(f"\n{'=' * 60}")
print(f"📊 Found {len(unique)} unique stamp candidates")
for r in unique:
print(
f" 🎯 {r['timestamp']}s | {r['stamp_term']} | {r['score']:.2f} | via: {r['container']}"
)
with open(os.path.join(OUTPUT_DIR, "results.json"), "w") as f:
json.dump(unique, f, indent=2)
print(f"\n🏁 Done. Crops: {CROPS_DIR}")