Phase 2.6.1: co_occurrence_edges migration - build_co_occurrence_edges_from_qdrant() - Qdrant embeddings → frame grouping → YOLO objects - Result: 6679 edges (vs 6701 PostgreSQL) Phase 2.6.2: face_face_edges migration - build_face_face_edges_from_qdrant() - Qdrant embeddings → frame grouping → face pairs - mutual_gaze detection preserved - Result: 6 edges (exact match) Phase 2.6.3: speaker_face_edges migration - build_speaker_face_edges_from_qdrant() - Qdrant embeddings → trace_id frame ranges - SPEAKS_AS edge creation Architecture: - All edges use Qdrant payload (no face_detections queries) - PostgreSQL fallback for empty Qdrant - Estimated 3.6x performance improvement Testing: - Playground (3003): ✓ All Phase 2.6 logs verified - Edge counts: ✓ Close match with PostgreSQL - Fallback: ✓ Working Docs: - docs_v1.0/DESIGN/TKG_PHASE2_6_EDGES_MIGRATION.md - docs_v1.0/M4_workspace/2026-06-21_phase2_6_test.md
354 lines
11 KiB
Python
354 lines
11 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Scene Classifier v1.11 — E4B per-CUT multimodal scene classification
|
|
|
|
For each CUT scene:
|
|
1. Extract mid-frame image via ffmpeg
|
|
2. Read ASR/OCR/YOLO context from DB (optional)
|
|
3. Send multimodal request to E4B (vMLX, port 8000)
|
|
4. Parse structured JSON response
|
|
5. Output aligned with CUT scene_number, start_frame, end_frame
|
|
|
|
Output format:
|
|
{
|
|
"frame_count": N,
|
|
"fps": 30.0,
|
|
"scenes": [
|
|
{
|
|
"scene_number": 1,
|
|
"start_time": 0.0,
|
|
"end_time": 2.87,
|
|
"start_frame": 0,
|
|
"end_frame": 69,
|
|
"scene_type": "interview",
|
|
"scene_type_zh": "採訪",
|
|
"confidence": 0.95,
|
|
"top_5": [{"scene_type": "interview", "confidence": 0.95}, ...],
|
|
"summary": "...",
|
|
"key_objects": ["..."]
|
|
}
|
|
]
|
|
}
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import base64
|
|
|
|
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
LLM_URL = os.environ.get(
|
|
"E4B_URL",
|
|
os.environ.get("MOMENTRY_LLM_URL", "http://127.0.0.1:8000/v1/chat/completions"),
|
|
)
|
|
LLM_MODEL = os.environ.get("E4B_MODEL", "gemma-4-E4B")
|
|
|
|
SCENE_TYPE_ZH = {
|
|
"interview": "採訪",
|
|
"presentation": "簡報",
|
|
"monologue": "獨白",
|
|
"dialogue": "對話",
|
|
"action": "動作",
|
|
"news_broadcast": "新聞播報",
|
|
"tutorial": "教學",
|
|
"vlog": "部落格影片",
|
|
"music_performance": "音樂表演",
|
|
"sports": "體育",
|
|
"cooking": "烹飪",
|
|
"gaming": "遊戲",
|
|
"travel": "旅遊",
|
|
"nature": "自然",
|
|
"studio": "攝影棚",
|
|
"outdoor": "戶外",
|
|
"indoor": "室內",
|
|
"conference": "會議",
|
|
"ceremony": "典禮",
|
|
"documentary": "紀錄片",
|
|
"commercial": "廣告",
|
|
"opening_credits": "片頭",
|
|
"closing_credits": "片尾",
|
|
"transition": "轉場",
|
|
"title_card": "字幕卡",
|
|
"other": "其他",
|
|
}
|
|
|
|
|
|
def load_cut_json(cut_path):
|
|
"""Load CUT scenes from JSON file."""
|
|
if not os.path.exists(cut_path):
|
|
print(f"[SCENE] CUT JSON not found: {cut_path}", file=sys.stderr)
|
|
return None
|
|
with open(cut_path) as f:
|
|
data = json.load(f)
|
|
return data
|
|
|
|
|
|
def extract_frame(video_path, timestamp_sec):
|
|
"""Extract a single frame at given timestamp as base64 JPEG."""
|
|
try:
|
|
result = subprocess.run(
|
|
[
|
|
"ffmpeg", "-y", "-ss", str(timestamp_sec),
|
|
"-i", video_path,
|
|
"-vframes", "1",
|
|
"-f", "image2pipe",
|
|
"-vcodec", "mjpeg",
|
|
"-q:v", "2",
|
|
"pipe:1",
|
|
],
|
|
capture_output=True,
|
|
timeout=30,
|
|
)
|
|
if result.returncode == 0 and len(result.stdout) > 100:
|
|
return base64.b64encode(result.stdout).decode("utf-8")
|
|
except Exception as e:
|
|
print(f"[SCENE] Frame extraction failed at {timestamp_sec:.1f}s: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def call_e4b(image_b64, context_text="", max_retries=2):
|
|
"""Send image + text to E4B for scene classification."""
|
|
prompt_parts = [
|
|
{
|
|
"type": "text",
|
|
"text": (
|
|
"Classify the scene in this video frame. "
|
|
"Respond with JSON ONLY (no markdown, no explanation):\n"
|
|
'{"scene_type": "type", "confidence": 0.0, "top_5": '
|
|
'[{"scene_type": "type", "confidence": 0.0}], "summary": "brief description"}\n\n'
|
|
"Valid scene types: interview, presentation, monologue, dialogue, "
|
|
"action, news_broadcast, tutorial, vlog, music_performance, sports, "
|
|
"cooking, gaming, travel, nature, studio, outdoor, indoor, conference, "
|
|
"ceremony, documentary, commercial, opening_credits, closing_credits, "
|
|
"transition, title_card, other"
|
|
),
|
|
}
|
|
]
|
|
|
|
if image_b64:
|
|
prompt_parts.append(
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {"url": f"data:image/jpeg;base64,{image_b64}"},
|
|
}
|
|
)
|
|
|
|
if context_text:
|
|
prompt_parts.append(
|
|
{"type": "text", "text": f"\nContext: {context_text}"}
|
|
)
|
|
|
|
messages = [{"role": "user", "content": prompt_parts}]
|
|
|
|
payload = {
|
|
"model": LLM_MODEL,
|
|
"messages": messages,
|
|
"max_tokens": 512,
|
|
"temperature": 0.1,
|
|
}
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
import requests
|
|
|
|
resp = requests.post(LLM_URL, json=payload, timeout=120)
|
|
if resp.status_code == 200:
|
|
content = resp.json()["choices"][0]["message"]["content"].strip()
|
|
# Strip markdown code fences
|
|
if content.startswith("```"):
|
|
content = content.split("\n", 1)[-1]
|
|
if "```" in content:
|
|
content = content.rsplit("```", 1)[0]
|
|
result = json.loads(content)
|
|
return result
|
|
else:
|
|
print(
|
|
f"[SCENE] E4B HTTP {resp.status_code}: {resp.text[:200]}",
|
|
file=sys.stderr,
|
|
)
|
|
except json.JSONDecodeError:
|
|
print(f"[SCENE] JSON parse error (attempt {attempt + 1})", file=sys.stderr)
|
|
except Exception as e:
|
|
print(f"[SCENE] E4B error (attempt {attempt + 1}): {e}", file=sys.stderr)
|
|
time.sleep(1)
|
|
|
|
return None
|
|
|
|
|
|
def query_asr_text(file_uuid, start_time, end_time):
|
|
"""Query ASR text from DB for context."""
|
|
try:
|
|
import psycopg2
|
|
|
|
db_url = os.environ.get("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry")
|
|
schema = os.environ.get("MOMENTRY_DB_SCHEMA", "dev")
|
|
conn = psycopg2.connect(db_url)
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
f"""
|
|
SELECT text_content FROM {schema}.chunk
|
|
WHERE file_uuid = %s AND chunk_type = 'sentence'
|
|
AND start_time >= %s AND end_time <= %s
|
|
ORDER BY start_time
|
|
""",
|
|
(file_uuid, start_time, end_time),
|
|
)
|
|
texts = [row[0] for row in cur.fetchall() if row[0]]
|
|
cur.close()
|
|
conn.close()
|
|
return " ".join(texts) if texts else ""
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def get_fps(cut_data):
|
|
"""Get FPS from CUT data."""
|
|
fps = cut_data.get("fps")
|
|
if fps and fps > 0:
|
|
return fps
|
|
return 30.0
|
|
|
|
|
|
def classify_scenes(video_path, cut_path, output_path, file_uuid=None):
|
|
"""Main classification: iterate CUT scenes, classify each via E4B."""
|
|
# Load CUT data
|
|
cut_data = load_cut_json(cut_path)
|
|
if not cut_data:
|
|
print("[SCENE] No CUT data available, returning empty result", file=sys.stderr)
|
|
result = {
|
|
"frame_count": 0,
|
|
"fps": 0.0,
|
|
"scenes": [],
|
|
"metadata": {"error": "No CUT data"},
|
|
}
|
|
with open(output_path, "w") as f:
|
|
json.dump(result, f, indent=2, ensure_ascii=False)
|
|
return result
|
|
|
|
scenes = cut_data.get("scenes", [])
|
|
if not scenes:
|
|
print("[SCENE] No scenes in CUT data", file=sys.stderr)
|
|
result = {"frame_count": 0, "fps": 0.0, "scenes": []}
|
|
with open(output_path, "w") as f:
|
|
json.dump(result, f, indent=2, ensure_ascii=False)
|
|
return result
|
|
|
|
fps = get_fps(cut_data)
|
|
frame_count = cut_data.get("total_frames", cut_data.get("frame_count", 0))
|
|
print(f"[SCENE] Loaded {len(scenes)} CUT scenes, fps={fps}", file=sys.stderr)
|
|
|
|
results = []
|
|
for i, scene in enumerate(scenes):
|
|
start_time = scene.get("start_time", 0)
|
|
end_time = scene.get("end_time", 0)
|
|
start_frame = scene.get("start_frame", int(start_time * fps))
|
|
end_frame = scene.get("end_frame", int(end_time * fps))
|
|
scene_number = scene.get("scene_number", i + 1)
|
|
|
|
# Extract mid-frame image
|
|
mid_time = (start_time + end_time) / 2
|
|
image_b64 = extract_frame(video_path, mid_time)
|
|
|
|
# Get ASR context
|
|
asr_text = ""
|
|
if file_uuid:
|
|
asr_text = query_asr_text(file_uuid, start_time, end_time)
|
|
|
|
context_parts = []
|
|
if asr_text:
|
|
context_parts.append(f"Audio: {asr_text[:200]}")
|
|
|
|
context_text = " | ".join(context_parts) if context_parts else ""
|
|
|
|
# Call E4B
|
|
e4b_resp = None
|
|
if image_b64:
|
|
e4b_resp = call_e4b(image_b64, context_text)
|
|
|
|
scene_type = "other"
|
|
confidence = 0.0
|
|
top_5 = []
|
|
summary = ""
|
|
|
|
if e4b_resp:
|
|
scene_type = e4b_resp.get("scene_type", "other")
|
|
confidence = e4b_resp.get("confidence", 0.0)
|
|
top_5_raw = e4b_resp.get("top_5", [])
|
|
top_5 = [
|
|
{"scene_type": p.get("scene_type", "unknown"), "confidence": p.get("confidence", 0.0)}
|
|
for p in top_5_raw
|
|
]
|
|
summary = e4b_resp.get("summary", "")
|
|
|
|
scene_result = {
|
|
"scene_number": scene_number,
|
|
"start_time": start_time,
|
|
"end_time": end_time,
|
|
"start_frame": start_frame,
|
|
"end_frame": end_frame,
|
|
"scene_type": scene_type,
|
|
"scene_type_zh": SCENE_TYPE_ZH.get(scene_type),
|
|
"confidence": confidence,
|
|
"top_5": top_5,
|
|
"summary": summary,
|
|
}
|
|
results.append(scene_result)
|
|
|
|
if i % 10 == 0:
|
|
print(f"[SCENE] Processed {i + 1}/{len(scenes)} scenes", file=sys.stderr)
|
|
|
|
result = {
|
|
"frame_count": frame_count,
|
|
"fps": fps,
|
|
"scenes": results,
|
|
}
|
|
|
|
with open(output_path, "w") as f:
|
|
json.dump(result, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"[SCENE] Classified {len(results)} scenes → {output_path}", file=sys.stderr)
|
|
return result
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Scene Classifier v1.11 (E4B per-CUT)")
|
|
parser.add_argument("video_path", nargs="?", help="Video file path")
|
|
parser.add_argument("output_path", nargs="?", help="Output JSON path")
|
|
parser.add_argument("--uuid", help="File UUID", default=None)
|
|
parser.add_argument("--cut-json", help="Path to CUT JSON file", default=None)
|
|
parser.add_argument("--check-health", action="store_true", help="Check E4B availability")
|
|
args = parser.parse_args()
|
|
|
|
if args.check_health:
|
|
print("=== Scene Classifier v1.11 (E4B) ===")
|
|
print(f"E4B URL: {LLM_URL}")
|
|
print(f"E4B Model: {LLM_MODEL}")
|
|
try:
|
|
import requests
|
|
|
|
resp = requests.get(LLM_URL.replace("/v1/chat/completions", "/health"), timeout=5)
|
|
print(f"E4B Health: {resp.status_code}")
|
|
except Exception as e:
|
|
print(f"E4B Health: ERROR - {e}")
|
|
sys.exit(0)
|
|
|
|
if not args.video_path or not args.output_path:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
# Find cut JSON if not specified
|
|
cut_path = args.cut_json
|
|
if not cut_path:
|
|
base = os.path.splitext(args.output_path)[0]
|
|
candidate = base.replace(".scene", ".cut") + ".json"
|
|
if os.path.exists(candidate):
|
|
cut_path = candidate
|
|
else:
|
|
candidate2 = base.rsplit(".", 1)[0] + ".cut.json"
|
|
if os.path.exists(candidate2):
|
|
cut_path = candidate2
|
|
|
|
classify_scenes(args.video_path, cut_path, args.output_path, file_uuid=args.uuid) |