Files
momentry_core/scripts/update_speaker_assignments.py
Accusys 39ba5ddf76 feat: Phase 1 handover - schema migration, correction mechanism, API fixes
Schema changes: dev.chunks->dev.chunk, remove old_chunk_id/chunk_index
Correction: asr-1.json format, generate/apply scripts
API: 37/37 endpoints fixed and tested
Docs: HANDOVER_V2.0.md for M4
2026-05-11 07:03:22 +08:00

193 lines
5.9 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Update sentence chunk metadata with new ASRX speaker_id and speaker_name.
Also update Qdrant momentry_dev_v1 and momentry_dev_voice collections.
"""
import json, sys, time
import psycopg2
import numpy as np
from urllib.request import Request, urlopen
UUID = "aeed71342a899fe4b4c57b7d41bcb692"
ASRX_PATH = f"/Users/accusys/momentry/output_dev/{UUID}.asrx.json"
SPEAKER_MAP_PATH = f"/Users/accusys/momentry/output_dev/{UUID}.speaker_map_v2.json"
DB_URL = "postgresql://accusys@localhost:5432/momentry?host=/tmp"
QDRANT_URL = "http://localhost:6333"
print("=== Loading data ===")
asrx = json.load(open(ASRX_PATH))
segs = asrx["segments"]
embeddings = asrx.get("embeddings", [])
speaker_map = json.load(open(SPEAKER_MAP_PATH))
assignments = speaker_map["assignments"]
speaker_identity = speaker_map["speaker_identity"]
print(f"Loaded {len(segs)} segments, {len(embeddings)} embeddings")
print("\n=== Step 1: Update DB chunks with new speaker info ===")
conn = psycopg2.connect(DB_URL)
cur = conn.cursor()
# Get existing chunks
cur.execute("""
SELECT id, chunk_index, metadata
FROM dev.chunks
WHERE file_uuid = %s AND chunk_type = 'sentence'
ORDER BY chunk_index
""", (UUID,))
db_chunks = cur.fetchall()
print(f"Found {len(db_chunks)} DB sentence chunks")
updated = 0
for row in db_chunks:
db_id, chunk_idx, old_meta = row
if chunk_idx >= len(assignments):
print(f"WARNING: chunk_idx {chunk_idx} out of range for assignments ({len(assignments)})")
continue
a = assignments[chunk_idx]
new_sid = a["speaker_id"]
new_name = a["speaker_name"]
# Preserve old metadata but update speaker fields
if old_meta is None:
old_meta = {}
elif isinstance(old_meta, str):
old_meta = json.loads(old_meta)
old_meta["new_speaker_id"] = new_sid
old_meta["new_speaker_name"] = new_name
old_meta["old_speaker_id"] = old_meta.get("speaker_id", "")
old_meta["old_speaker_name"] = old_meta.get("speaker_name", "")
# Update
meta_json = json.dumps(old_meta)
cur.execute("""
UPDATE dev.chunks
SET metadata = %s::jsonb, updated_at = NOW()
WHERE id = %s
""", (meta_json, db_id))
updated += 1
conn.commit()
print(f"Updated {updated} DB chunks")
# Also update story chunks with new aggregated speaker info
print("\n=== Step 2: Update story chunk aggregates ===")
cur.execute("""
SELECT id, chunk_index, metadata, child_chunk_ids
FROM dev.chunks
WHERE file_uuid = %s AND chunk_type = 'story'
ORDER BY chunk_index
""", (UUID,))
stories = cur.fetchall()
print(f"Found {len(stories)} story chunks")
# Get all sentence chunks with their new speaker info
cur.execute("""
SELECT chunk_index, metadata->>'new_speaker_name' as speaker_name
FROM dev.chunks
WHERE file_uuid = %s AND chunk_type = 'sentence'
ORDER BY chunk_index
""", (UUID,))
sentences = cur.fetchall()
sent_names = {s[0]: s[1] for s in sentences}
for row in stories:
db_id, idx, meta, child_ids = row
if meta is None:
meta = {}
elif isinstance(meta, str):
meta = json.loads(meta)
if child_ids:
# Aggregate speaker info from child chunks
speaker_counts = {}
for cid in child_ids:
# Parse chunk_index from child chunk_id
parts = cid.split("_")
child_idx = int(parts[-1])
if child_idx in sent_names:
name = sent_names[child_idx]
speaker_counts[name] = speaker_counts.get(name, 0) + 1
meta["speaker_breakdown"] = speaker_counts
primary = max(speaker_counts, key=speaker_counts.get) if speaker_counts else "Unknown"
meta["primary_speaker"] = primary
meta["speaker_count"] = len(speaker_counts)
meta_json = json.dumps(meta)
cur.execute("""
UPDATE dev.chunks
SET metadata = %s::jsonb, updated_at = NOW()
WHERE id = %s
""", (meta_json, db_id))
conn.commit()
print(f"Updated {len(stories)} story chunks")
print("\n=== Step 3: Update Qdrant momentry_dev_voice ===")
# Delete old voice collection and recreate
# First check if it exists
import urllib.request
req = Request(f"{QDRANT_URL}/collections/momentry_dev_voice", method="DELETE")
try:
urlopen(req)
print("Deleted old momentry_dev_voice collection")
except:
print("Could not delete or doesn't exist")
time.sleep(0.5)
# Create collection
req = Request(f"{QDRANT_URL}/collections/momentry_dev_voice",
data=json.dumps({"vectors": {"size": 192, "distance": "Cosine"}}).encode(),
headers={"Content-Type": "application/json"}, method="PUT")
try:
urlopen(req)
print("Created momentry_dev_voice collection (192D)")
except Exception as e:
print(f"Create collection error: {e}")
# Upload in batches
batch_size = 100
total_uploaded = 0
for start in range(0, len(assignments), batch_size):
batch = assignments[start:start+batch_size]
points = []
for i, a in enumerate(batch):
idx = start + i
emb = embeddings[idx]
points.append({
"id": idx + 1,
"vector": emb,
"payload": {
"file_uuid": UUID,
"speaker_id": a["speaker_id"],
"speaker_name": a["speaker_name"],
"start_time": a["start_time"],
"end_time": a["end_time"],
"segment_index": idx,
}
})
req = Request(f"{QDRANT_URL}/collections/momentry_dev_voice/points?wait=true",
data=json.dumps({"points": points}).encode(),
headers={"Content-Type": "application/json"}, method="PUT")
try:
urlopen(req)
total_uploaded += len(points)
except Exception as e:
print(f" Batch {start} error: {e}")
if (start // batch_size) % 5 == 0:
print(f" Uploaded {total_uploaded}/{len(assignments)} voice embeddings")
print(f"\nUploaded {total_uploaded} voice embeddings to momentry_dev_voice")
cur.close()
conn.close()
print("\n=== Done ===")