Files
momentry_core/v1.1/scripts/rebuild_parents_v1.11.py
Accusys 2cfcfdd1af feat: Phase 2.6 edges migration to Qdrant (TKG-only architecture)
Phase 2.6.1: co_occurrence_edges migration
- build_co_occurrence_edges_from_qdrant()
- Qdrant embeddings → frame grouping → YOLO objects
- Result: 6679 edges (vs 6701 PostgreSQL)

Phase 2.6.2: face_face_edges migration
- build_face_face_edges_from_qdrant()
- Qdrant embeddings → frame grouping → face pairs
- mutual_gaze detection preserved
- Result: 6 edges (exact match)

Phase 2.6.3: speaker_face_edges migration
- build_speaker_face_edges_from_qdrant()
- Qdrant embeddings → trace_id frame ranges
- SPEAKS_AS edge creation

Architecture:
- All edges use Qdrant payload (no face_detections queries)
- PostgreSQL fallback for empty Qdrant
- Estimated 3.6x performance improvement

Testing:
- Playground (3003): ✓ All Phase 2.6 logs verified
- Edge counts: ✓ Close match with PostgreSQL
- Fallback: ✓ Working

Docs:
- docs_v1.0/DESIGN/TKG_PHASE2_6_EDGES_MIGRATION.md
- docs_v1.0/M4_workspace/2026-06-21_phase2_6_test.md
2026-06-21 04:47:49 +08:00

