#!/opt/homebrew/bin/python3.11 """ Scene Classifier v1.11 — E4B per-CUT multimodal scene classification For each CUT scene: 1. Extract mid-frame image via ffmpeg 2. Read ASR/OCR/YOLO context from DB (optional) 3. Send multimodal request to E4B (vMLX, port 8000) 4. Parse structured JSON response 5. Output aligned with CUT scene_number, start_frame, end_frame Output format: { "frame_count": N, "fps": 30.0, "scenes": [ { "scene_number": 1, "start_time": 0.0, "end_time": 2.87, "start_frame": 0, "end_frame": 69, "scene_type": "interview", "scene_type_zh": "採訪", "confidence": 0.95, "top_5": [{"scene_type": "interview", "confidence": 0.95}, ...], "summary": "...", "key_objects": ["..."] } ] } """ import argparse import json import os import subprocess import sys import time import base64 SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) LLM_URL = os.environ.get( "E4B_URL", os.environ.get("MOMENTRY_LLM_URL", "http://127.0.0.1:8000/v1/chat/completions"), ) LLM_MODEL = os.environ.get("E4B_MODEL", "gemma-4-E4B") SCENE_TYPE_ZH = { "interview": "採訪", "presentation": "簡報", "monologue": "獨白", "dialogue": "對話", "action": "動作", "news_broadcast": "新聞播報", "tutorial": "教學", "vlog": "部落格影片", "music_performance": "音樂表演", "sports": "體育", "cooking": "烹飪", "gaming": "遊戲", "travel": "旅遊", "nature": "自然", "studio": "攝影棚", "outdoor": "戶外", "indoor": "室內", "conference": "會議", "ceremony": "典禮", "documentary": "紀錄片", "commercial": "廣告", "opening_credits": "片頭", "closing_credits": "片尾", "transition": "轉場", "title_card": "字幕卡", "other": "其他", } def load_cut_json(cut_path): """Load CUT scenes from JSON file.""" if not os.path.exists(cut_path): print(f"[SCENE] CUT JSON not found: {cut_path}", file=sys.stderr) return None with open(cut_path) as f: data = json.load(f) return data def extract_frame(video_path, timestamp_sec): """Extract a single frame at given timestamp as base64 JPEG.""" try: result = subprocess.run( [ "ffmpeg", "-y", "-ss", str(timestamp_sec), "-i", video_path, "-vframes", "1", "-f", "image2pipe", "-vcodec", "mjpeg", "-q:v", "2", "pipe:1", ], capture_output=True, timeout=30, ) if result.returncode == 0 and len(result.stdout) > 100: return base64.b64encode(result.stdout).decode("utf-8") except Exception as e: print(f"[SCENE] Frame extraction failed at {timestamp_sec:.1f}s: {e}", file=sys.stderr) return None def call_e4b(image_b64, context_text="", max_retries=2): """Send image + text to E4B for scene classification.""" prompt_parts = [ { "type": "text", "text": ( "Classify the scene in this video frame. " "Respond with JSON ONLY (no markdown, no explanation):\n" '{"scene_type": "type", "confidence": 0.0, "top_5": ' '[{"scene_type": "type", "confidence": 0.0}], "summary": "brief description"}\n\n' "Valid scene types: interview, presentation, monologue, dialogue, " "action, news_broadcast, tutorial, vlog, music_performance, sports, " "cooking, gaming, travel, nature, studio, outdoor, indoor, conference, " "ceremony, documentary, commercial, opening_credits, closing_credits, " "transition, title_card, other" ), } ] if image_b64: prompt_parts.append( { "type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}, } ) if context_text: prompt_parts.append( {"type": "text", "text": f"\nContext: {context_text}"} ) messages = [{"role": "user", "content": prompt_parts}] payload = { "model": LLM_MODEL, "messages": messages, "max_tokens": 512, "temperature": 0.1, } for attempt in range(max_retries): try: import requests resp = requests.post(LLM_URL, json=payload, timeout=120) if resp.status_code == 200: content = resp.json()["choices"][0]["message"]["content"].strip() # Strip markdown code fences if content.startswith("```"): content = content.split("\n", 1)[-1] if "```" in content: content = content.rsplit("```", 1)[0] result = json.loads(content) return result else: print( f"[SCENE] E4B HTTP {resp.status_code}: {resp.text[:200]}", file=sys.stderr, ) except json.JSONDecodeError: print(f"[SCENE] JSON parse error (attempt {attempt + 1})", file=sys.stderr) except Exception as e: print(f"[SCENE] E4B error (attempt {attempt + 1}): {e}", file=sys.stderr) time.sleep(1) return None def query_asr_text(file_uuid, start_time, end_time): """Query ASR text from DB for context.""" try: import psycopg2 db_url = os.environ.get("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry") schema = os.environ.get("MOMENTRY_DB_SCHEMA", "dev") conn = psycopg2.connect(db_url) cur = conn.cursor() cur.execute( f""" SELECT text_content FROM {schema}.chunk WHERE file_uuid = %s AND chunk_type = 'sentence' AND start_time >= %s AND end_time <= %s ORDER BY start_time """, (file_uuid, start_time, end_time), ) texts = [row[0] for row in cur.fetchall() if row[0]] cur.close() conn.close() return " ".join(texts) if texts else "" except Exception: return "" def get_fps(cut_data): """Get FPS from CUT data.""" fps = cut_data.get("fps") if fps and fps > 0: return fps return 30.0 def classify_scenes(video_path, cut_path, output_path, file_uuid=None): """Main classification: iterate CUT scenes, classify each via E4B.""" # Load CUT data cut_data = load_cut_json(cut_path) if not cut_data: print("[SCENE] No CUT data available, returning empty result", file=sys.stderr) result = { "frame_count": 0, "fps": 0.0, "scenes": [], "metadata": {"error": "No CUT data"}, } with open(output_path, "w") as f: json.dump(result, f, indent=2, ensure_ascii=False) return result scenes = cut_data.get("scenes", []) if not scenes: print("[SCENE] No scenes in CUT data", file=sys.stderr) result = {"frame_count": 0, "fps": 0.0, "scenes": []} with open(output_path, "w") as f: json.dump(result, f, indent=2, ensure_ascii=False) return result fps = get_fps(cut_data) frame_count = cut_data.get("total_frames", cut_data.get("frame_count", 0)) print(f"[SCENE] Loaded {len(scenes)} CUT scenes, fps={fps}", file=sys.stderr) results = [] for i, scene in enumerate(scenes): start_time = scene.get("start_time", 0) end_time = scene.get("end_time", 0) start_frame = scene.get("start_frame", int(start_time * fps)) end_frame = scene.get("end_frame", int(end_time * fps)) scene_number = scene.get("scene_number", i + 1) # Extract mid-frame image mid_time = (start_time + end_time) / 2 image_b64 = extract_frame(video_path, mid_time) # Get ASR context asr_text = "" if file_uuid: asr_text = query_asr_text(file_uuid, start_time, end_time) context_parts = [] if asr_text: context_parts.append(f"Audio: {asr_text[:200]}") context_text = " | ".join(context_parts) if context_parts else "" # Call E4B e4b_resp = None if image_b64: e4b_resp = call_e4b(image_b64, context_text) scene_type = "other" confidence = 0.0 top_5 = [] summary = "" if e4b_resp: scene_type = e4b_resp.get("scene_type", "other") confidence = e4b_resp.get("confidence", 0.0) top_5_raw = e4b_resp.get("top_5", []) top_5 = [ {"scene_type": p.get("scene_type", "unknown"), "confidence": p.get("confidence", 0.0)} for p in top_5_raw ] summary = e4b_resp.get("summary", "") scene_result = { "scene_number": scene_number, "start_time": start_time, "end_time": end_time, "start_frame": start_frame, "end_frame": end_frame, "scene_type": scene_type, "scene_type_zh": SCENE_TYPE_ZH.get(scene_type), "confidence": confidence, "top_5": top_5, "summary": summary, } results.append(scene_result) if i % 10 == 0: print(f"[SCENE] Processed {i + 1}/{len(scenes)} scenes", file=sys.stderr) result = { "frame_count": frame_count, "fps": fps, "scenes": results, } with open(output_path, "w") as f: json.dump(result, f, indent=2, ensure_ascii=False) print(f"[SCENE] Classified {len(results)} scenes → {output_path}", file=sys.stderr) return result if __name__ == "__main__": parser = argparse.ArgumentParser(description="Scene Classifier v1.11 (E4B per-CUT)") parser.add_argument("video_path", nargs="?", help="Video file path") parser.add_argument("output_path", nargs="?", help="Output JSON path") parser.add_argument("--uuid", help="File UUID", default=None) parser.add_argument("--cut-json", help="Path to CUT JSON file", default=None) parser.add_argument("--check-health", action="store_true", help="Check E4B availability") args = parser.parse_args() if args.check_health: print("=== Scene Classifier v1.11 (E4B) ===") print(f"E4B URL: {LLM_URL}") print(f"E4B Model: {LLM_MODEL}") try: import requests resp = requests.get(LLM_URL.replace("/v1/chat/completions", "/health"), timeout=5) print(f"E4B Health: {resp.status_code}") except Exception as e: print(f"E4B Health: ERROR - {e}") sys.exit(0) if not args.video_path or not args.output_path: parser.print_help() sys.exit(1) # Find cut JSON if not specified cut_path = args.cut_json if not cut_path: base = os.path.splitext(args.output_path)[0] candidate = base.replace(".scene", ".cut") + ".json" if os.path.exists(candidate): cut_path = candidate else: candidate2 = base.rsplit(".", 1)[0] + ".cut.json" if os.path.exists(candidate2): cut_path = candidate2 classify_scenes(args.video_path, cut_path, args.output_path, file_uuid=args.uuid)