#!/opt/homebrew/bin/python3.11 """ Story Processor V2.0 — Dual Pipeline: Story-based + LLM-based Parent-Child Summarization Pipeline 1 (Story): Template-based, instant, no LLM cost → Parent story summary + Child story summary → Embedding (Ollama nomic-embed) → pgvector → BM25 (PostgreSQL tsvector) → full-text search Pipeline 2 (LLM): LLM-based summarization (Gemma4/Qwen when resources allow) → Parent LLM summary + Child LLM summary → Embedding → pgvector + BM25 Both pipelines store into chunks table with distinct chunk_types: story_parent, story_child, llm_parent, llm_child Usage: python parent_chunk_5w1h.py --file-uuid --mode story [--embed] python parent_chunk_5w1h.py --file-uuid --mode llm [--embed] """ import json, os, sys, argparse, time, requests, psycopg2 from collections import defaultdict from typing import Dict, List, Optional sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) DB_URL = os.getenv("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry") SCHEMA = os.getenv("DATABASE_SCHEMA", "dev") OUTPUT_DIR = os.getenv("MOMENTRY_OUTPUT_DIR", "/Users/accusys/momentry/output_dev") EMBEDDING_URL = os.getenv("EMBEDDING_URL", "http://localhost:11436/v1/embeddings") def load_speaker_map(file_uuid: str) -> dict: """Load speaker→identity mapping from DB (generalized, not hardcoded)""" try: conn = psycopg2.connect(DB_URL) cur = conn.cursor() cur.execute("SET search_path TO %s, public", (SCHEMA,)) cur.execute( "SELECT metadata->>'speaker_id', name FROM identities " "WHERE metadata->>'speaker_id' IS NOT NULL" ) spk_map = {} for spk_id, name in cur.fetchall(): spk_map[spk_id] = (name, 0.85) # default confidence from MAR cur.close(); conn.close() return spk_map if spk_map else DEFAULT_SPEAKER_MAP except Exception: return DEFAULT_SPEAKER_MAP # Default fallback (used when DB has no speaker mapping) DEFAULT_SPEAKER_MAP = {} CURRENT_VERSIONS = { "asr": "faster-whisper/small/v1", "asrx": "speechbrain/ecapa-tdnn/v1", "cut": "pyscenedetect/default", "yolo": "yolov5-coreml/v2", "face_detection": "apple-vision/v2", "face_embedding": "coreml-facenet/v2", "speaker_binding": "mar-lip/v1", "identity_clustering": "cosine-threshold/v1", "story_agent": "template/v2.0", "embedding_agent": "nomic-embed-768d/v1", } LLM_URL = os.getenv("MOMENTRY_LLM_URL", os.getenv("MOMENTRY_LLM_SUMMARY_URL", "http://127.0.0.1:8082/v1/chat/completions")) LLM_MODEL = os.getenv("MOMENTRY_LLM_SUMMARY_MODEL", "gemma4") def load_data(file_uuid: str) -> dict: data = {} for name in ["asr", "asrx", "cut"]: path = os.path.join(OUTPUT_DIR, f"{file_uuid}.{name}.json") data[name] = json.load(open(path)) if os.path.exists(path) else None return data def build_child_chunks(data: dict, file_uuid: str) -> List[dict]: """Group ASR sentences by CUT scene boundaries → parent/child structure.""" asr_segs = data["asr"].get("segments", []) if data["asr"] else [] asrx_segs = data["asrx"].get("segments", []) if data["asrx"] else [] cut_scenes = data["cut"].get("scenes", []) if data["cut"] else [] # Dynamically load speaker→identity mapping from DB speaker_map = load_speaker_map(file_uuid) if not cut_scenes: max_t = max( (asr_segs[-1].get("end", 0) if asr_segs else 0), (asrx_segs[-1].get("end_time", 0) if asrx_segs else 0), ) cut_scenes = [{"start_time": t, "end_time": min(t + 60, max_t)} for t in range(0, int(max_t) + 60, 60)] scenes = [] for cs in cut_scenes: s, e = cs["start_time"], cs["end_time"] children = [] for seg_idx, seg in enumerate(asr_segs): st, en = seg.get("start", 0), seg.get("end", 0) text = seg.get("text", "").strip() if st < s or en > e or not text: continue spk_id = "unknown" for ax in asrx_segs: if ax["start_time"] <= st and ax["end_time"] >= en: spk_id = ax.get("speaker_id", "unknown"); break spk_info = speaker_map.get(spk_id) if spk_info: character, spk_conf = spk_info else: character, spk_conf = spk_id, 0.0 children.append({ "start": st, "end": en, "text": text, "speaker_id": spk_id, "speaker_name": character, "speaker_confidence": spk_conf, "chunk_id": f"{file_uuid}_{seg_idx}", }) # Boundary overlap: even empty scenes get partial children for seg_idx, seg in enumerate(asr_segs): st, en = seg.get("start", 0), seg.get("end", 0) text = seg.get("text", "").strip() if not text: continue if st >= s and en <= e: continue if not (st < e and en > s): continue spk_id = "unknown" for ax in asrx_segs: if ax["start_time"] <= st and ax["end_time"] >= en: spk_id = ax.get("speaker_id", "unknown"); break spk_info = speaker_map.get(spk_id) if spk_info: character, spk_conf = spk_info else: character, spk_conf = spk_id, 0.0 children.append({ "start": st, "end": en, "text": text, "speaker_id": spk_id, "speaker_name": character, "speaker_confidence": spk_conf, "chunk_id": f"{file_uuid}_{seg_idx}", "overlap_type": "partial", }) if children: scenes.append({ "start_time": s, "end_time": e, "duration": e - s, "children": children, "child_count": len(children), }) return scenes # ===== Pipeline 1: Story (Template) Summaries ===== def generate_story_parent_summary(scene: dict) -> str: children = scene["children"] characters = sorted(set(c["speaker_name"] for c in children)) total_words = sum(len(c["text"].split()) for c in children) by_speaker = defaultdict(list) for c in children: by_speaker[c["speaker_name"]].append(c["text"]) speakers = [] for char, texts in sorted(by_speaker.items()): speakers.append(f"{char} ({len(texts)} lines)") return ( f"[{scene['start_time']:.0f}s-{scene['end_time']:.0f}s, {scene['duration']:.0f}s] " f"Cast: {', '.join(characters)}. Total: {len(children)} lines, {total_words} words. " f"Speakers: {' | '.join(speakers[:3])}" ) def generate_story_child_summary(child: dict, parent_summary: str) -> str: return ( f"[{child['start']:.0f}s-{child['end']:.0f}s] " f"{child['speaker_name']}: \"{child['text']}\"" ) # ===== Pipeline 2: LLM Summaries (requires LLM server) ===== def generate_llm_parent_summary(scene: dict, max_scenes_processed: int) -> Optional[str]: """LLM-based parent summary""" if not LLM_URL: return None children = scene["children"] dialogue = "\n".join( f"[{c['start']:.0f}s] {c['speaker_name']}: {c['text'][:150]}" for c in children[:15] ) prompt = ( "You are a film analyst. Summarize this scene in one flowing paragraph (60-100 words). " "Include: who is present, what they discuss, tone/mood.\n\n" f"Scene: {scene['start_time']:.0f}s - {scene['end_time']:.0f}s\n" f"Dialogue:\n{dialogue}\n\nSummary:" ) try: resp = requests.post(LLM_URL, json={ "model": LLM_MODEL, "messages": [{"role": "user", "content": prompt}], "max_tokens": 200, "temperature": 0.3, }, timeout=60) return resp.json()["choices"][0]["message"]["content"].strip() except Exception as e: print(f" ⚠️ LLM parent summary failed: {e}") return None def generate_llm_child_summary(child: dict, parent_summary: str) -> Optional[str]: """LLM-based child (sentence) summary""" return f"[{child['start']:.0f}s-{child['end']:.0f}s] {child['speaker_name']}: \"{child['text']}\"" # ===== Embedding (Ollama nomic-embed) ===== def embed_text(text: str, max_retries: int = 3) -> Optional[List[float]]: """Get embedding via EmbeddingGemma server""" for attempt in range(max_retries): try: resp = requests.post(EMBEDDING_URL, json={ "input": [text], }, timeout=30) if resp.status_code == 200: data = resp.json() items = data.get("data", []) if items: return items[0]["embedding"] except Exception as e: if attempt == max_retries - 1: print(f" ⚠️ Embedding failed: {e}") return None time.sleep(1) return None # ===== DB Store (chunks table with embedding + BM25) ===== def store_chunks(file_uuid: str, scenes: List[dict], mode: str, do_embed: bool, conn): """Store parent + child summaries into chunks table.""" cur = conn.cursor() parent_type = f"{mode}_parent" child_type = f"{mode}_child" parent_count = 0 child_count = 0 # Get base chunk_index cur.execute( f"SELECT COALESCE(MAX(chunk_index), 0) FROM {SCHEMA}.chunk WHERE file_uuid = %s", (file_uuid,), ) next_index = (cur.fetchone()[0] or 0) + 1 for scene in scenes: parent_text = generate_story_parent_summary(scene) if mode == "story" else generate_llm_parent_summary(scene, parent_count) if not parent_text: continue parent_id = f"{mode}_parent_{file_uuid}_{scene['start_time']:.0f}_{scene['end_time']:.0f}" parent_embedding = embed_text(parent_text) if do_embed else None if do_embed and parent_embedding: cur.execute( f""" INSERT INTO {SCHEMA}.chunk (chunk_id, old_chunk_id, file_uuid, chunk_type, chunk_index, start_time, end_time, content, text_content, parent_chunk_id, embedding) VALUES (%s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s, %s, %s::vector) ON CONFLICT (file_uuid, old_chunk_id) DO UPDATE SET content = EXCLUDED.content, text_content = EXCLUDED.text_content, embedding = EXCLUDED.embedding """, (parent_id, parent_id, file_uuid, parent_type, next_index, scene["start_time"], scene["end_time"], json.dumps({"summary": parent_text, "mode": mode, "type": "parent", "source_versions": CURRENT_VERSIONS}), parent_text, None, parent_embedding), ) else: cur.execute( f""" INSERT INTO {SCHEMA}.chunk (chunk_id, old_chunk_id, file_uuid, chunk_type, chunk_index, start_time, end_time, content, text_content, parent_chunk_id) VALUES (%s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s, %s) ON CONFLICT (file_uuid, old_chunk_id) DO UPDATE SET content = EXCLUDED.content, text_content = EXCLUDED.text_content """, (parent_id, parent_id, file_uuid, parent_type, next_index, scene["start_time"], scene["end_time"], json.dumps({"summary": parent_text, "mode": mode, "type": "parent", "source_versions": CURRENT_VERSIONS}), parent_text, None), ) next_index += 1 parent_count += 1 for child in scene["children"]: child_id = child["chunk_id"] child_text = generate_story_child_summary(child, parent_text) if mode == "story" else generate_llm_child_summary(child, parent_text) child_embedding = embed_text(child_text) if do_embed else None if do_embed and child_embedding: cur.execute( f""" INSERT INTO {SCHEMA}.chunk (chunk_id, old_chunk_id, file_uuid, chunk_type, chunk_index, start_time, end_time, content, text_content, parent_chunk_id, embedding) VALUES (%s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s, %s, %s::vector) ON CONFLICT (file_uuid, old_chunk_id) DO UPDATE SET content = EXCLUDED.content, text_content = EXCLUDED.text_content, parent_chunk_id = EXCLUDED.parent_chunk_id, embedding = EXCLUDED.embedding """, (child_id, child_id, file_uuid, child_type, next_index, child["start"], child["end"], json.dumps({"speaker": child["speaker_name"], "text": child["text"], "mode": mode, "speaker_confidence": child.get("speaker_confidence", 0), "source_versions": CURRENT_VERSIONS}), child_text, parent_id, child_embedding), ) else: cur.execute( f""" INSERT INTO {SCHEMA}.chunk (chunk_id, old_chunk_id, file_uuid, chunk_type, chunk_index, start_time, end_time, content, text_content, parent_chunk_id) VALUES (%s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s, %s) ON CONFLICT (file_uuid, old_chunk_id) DO UPDATE SET content = EXCLUDED.content, text_content = EXCLUDED.text_content, parent_chunk_id = EXCLUDED.parent_chunk_id """, (child_id, child_id, file_uuid, child_type, next_index, child["start"], child["end"], json.dumps({"speaker": child["speaker_name"], "text": child["text"], "mode": mode, "speaker_confidence": child.get("speaker_confidence", 0), "source_versions": CURRENT_VERSIONS}), child_text, parent_id), ) next_index += 1 child_count += 1 conn.commit() cur.close() return parent_count, child_count def main(): parser = argparse.ArgumentParser(description="Story Processor V2.0") parser.add_argument("--file-uuid", required=True) parser.add_argument("--mode", choices=["story", "llm"], default="story") parser.add_argument("--max-scenes", type=int, default=99999) parser.add_argument("--embed", action="store_true", help="Generate embeddings (Ollama)") parser.add_argument("--no-db", action="store_true", help="Skip DB storage") args = parser.parse_args() file_uuid = args.file_uuid print(f"[STORY] Mode: {args.mode}, Embed: {args.embed}") data = load_data(file_uuid) if not data["asr"]: print("[STORY] ❌ No ASR data"); return scenes = build_child_chunks(data, file_uuid)[:args.max_scenes] total_children = sum(s["child_count"] for s in scenes) print(f"[STORY] {len(scenes)} scenes, {total_children} child chunks") if not args.no_db: conn = psycopg2.connect(DB_URL) try: pc, cc = store_chunks(file_uuid, scenes, args.mode, args.embed, conn) print(f"[STORY] DB: {pc} parent, {cc} child chunks ({args.mode})") finally: conn.close() # Save JSON output out_path = os.path.join(OUTPUT_DIR, f"{file_uuid}.story_{args.mode}.json") out_data = {"file_uuid": file_uuid, "mode": args.mode, "scenes": scenes} with open(out_path, "w") as f: json.dump(out_data, f, indent=2, ensure_ascii=False, default=str) print(f"[STORY] ✅ {out_path}") if __name__ == "__main__": main()