Files
momentry_core/scripts/apply_asr_corrections.py
Accusys 39ba5ddf76 feat: Phase 1 handover - schema migration, correction mechanism, API fixes
Schema changes: dev.chunks->dev.chunk, remove old_chunk_id/chunk_index
Correction: asr-1.json format, generate/apply scripts
API: 37/37 endpoints fixed and tested
Docs: HANDOVER_V2.0.md for M4
2026-05-11 07:03:22 +08:00

164 lines
6.1 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Apply asr-1.json corrections to dev.chunks.
DELETE old chunks, INSERT corrected chunks.
PRESERVE chunk_vectors by renaming old chunk_id to new corrected IDs.
"""
import json, os, subprocess, sys, time
PG_BIN = "/Users/accusys/pgsql/18.3/bin"
DB_USER = "accusys"
DB_NAME = "momentry"
OUTPUT_DIR = "/Users/accusys/momentry/output_dev"
UUID = "aeed71342a899fe4b4c57b7d41bcb692"
DRY_RUN = "--dry-run" in sys.argv
def psql(sql, raw=False):
args = [f"{PG_BIN}/psql", "-U", DB_USER, "-d", DB_NAME]
if not raw:
args += ["-t", "-A"]
args += ["-c", sql]
r = subprocess.run(args, capture_output=True, text=True, timeout=15)
if r.returncode != 0: return None, r.stderr[:200]
return r.stdout.strip(), None
def esc(val):
if val is None: return "NULL"
return "'" + str(val).replace("'", "''") + "'"
def main():
t0 = time.time()
fps = 24.0
errors = 0
d = json.load(open(os.path.join(OUTPUT_DIR, f"{UUID}.asr-1.json")))
kept = d["kept"]
corrections = d["corrections"]
total = len(kept) + sum(len(c["corrected"]) for c in corrections)
print(f"Kept: {len(kept)}, Corrected chunks: {sum(len(c['corrected']) for c in corrections)}, Total: {total}\n")
# Step 1: DELETE old sentence chunks
if not DRY_RUN:
psql(f"DELETE FROM dev.chunks WHERE file_uuid='{UUID}' AND chunk_type='sentence';")
print(f"Step 1/4: Deleted old chunks (dry_run={DRY_RUN})")
# Step 2: RENAME chunk_vectors: old chunk_id → new corrected IDs
# For kept chunks: chunk_id unchanged → no action needed
# For corrections: clone the vector to each new child ID
vec_renamed = 0
batch_sql = []
for c in corrections:
old_id = str(c["parent_chunk_index"])
new_ids = []
for si, child in enumerate(c["corrected"]):
new_id = child.get("new_chunk_id", f"{c['parent_chunk_index']}-{si+1:02d}")
new_ids.append(new_id)
# Check if old_id has a vector in chunk_vectors
if not DRY_RUN:
out, err = psql(
f"SELECT count(*) FROM dev.chunk_vectors "
f"WHERE uuid='{UUID}' AND chunk_id='{old_id}'"
)
count = int(out.strip()) if out and out.strip().isdigit() else 0
else:
count = 1 # assume exists for dry-run
if count > 0:
# Delete old row, insert new rows for each child (cloning the embedding)
if not DRY_RUN:
# Get the embedding data
out, err = psql(
f"SELECT embedding FROM dev.chunk_vectors "
f"WHERE uuid='{UUID}' AND chunk_id='{old_id}'"
)
embedding = out.strip() if out and out.strip() else "NULL"
# Delete old
psql(f"DELETE FROM dev.chunk_vectors WHERE uuid='{UUID}' AND chunk_id='{old_id}'")
# Insert new rows
for new_id in new_ids:
psql(
f"INSERT INTO dev.chunk_vectors (chunk_id, uuid, chunk_type, embedding) "
f"VALUES ('{new_id}', '{UUID}', 'sentence', '{embedding}'::jsonb)"
)
vec_renamed += len(new_ids)
print(f"Step 2/4: chunk_vectors renamed: {vec_renamed} new entries (dry_run={DRY_RUN})")
# Step 3: INSERT kept chunks
batch = []
for k in kept:
child_id = str(k["chunk_index"])
sf = k["start_frame"]
ef = k["end_frame"]
text = k["text_content"]
st = round(sf / fps, 3)
et = round(ef / fps, 3)
batch.append(
f"INSERT INTO dev.chunks "
f"(file_uuid, chunk_id, old_chunk_id, chunk_index, chunk_type, "
f"start_time, end_time, start_frame, end_frame, text_content, fps, content) "
f"VALUES ("
f"'{UUID}', '{child_id}', '{child_id}', 0, 'sentence', "
f"{esc(st)}, {esc(et)}, {sf}, {ef}, {esc(text)}, {fps}, "
f"'{{\"source\": \"asr-1\"}}'::jsonb"
f");"
)
# Step 4: INSERT corrected chunks
for c in corrections:
for si, child in enumerate(c["corrected"]):
child_id = child.get("new_chunk_id", f"{c['parent_chunk_index']}-{si+1:02d}")
sf = child["start_frame"]
ef = child["end_frame"]
text = child["text_content"]
st = round(sf / fps, 3)
et = round(ef / fps, 3)
batch.append(
f"INSERT INTO dev.chunks "
f"(file_uuid, chunk_id, old_chunk_id, chunk_index, chunk_type, "
f"start_time, end_time, start_frame, end_frame, text_content, fps, content) "
f"VALUES ("
f"'{UUID}', '{child_id}', '{child_id}', 0, 'sentence', "
f"{esc(st)}, {esc(et)}, {sf}, {ef}, {esc(text)}, {fps}, "
f"'{{\"source\": \"asr-1\"}}'::jsonb"
f");"
)
# Execute batch
for bs in range(0, len(batch), 100):
be = min(bs + 100, len(batch))
if not DRY_RUN:
for s in batch[bs:be]:
out, err = psql(s)
if err:
errors += 1
if errors <= 3: print(f" ERROR: {err[:120]}")
pct = be * 100 // len(batch)
print(f" Steps 3+4/4: [{be}/{len(batch)}] {pct}% err={errors} [{time.time()-t0:.0f}s]")
# Verify
if not DRY_RUN:
sc = psql(f"SELECT count(*) FROM dev.chunks WHERE file_uuid='{UUID}' AND chunk_type='sentence'")
vc = psql(f"SELECT count(*) FROM dev.chunk_vectors WHERE uuid='{UUID}'")
mc = psql(
f"SELECT count(*) FROM dev.chunk_vectors cv "
f"JOIN dev.chunks c ON c.file_uuid=cv.uuid AND c.chunk_id=cv.chunk_id "
f"WHERE cv.uuid='{UUID}'"
)
print(f"\n Verify: {sc[0].strip()} chunks, {vc[0].strip()} vectors, {mc[0].strip()} matched")
print(f"\n{'='*50}")
print("DRY RUN" if DRY_RUN else "APPLIED")
print(f" Total chunks: {len(batch)}")
print(f" Vectors renamed: {vec_renamed}")
print(f" Errors: {errors}")
print(f" Time: {time.time()-t0:.1f}s")
if __name__ == "__main__":
main()