#!/opt/homebrew/bin/python3.11 """ Caption Processor - Generate image captions (LOCAL ONLY) Uses Moondream2 (local VLM) for image captioning No cloud API calls - fully offline processing """ import sys import json import os import argparse import subprocess from typing import Dict, List, Optional sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from redis_publisher import RedisPublisher def extract_frames(video_path: str, max_frames: int = 30) -> List[Dict]: """Extract frames from video at regular intervals""" cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", video_path, ] try: result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: data = json.loads(result.stdout) duration = float(data.get("format", {}).get("duration", 0)) else: duration = 60 except Exception: duration = 60 if duration <= 0: duration = 60 interval = max(duration / max_frames, 1.0) frames = [] temp_dir = os.path.join(os.path.dirname(video_path), ".caption_frames") os.makedirs(temp_dir, exist_ok=True) for i in range(max_frames): timestamp = i * interval output_file = os.path.join(temp_dir, f"frame_{i:04d}.jpg") cmd = [ "ffmpeg", "-y", "-ss", str(timestamp), "-i", video_path, "-vframes", "1", "-q:v", "2", output_file, ] try: subprocess.run(cmd, capture_output=True, check=False) if os.path.exists(output_file): frames.append({"index": i, "timestamp": timestamp, "path": output_file}) except Exception: pass return frames def generate_caption_with_moondream( image_path: str, prompt: str = "Describe this image in detail." ) -> Optional[str]: """Generate caption using Moondream2 (local VLM)""" try: from transformers import AutoModelForCausalLM, AutoTokenizer from PIL import Image import torch model_id = "vikhyatk/moondream2" revision = "2025-01-09" tokenizer = AutoTokenizer.from_pretrained( model_id, revision=revision, trust_remote_code=True ) moondream = AutoModelForCausalLM.from_pretrained( model_id, revision=revision, trust_remote_code=True, torch_dtype=torch.float16, ).to("mps" if torch.backends.mps.is_available() else "cpu") moondream.eval() image = Image.open(image_path) enc_image = moondream.encode_image(image) caption = moondream.answer_question(enc_image, prompt, tokenizer) return caption if caption else None except ImportError: return None except Exception as e: print(f"[CAPTION] Moondream error: {e}") return None def generate_caption_from_metadata(image_path: str, existing_data: Dict = None) -> str: """Generate caption using YOLO/OCR metadata (fallback)""" caption_parts = [] if existing_data and existing_data.get("objects"): objects = list(set([o["class"] for o in existing_data["objects"]]))[:5] if objects: caption_parts.append(f"Objects: {', '.join(objects)}") if existing_data and existing_data.get("texts"): texts = [t["text"] for t in existing_data["texts"] if t.get("text")] if texts: caption_parts.append(f"Text: {' '.join(texts[:3])}") if existing_data and existing_data.get("scene_type"): caption_parts.append(f"Scene: {existing_data['scene_type']}") if caption_parts: return " | ".join(caption_parts) return "Video frame" def process_frame( frame_info: Dict, yolo_data: List = None, ocr_data: List = None, scene_data: Dict = None, ) -> Dict: """Process a single frame and generate caption (LOCAL ONLY)""" frame_path = frame_info["path"] timestamp = frame_info["timestamp"] caption = None source = "unknown" # Try Moondream2 (local VLM) caption = generate_caption_with_moondream(frame_path) if caption: source = "moondream2" else: # Fallback: Use metadata from YOLO/OCR/Scene combined_data = {"objects": [], "texts": [], "scene_type": ""} if yolo_data: combined_data["objects"] = [ o for o in yolo_data if o.get("timestamp") == timestamp ] if ocr_data: combined_data["texts"] = [ t for t in ocr_data if t.get("timestamp") == timestamp ] if scene_data: for scene in scene_data.get("scenes", []): if scene.get("start_time", 0) <= timestamp <= scene.get("end_time", 0): combined_data["scene_type"] = scene.get( "scene_type_zh" ) or scene.get("scene_type", "") break caption = generate_caption_from_metadata(frame_path, combined_data) source = "metadata" return { "index": frame_info["index"], "timestamp": timestamp, "caption": caption, "source": source, } def run_caption( video_path: str, output_path: str, uuid: str = "", max_frames: int = 30 ): publisher = RedisPublisher(uuid) if uuid else None if publisher: publisher.info("caption", "CAPTION_START") if publisher: publisher.info("caption", "Extracting frames from video...") frames = extract_frames(video_path, max_frames) if publisher: publisher.info("caption", f"Extracted {len(frames)} frames") base_path = os.path.dirname(output_path) uuid_name = os.path.basename(output_path).split(".")[0] yolo_objects = [] ocr_texts = [] scene_info = {} yolo_path = os.path.join(base_path, f"{uuid_name}.yolo.json") if os.path.exists(yolo_path): with open(yolo_path) as f: yolo_data = json.load(f) for frame in yolo_data.get("frames", []): for obj in frame.get("objects", []): obj["timestamp"] = frame.get("timestamp", 0) yolo_objects.append(obj) ocr_path = os.path.join(base_path, f"{uuid_name}.ocr.json") if os.path.exists(ocr_path): with open(ocr_path) as f: ocr_data = json.load(f) for frame in ocr_data.get("frames", []): for text in frame.get("texts", []): text["timestamp"] = frame.get("timestamp", 0) ocr_texts.append(text) scene_path = os.path.join(base_path, f"{uuid_name}.scene.json") if os.path.exists(scene_path): with open(scene_path) as f: scene_info = json.load(f) captions = [] for i, frame in enumerate(frames): if publisher and i % 5 == 0: publisher.progress( "caption", i, len(frames), f"Frame {i + 1}/{len(frames)}" ) caption_data = process_frame(frame, yolo_objects, ocr_texts, scene_info) captions.append(caption_data) try: os.remove(frame["path"]) except Exception: pass temp_dir = os.path.join(os.path.dirname(video_path), ".caption_frames") try: os.rmdir(temp_dir) except Exception: pass result = { "video_path": video_path, "total_frames": len(frames), "captions": captions, "summary": { "avg_caption_length": sum(len(c.get("caption", "")) for c in captions) / max(len(captions), 1), "moondream_count": sum( 1 for c in captions if c.get("source") == "moondream2" ), "metadata_count": sum(1 for c in captions if c.get("source") == "metadata"), "cloud_api_count": 0, }, } with open(output_path, "w") as f: json.dump(result, f, indent=2, ensure_ascii=False) if publisher: publisher.complete("caption", f"{len(captions)} frames captioned (LOCAL)") return result if __name__ == "__main__": parser = argparse.ArgumentParser(description="Video Caption Generator (LOCAL ONLY)") parser.add_argument("video_path", help="Path to video file") parser.add_argument("output_path", help="Output JSON path") parser.add_argument("--uuid", help="UUID for progress tracking", default="") parser.add_argument( "--max-frames", type=int, default=30, help="Maximum frames to caption" ) args = parser.parse_args() result = run_caption(args.video_path, args.output_path, args.uuid, args.max_frames) print(f"Caption generated: {result['total_frames']} frames (LOCAL)")