Files
momentry_core/scripts/migrate_to_4188.py
Accusys 39ba5ddf76 feat: Phase 1 handover - schema migration, correction mechanism, API fixes
Schema changes: dev.chunks->dev.chunk, remove old_chunk_id/chunk_index
Correction: asr-1.json format, generate/apply scripts
API: 37/37 endpoints fixed and tested
Docs: HANDOVER_V2.0.md for M4
2026-05-11 07:03:22 +08:00

186 lines
6.3 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Full pipeline migration: delete old chunks, create 4188 fine-grained chunks
with yolo_objects, face_ids, metadata per (recalculated) frame range.
"""
import json, sys, time, psycopg2
from collections import defaultdict
UUID = "aeed71342a899fe4b4c57b7d41bcb692"
BASE = "/Users/accusys/momentry/output_dev"
DB_URL = "postgresql://accusys@localhost:5432/momentry?host=/tmp"
FPS = 25.0
FILE_ID = 242
print("=== Load asrx_fine ===")
fine = json.load(open(f"{BASE}/{UUID}.asrx.json"))
segs = fine["segments"]
print(f"Segments: {len(segs)}")
conn = psycopg2.connect(DB_URL)
cur = conn.cursor()
# Step 2: Delete old chunks
print("\n=== Step 2: Delete old chunks ===")
for ctype in ['sentence', 'story', 'trace']:
cur.execute(
"DELETE FROM dev.chunks WHERE file_uuid=%s AND chunk_type=%s",
(UUID, ctype))
print(f" Deleted {cur.rowcount} {ctype} chunks")
conn.commit()
# Step 3: Build frame → data lookup for YOLO and faces
print("\n=== Step 3: Load yolo + face data ===")
# YOLO: frame → set of object class names (dedup, confidence > 0.5)
print(" Loading YOLO data...")
t0 = time.time()
cur.execute(
"SELECT start_frame, data FROM dev.pre_chunks "
"WHERE file_uuid=%s AND processor_type='yolo' "
"ORDER BY start_frame", (UUID,))
yolo_by_frame = {} # frame → set of class names
row_count = 0
for r in cur:
fn = r[0]
data = r[1]
if data and "objects" in data:
objects = data["objects"]
names = set()
for obj in objects:
if obj.get("confidence", 0) > 0.5:
names.add(obj.get("class_name", ""))
if names:
yolo_by_frame[fn] = names
row_count += 1
print(f" YOLO: {row_count} entries, {len(yolo_by_frame)} frames with objects ({time.time()-t0:.1f}s)")
# Face: frame → set of face_ids
print(" Loading face data...")
t0 = time.time()
cur.execute(
"SELECT frame_number, face_id FROM dev.face_detections "
"WHERE file_uuid=%s AND trace_id IS NOT NULL "
"ORDER BY frame_number", (UUID,))
face_by_frame = defaultdict(set) # frame → set of face_ids
row_count = 0
for r in cur:
fn = r[0]
fid = r[1]
if fid:
face_by_frame[fn].add(fid)
row_count += 1
print(f" Faces: {row_count} entries, {len(face_by_frame)} frames ({time.time()-t0:.1f}s)")
# Step 4: Create new chunks
print("\n=== Step 4: Create 4188 sentence chunks ===")
t0 = time.time()
batch_size = 100
inserted = 0
yolo_hit = 0
face_hit = 0
yolo_frames_sorted = sorted(yolo_by_frame.keys())
face_frames_sorted = sorted(face_by_frame.keys())
for batch_start in range(0, len(segs), batch_size):
batch = segs[batch_start:batch_start + batch_size]
values = []
for si, s in enumerate(batch):
idx = batch_start + si
st = s["start_time"]
et = s["end_time"]
sf = int(st * FPS)
ef = int(et * FPS)
spk_name = s.get("speaker_name", "Unknown")
spk_id = s.get("speaker_id", "SPEAKER_?")
raw_text = s.get("text", "")
# Query YOLO objects in frame range (binary search on sorted list)
yolo_objs = []
import bisect
left = bisect.bisect_left(yolo_frames_sorted, sf)
right = bisect.bisect_right(yolo_frames_sorted, ef)
for i in range(left, right):
fn = yolo_frames_sorted[i]
yolo_objs.extend(yolo_by_frame[fn])
yolo_objs = list(set(yolo_objs)) # dedup
if yolo_objs:
yolo_hit += 1
# Query face IDs in frame range
face_ids = []
left = bisect.bisect_left(face_frames_sorted, sf)
right = bisect.bisect_right(face_frames_sorted, ef)
for i in range(left, right):
fn = face_frames_sorted[i]
face_ids.extend(face_by_frame[fn])
face_ids = list(set(face_ids)) # dedup
if face_ids:
face_hit += 1
chunk_id = f"{UUID}_{idx}"
values.append((
UUID, # file_uuid
chunk_id, # old_chunk_id
idx, # chunk_index
"sentence", # chunk_type
st, # start_time
et, # end_time
json.dumps({"data": {"text": raw_text, "text_normalized": raw_text.lower()}, "rule": "rule_1"}), # content
json.dumps({ # metadata
"speaker_id": spk_id,
"speaker_name": spk_name,
"yolo_objects": yolo_objs,
"face_ids": face_ids,
"language": "en",
}),
f"[{spk_name}] {raw_text}", # text_content
FPS, # fps
sf, # start_frame
ef, # end_frame
ef - sf, # frame_count
FILE_ID, # file_id
chunk_id, # chunk_id
[], # pre_chunk_ids
[], # child_chunk_ids
))
cur.executemany("""
INSERT INTO dev.chunks
(file_uuid, old_chunk_id, chunk_index, chunk_type,
start_time, end_time, content, metadata,
text_content, fps, start_frame, end_frame, frame_count,
file_id, chunk_id, pre_chunk_ids, child_chunk_ids)
VALUES (%s,%s,%s,%s,%s,%s,%s::jsonb,%s::jsonb,%s,%s,%s,%s,%s,%s,%s,%s,%s)
""", values)
conn.commit()
inserted += len(batch)
if (batch_start // batch_size) % 5 == 0:
pct = inserted * 100 // len(segs)
print(f" {inserted}/{len(segs)} ({pct}%) yolo_hit={yolo_hit} face_hit={face_hit} [{time.time()-t0:.0f}s]")
print(f"\n Inserted: {inserted} chunks")
print(f" Chunks with YOLO objects: {yolo_hit}/{inserted}")
print(f" Chunks with face IDs: {face_hit}/{inserted}")
print(f" Time: {time.time()-t0:.1f}s")
# Verify
cur.execute(
"SELECT COUNT(*) FROM dev.chunks WHERE file_uuid=%s AND chunk_type='sentence'",
(UUID,))
cnt = cur.fetchone()[0]
print(f"\n DB sentence chunks: {cnt}")
cur.execute(
"SELECT metadata->>'speaker_name', COUNT(*) FROM dev.chunks "
"WHERE file_uuid=%s AND chunk_type='sentence' "
"GROUP BY 1 ORDER BY 2 DESC", (UUID,))
print(" Speaker distribution:")
for r in cur.fetchall():
print(f" {r[0]}: {r[1]}")
conn.close()
print("\n=== Done ===")