- Update ASR, face, OCR, pose processors - Add release pre-flight check script - Add synonym generation, chunk processing scripts - Add face recognition, stamp search utilities
162 lines
4.9 KiB
Python
162 lines
4.9 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Magnifying Glass: OWL-ViT fine-grained stamp search
|
|
Scans key frames with multiple stamp-related search terms.
|
|
"""
|
|
|
|
import os
|
|
import cv2
|
|
import json
|
|
import glob
|
|
from PIL import Image
|
|
import torch
|
|
from transformers import OwlViTProcessor, OwlViTForObjectDetection
|
|
|
|
UUID = "384b0ff44aaaa1f1"
|
|
BASE_DIR = f"output/{UUID}/magnifying_glass"
|
|
RESULTS_DIR = f"output/{UUID}/magnifying_glass_owl"
|
|
os.makedirs(RESULTS_DIR, exist_ok=True)
|
|
|
|
print("🔬 Loading OWL-ViT model...")
|
|
processor = OwlViTProcessor.from_pretrained("google/owlvit-base-patch32")
|
|
model = OwlViTForObjectDetection.from_pretrained("google/owlvit-base-patch32")
|
|
model.eval()
|
|
|
|
# Comprehensive search terms for stamp detection
|
|
SEARCH_TERMS = [
|
|
"postage stamp",
|
|
"stamp on envelope",
|
|
"stamp on paper",
|
|
"holding a stamp",
|
|
"envelope with stamp",
|
|
"letter with stamp",
|
|
"stamp collection",
|
|
"stamp album",
|
|
"rare stamp",
|
|
"British stamp",
|
|
"old stamp",
|
|
"small rectangular stamp",
|
|
"red stamp",
|
|
"blue stamp",
|
|
"stamp on document",
|
|
"envelope",
|
|
"letter",
|
|
"piece of paper",
|
|
"document",
|
|
"hand holding paper",
|
|
]
|
|
|
|
|
|
def detect_stamps(image_path, search_terms):
|
|
"""Run OWL-ViT detection with multiple search terms"""
|
|
image = Image.open(image_path).convert("RGB")
|
|
|
|
all_detections = []
|
|
|
|
for term in search_terms:
|
|
inputs = processor(text=[[term]], images=image, return_tensors="pt")
|
|
|
|
with torch.no_grad():
|
|
outputs = model(**inputs)
|
|
|
|
# Use lower threshold for small objects
|
|
threshold = 0.05
|
|
target_sizes = torch.Tensor([image.size[::-1]])
|
|
results = processor.post_process_object_detection(
|
|
outputs=outputs, target_sizes=target_sizes, threshold=threshold
|
|
)
|
|
|
|
for score, label, box in zip(
|
|
results[0]["scores"], results[0]["labels"], results[0]["boxes"]
|
|
):
|
|
if score > threshold:
|
|
all_detections.append(
|
|
{
|
|
"term": term,
|
|
"score": float(score),
|
|
"bbox": box.tolist(),
|
|
"label": f"{term} ({score:.2f})",
|
|
}
|
|
)
|
|
|
|
return all_detections
|
|
|
|
|
|
def analyze_scene(scene_dir, scene_name):
|
|
"""Analyze all frames in a scene"""
|
|
frames = sorted(glob.glob(os.path.join(scene_dir, "frame_*.jpg")))
|
|
print(f"\n🔍 Analyzing {scene_name}: {len(frames)} frames")
|
|
|
|
scene_results = []
|
|
|
|
for frame_path in frames:
|
|
frame_name = os.path.basename(frame_path)
|
|
sec = frame_name.replace("frame_", "").replace("s.jpg", "")
|
|
|
|
print(f" Processing {sec}s...")
|
|
detections = detect_stamps(frame_path, SEARCH_TERMS)
|
|
|
|
if detections:
|
|
# Sort by score
|
|
detections.sort(key=lambda x: x["score"], reverse=True)
|
|
top_dets = detections[:10] # Keep top 10
|
|
|
|
print(
|
|
f" 📍 Found {len(detections)} detections, top: {top_dets[0]['term']} ({top_dets[0]['score']:.2f})"
|
|
)
|
|
|
|
# Save annotated image
|
|
try:
|
|
img = cv2.imread(frame_path)
|
|
for det in top_dets:
|
|
x1, y1, x2, y2 = map(int, det["bbox"])
|
|
cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2)
|
|
cv2.putText(
|
|
img,
|
|
det["label"],
|
|
(x1, y1 - 10),
|
|
cv2.FONT_HERSHEY_SIMPLEX,
|
|
0.5,
|
|
(0, 255, 0),
|
|
1,
|
|
)
|
|
|
|
# Save crop
|
|
crop = img[y1:y2, x1:x2]
|
|
if crop.size > 0:
|
|
crop_name = (
|
|
f"{scene_name}_{sec}s_{det['term'].replace(' ', '_')}.jpg"
|
|
)
|
|
cv2.imwrite(os.path.join(RESULTS_DIR, crop_name), crop)
|
|
|
|
ann_path = os.path.join(
|
|
RESULTS_DIR, f"annotated_{scene_name}_{sec}s.jpg"
|
|
)
|
|
cv2.imwrite(ann_path, img)
|
|
except Exception as e:
|
|
print(f" ⚠️ Save error: {e}")
|
|
|
|
scene_results.append({"frame": frame_name, "detections": top_dets})
|
|
|
|
return scene_results
|
|
|
|
|
|
# Analyze all scenes
|
|
all_results = {}
|
|
scene_dirs = sorted(glob.glob(os.path.join(BASE_DIR, "*/")))
|
|
print(f"📂 Found {len(scene_dirs)} scene directories")
|
|
|
|
for scene_dir in scene_dirs:
|
|
scene_name = os.path.basename(os.path.dirname(scene_dir))
|
|
results = analyze_scene(scene_dir, scene_name)
|
|
if results:
|
|
all_results[scene_name] = results
|
|
|
|
# Save results
|
|
results_path = os.path.join(RESULTS_DIR, "detection_results.json")
|
|
with open(results_path, "w") as f:
|
|
json.dump(all_results, f, indent=2)
|
|
|
|
print(f"\n🏁 Done. Results saved to {results_path}")
|
|
print(f"📁 Check {RESULTS_DIR} for annotated images and crops.")
|