#!/opt/homebrew/bin/python3.11 """ Magnifying Glass: Florence-2 AI analysis of extracted frames Uses multiple search terms to find stamps, envelopes, letters. """ import os import json import glob from PIL import Image import torch from transformers import AutoProcessor, AutoModelForCausalLM UUID = "384b0ff44aaaa1f1" BASE_DIR = f"output/{UUID}/magnifying_glass" RESULTS_DIR = f"output/{UUID}/magnifying_glass_results" os.makedirs(RESULTS_DIR, exist_ok=True) print("šŸ”¬ Loading Florence-2 model...") processor = AutoProcessor.from_pretrained( "microsoft/Florence-2-base", trust_remote_code=True ) model = AutoModelForCausalLM.from_pretrained( "microsoft/Florence-2-base", trust_remote_code=True ) model.eval() # Search terms for open vocabulary detection SEARCH_TERMS = [ "postage stamp", "stamp", "envelope with stamp", "letter with stamp", "holding a stamp", "stamp album", "collection of stamps", ] def run_detection(image_path, search_term): """Run Florence-2 detection on a single image""" try: image = Image.open(image_path).convert("RGB") prompt = "" text_input = f"{prompt} {search_term}" inputs = processor(text=text_input, images=image, return_tensors="pt") with torch.no_grad(): generated_ids = model.generate( input_ids=inputs["input_ids"], pixel_values=inputs["pixel_values"], max_new_tokens=512, num_beams=3, ) generated_text = processor.batch_decode( generated_ids, skip_special_tokens=False )[0] parsed = processor.post_process_generation( generated_text, task=prompt, image_size=(image.width, image.height), ) if parsed and "" in parsed: detections = parsed[""] if detections: return detections return [] except Exception as e: print(f" āš ļø Error: {e}") return [] def analyze_scene(scene_dir, scene_name): """Analyze all frames in a scene""" frames = sorted(glob.glob(os.path.join(scene_dir, "frame_*.jpg"))) print(f"\nšŸ” Analyzing {scene_name}: {len(frames)} frames") scene_detections = [] for frame_path in frames: frame_name = os.path.basename(frame_path) frame_results = {} for term in SEARCH_TERMS: detections = run_detection(frame_path, term) if detections: frame_results[term] = detections if frame_results: sec = frame_name.replace("frame_", "").replace("s.jpg", "") print( f" šŸ“ Frame {sec}s: Found detections for {list(frame_results.keys())}" ) # Save annotated image try: import cv2 img = cv2.imread(frame_path) for term, dets in frame_results.items(): for det in dets: bbox = det.get("bbox", [0, 0, 0, 0]) x1, y1, x2, y2 = map(int, bbox) cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 3) label = det.get("label", term) cv2.putText( img, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2, ) # Save crop crop = img[y1:y2, x1:x2] if crop.size > 0: crop_name = ( f"{scene_name}_{sec}s_{label.replace(' ', '_')}.jpg" ) cv2.imwrite(os.path.join(RESULTS_DIR, crop_name), crop) ann_path = os.path.join( RESULTS_DIR, f"annotated_{scene_name}_{sec}s.jpg" ) cv2.imwrite(ann_path, img) except Exception as e: print(f" āš ļø Save error: {e}") scene_detections.append({"frame": frame_name, "detections": frame_results}) return scene_detections # Analyze all scenes all_results = {} scene_dirs = sorted(glob.glob(os.path.join(BASE_DIR, "*/"))) print(f"šŸ“‚ Found {len(scene_dirs)} scene directories") for scene_dir in scene_dirs: scene_name = os.path.basename(os.path.dirname(scene_dir)) detections = analyze_scene(scene_dir, scene_name) if detections: all_results[scene_name] = detections # Save results results_path = os.path.join(RESULTS_DIR, "detection_results.json") with open(results_path, "w") as f: json.dump(all_results, f, indent=2) print(f"\nšŸ Done. Results saved to {results_path}") print(f"šŸ“ Check {RESULTS_DIR} for annotated images and crops.")