Schema changes: dev.chunks->dev.chunk, remove old_chunk_id/chunk_index Correction: asr-1.json format, generate/apply scripts API: 37/37 endpoints fixed and tested Docs: HANDOVER_V2.0.md for M4
186 lines
6.3 KiB
Python
186 lines
6.3 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Full pipeline migration: delete old chunks, create 4188 fine-grained chunks
|
|
with yolo_objects, face_ids, metadata per (recalculated) frame range.
|
|
"""
|
|
import json, sys, time, psycopg2
|
|
from collections import defaultdict
|
|
|
|
UUID = "aeed71342a899fe4b4c57b7d41bcb692"
|
|
BASE = "/Users/accusys/momentry/output_dev"
|
|
DB_URL = "postgresql://accusys@localhost:5432/momentry?host=/tmp"
|
|
FPS = 25.0
|
|
FILE_ID = 242
|
|
|
|
print("=== Load asrx_fine ===")
|
|
fine = json.load(open(f"{BASE}/{UUID}.asrx.json"))
|
|
segs = fine["segments"]
|
|
print(f"Segments: {len(segs)}")
|
|
|
|
conn = psycopg2.connect(DB_URL)
|
|
cur = conn.cursor()
|
|
|
|
# Step 2: Delete old chunks
|
|
print("\n=== Step 2: Delete old chunks ===")
|
|
for ctype in ['sentence', 'story', 'trace']:
|
|
cur.execute(
|
|
"DELETE FROM dev.chunks WHERE file_uuid=%s AND chunk_type=%s",
|
|
(UUID, ctype))
|
|
print(f" Deleted {cur.rowcount} {ctype} chunks")
|
|
conn.commit()
|
|
|
|
# Step 3: Build frame → data lookup for YOLO and faces
|
|
print("\n=== Step 3: Load yolo + face data ===")
|
|
# YOLO: frame → set of object class names (dedup, confidence > 0.5)
|
|
print(" Loading YOLO data...")
|
|
t0 = time.time()
|
|
cur.execute(
|
|
"SELECT start_frame, data FROM dev.pre_chunks "
|
|
"WHERE file_uuid=%s AND processor_type='yolo' "
|
|
"ORDER BY start_frame", (UUID,))
|
|
yolo_by_frame = {} # frame → set of class names
|
|
row_count = 0
|
|
for r in cur:
|
|
fn = r[0]
|
|
data = r[1]
|
|
if data and "objects" in data:
|
|
objects = data["objects"]
|
|
names = set()
|
|
for obj in objects:
|
|
if obj.get("confidence", 0) > 0.5:
|
|
names.add(obj.get("class_name", ""))
|
|
if names:
|
|
yolo_by_frame[fn] = names
|
|
row_count += 1
|
|
print(f" YOLO: {row_count} entries, {len(yolo_by_frame)} frames with objects ({time.time()-t0:.1f}s)")
|
|
|
|
# Face: frame → set of face_ids
|
|
print(" Loading face data...")
|
|
t0 = time.time()
|
|
cur.execute(
|
|
"SELECT frame_number, face_id FROM dev.face_detections "
|
|
"WHERE file_uuid=%s AND trace_id IS NOT NULL "
|
|
"ORDER BY frame_number", (UUID,))
|
|
face_by_frame = defaultdict(set) # frame → set of face_ids
|
|
row_count = 0
|
|
for r in cur:
|
|
fn = r[0]
|
|
fid = r[1]
|
|
if fid:
|
|
face_by_frame[fn].add(fid)
|
|
row_count += 1
|
|
print(f" Faces: {row_count} entries, {len(face_by_frame)} frames ({time.time()-t0:.1f}s)")
|
|
|
|
# Step 4: Create new chunks
|
|
print("\n=== Step 4: Create 4188 sentence chunks ===")
|
|
t0 = time.time()
|
|
batch_size = 100
|
|
inserted = 0
|
|
yolo_hit = 0
|
|
face_hit = 0
|
|
|
|
yolo_frames_sorted = sorted(yolo_by_frame.keys())
|
|
face_frames_sorted = sorted(face_by_frame.keys())
|
|
|
|
for batch_start in range(0, len(segs), batch_size):
|
|
batch = segs[batch_start:batch_start + batch_size]
|
|
values = []
|
|
for si, s in enumerate(batch):
|
|
idx = batch_start + si
|
|
st = s["start_time"]
|
|
et = s["end_time"]
|
|
sf = int(st * FPS)
|
|
ef = int(et * FPS)
|
|
spk_name = s.get("speaker_name", "Unknown")
|
|
spk_id = s.get("speaker_id", "SPEAKER_?")
|
|
raw_text = s.get("text", "")
|
|
|
|
# Query YOLO objects in frame range (binary search on sorted list)
|
|
yolo_objs = []
|
|
import bisect
|
|
left = bisect.bisect_left(yolo_frames_sorted, sf)
|
|
right = bisect.bisect_right(yolo_frames_sorted, ef)
|
|
for i in range(left, right):
|
|
fn = yolo_frames_sorted[i]
|
|
yolo_objs.extend(yolo_by_frame[fn])
|
|
yolo_objs = list(set(yolo_objs)) # dedup
|
|
if yolo_objs:
|
|
yolo_hit += 1
|
|
|
|
# Query face IDs in frame range
|
|
face_ids = []
|
|
left = bisect.bisect_left(face_frames_sorted, sf)
|
|
right = bisect.bisect_right(face_frames_sorted, ef)
|
|
for i in range(left, right):
|
|
fn = face_frames_sorted[i]
|
|
face_ids.extend(face_by_frame[fn])
|
|
face_ids = list(set(face_ids)) # dedup
|
|
if face_ids:
|
|
face_hit += 1
|
|
|
|
chunk_id = f"{UUID}_{idx}"
|
|
|
|
values.append((
|
|
UUID, # file_uuid
|
|
chunk_id, # old_chunk_id
|
|
idx, # chunk_index
|
|
"sentence", # chunk_type
|
|
st, # start_time
|
|
et, # end_time
|
|
json.dumps({"data": {"text": raw_text, "text_normalized": raw_text.lower()}, "rule": "rule_1"}), # content
|
|
json.dumps({ # metadata
|
|
"speaker_id": spk_id,
|
|
"speaker_name": spk_name,
|
|
"yolo_objects": yolo_objs,
|
|
"face_ids": face_ids,
|
|
"language": "en",
|
|
}),
|
|
f"[{spk_name}] {raw_text}", # text_content
|
|
FPS, # fps
|
|
sf, # start_frame
|
|
ef, # end_frame
|
|
ef - sf, # frame_count
|
|
FILE_ID, # file_id
|
|
chunk_id, # chunk_id
|
|
[], # pre_chunk_ids
|
|
[], # child_chunk_ids
|
|
))
|
|
|
|
cur.executemany("""
|
|
INSERT INTO dev.chunks
|
|
(file_uuid, old_chunk_id, chunk_index, chunk_type,
|
|
start_time, end_time, content, metadata,
|
|
text_content, fps, start_frame, end_frame, frame_count,
|
|
file_id, chunk_id, pre_chunk_ids, child_chunk_ids)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s::jsonb,%s::jsonb,%s,%s,%s,%s,%s,%s,%s,%s,%s)
|
|
""", values)
|
|
conn.commit()
|
|
inserted += len(batch)
|
|
|
|
if (batch_start // batch_size) % 5 == 0:
|
|
pct = inserted * 100 // len(segs)
|
|
print(f" {inserted}/{len(segs)} ({pct}%) yolo_hit={yolo_hit} face_hit={face_hit} [{time.time()-t0:.0f}s]")
|
|
|
|
print(f"\n Inserted: {inserted} chunks")
|
|
print(f" Chunks with YOLO objects: {yolo_hit}/{inserted}")
|
|
print(f" Chunks with face IDs: {face_hit}/{inserted}")
|
|
print(f" Time: {time.time()-t0:.1f}s")
|
|
|
|
# Verify
|
|
cur.execute(
|
|
"SELECT COUNT(*) FROM dev.chunks WHERE file_uuid=%s AND chunk_type='sentence'",
|
|
(UUID,))
|
|
cnt = cur.fetchone()[0]
|
|
print(f"\n DB sentence chunks: {cnt}")
|
|
|
|
cur.execute(
|
|
"SELECT metadata->>'speaker_name', COUNT(*) FROM dev.chunks "
|
|
"WHERE file_uuid=%s AND chunk_type='sentence' "
|
|
"GROUP BY 1 ORDER BY 2 DESC", (UUID,))
|
|
print(" Speaker distribution:")
|
|
for r in cur.fetchall():
|
|
print(f" {r[0]}: {r[1]}")
|
|
|
|
conn.close()
|
|
print("\n=== Done ===")
|