Files
momentry_core/scripts/story_pipeline_full.py
Accusys 39ba5ddf76 feat: Phase 1 handover - schema migration, correction mechanism, API fixes
Schema changes: dev.chunks->dev.chunk, remove old_chunk_id/chunk_index
Correction: asr-1.json format, generate/apply scripts
API: 37/37 endpoints fixed and tested
Docs: HANDOVER_V2.0.md for M4
2026-05-11 07:03:22 +08:00

231 lines
9.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/opt/homebrew/bin/python3.11
"""
Story Pipeline Full — Speaker + Story + Summary
Step 1: Update sentence chunks with speaker name
Step 2: Rebuild story chunks + re-embed
Step 3: LLM summary × 228 + embed
"""
import json, urllib.request, subprocess, sys, time, os
UUID = "aeed71342a899fe4b4c57b7d41bcb692"
DIR = "/Users/accusys/momentry/output_dev"
PSQL = ["/Users/accusys/pgsql/18.3/bin/psql", "-U", "accusys", "-d", "momentry", "-t", "-A"]
LLM_URL = "http://localhost:8082/v1/chat/completions"
EMBED_URL = "http://localhost:11436/v1/embeddings"
QDRANT_URL = "http://localhost:6333/collections/momentry_dev_stories/points"
def psql(sql):
r = subprocess.run(PSQL + ["-c", sql], capture_output=True, text=True, timeout=30)
return r.stdout.strip()
def psql_file(path):
r = subprocess.run(PSQL + ["-f", path], capture_output=True, text=True, timeout=60)
if r.stderr and "ERROR" in r.stderr:
print(f"SQL Error: {r.stderr[:200]}")
return r.returncode
def embed_text(text):
body = json.dumps({"input": text[:1024]}).encode()
req = urllib.request.Request(EMBED_URL, data=body, headers={"Content-Type": "application/json"})
return json.loads(urllib.request.urlopen(req, timeout=30).read())["data"][0]["embedding"]
def llm_summary(dialogue):
body = json.dumps({
"model": "google_gemma-4-26B-A4B-it-Q5_K_M.gguf",
"messages": [{"role": "user", "content": f"Summarize concisely:\n{dialogue}\n\n50-word summary:"}],
"temperature": 0.1, "max_tokens": 100,
}).encode()
req = urllib.request.Request(LLM_URL, data=body, headers={"Content-Type": "application/json"})
return json.loads(urllib.request.urlopen(req, timeout=120).read())["choices"][0]["message"]["content"].strip()
fps = 25.0
FILE_ID = 242
# ═══════════════════════════════════════════════════
# Step 0: Load ASR + ASRX + speaker map
# ═══════════════════════════════════════════════════
print("=" * 60)
print("Step 0: Loading data...")
asr = json.load(open(f"{DIR}/{UUID}.asr.json"))
segs = asr["segments"]
asrx = json.load(open(f"{DIR}/{UUID}.asrx.json"))
asrx_segs = asrx["segments"]
# Speaker map from identity_bindings
r = psql("SELECT ib.identity_value, i.name FROM dev.identity_bindings ib JOIN dev.identities i ON i.id=ib.identity_id WHERE ib.identity_type='speaker'")
speaker_map = {}
for line in r.strip().split('\n'):
if line.strip() and '|' in line:
p = line.split('|')
speaker_map[p[0].strip()] = p[1].strip()
speaker_map["SPEAKER_0"] = "Speaker_0" # Fallback for unbounded
# ═══════════════════════════════════════════════════
# Step 1: Update sentence chunks with speaker
# ═══════════════════════════════════════════════════
print("\n" + "=" * 60)
print("Step 1: Updating sentence chunks with speaker...")
sql = ["BEGIN;"]
chunk_meta = {} # idx → {speaker_id, speaker_name}
for idx, seg in enumerate(segs):
st, et = seg["start"], seg["end"]
text = seg["text"].strip()
if not text:
continue
# Find overlapping ASRX segment → speaker_id
spk_id = "SPEAKER_0"
for ax in asrx_segs:
if ax.get("start_time", 0) <= st and ax.get("end_time", 0) >= et:
spk_id = ax.get("speaker_id", "SPEAKER_0")
break
spk_name = speaker_map.get(spk_id, spk_id)
new_text = f"[{spk_name}] {text}"
meta = json.dumps({"speaker_id": spk_id, "speaker_name": spk_name})
esc = new_text.replace("'", "''")
sql.append(f"UPDATE dev.chunks SET text_content='{esc}', metadata='{meta}'::jsonb WHERE file_uuid='{UUID}' AND chunk_id='{UUID}_{idx}';")
chunk_meta[idx] = {"speaker_id": spk_id, "speaker_name": spk_name}
sql.append("COMMIT;")
with open("/tmp/s1_speaker.sql", "w") as f:
f.write("\n".join(sql))
psql_file("/tmp/s1_speaker.sql")
print(f" Updated {len(chunk_meta)} sentence chunks with speaker")
# ═══════════════════════════════════════════════════
# Step 2: Rebuild story chunks + re-embed
# ═══════════════════════════════════════════════════
print("\n" + "=" * 60)
print("Step 2: Rebuilding story chunks...")
# Delete old story chunks
psql(f"DELETE FROM dev.chunks WHERE file_uuid='{UUID}' AND chunk_type='story';")
# Recreate
CHUNK_SIZE = 15
sql2 = ["BEGIN;"]
story_meta = []
for i in range(0, len(segs), CHUNK_SIZE):
group = segs[i:i+CHUNK_SIZE]
st, et = group[0]["start"], group[-1]["end"]
idx = i // CHUNK_SIZE
chunk_id = f"{UUID}_story_{idx}"
# Build speaker text from individual sentences
texts = []
speakers_used = {}
for j, seg in enumerate(group):
seg_idx = i + j
if seg_idx in chunk_meta:
cm = chunk_meta[seg_idx]
text = seg["text"].strip()
if text:
texts.append(f"[{cm['speaker_name']}] {text}")
speakers_used[cm['speaker_name']] = speakers_used.get(cm['speaker_name'], 0) + 1
dialogue = " ".join(texts)
child_ids = ", ".join([f"'{UUID}_{j}'" for j in range(i, min(i+CHUNK_SIZE, len(segs)))])
words = sum(len(t.split()) for t in texts)
meta = json.dumps({"method": "fixed_15", "seg_count": len(group), "words": words, "speakers": speakers_used})
esc = dialogue.replace("'", "''")
sql2.append(f"""INSERT INTO dev.chunks (file_id,file_uuid,chunk_id,old_chunk_id,chunk_index,chunk_type,start_time,end_time,fps,start_frame,end_frame,text_content,content,metadata,frame_count,child_chunk_ids)
VALUES ({FILE_ID},'{UUID}','{chunk_id}','{chunk_id}',{idx},'story',{st},{et},{fps},{int(st*fps)},{int(et*fps)},'{esc}','{{"type":"story_parent"}}'::jsonb,'{meta}'::jsonb,{int((et-st)*fps)},ARRAY[{child_ids}]);""")
story_meta.append({"idx": idx, "st": st, "et": et, "dialogue": dialogue, "words": words, "speakers": speakers_used})
sql2.append("COMMIT;")
with open("/tmp/s2_story.sql", "w") as f:
f.write("\n".join(sql2))
psql_file("/tmp/s2_story.sql")
print(f" Created {len(story_meta)} story chunks")
# Embed + upsert to Qdrant
print("\n Embedding story chunks...")
points_dialogue = []
for sm in story_meta:
if len(sm["dialogue"]) < 10:
continue
vec = embed_text(sm["dialogue"])
points_dialogue.append({"id": sm["idx"] + 1, "vector": vec, "payload": {
"chunk_id": f"{UUID}_story_{sm['idx']}", "file_uuid": UUID,
"start_time": sm["st"], "end_time": sm["et"], "type": "story_dialogue"
}})
for i in range(0, len(points_dialogue), 100):
batch = points_dialogue[i:i+100]
data = json.dumps({"points": batch, "wait": True}).encode()
req = urllib.request.Request(f"{QDRANT_URL}?wait=true", data=data, headers={"Content-Type": "application/json"}, method="PUT")
urllib.request.urlopen(req, timeout=30)
print(f" Qdrant: {len(points_dialogue)} dialogue vectors")
# ═══════════════════════════════════════════════════
# Step 3: LLM summaries + embed
# ═══════════════════════════════════════════════════
print("\n" + "=" * 60)
print("Step 3: LLM summaries...")
points_summary = []
summary_sql = ["BEGIN;"]
for i, sm in enumerate(story_meta):
if len(sm["dialogue"]) < 10:
continue
try:
summary = llm_summary(sm["dialogue"])
time.sleep(0.3)
vec = embed_text(summary)
time.sleep(0.1)
except Exception as e:
print(f" Error on story {sm['idx']}: {e}")
summary = "[error]"
vec = [0.0] * 768
s_esc = summary.replace("'", "''")
summary_sql.append(f"UPDATE dev.chunks SET summary_text='{s_esc}', updated_at=CURRENT_TIMESTAMP WHERE file_uuid='{UUID}' AND chunk_id='{UUID}_story_{sm['idx']}';")
points_summary.append({"id": 100000 + sm["idx"] + 1, "vector": vec, "payload": {
"chunk_id": f"{UUID}_story_{sm['idx']}", "file_uuid": UUID,
"start_time": sm["st"], "end_time": sm["et"],
"summary": summary, "type": "story_summary"
}})
if (i + 1) % 50 == 0:
print(f" {i+1}/{len(story_meta)}")
# Update DB with summaries
summary_sql.append("COMMIT;")
with open("/tmp/s3_summary.sql", "w") as f:
f.write("\n".join(summary_sql))
psql_file("/tmp/s3_summary.sql")
# Upsert summary vectors to Qdrant
for i in range(0, len(points_summary), 100):
batch = points_summary[i:i+100]
data = json.dumps({"points": batch, "wait": True}).encode()
req = urllib.request.Request(f"{QDRANT_URL}?wait=true", data=data, headers={"Content-Type": "application/json"}, method="PUT")
urllib.request.urlopen(req, timeout=30)
print(f" Qdrant: {len(points_summary)} summary vectors")
# ═══════════════════════════════════════════════════
# Step 4: Verify
# ═══════════════════════════════════════════════════
print("\n" + "=" * 60)
print("Done.")
r1 = psql(f"SELECT count(*) FROM dev.chunks WHERE file_uuid='{UUID}' AND chunk_type='sentence' AND text_content LIKE '[%'")
r2 = psql(f"SELECT count(*) FROM dev.chunks WHERE file_uuid='{UUID}' AND chunk_type='story'")
r3 = psql(f"SELECT count(*) FROM dev.chunks WHERE file_uuid='{UUID}' AND chunk_type='story' AND summary_text IS NOT NULL")
print(f"Sentence chunks with speaker: {r1}")
print(f"Story chunks: {r2}")
print(f"Story chunks with summary: {r3}")