Schema changes: dev.chunks->dev.chunk, remove old_chunk_id/chunk_index Correction: asr-1.json format, generate/apply scripts API: 37/37 endpoints fixed and tested Docs: HANDOVER_V2.0.md for M4
199 lines
6.4 KiB
Python
199 lines
6.4 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Generate sentence-level summaries using parent story context.
|
|
Each sentence gets an LLM summary informed by the parent chunk scene overview.
|
|
"""
|
|
|
|
import json, time, sys, os
|
|
from urllib.request import Request, urlopen
|
|
import psycopg2
|
|
|
|
UUID = "aeed71342a899fe4b4c57b7d41bcb692"
|
|
DB_URL = "postgresql://accusys@localhost:5432/momentry?host=/tmp"
|
|
QDRANT_URL = "http://localhost:6333"
|
|
LLM_URL = "http://localhost:8082/v1/chat/completions"
|
|
EMBED_URL = "http://localhost:11436/v1/embeddings"
|
|
|
|
CHECKPOINT = f"/tmp/sentence_summaries_{UUID}.json"
|
|
|
|
def call_llm(prompt):
|
|
body = json.dumps({"model": "google_gemma-4-26B-A4B-it-Q5_K_M.gguf",
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
"temperature": 0.1, "max_tokens": 80}).encode()
|
|
req = Request(LLM_URL, data=body, headers={"Content-Type": "application/json"})
|
|
try:
|
|
resp = urlopen(req, timeout=30)
|
|
data = json.loads(resp.read())
|
|
return data["choices"][0]["message"]["content"].strip()
|
|
except Exception as e:
|
|
return ""
|
|
|
|
def call_embed(text):
|
|
body = json.dumps({"input": text}).encode()
|
|
req = Request(EMBED_URL, data=body, headers={"Content-Type": "application/json"})
|
|
try:
|
|
resp = urlopen(req, timeout=30)
|
|
return json.loads(resp.read())["data"][0]["embedding"]
|
|
except Exception as e:
|
|
return None
|
|
|
|
print("=== Step 1: Build sentence→parent mapping ===")
|
|
conn = psycopg2.connect(DB_URL)
|
|
cur = conn.cursor()
|
|
|
|
# Get all story chunks with their child_chunk_ids
|
|
cur.execute("""
|
|
SELECT chunk_index, summary_text, child_chunk_ids
|
|
FROM dev.chunks
|
|
WHERE file_uuid = %s AND chunk_type = 'story'
|
|
ORDER BY chunk_index
|
|
""", (UUID,))
|
|
stories = cur.fetchall()
|
|
print(f"Loaded {len(stories)} story chunks")
|
|
|
|
# Get all sentence chunks
|
|
cur.execute("""
|
|
SELECT chunk_index, text_content, metadata->>'new_speaker_name' as speaker
|
|
FROM dev.chunks
|
|
WHERE file_uuid = %s AND chunk_type = 'sentence'
|
|
ORDER BY chunk_index
|
|
""", (UUID,))
|
|
all_sentences = {r[0]: {"text": r[1], "speaker": r[2]} for r in cur.fetchall()}
|
|
print(f"Loaded {len(all_sentences)} sentence chunks")
|
|
|
|
# Build: sentence_index → (parent_summary, sentence_text, speaker)
|
|
sentence_map = {}
|
|
for r in stories:
|
|
story_idx, summary_text, child_ids = r
|
|
if not child_ids:
|
|
continue
|
|
for cid in child_ids:
|
|
parts = cid.split("_")
|
|
child_idx = int(parts[-1])
|
|
if child_idx in all_sentences:
|
|
sentence_map[child_idx] = {
|
|
"parent_summary": summary_text or "",
|
|
"sentence_text": all_sentences[child_idx]["text"] or "",
|
|
"speaker": all_sentences[child_idx]["speaker"] or "Unknown",
|
|
}
|
|
|
|
# Load checkpoint if exists
|
|
completed = set()
|
|
if os.path.exists(CHECKPOINT):
|
|
with open(CHECKPOINT) as f:
|
|
old = json.load(f)
|
|
completed = set(old.get("completed", []))
|
|
print(f"Loaded checkpoint: {len(completed)} already completed")
|
|
|
|
conn.close()
|
|
|
|
print("\n=== Step 2: Generate summaries ===")
|
|
results = []
|
|
errors = 0
|
|
sorted_indices = sorted(sentence_map.keys())
|
|
|
|
for i, idx in enumerate(sorted_indices):
|
|
if idx in completed:
|
|
continue
|
|
|
|
info = sentence_map[idx]
|
|
parent_summary = info["parent_summary"]
|
|
sent_text = info["sentence_text"]
|
|
speaker = info["speaker"]
|
|
|
|
if not parent_summary or not sent_text:
|
|
summary = sent_text or ""
|
|
embedding = [0.0] * 768
|
|
else:
|
|
prompt = f"Context: {parent_summary}\nUtterance: {sent_text}\n\nIn one short sentence, explain what the speaker communicates with this line within the context above."
|
|
summary = call_llm(prompt)
|
|
if not summary:
|
|
summary = sent_text
|
|
embedding = [0.0] * 768
|
|
else:
|
|
embedding = call_embed(summary)
|
|
if embedding is None:
|
|
embedding = [0.0] * 768
|
|
time.sleep(0.15)
|
|
|
|
results.append({
|
|
"index": idx,
|
|
"chunk_id": f"{UUID}_{idx}",
|
|
"speaker_name": speaker,
|
|
"utterance": sent_text,
|
|
"summary": summary,
|
|
"embedding": embedding,
|
|
})
|
|
|
|
if (i + 1) % 50 == 0:
|
|
print(f" [{i+1}/{len(sorted_indices)}] idx={idx} summary_len={len(summary)} errs={errors}")
|
|
json.dump({"completed": list(completed | {r["index"] for r in results}), "results": results}, open(CHECKPOINT, "w"))
|
|
|
|
print(f"Generated {len(results)} summaries, {errors} errors")
|
|
|
|
# Recompute all results including checkpointed
|
|
all_results = results
|
|
if os.path.exists(CHECKPOINT):
|
|
cp = json.load(open(CHECKPOINT))
|
|
all_results = cp.get("results", [])
|
|
# Merge
|
|
existing = {r["index"] for r in all_results}
|
|
for r in results:
|
|
if r["index"] not in existing:
|
|
all_results.append(r)
|
|
all_results.sort(key=lambda x: x["index"])
|
|
|
|
print(f"\nTotal summaries: {len(all_results)}")
|
|
|
|
print("\n=== Step 3: Update Qdrant sentence_summary ===")
|
|
# Delete old collection
|
|
req = Request(f"{QDRANT_URL}/collections/sentence_summary", method="DELETE")
|
|
try:
|
|
urlopen(req)
|
|
time.sleep(0.5)
|
|
except:
|
|
pass
|
|
|
|
# Recreate
|
|
req = Request(f"{QDRANT_URL}/collections/sentence_summary",
|
|
data=json.dumps({"vectors": {"size": 768, "distance": "Cosine"}}).encode(),
|
|
headers={"Content-Type": "application/json"}, method="PUT")
|
|
urlopen(req)
|
|
time.sleep(0.5)
|
|
|
|
# Upload
|
|
batch_size = 100
|
|
points = []
|
|
for r in all_results:
|
|
points.append({
|
|
"id": r["index"] + 1,
|
|
"vector": r["embedding"],
|
|
"payload": {
|
|
"chunk_type": "sentence",
|
|
"uuid": UUID,
|
|
"chunk_id": r["chunk_id"],
|
|
"speaker_name": r["speaker_name"],
|
|
"utterance": r["utterance"],
|
|
"summary": r["summary"],
|
|
}
|
|
})
|
|
|
|
for start in range(0, len(points), batch_size):
|
|
batch = points[start:start+batch_size]
|
|
req = Request(f"{QDRANT_URL}/collections/sentence_summary/points?wait=true",
|
|
data=json.dumps({"points": batch}).encode(),
|
|
headers={"Content-Type": "application/json"}, method="PUT")
|
|
try:
|
|
urlopen(req)
|
|
except Exception as e:
|
|
print(f" Batch {start}: {e}")
|
|
if (start // batch_size) % 5 == 0:
|
|
print(f" Uploaded {start + len(batch)}/{len(points)}")
|
|
|
|
print(f"Done: {len(points)} points in sentence_summary")
|
|
|
|
# Verify
|
|
resp = json.loads(urlopen(f"{QDRANT_URL}/collections/sentence_summary").read())
|
|
info = resp["result"]
|
|
print(f"Verified: points={info['points_count']}, dim={info['config']['params']['vectors'].get('size','?')}")
|