Schema changes: dev.chunks->dev.chunk, remove old_chunk_id/chunk_index Correction: asr-1.json format, generate/apply scripts API: 37/37 endpoints fixed and tested Docs: HANDOVER_V2.0.md for M4
193 lines
5.9 KiB
Python
193 lines
5.9 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Update sentence chunk metadata with new ASRX speaker_id and speaker_name.
|
|
Also update Qdrant momentry_dev_v1 and momentry_dev_voice collections.
|
|
"""
|
|
|
|
import json, sys, time
|
|
import psycopg2
|
|
import numpy as np
|
|
from urllib.request import Request, urlopen
|
|
|
|
UUID = "aeed71342a899fe4b4c57b7d41bcb692"
|
|
ASRX_PATH = f"/Users/accusys/momentry/output_dev/{UUID}.asrx.json"
|
|
SPEAKER_MAP_PATH = f"/Users/accusys/momentry/output_dev/{UUID}.speaker_map_v2.json"
|
|
DB_URL = "postgresql://accusys@localhost:5432/momentry?host=/tmp"
|
|
QDRANT_URL = "http://localhost:6333"
|
|
|
|
print("=== Loading data ===")
|
|
asrx = json.load(open(ASRX_PATH))
|
|
segs = asrx["segments"]
|
|
embeddings = asrx.get("embeddings", [])
|
|
speaker_map = json.load(open(SPEAKER_MAP_PATH))
|
|
|
|
assignments = speaker_map["assignments"]
|
|
speaker_identity = speaker_map["speaker_identity"]
|
|
|
|
print(f"Loaded {len(segs)} segments, {len(embeddings)} embeddings")
|
|
|
|
print("\n=== Step 1: Update DB chunks with new speaker info ===")
|
|
conn = psycopg2.connect(DB_URL)
|
|
cur = conn.cursor()
|
|
|
|
# Get existing chunks
|
|
cur.execute("""
|
|
SELECT id, chunk_index, metadata
|
|
FROM dev.chunks
|
|
WHERE file_uuid = %s AND chunk_type = 'sentence'
|
|
ORDER BY chunk_index
|
|
""", (UUID,))
|
|
db_chunks = cur.fetchall()
|
|
print(f"Found {len(db_chunks)} DB sentence chunks")
|
|
|
|
updated = 0
|
|
for row in db_chunks:
|
|
db_id, chunk_idx, old_meta = row
|
|
if chunk_idx >= len(assignments):
|
|
print(f"WARNING: chunk_idx {chunk_idx} out of range for assignments ({len(assignments)})")
|
|
continue
|
|
|
|
a = assignments[chunk_idx]
|
|
new_sid = a["speaker_id"]
|
|
new_name = a["speaker_name"]
|
|
|
|
# Preserve old metadata but update speaker fields
|
|
if old_meta is None:
|
|
old_meta = {}
|
|
elif isinstance(old_meta, str):
|
|
old_meta = json.loads(old_meta)
|
|
|
|
old_meta["new_speaker_id"] = new_sid
|
|
old_meta["new_speaker_name"] = new_name
|
|
old_meta["old_speaker_id"] = old_meta.get("speaker_id", "")
|
|
old_meta["old_speaker_name"] = old_meta.get("speaker_name", "")
|
|
|
|
# Update
|
|
meta_json = json.dumps(old_meta)
|
|
cur.execute("""
|
|
UPDATE dev.chunks
|
|
SET metadata = %s::jsonb, updated_at = NOW()
|
|
WHERE id = %s
|
|
""", (meta_json, db_id))
|
|
updated += 1
|
|
|
|
conn.commit()
|
|
print(f"Updated {updated} DB chunks")
|
|
|
|
# Also update story chunks with new aggregated speaker info
|
|
print("\n=== Step 2: Update story chunk aggregates ===")
|
|
cur.execute("""
|
|
SELECT id, chunk_index, metadata, child_chunk_ids
|
|
FROM dev.chunks
|
|
WHERE file_uuid = %s AND chunk_type = 'story'
|
|
ORDER BY chunk_index
|
|
""", (UUID,))
|
|
stories = cur.fetchall()
|
|
print(f"Found {len(stories)} story chunks")
|
|
|
|
# Get all sentence chunks with their new speaker info
|
|
cur.execute("""
|
|
SELECT chunk_index, metadata->>'new_speaker_name' as speaker_name
|
|
FROM dev.chunks
|
|
WHERE file_uuid = %s AND chunk_type = 'sentence'
|
|
ORDER BY chunk_index
|
|
""", (UUID,))
|
|
sentences = cur.fetchall()
|
|
sent_names = {s[0]: s[1] for s in sentences}
|
|
|
|
for row in stories:
|
|
db_id, idx, meta, child_ids = row
|
|
if meta is None:
|
|
meta = {}
|
|
elif isinstance(meta, str):
|
|
meta = json.loads(meta)
|
|
|
|
if child_ids:
|
|
# Aggregate speaker info from child chunks
|
|
speaker_counts = {}
|
|
for cid in child_ids:
|
|
# Parse chunk_index from child chunk_id
|
|
parts = cid.split("_")
|
|
child_idx = int(parts[-1])
|
|
if child_idx in sent_names:
|
|
name = sent_names[child_idx]
|
|
speaker_counts[name] = speaker_counts.get(name, 0) + 1
|
|
|
|
meta["speaker_breakdown"] = speaker_counts
|
|
primary = max(speaker_counts, key=speaker_counts.get) if speaker_counts else "Unknown"
|
|
meta["primary_speaker"] = primary
|
|
meta["speaker_count"] = len(speaker_counts)
|
|
|
|
meta_json = json.dumps(meta)
|
|
cur.execute("""
|
|
UPDATE dev.chunks
|
|
SET metadata = %s::jsonb, updated_at = NOW()
|
|
WHERE id = %s
|
|
""", (meta_json, db_id))
|
|
|
|
conn.commit()
|
|
print(f"Updated {len(stories)} story chunks")
|
|
|
|
print("\n=== Step 3: Update Qdrant momentry_dev_voice ===")
|
|
# Delete old voice collection and recreate
|
|
# First check if it exists
|
|
import urllib.request
|
|
req = Request(f"{QDRANT_URL}/collections/momentry_dev_voice", method="DELETE")
|
|
try:
|
|
urlopen(req)
|
|
print("Deleted old momentry_dev_voice collection")
|
|
except:
|
|
print("Could not delete or doesn't exist")
|
|
|
|
time.sleep(0.5)
|
|
|
|
# Create collection
|
|
req = Request(f"{QDRANT_URL}/collections/momentry_dev_voice",
|
|
data=json.dumps({"vectors": {"size": 192, "distance": "Cosine"}}).encode(),
|
|
headers={"Content-Type": "application/json"}, method="PUT")
|
|
try:
|
|
urlopen(req)
|
|
print("Created momentry_dev_voice collection (192D)")
|
|
except Exception as e:
|
|
print(f"Create collection error: {e}")
|
|
|
|
# Upload in batches
|
|
batch_size = 100
|
|
total_uploaded = 0
|
|
for start in range(0, len(assignments), batch_size):
|
|
batch = assignments[start:start+batch_size]
|
|
points = []
|
|
for i, a in enumerate(batch):
|
|
idx = start + i
|
|
emb = embeddings[idx]
|
|
points.append({
|
|
"id": idx + 1,
|
|
"vector": emb,
|
|
"payload": {
|
|
"file_uuid": UUID,
|
|
"speaker_id": a["speaker_id"],
|
|
"speaker_name": a["speaker_name"],
|
|
"start_time": a["start_time"],
|
|
"end_time": a["end_time"],
|
|
"segment_index": idx,
|
|
}
|
|
})
|
|
|
|
req = Request(f"{QDRANT_URL}/collections/momentry_dev_voice/points?wait=true",
|
|
data=json.dumps({"points": points}).encode(),
|
|
headers={"Content-Type": "application/json"}, method="PUT")
|
|
try:
|
|
urlopen(req)
|
|
total_uploaded += len(points)
|
|
except Exception as e:
|
|
print(f" Batch {start} error: {e}")
|
|
|
|
if (start // batch_size) % 5 == 0:
|
|
print(f" Uploaded {total_uploaded}/{len(assignments)} voice embeddings")
|
|
|
|
print(f"\nUploaded {total_uploaded} voice embeddings to momentry_dev_voice")
|
|
|
|
cur.close()
|
|
conn.close()
|
|
print("\n=== Done ===")
|