- Update ASR, face, OCR, pose processors - Add release pre-flight check script - Add synonym generation, chunk processing scripts - Add face recognition, stamp search utilities
159 lines
4.9 KiB
Python
159 lines
4.9 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Magnifying Glass: Florence-2 AI analysis of extracted frames
|
|
Uses multiple search terms to find stamps, envelopes, letters.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import glob
|
|
from PIL import Image
|
|
import torch
|
|
from transformers import AutoProcessor, AutoModelForCausalLM
|
|
|
|
UUID = "384b0ff44aaaa1f1"
|
|
BASE_DIR = f"output/{UUID}/magnifying_glass"
|
|
RESULTS_DIR = f"output/{UUID}/magnifying_glass_results"
|
|
os.makedirs(RESULTS_DIR, exist_ok=True)
|
|
|
|
print("🔬 Loading Florence-2 model...")
|
|
processor = AutoProcessor.from_pretrained(
|
|
"microsoft/Florence-2-base", trust_remote_code=True
|
|
)
|
|
model = AutoModelForCausalLM.from_pretrained(
|
|
"microsoft/Florence-2-base", trust_remote_code=True
|
|
)
|
|
model.eval()
|
|
|
|
# Search terms for open vocabulary detection
|
|
SEARCH_TERMS = [
|
|
"postage stamp",
|
|
"stamp",
|
|
"envelope with stamp",
|
|
"letter with stamp",
|
|
"holding a stamp",
|
|
"stamp album",
|
|
"collection of stamps",
|
|
]
|
|
|
|
|
|
def run_detection(image_path, search_term):
|
|
"""Run Florence-2 detection on a single image"""
|
|
try:
|
|
image = Image.open(image_path).convert("RGB")
|
|
|
|
prompt = "<OPEN_VOCABULARY_DETECTION>"
|
|
text_input = f"{prompt} {search_term}"
|
|
|
|
inputs = processor(text=text_input, images=image, return_tensors="pt")
|
|
|
|
with torch.no_grad():
|
|
generated_ids = model.generate(
|
|
input_ids=inputs["input_ids"],
|
|
pixel_values=inputs["pixel_values"],
|
|
max_new_tokens=512,
|
|
num_beams=3,
|
|
)
|
|
|
|
generated_text = processor.batch_decode(
|
|
generated_ids, skip_special_tokens=False
|
|
)[0]
|
|
|
|
parsed = processor.post_process_generation(
|
|
generated_text,
|
|
task=prompt,
|
|
image_size=(image.width, image.height),
|
|
)
|
|
|
|
if parsed and "<OPEN_VOCABULARY_DETECTION>" in parsed:
|
|
detections = parsed["<OPEN_VOCABULARY_DETECTION>"]
|
|
if detections:
|
|
return detections
|
|
return []
|
|
except Exception as e:
|
|
print(f" ⚠️ Error: {e}")
|
|
return []
|
|
|
|
|
|
def analyze_scene(scene_dir, scene_name):
|
|
"""Analyze all frames in a scene"""
|
|
frames = sorted(glob.glob(os.path.join(scene_dir, "frame_*.jpg")))
|
|
print(f"\n🔍 Analyzing {scene_name}: {len(frames)} frames")
|
|
|
|
scene_detections = []
|
|
|
|
for frame_path in frames:
|
|
frame_name = os.path.basename(frame_path)
|
|
frame_results = {}
|
|
|
|
for term in SEARCH_TERMS:
|
|
detections = run_detection(frame_path, term)
|
|
if detections:
|
|
frame_results[term] = detections
|
|
|
|
if frame_results:
|
|
sec = frame_name.replace("frame_", "").replace("s.jpg", "")
|
|
print(
|
|
f" 📍 Frame {sec}s: Found detections for {list(frame_results.keys())}"
|
|
)
|
|
|
|
# Save annotated image
|
|
try:
|
|
import cv2
|
|
|
|
img = cv2.imread(frame_path)
|
|
for term, dets in frame_results.items():
|
|
for det in dets:
|
|
bbox = det.get("bbox", [0, 0, 0, 0])
|
|
x1, y1, x2, y2 = map(int, bbox)
|
|
cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 3)
|
|
label = det.get("label", term)
|
|
cv2.putText(
|
|
img,
|
|
label,
|
|
(x1, y1 - 10),
|
|
cv2.FONT_HERSHEY_SIMPLEX,
|
|
0.6,
|
|
(0, 255, 0),
|
|
2,
|
|
)
|
|
|
|
# Save crop
|
|
crop = img[y1:y2, x1:x2]
|
|
if crop.size > 0:
|
|
crop_name = (
|
|
f"{scene_name}_{sec}s_{label.replace(' ', '_')}.jpg"
|
|
)
|
|
cv2.imwrite(os.path.join(RESULTS_DIR, crop_name), crop)
|
|
|
|
ann_path = os.path.join(
|
|
RESULTS_DIR, f"annotated_{scene_name}_{sec}s.jpg"
|
|
)
|
|
cv2.imwrite(ann_path, img)
|
|
except Exception as e:
|
|
print(f" ⚠️ Save error: {e}")
|
|
|
|
scene_detections.append({"frame": frame_name, "detections": frame_results})
|
|
|
|
return scene_detections
|
|
|
|
|
|
# Analyze all scenes
|
|
all_results = {}
|
|
scene_dirs = sorted(glob.glob(os.path.join(BASE_DIR, "*/")))
|
|
print(f"📂 Found {len(scene_dirs)} scene directories")
|
|
|
|
for scene_dir in scene_dirs:
|
|
scene_name = os.path.basename(os.path.dirname(scene_dir))
|
|
detections = analyze_scene(scene_dir, scene_name)
|
|
if detections:
|
|
all_results[scene_name] = detections
|
|
|
|
# Save results
|
|
results_path = os.path.join(RESULTS_DIR, "detection_results.json")
|
|
with open(results_path, "w") as f:
|
|
json.dump(all_results, f, indent=2)
|
|
|
|
print(f"\n🏁 Done. Results saved to {results_path}")
|
|
print(f"📁 Check {RESULTS_DIR} for annotated images and crops.")
|