Schema changes: dev.chunks->dev.chunk, remove old_chunk_id/chunk_index Correction: asr-1.json format, generate/apply scripts API: 37/37 endpoints fixed and tested Docs: HANDOVER_V2.0.md for M4
205 lines
7.2 KiB
Python
205 lines
7.2 KiB
Python
#!/opt/homebrew/bin/python3.11
|
||
"""
|
||
Rebuild parent/story chunks (280 × 15 children) + LLM summaries + Qdrant momentry_dev_stories.
|
||
"""
|
||
import json, sys, time, psycopg2
|
||
from collections import Counter
|
||
from urllib.request import Request, urlopen
|
||
|
||
UUID = "aeed71342a899fe4b4c57b7d41bcb692"
|
||
DB_URL = "postgresql://accusys@localhost:5432/momentry?host=/tmp"
|
||
QDRANT_URL = "http://localhost:6333"
|
||
LLM_URL = "http://localhost:8082/v1/chat/completions"
|
||
EMBED_URL = "http://localhost:11436/v1/embeddings"
|
||
FPS = 25.0
|
||
FILE_ID = 242
|
||
CHILDREN_PER_PARENT = 15
|
||
|
||
print("=== Step 1: Load sentence chunks sorted by time ===")
|
||
conn = psycopg2.connect(DB_URL)
|
||
cur = conn.cursor()
|
||
cur.execute("""
|
||
SELECT chunk_index, chunk_id, start_time, end_time, text_content,
|
||
metadata->>'speaker_name', file_uuid
|
||
FROM dev.chunks
|
||
WHERE file_uuid=%s AND chunk_type='sentence'
|
||
ORDER BY start_time, chunk_index
|
||
""", (UUID,))
|
||
children = cur.fetchall()
|
||
print(f"Loaded {len(children)} sentence chunks")
|
||
|
||
# Group into parents of 15
|
||
parents = []
|
||
for i in range(0, len(children), CHILDREN_PER_PARENT):
|
||
group = children[i:i+CHILDREN_PER_PARENT]
|
||
if not group: continue
|
||
p_start = group[0][2]
|
||
p_end = group[-1][3]
|
||
child_ids = [c[1] for c in group]
|
||
|
||
# Speaker breakdown
|
||
spk_counter = Counter(c[4] for c in group)
|
||
# Actually count speaker names
|
||
spk_names = Counter(c[5] for c in group)
|
||
primary = spk_names.most_common(1)[0][0] if spk_names else "Unknown"
|
||
|
||
parents.append({
|
||
"start": p_start, "end": p_end,
|
||
"child_ids": child_ids,
|
||
"child_indices": [c[0] for c in group],
|
||
"speakers": dict(spk_names.most_common()),
|
||
"primary": primary,
|
||
"texts": [c[4] for c in group],
|
||
})
|
||
|
||
print(f"Parent chunks: {len(parents)}")
|
||
print(f"Speakers per parent: {[len(p['speakers']) for p in parents[:5]]}")
|
||
|
||
# Delete old story chunks
|
||
cur.execute("DELETE FROM dev.chunks WHERE file_uuid=%s AND chunk_type='story'", (UUID,))
|
||
print(f"Deleted old story chunks: {cur.rowcount}")
|
||
|
||
# Insert parent chunks
|
||
print("\n=== Step 2: Insert parent chunks ===")
|
||
parent_records = []
|
||
for pi, p in enumerate(parents):
|
||
pid = f"{UUID}_story_{pi}"
|
||
dialogue = " ".join([t or "" for t in p["texts"]])
|
||
sf = int(p["start"] * FPS)
|
||
ef = int(p["end"] * FPS)
|
||
fc = ef - sf
|
||
|
||
metadata = json.dumps({
|
||
"method": "fixed_15",
|
||
"seg_count": len(p["child_ids"]),
|
||
"speakers": p["speakers"],
|
||
"speaker_count": len(p["speakers"]),
|
||
"primary_speaker": p["primary"],
|
||
"words": len(dialogue.split()),
|
||
})
|
||
|
||
parent_records.append((
|
||
UUID, pid, pi, "story", p["start"], p["end"],
|
||
json.dumps({"type": "story_parent"}),
|
||
dialogue, FPS, sf, ef, fc, FILE_ID, pid,
|
||
metadata, p["child_ids"], [], None,
|
||
))
|
||
|
||
cur.executemany("""
|
||
INSERT INTO dev.chunks
|
||
(file_uuid, chunk_id, chunk_index, chunk_type,
|
||
start_time, end_time, content, text_content,
|
||
fps, start_frame, end_frame, frame_count,
|
||
file_id, old_chunk_id, metadata, child_chunk_ids, pre_chunk_ids, summary_text)
|
||
VALUES (%s,%s,%s,%s,%s,%s,%s::jsonb,%s,%s,%s,%s,%s,%s,%s,%s::jsonb,%s,%s,%s)
|
||
""", parent_records)
|
||
conn.commit()
|
||
print(f"Inserted {len(parent_records)} parent chunks")
|
||
|
||
# Update sentence chunks with parent_chunk_id
|
||
for pi, p in enumerate(parents):
|
||
pid = f"{UUID}_story_{pi}"
|
||
for cid in p["child_ids"]:
|
||
cur.execute("UPDATE dev.chunks SET parent_chunk_id=%s WHERE chunk_id=%s", (pid, cid))
|
||
conn.commit()
|
||
print("Updated child parent references")
|
||
|
||
print("\n=== Step 3: Generate LLM summaries ===")
|
||
def call_llm(prompt):
|
||
body = json.dumps({"model": "google_gemma-4-26B-A4B-it-Q5_K_M.gguf",
|
||
"messages": [{"role": "user", "content": prompt}],
|
||
"temperature": 0.1, "max_tokens": 100}).encode()
|
||
req = Request(LLM_URL, data=body, headers={"Content-Type": "application/json"})
|
||
resp = urlopen(req, timeout=120)
|
||
return json.loads(resp.read())["choices"][0]["message"]["content"].strip()
|
||
|
||
def call_embed(text):
|
||
body = json.dumps({"input": text}).encode()
|
||
req = Request(EMBED_URL, data=body, headers={"Content-Type": "application/json"})
|
||
resp = urlopen(req, timeout=30)
|
||
return json.loads(resp.read())["data"][0]["embedding"]
|
||
|
||
t0 = time.time()
|
||
summaries = []
|
||
for pi, p in enumerate(parents):
|
||
dialogue = " ".join([t or "" for t in p["texts"]])
|
||
if len(dialogue) < 10:
|
||
summary = "[no dialogue]"
|
||
embedding = [0.0] * 768
|
||
else:
|
||
try:
|
||
prompt = f"Act as a film scene analyst. Summarize this dialogue in 50 words:\n{dialogue[:3000]}"
|
||
summary = call_llm(prompt)
|
||
time.sleep(0.2)
|
||
embedding = call_embed(summary)
|
||
except Exception as e:
|
||
print(f" P{pi} ERROR: {e}")
|
||
summary = "[error]"
|
||
embedding = [0.0] * 768
|
||
|
||
cur.execute("UPDATE dev.chunks SET summary_text=%s, updated_at=NOW() WHERE chunk_id=%s",
|
||
(summary, f"{UUID}_story_{pi}"))
|
||
|
||
summaries.append({"index": pi, "chunk_id": f"{UUID}_story_{pi}",
|
||
"summary": summary, "start": p["start"], "end": p["end"],
|
||
"embedding": embedding})
|
||
|
||
if (pi + 1) % 20 == 0:
|
||
print(f" [{pi+1}/{len(parents)}] [{time.time()-t0:.0f}s]")
|
||
|
||
conn.commit()
|
||
print(f"Summaries: {len(summaries)}")
|
||
|
||
print("\n=== Step 4: Update Qdrant momentry_dev_stories ===")
|
||
# Delete old
|
||
req = Request(f"{QDRANT_URL}/collections/momentry_dev_stories", method="DELETE")
|
||
try: urlopen(req); time.sleep(0.5)
|
||
except: pass
|
||
|
||
# Create
|
||
req = Request(f"{QDRANT_URL}/collections/momentry_dev_stories",
|
||
data=json.dumps({"vectors": {"size": 768, "distance": "Cosine"}}).encode(),
|
||
headers={"Content-Type": "application/json"}, method="PUT")
|
||
urlopen(req)
|
||
time.sleep(0.5)
|
||
|
||
# Upload dialogue + summary points (dialogue=0..279, summary=280..559)
|
||
points = []
|
||
for pi, p in enumerate(parents):
|
||
# Dialogue point (zero vector)
|
||
points.append({
|
||
"id": pi + 1,
|
||
"vector": [0.0] * 768,
|
||
"payload": {"chunk_id": f"{UUID}_story_{pi}", "file_uuid": UUID,
|
||
"start_time": p["start"], "end_time": p["end"],
|
||
"type": "story_dialogue", "text": " ".join(p["texts"])[:500]},
|
||
})
|
||
# Summary point
|
||
s = summaries[pi]
|
||
points.append({
|
||
"id": pi + 1 + len(parents),
|
||
"vector": s["embedding"],
|
||
"payload": {"chunk_id": s["chunk_id"], "file_uuid": UUID,
|
||
"start_time": s["start"], "end_time": s["end"],
|
||
"type": "story_summary", "summary": s["summary"]},
|
||
})
|
||
|
||
batch_size = 100
|
||
for start in range(0, len(points), batch_size):
|
||
batch = points[start:start+batch_size]
|
||
req = Request(f"{QDRANT_URL}/collections/momentry_dev_stories/points?wait=true",
|
||
data=json.dumps({"points": batch}).encode(),
|
||
headers={"Content-Type": "application/json"}, method="PUT")
|
||
try: urlopen(req)
|
||
except Exception as e: print(f" batch {start}: {e}")
|
||
|
||
print(f"Uploaded {len(points)} points")
|
||
|
||
# Verify
|
||
resp = json.loads(urlopen(f"{QDRANT_URL}/collections/momentry_dev_stories").read())
|
||
info = resp["result"]
|
||
print(f"Verifed: {info['points_count']} pts, {info['config']['params']['vectors'].get('size','?')}D")
|
||
|
||
conn.close()
|
||
print(f"\n=== Done [{time.time()-t0:.0f}s] ===")
|