Files
momentry_core/scripts/parent_chunk_5w1h.py

382 lines
16 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Story Processor V2.0 — Dual Pipeline: Story-based + LLM-based Parent-Child Summarization
Pipeline 1 (Story): Template-based, instant, no LLM cost
→ Parent story summary + Child story summary
→ Embedding (Ollama nomic-embed) → pgvector
→ BM25 (PostgreSQL tsvector) → full-text search
Pipeline 2 (LLM): LLM-based summarization (Gemma4/Qwen when resources allow)
→ Parent LLM summary + Child LLM summary
→ Embedding → pgvector + BM25
Both pipelines store into chunks table with distinct chunk_types:
story_parent, story_child, llm_parent, llm_child
Usage:
python parent_chunk_5w1h.py --file-uuid <uuid> --mode story [--embed]
python parent_chunk_5w1h.py --file-uuid <uuid> --mode llm [--embed]
"""
import json, os, sys, argparse, time, requests, psycopg2
from collections import defaultdict
from typing import Dict, List, Optional
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
DB_URL = os.getenv("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry")
SCHEMA = os.getenv("DATABASE_SCHEMA", "dev")
OUTPUT_DIR = os.getenv("MOMENTRY_OUTPUT_DIR", "/Users/accusys/momentry/output_dev")
EMBEDDING_URL = os.getenv("EMBEDDING_URL", "http://localhost:11436/v1/embeddings")
def load_speaker_map(file_uuid: str) -> dict:
"""Load speaker→identity mapping from DB (generalized, not hardcoded)"""
try:
conn = psycopg2.connect(DB_URL)
cur = conn.cursor()
cur.execute("SET search_path TO %s, public", (SCHEMA,))
cur.execute(
"SELECT metadata->>'speaker_id', name FROM identities "
"WHERE metadata->>'speaker_id' IS NOT NULL"
)
spk_map = {}
for spk_id, name in cur.fetchall():
spk_map[spk_id] = (name, 0.85) # default confidence from MAR
cur.close(); conn.close()
return spk_map if spk_map else DEFAULT_SPEAKER_MAP
except Exception:
return DEFAULT_SPEAKER_MAP
# Default fallback (used when DB has no speaker mapping)
DEFAULT_SPEAKER_MAP = {}
CURRENT_VERSIONS = {
"asr": "faster-whisper/small/v1",
"asrx": "speechbrain/ecapa-tdnn/v1",
"cut": "pyscenedetect/default",
"yolo": "yolov5-coreml/v2",
"face_detection": "apple-vision/v2",
"face_embedding": "coreml-facenet/v2",
"speaker_binding": "mar-lip/v1",
"identity_clustering": "cosine-threshold/v1",
"story_agent": "template/v2.0",
"embedding_agent": "nomic-embed-768d/v1",
}
LLM_URL = os.getenv("MOMENTRY_LLM_URL", os.getenv("MOMENTRY_LLM_SUMMARY_URL", "http://127.0.0.1:8082/v1/chat/completions"))
LLM_MODEL = os.getenv("MOMENTRY_LLM_SUMMARY_MODEL", "gemma4")
def load_data(file_uuid: str) -> dict:
data = {}
for name in ["asr", "asrx", "cut"]:
path = os.path.join(OUTPUT_DIR, f"{file_uuid}.{name}.json")
data[name] = json.load(open(path)) if os.path.exists(path) else None
return data
def build_child_chunks(data: dict, file_uuid: str) -> List[dict]:
"""Group ASR sentences by CUT scene boundaries → parent/child structure."""
asr_segs = data["asr"].get("segments", []) if data["asr"] else []
asrx_segs = data["asrx"].get("segments", []) if data["asrx"] else []
cut_scenes = data["cut"].get("scenes", []) if data["cut"] else []
# Dynamically load speaker→identity mapping from DB
speaker_map = load_speaker_map(file_uuid)
if not cut_scenes:
max_t = max(
(asr_segs[-1].get("end", 0) if asr_segs else 0),
(asrx_segs[-1].get("end_time", 0) if asrx_segs else 0),
)
cut_scenes = [{"start_time": t, "end_time": min(t + 60, max_t)} for t in range(0, int(max_t) + 60, 60)]
scenes = []
for cs in cut_scenes:
s, e = cs["start_time"], cs["end_time"]
children = []
for seg_idx, seg in enumerate(asr_segs):
st, en = seg.get("start", 0), seg.get("end", 0)
text = seg.get("text", "").strip()
if st < s or en > e or not text: continue
spk_id = "unknown"
for ax in asrx_segs:
if ax["start_time"] <= st and ax["end_time"] >= en:
spk_id = ax.get("speaker_id", "unknown"); break
spk_info = speaker_map.get(spk_id)
if spk_info:
character, spk_conf = spk_info
else:
character, spk_conf = spk_id, 0.0
children.append({
"start": st, "end": en, "text": text,
"speaker_id": spk_id, "speaker_name": character,
"speaker_confidence": spk_conf,
"chunk_id": f"{file_uuid}_{seg_idx}",
})
# Boundary overlap: even empty scenes get partial children
for seg_idx, seg in enumerate(asr_segs):
st, en = seg.get("start", 0), seg.get("end", 0)
text = seg.get("text", "").strip()
if not text: continue
if st >= s and en <= e: continue
if not (st < e and en > s): continue
spk_id = "unknown"
for ax in asrx_segs:
if ax["start_time"] <= st and ax["end_time"] >= en:
spk_id = ax.get("speaker_id", "unknown"); break
spk_info = speaker_map.get(spk_id)
if spk_info:
character, spk_conf = spk_info
else:
character, spk_conf = spk_id, 0.0
children.append({
"start": st, "end": en, "text": text,
"speaker_id": spk_id, "speaker_name": character,
"speaker_confidence": spk_conf,
"chunk_id": f"{file_uuid}_{seg_idx}",
"overlap_type": "partial",
})
if children:
scenes.append({
"start_time": s, "end_time": e, "duration": e - s,
"children": children, "child_count": len(children),
})
return scenes
# ===== Pipeline 1: Story (Template) Summaries =====
def generate_story_parent_summary(scene: dict) -> str:
children = scene["children"]
characters = sorted(set(c["speaker_name"] for c in children))
total_words = sum(len(c["text"].split()) for c in children)
by_speaker = defaultdict(list)
for c in children: by_speaker[c["speaker_name"]].append(c["text"])
speakers = []
for char, texts in sorted(by_speaker.items()):
speakers.append(f"{char} ({len(texts)} lines)")
return (
f"[{scene['start_time']:.0f}s-{scene['end_time']:.0f}s, {scene['duration']:.0f}s] "
f"Cast: {', '.join(characters)}. Total: {len(children)} lines, {total_words} words. "
f"Speakers: {' | '.join(speakers[:3])}"
)
def generate_story_child_summary(child: dict, parent_summary: str) -> str:
return (
f"[{child['start']:.0f}s-{child['end']:.0f}s] "
f"{child['speaker_name']}: \"{child['text']}\""
)
# ===== Pipeline 2: LLM Summaries (requires LLM server) =====
def generate_llm_parent_summary(scene: dict, max_scenes_processed: int) -> Optional[str]:
"""LLM-based parent summary"""
if not LLM_URL: return None
children = scene["children"]
dialogue = "\n".join(
f"[{c['start']:.0f}s] {c['speaker_name']}: {c['text'][:150]}"
for c in children[:15]
)
prompt = (
"You are a film analyst. Summarize this scene in one flowing paragraph (60-100 words). "
"Include: who is present, what they discuss, tone/mood.\n\n"
f"Scene: {scene['start_time']:.0f}s - {scene['end_time']:.0f}s\n"
f"Dialogue:\n{dialogue}\n\nSummary:"
)
try:
resp = requests.post(LLM_URL, json={
"model": LLM_MODEL,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 200, "temperature": 0.3,
}, timeout=60)
return resp.json()["choices"][0]["message"]["content"].strip()
except Exception as e:
print(f" ⚠️ LLM parent summary failed: {e}")
return None
def generate_llm_child_summary(child: dict, parent_summary: str) -> Optional[str]:
"""LLM-based child (sentence) summary"""
return f"[{child['start']:.0f}s-{child['end']:.0f}s] {child['speaker_name']}: \"{child['text']}\""
# ===== Embedding (Ollama nomic-embed) =====
def embed_text(text: str, max_retries: int = 3) -> Optional[List[float]]:
"""Get embedding via EmbeddingGemma server"""
for attempt in range(max_retries):
try:
resp = requests.post(EMBEDDING_URL, json={
"input": [text],
}, timeout=30)
if resp.status_code == 200:
data = resp.json()
items = data.get("data", [])
if items:
return items[0]["embedding"]
except Exception as e:
if attempt == max_retries - 1:
print(f" ⚠️ Embedding failed: {e}")
return None
time.sleep(1)
return None
# ===== DB Store (chunks table with embedding + BM25) =====
def store_chunks(file_uuid: str, scenes: List[dict], mode: str, do_embed: bool, conn):
"""Store parent + child summaries into chunks table."""
cur = conn.cursor()
parent_type = f"{mode}_parent"
child_type = f"{mode}_child"
parent_count = 0
child_count = 0
# Get base chunk_index
cur.execute(
f"SELECT COALESCE(MAX(chunk_index), 0) FROM {SCHEMA}.chunk WHERE file_uuid = %s",
(file_uuid,),
)
next_index = (cur.fetchone()[0] or 0) + 1
for scene in scenes:
parent_text = generate_story_parent_summary(scene) if mode == "story" else generate_llm_parent_summary(scene, parent_count)
if not parent_text: continue
parent_id = f"{mode}_parent_{file_uuid}_{scene['start_time']:.0f}_{scene['end_time']:.0f}"
parent_embedding = embed_text(parent_text) if do_embed else None
if do_embed and parent_embedding:
cur.execute(
f"""
INSERT INTO {SCHEMA}.chunk (chunk_id, old_chunk_id, file_uuid, chunk_type, chunk_index,
start_time, end_time, content, text_content, parent_chunk_id, embedding)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s, %s, %s::vector)
ON CONFLICT (file_uuid, old_chunk_id) DO UPDATE
SET content = EXCLUDED.content, text_content = EXCLUDED.text_content,
embedding = EXCLUDED.embedding
""",
(parent_id, parent_id, file_uuid, parent_type, next_index,
scene["start_time"], scene["end_time"],
json.dumps({"summary": parent_text, "mode": mode, "type": "parent",
"source_versions": CURRENT_VERSIONS}),
parent_text, None, parent_embedding),
)
else:
cur.execute(
f"""
INSERT INTO {SCHEMA}.chunk (chunk_id, old_chunk_id, file_uuid, chunk_type, chunk_index,
start_time, end_time, content, text_content, parent_chunk_id)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s, %s)
ON CONFLICT (file_uuid, old_chunk_id) DO UPDATE
SET content = EXCLUDED.content, text_content = EXCLUDED.text_content
""",
(parent_id, parent_id, file_uuid, parent_type, next_index,
scene["start_time"], scene["end_time"],
json.dumps({"summary": parent_text, "mode": mode, "type": "parent",
"source_versions": CURRENT_VERSIONS}),
parent_text, None),
)
next_index += 1
parent_count += 1
for child in scene["children"]:
child_id = child["chunk_id"]
child_text = generate_story_child_summary(child, parent_text) if mode == "story" else generate_llm_child_summary(child, parent_text)
child_embedding = embed_text(child_text) if do_embed else None
if do_embed and child_embedding:
cur.execute(
f"""
INSERT INTO {SCHEMA}.chunk (chunk_id, old_chunk_id, file_uuid, chunk_type, chunk_index,
start_time, end_time, content, text_content, parent_chunk_id, embedding)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s, %s, %s::vector)
ON CONFLICT (file_uuid, old_chunk_id) DO UPDATE
SET content = EXCLUDED.content, text_content = EXCLUDED.text_content,
parent_chunk_id = EXCLUDED.parent_chunk_id,
embedding = EXCLUDED.embedding
""",
(child_id, child_id, file_uuid, child_type, next_index,
child["start"], child["end"],
json.dumps({"speaker": child["speaker_name"], "text": child["text"], "mode": mode,
"speaker_confidence": child.get("speaker_confidence", 0),
"source_versions": CURRENT_VERSIONS}),
child_text, parent_id, child_embedding),
)
else:
cur.execute(
f"""
INSERT INTO {SCHEMA}.chunk (chunk_id, old_chunk_id, file_uuid, chunk_type, chunk_index,
start_time, end_time, content, text_content, parent_chunk_id)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s, %s)
ON CONFLICT (file_uuid, old_chunk_id) DO UPDATE
SET content = EXCLUDED.content, text_content = EXCLUDED.text_content,
parent_chunk_id = EXCLUDED.parent_chunk_id
""",
(child_id, child_id, file_uuid, child_type, next_index,
child["start"], child["end"],
json.dumps({"speaker": child["speaker_name"], "text": child["text"], "mode": mode,
"speaker_confidence": child.get("speaker_confidence", 0),
"source_versions": CURRENT_VERSIONS}),
child_text, parent_id),
)
next_index += 1
child_count += 1
conn.commit()
cur.close()
return parent_count, child_count
def main():
parser = argparse.ArgumentParser(description="Story Processor V2.0")
parser.add_argument("--file-uuid", required=True)
parser.add_argument("--mode", choices=["story", "llm"], default="story")
parser.add_argument("--max-scenes", type=int, default=99999)
parser.add_argument("--embed", action="store_true", help="Generate embeddings (Ollama)")
parser.add_argument("--no-db", action="store_true", help="Skip DB storage")
args = parser.parse_args()
file_uuid = args.file_uuid
print(f"[STORY] Mode: {args.mode}, Embed: {args.embed}")
data = load_data(file_uuid)
if not data["asr"]:
print("[STORY] ❌ No ASR data"); return
scenes = build_child_chunks(data, file_uuid)[:args.max_scenes]
total_children = sum(s["child_count"] for s in scenes)
print(f"[STORY] {len(scenes)} scenes, {total_children} child chunks")
if not args.no_db:
conn = psycopg2.connect(DB_URL)
try:
pc, cc = store_chunks(file_uuid, scenes, args.mode, args.embed, conn)
print(f"[STORY] DB: {pc} parent, {cc} child chunks ({args.mode})")
finally:
conn.close()
# Save JSON output
out_path = os.path.join(OUTPUT_DIR, f"{file_uuid}.story_{args.mode}.json")
out_data = {"file_uuid": file_uuid, "mode": args.mode, "scenes": scenes}
with open(out_path, "w") as f:
json.dump(out_data, f, indent=2, ensure_ascii=False, default=str)
print(f"[STORY] ✅ {out_path}")
if __name__ == "__main__":
main()