Schema changes: dev.chunks->dev.chunk, remove old_chunk_id/chunk_index Correction: asr-1.json format, generate/apply scripts API: 37/37 endpoints fixed and tested Docs: HANDOVER_V2.0.md for M4
231 lines
9.9 KiB
Python
231 lines
9.9 KiB
Python
#!/opt/homebrew/bin/python3.11
|
||
"""
|
||
Story Pipeline Full — Speaker + Story + Summary
|
||
Step 1: Update sentence chunks with speaker name
|
||
Step 2: Rebuild story chunks + re-embed
|
||
Step 3: LLM summary × 228 + embed
|
||
"""
|
||
|
||
import json, urllib.request, subprocess, sys, time, os
|
||
|
||
UUID = "aeed71342a899fe4b4c57b7d41bcb692"
|
||
DIR = "/Users/accusys/momentry/output_dev"
|
||
PSQL = ["/Users/accusys/pgsql/18.3/bin/psql", "-U", "accusys", "-d", "momentry", "-t", "-A"]
|
||
LLM_URL = "http://localhost:8082/v1/chat/completions"
|
||
EMBED_URL = "http://localhost:11436/v1/embeddings"
|
||
QDRANT_URL = "http://localhost:6333/collections/momentry_dev_stories/points"
|
||
|
||
def psql(sql):
|
||
r = subprocess.run(PSQL + ["-c", sql], capture_output=True, text=True, timeout=30)
|
||
return r.stdout.strip()
|
||
|
||
def psql_file(path):
|
||
r = subprocess.run(PSQL + ["-f", path], capture_output=True, text=True, timeout=60)
|
||
if r.stderr and "ERROR" in r.stderr:
|
||
print(f"SQL Error: {r.stderr[:200]}")
|
||
return r.returncode
|
||
|
||
def embed_text(text):
|
||
body = json.dumps({"input": text[:1024]}).encode()
|
||
req = urllib.request.Request(EMBED_URL, data=body, headers={"Content-Type": "application/json"})
|
||
return json.loads(urllib.request.urlopen(req, timeout=30).read())["data"][0]["embedding"]
|
||
|
||
def llm_summary(dialogue):
|
||
body = json.dumps({
|
||
"model": "google_gemma-4-26B-A4B-it-Q5_K_M.gguf",
|
||
"messages": [{"role": "user", "content": f"Summarize concisely:\n{dialogue}\n\n50-word summary:"}],
|
||
"temperature": 0.1, "max_tokens": 100,
|
||
}).encode()
|
||
req = urllib.request.Request(LLM_URL, data=body, headers={"Content-Type": "application/json"})
|
||
return json.loads(urllib.request.urlopen(req, timeout=120).read())["choices"][0]["message"]["content"].strip()
|
||
|
||
fps = 25.0
|
||
FILE_ID = 242
|
||
|
||
# ═══════════════════════════════════════════════════
|
||
# Step 0: Load ASR + ASRX + speaker map
|
||
# ═══════════════════════════════════════════════════
|
||
print("=" * 60)
|
||
print("Step 0: Loading data...")
|
||
asr = json.load(open(f"{DIR}/{UUID}.asr.json"))
|
||
segs = asr["segments"]
|
||
asrx = json.load(open(f"{DIR}/{UUID}.asrx.json"))
|
||
asrx_segs = asrx["segments"]
|
||
|
||
# Speaker map from identity_bindings
|
||
r = psql("SELECT ib.identity_value, i.name FROM dev.identity_bindings ib JOIN dev.identities i ON i.id=ib.identity_id WHERE ib.identity_type='speaker'")
|
||
speaker_map = {}
|
||
for line in r.strip().split('\n'):
|
||
if line.strip() and '|' in line:
|
||
p = line.split('|')
|
||
speaker_map[p[0].strip()] = p[1].strip()
|
||
speaker_map["SPEAKER_0"] = "Speaker_0" # Fallback for unbounded
|
||
|
||
# ═══════════════════════════════════════════════════
|
||
# Step 1: Update sentence chunks with speaker
|
||
# ═══════════════════════════════════════════════════
|
||
print("\n" + "=" * 60)
|
||
print("Step 1: Updating sentence chunks with speaker...")
|
||
|
||
sql = ["BEGIN;"]
|
||
chunk_meta = {} # idx → {speaker_id, speaker_name}
|
||
|
||
for idx, seg in enumerate(segs):
|
||
st, et = seg["start"], seg["end"]
|
||
text = seg["text"].strip()
|
||
if not text:
|
||
continue
|
||
|
||
# Find overlapping ASRX segment → speaker_id
|
||
spk_id = "SPEAKER_0"
|
||
for ax in asrx_segs:
|
||
if ax.get("start_time", 0) <= st and ax.get("end_time", 0) >= et:
|
||
spk_id = ax.get("speaker_id", "SPEAKER_0")
|
||
break
|
||
|
||
spk_name = speaker_map.get(spk_id, spk_id)
|
||
new_text = f"[{spk_name}] {text}"
|
||
meta = json.dumps({"speaker_id": spk_id, "speaker_name": spk_name})
|
||
esc = new_text.replace("'", "''")
|
||
|
||
sql.append(f"UPDATE dev.chunks SET text_content='{esc}', metadata='{meta}'::jsonb WHERE file_uuid='{UUID}' AND chunk_id='{UUID}_{idx}';")
|
||
chunk_meta[idx] = {"speaker_id": spk_id, "speaker_name": spk_name}
|
||
|
||
sql.append("COMMIT;")
|
||
with open("/tmp/s1_speaker.sql", "w") as f:
|
||
f.write("\n".join(sql))
|
||
|
||
psql_file("/tmp/s1_speaker.sql")
|
||
print(f" Updated {len(chunk_meta)} sentence chunks with speaker")
|
||
|
||
# ═══════════════════════════════════════════════════
|
||
# Step 2: Rebuild story chunks + re-embed
|
||
# ═══════════════════════════════════════════════════
|
||
print("\n" + "=" * 60)
|
||
print("Step 2: Rebuilding story chunks...")
|
||
|
||
# Delete old story chunks
|
||
psql(f"DELETE FROM dev.chunks WHERE file_uuid='{UUID}' AND chunk_type='story';")
|
||
|
||
# Recreate
|
||
CHUNK_SIZE = 15
|
||
sql2 = ["BEGIN;"]
|
||
story_meta = []
|
||
|
||
for i in range(0, len(segs), CHUNK_SIZE):
|
||
group = segs[i:i+CHUNK_SIZE]
|
||
st, et = group[0]["start"], group[-1]["end"]
|
||
idx = i // CHUNK_SIZE
|
||
chunk_id = f"{UUID}_story_{idx}"
|
||
|
||
# Build speaker text from individual sentences
|
||
texts = []
|
||
speakers_used = {}
|
||
for j, seg in enumerate(group):
|
||
seg_idx = i + j
|
||
if seg_idx in chunk_meta:
|
||
cm = chunk_meta[seg_idx]
|
||
text = seg["text"].strip()
|
||
if text:
|
||
texts.append(f"[{cm['speaker_name']}] {text}")
|
||
speakers_used[cm['speaker_name']] = speakers_used.get(cm['speaker_name'], 0) + 1
|
||
|
||
dialogue = " ".join(texts)
|
||
child_ids = ", ".join([f"'{UUID}_{j}'" for j in range(i, min(i+CHUNK_SIZE, len(segs)))])
|
||
words = sum(len(t.split()) for t in texts)
|
||
|
||
meta = json.dumps({"method": "fixed_15", "seg_count": len(group), "words": words, "speakers": speakers_used})
|
||
esc = dialogue.replace("'", "''")
|
||
|
||
sql2.append(f"""INSERT INTO dev.chunks (file_id,file_uuid,chunk_id,old_chunk_id,chunk_index,chunk_type,start_time,end_time,fps,start_frame,end_frame,text_content,content,metadata,frame_count,child_chunk_ids)
|
||
VALUES ({FILE_ID},'{UUID}','{chunk_id}','{chunk_id}',{idx},'story',{st},{et},{fps},{int(st*fps)},{int(et*fps)},'{esc}','{{"type":"story_parent"}}'::jsonb,'{meta}'::jsonb,{int((et-st)*fps)},ARRAY[{child_ids}]);""")
|
||
|
||
story_meta.append({"idx": idx, "st": st, "et": et, "dialogue": dialogue, "words": words, "speakers": speakers_used})
|
||
|
||
sql2.append("COMMIT;")
|
||
with open("/tmp/s2_story.sql", "w") as f:
|
||
f.write("\n".join(sql2))
|
||
psql_file("/tmp/s2_story.sql")
|
||
print(f" Created {len(story_meta)} story chunks")
|
||
|
||
# Embed + upsert to Qdrant
|
||
print("\n Embedding story chunks...")
|
||
points_dialogue = []
|
||
for sm in story_meta:
|
||
if len(sm["dialogue"]) < 10:
|
||
continue
|
||
vec = embed_text(sm["dialogue"])
|
||
points_dialogue.append({"id": sm["idx"] + 1, "vector": vec, "payload": {
|
||
"chunk_id": f"{UUID}_story_{sm['idx']}", "file_uuid": UUID,
|
||
"start_time": sm["st"], "end_time": sm["et"], "type": "story_dialogue"
|
||
}})
|
||
|
||
for i in range(0, len(points_dialogue), 100):
|
||
batch = points_dialogue[i:i+100]
|
||
data = json.dumps({"points": batch, "wait": True}).encode()
|
||
req = urllib.request.Request(f"{QDRANT_URL}?wait=true", data=data, headers={"Content-Type": "application/json"}, method="PUT")
|
||
urllib.request.urlopen(req, timeout=30)
|
||
print(f" Qdrant: {len(points_dialogue)} dialogue vectors")
|
||
|
||
# ═══════════════════════════════════════════════════
|
||
# Step 3: LLM summaries + embed
|
||
# ═══════════════════════════════════════════════════
|
||
print("\n" + "=" * 60)
|
||
print("Step 3: LLM summaries...")
|
||
|
||
points_summary = []
|
||
summary_sql = ["BEGIN;"]
|
||
|
||
for i, sm in enumerate(story_meta):
|
||
if len(sm["dialogue"]) < 10:
|
||
continue
|
||
|
||
try:
|
||
summary = llm_summary(sm["dialogue"])
|
||
time.sleep(0.3)
|
||
vec = embed_text(summary)
|
||
time.sleep(0.1)
|
||
except Exception as e:
|
||
print(f" Error on story {sm['idx']}: {e}")
|
||
summary = "[error]"
|
||
vec = [0.0] * 768
|
||
|
||
s_esc = summary.replace("'", "''")
|
||
summary_sql.append(f"UPDATE dev.chunks SET summary_text='{s_esc}', updated_at=CURRENT_TIMESTAMP WHERE file_uuid='{UUID}' AND chunk_id='{UUID}_story_{sm['idx']}';")
|
||
|
||
points_summary.append({"id": 100000 + sm["idx"] + 1, "vector": vec, "payload": {
|
||
"chunk_id": f"{UUID}_story_{sm['idx']}", "file_uuid": UUID,
|
||
"start_time": sm["st"], "end_time": sm["et"],
|
||
"summary": summary, "type": "story_summary"
|
||
}})
|
||
|
||
if (i + 1) % 50 == 0:
|
||
print(f" {i+1}/{len(story_meta)}")
|
||
|
||
# Update DB with summaries
|
||
summary_sql.append("COMMIT;")
|
||
with open("/tmp/s3_summary.sql", "w") as f:
|
||
f.write("\n".join(summary_sql))
|
||
psql_file("/tmp/s3_summary.sql")
|
||
|
||
# Upsert summary vectors to Qdrant
|
||
for i in range(0, len(points_summary), 100):
|
||
batch = points_summary[i:i+100]
|
||
data = json.dumps({"points": batch, "wait": True}).encode()
|
||
req = urllib.request.Request(f"{QDRANT_URL}?wait=true", data=data, headers={"Content-Type": "application/json"}, method="PUT")
|
||
urllib.request.urlopen(req, timeout=30)
|
||
|
||
print(f" Qdrant: {len(points_summary)} summary vectors")
|
||
|
||
# ═══════════════════════════════════════════════════
|
||
# Step 4: Verify
|
||
# ═══════════════════════════════════════════════════
|
||
print("\n" + "=" * 60)
|
||
print("Done.")
|
||
r1 = psql(f"SELECT count(*) FROM dev.chunks WHERE file_uuid='{UUID}' AND chunk_type='sentence' AND text_content LIKE '[%'")
|
||
r2 = psql(f"SELECT count(*) FROM dev.chunks WHERE file_uuid='{UUID}' AND chunk_type='story'")
|
||
r3 = psql(f"SELECT count(*) FROM dev.chunks WHERE file_uuid='{UUID}' AND chunk_type='story' AND summary_text IS NOT NULL")
|
||
print(f"Sentence chunks with speaker: {r1}")
|
||
print(f"Story chunks: {r2}")
|
||
print(f"Story chunks with summary: {r3}")
|