205 lines
7.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/opt/homebrew/bin/python3.11
"""
Rebuild parent/story chunks (280 × 15 children) + LLM summaries + Qdrant momentry_dev_stories.
"""
import json, sys, time, psycopg2
from collections import Counter
from urllib.request import Request, urlopen
UUID = "aeed71342a899fe4b4c57b7d41bcb692"
DB_URL = "postgresql://accusys@localhost:5432/momentry?host=/tmp"
QDRANT_URL = "http://localhost:6333"
LLM_URL = "http://localhost:8082/v1/chat/completions"
EMBED_URL = "http://localhost:11436/v1/embeddings"
FPS = 25.0
FILE_ID = 242
CHILDREN_PER_PARENT = 15
print("=== Step 1: Load sentence chunks sorted by time ===")
conn = psycopg2.connect(DB_URL)
cur = conn.cursor()
cur.execute("""
SELECT chunk_index, chunk_id, start_time, end_time, text_content,
metadata->>'speaker_name', file_uuid
FROM dev.chunks
WHERE file_uuid=%s AND chunk_type='sentence'
ORDER BY start_time, chunk_index
""", (UUID,))
children = cur.fetchall()
print(f"Loaded {len(children)} sentence chunks")
# Group into parents of 15
parents = []
for i in range(0, len(children), CHILDREN_PER_PARENT):
group = children[i:i+CHILDREN_PER_PARENT]
if not group: continue
p_start = group[0][2]
p_end = group[-1][3]
child_ids = [c[1] for c in group]
# Speaker breakdown
spk_counter = Counter(c[4] for c in group)
# Actually count speaker names
spk_names = Counter(c[5] for c in group)
primary = spk_names.most_common(1)[0][0] if spk_names else "Unknown"
parents.append({
"start": p_start, "end": p_end,
"child_ids": child_ids,
"child_indices": [c[0] for c in group],
"speakers": dict(spk_names.most_common()),
"primary": primary,
"texts": [c[4] for c in group],
})
print(f"Parent chunks: {len(parents)}")
print(f"Speakers per parent: {[len(p['speakers']) for p in parents[:5]]}")
# Delete old story chunks
cur.execute("DELETE FROM dev.chunks WHERE file_uuid=%s AND chunk_type='story'", (UUID,))
print(f"Deleted old story chunks: {cur.rowcount}")
# Insert parent chunks
print("\n=== Step 2: Insert parent chunks ===")
parent_records = []
for pi, p in enumerate(parents):
pid = f"{UUID}_story_{pi}"
dialogue = " ".join([t or "" for t in p["texts"]])
sf = int(p["start"] * FPS)
ef = int(p["end"] * FPS)
fc = ef - sf
metadata = json.dumps({
"method": "fixed_15",
"seg_count": len(p["child_ids"]),
"speakers": p["speakers"],
"speaker_count": len(p["speakers"]),
"primary_speaker": p["primary"],
"words": len(dialogue.split()),
})
parent_records.append((
UUID, pid, pi, "story", p["start"], p["end"],
json.dumps({"type": "story_parent"}),
dialogue, FPS, sf, ef, fc, FILE_ID, pid,
metadata, p["child_ids"], [], None,
))
cur.executemany("""
INSERT INTO dev.chunks
(file_uuid, chunk_id, chunk_index, chunk_type,
start_time, end_time, content, text_content,
fps, start_frame, end_frame, frame_count,
file_id, old_chunk_id, metadata, child_chunk_ids, pre_chunk_ids, summary_text)
VALUES (%s,%s,%s,%s,%s,%s,%s::jsonb,%s,%s,%s,%s,%s,%s,%s,%s::jsonb,%s,%s,%s)
""", parent_records)
conn.commit()
print(f"Inserted {len(parent_records)} parent chunks")
# Update sentence chunks with parent_chunk_id
for pi, p in enumerate(parents):
pid = f"{UUID}_story_{pi}"
for cid in p["child_ids"]:
cur.execute("UPDATE dev.chunks SET parent_chunk_id=%s WHERE chunk_id=%s", (pid, cid))
conn.commit()
print("Updated child parent references")
print("\n=== Step 3: Generate LLM summaries ===")
def call_llm(prompt):
body = json.dumps({"model": "google_gemma-4-26B-A4B-it-Q5_K_M.gguf",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1, "max_tokens": 100}).encode()
req = Request(LLM_URL, data=body, headers={"Content-Type": "application/json"})
resp = urlopen(req, timeout=120)
return json.loads(resp.read())["choices"][0]["message"]["content"].strip()
def call_embed(text):
body = json.dumps({"input": text}).encode()
req = Request(EMBED_URL, data=body, headers={"Content-Type": "application/json"})
resp = urlopen(req, timeout=30)
return json.loads(resp.read())["data"][0]["embedding"]
t0 = time.time()
summaries = []
for pi, p in enumerate(parents):
dialogue = " ".join([t or "" for t in p["texts"]])
if len(dialogue) < 10:
summary = "[no dialogue]"
embedding = [0.0] * 768
else:
try:
prompt = f"Act as a film scene analyst. Summarize this dialogue in 50 words:\n{dialogue[:3000]}"
summary = call_llm(prompt)
time.sleep(0.2)
embedding = call_embed(summary)
except Exception as e:
print(f" P{pi} ERROR: {e}")
summary = "[error]"
embedding = [0.0] * 768
cur.execute("UPDATE dev.chunks SET summary_text=%s, updated_at=NOW() WHERE chunk_id=%s",
(summary, f"{UUID}_story_{pi}"))
summaries.append({"index": pi, "chunk_id": f"{UUID}_story_{pi}",
"summary": summary, "start": p["start"], "end": p["end"],
"embedding": embedding})
if (pi + 1) % 20 == 0:
print(f" [{pi+1}/{len(parents)}] [{time.time()-t0:.0f}s]")
conn.commit()
print(f"Summaries: {len(summaries)}")
print("\n=== Step 4: Update Qdrant momentry_dev_stories ===")
# Delete old
req = Request(f"{QDRANT_URL}/collections/momentry_dev_stories", method="DELETE")
try: urlopen(req); time.sleep(0.5)
except: pass
# Create
req = Request(f"{QDRANT_URL}/collections/momentry_dev_stories",
data=json.dumps({"vectors": {"size": 768, "distance": "Cosine"}}).encode(),
headers={"Content-Type": "application/json"}, method="PUT")
urlopen(req)
time.sleep(0.5)
# Upload dialogue + summary points (dialogue=0..279, summary=280..559)
points = []
for pi, p in enumerate(parents):
# Dialogue point (zero vector)
points.append({
"id": pi + 1,
"vector": [0.0] * 768,
"payload": {"chunk_id": f"{UUID}_story_{pi}", "file_uuid": UUID,
"start_time": p["start"], "end_time": p["end"],
"type": "story_dialogue", "text": " ".join(p["texts"])[:500]},
})
# Summary point
s = summaries[pi]
points.append({
"id": pi + 1 + len(parents),
"vector": s["embedding"],
"payload": {"chunk_id": s["chunk_id"], "file_uuid": UUID,
"start_time": s["start"], "end_time": s["end"],
"type": "story_summary", "summary": s["summary"]},
})
batch_size = 100
for start in range(0, len(points), batch_size):
batch = points[start:start+batch_size]
req = Request(f"{QDRANT_URL}/collections/momentry_dev_stories/points?wait=true",
data=json.dumps({"points": batch}).encode(),
headers={"Content-Type": "application/json"}, method="PUT")
try: urlopen(req)
except Exception as e: print(f" batch {start}: {e}")
print(f"Uploaded {len(points)} points")
# Verify
resp = json.loads(urlopen(f"{QDRANT_URL}/collections/momentry_dev_stories").read())
info = resp["result"]
print(f"Verifed: {info['points_count']} pts, {info['config']['params']['vectors'].get('size','?')}D")
conn.close()
print(f"\n=== Done [{time.time()-t0:.0f}s] ===")