fix: M4 Phase 1 bugs - dev.chunks refs, search_path, uuid column

Bug fixes from M4 report:
- 4 remaining dev.chunks → dev.chunk in SQL queries
- search_path includes public for pgvector extension
- get_chunk_by_chunk_id_and_uuid: uuid → file_uuid
- New endpoint: GET /api/v1/file/:uuid/chunk/:chunk_id
This commit is contained in:
Accusys
2026-05-11 10:21:06 +08:00
parent 39ba5ddf76
commit cac60c6093
17 changed files with 25156 additions and 8 deletions

View File

@@ -0,0 +1,72 @@
# M4 Handover Package — Complete
## Contents
| File | Size | Description |
|------|:----:|-------------|
| `HANDOVER_V2.0.md` | 9.6K | Main handover document |
| `api_test.sh` | 8.7K | API smoke test (37 endpoints) |
| `M4_RESPONSE.md` | 1.0K | M4 response (this file) |
### Source Code (choose one)
| File | Size | Description |
|------|:----:|-------------|
| `momentry_core_v1.0.1_source.tar.gz` | 204M | Git archive (latest commit) |
| `momentry_core.bundle` | 150M | Git bundle (full repo, `git clone momentry_core.bundle`) |
### DB Backup (pre-migration)
| File | Size | Description |
|------|:----:|-------------|
| `dev.chunks.sql` | 20M | `dev.chunks` table (old schema, pre-migration) |
| `dev.chunk_vectors.sql` | 56M | `dev.chunk_vectors` table (pre-migration) |
### Scripts
| File | Description |
|------|-------------|
| `generate_asr1.py` | Generate correction record from DB + asr.json |
| `apply_asr_corrections.py` | Apply corrections, preserve chunk_vectors |
| `clean_sentence_text.py` | LLM cleaning + Qdrant re-embedding |
| `pipeline_status.py` | Pipeline health check (9 stages) |
| `split_asr_segments.py` | Sub-window speaker change detection |
## Quick Start (on M4 machine)
```bash
# 1. Restore DB
psql -U accusys -d momentry < dev.chunks.sql
psql -U accusys -d momentry < dev.chunk_vectors.sql
# 2. Apply schema migration
psql -U accusys -d momentry -c "
ALTER TABLE dev.chunks RENAME TO dev.chunk;
ALTER TABLE dev.chunk DROP COLUMN IF EXISTS old_chunk_id;
ALTER TABLE dev.chunk DROP COLUMN IF EXISTS chunk_index;
"
psql -U accusys -d momentry -c "
UPDATE dev.chunk SET chunk_id = substring(chunk_id from 34)
WHERE chunk_id LIKE (file_uuid || '_%');
UPDATE dev.chunk_vectors cv SET chunk_id = substring(cv.chunk_id from 34)
FROM dev.chunk c WHERE c.file_uuid = cv.uuid AND cv.chunk_id LIKE (c.file_uuid || '_%');
"
# 3. Get source code
git clone momentry_core.bundle momentry_core_0.1
# or: tar xzf momentry_core_v1.0.1_source.tar.gz
# 4. Apply corrections
python3 generate_asr1.py
python3 apply_asr_corrections.py
# 5. Rebuild Qdrant
python3 clean_sentence_text.py
# 6. Build and run
cargo build --bin momentry_playground
DATABASE_SCHEMA=dev ./target/debug/momentry_playground server --port 3003
# 7. Run API test
bash api_test.sh
```

View File

@@ -0,0 +1,53 @@
# M4 Response — All Deliverables Ready
**Date:** 2026-05-11
**From:** M5
**To:** M4
## Status
| # | Item | Ref | Status |
|:-:|------|:---:|:------:|
| 1 | Source code (git bundle + tar.gz) | §8 | ✅ `momentry_core.bundle` (150M), `momentry_core_v1.0.1_source.tar.gz` (204M) |
| 2 | DB backup (pre-migration) | §5 #8 | ✅ `dev.chunks.sql` + `dev.chunk_vectors.sql` (76M total) |
| 3 | Scripts (generate, apply, clean, pipeline) | §2, §9 | ✅ 5 scripts in this directory |
| 4 | Handover document | §1 | ✅ `HANDOVER_V2.0.md` |
| 5 | API test script | §4 | ✅ `api_test.sh` (37/37 ✅) |
| 6 | INDEX.md | — | ✅ Complete contents + quick start |
## Migration Steps (on M4 machine)
```bash
# 1. Restore DB from backup
psql -U accusys -d momentry < dev.chunks.sql
psql -U accusys -d momentry < dev.chunk_vectors.sql
# 2. Schema migration
psql -U accusys -d momentry -c "
ALTER TABLE dev.chunks RENAME TO dev.chunk;
ALTER TABLE dev.chunk DROP COLUMN IF EXISTS old_chunk_id;
ALTER TABLE dev.chunk DROP COLUMN IF EXISTS chunk_index;
"
psql -U accusys -d momentry -c "
UPDATE dev.chunk SET chunk_id = substring(chunk_id from 34)
WHERE chunk_id LIKE (file_uuid || '_%');
UPDATE dev.chunk_vectors cv SET chunk_id = substring(cv.chunk_id from 34)
FROM dev.chunk c WHERE c.file_uuid = cv.uuid AND cv.chunk_id LIKE (c.file_uuid || '_%');
"
# 3. Clone source
git clone momentry_core.bundle momentry_core_0.1
# or: tar xzf momentry_core_v1.0.1_source.tar.gz
# 4. Apply corrections
python3 generate_asr1.py
python3 apply_asr_corrections.py
# 5. LLM cleanup + Qdrant rebuild
python3 clean_sentence_text.py
# 6. Build and verify
cargo build --bin momentry_playground
DATABASE_SCHEMA=dev ./target/debug/momentry_playground server --port 3003
bash api_test.sh
```

View File

@@ -0,0 +1,163 @@
#!/opt/homebrew/bin/python3.11
"""
Apply asr-1.json corrections to dev.chunks.
DELETE old chunks, INSERT corrected chunks.
PRESERVE chunk_vectors by renaming old chunk_id to new corrected IDs.
"""
import json, os, subprocess, sys, time
PG_BIN = "/Users/accusys/pgsql/18.3/bin"
DB_USER = "accusys"
DB_NAME = "momentry"
OUTPUT_DIR = "/Users/accusys/momentry/output_dev"
UUID = "aeed71342a899fe4b4c57b7d41bcb692"
DRY_RUN = "--dry-run" in sys.argv
def psql(sql, raw=False):
args = [f"{PG_BIN}/psql", "-U", DB_USER, "-d", DB_NAME]
if not raw:
args += ["-t", "-A"]
args += ["-c", sql]
r = subprocess.run(args, capture_output=True, text=True, timeout=15)
if r.returncode != 0: return None, r.stderr[:200]
return r.stdout.strip(), None
def esc(val):
if val is None: return "NULL"
return "'" + str(val).replace("'", "''") + "'"
def main():
t0 = time.time()
fps = 24.0
errors = 0
d = json.load(open(os.path.join(OUTPUT_DIR, f"{UUID}.asr-1.json")))
kept = d["kept"]
corrections = d["corrections"]
total = len(kept) + sum(len(c["corrected"]) for c in corrections)
print(f"Kept: {len(kept)}, Corrected chunks: {sum(len(c['corrected']) for c in corrections)}, Total: {total}\n")
# Step 1: DELETE old sentence chunks
if not DRY_RUN:
psql(f"DELETE FROM dev.chunks WHERE file_uuid='{UUID}' AND chunk_type='sentence';")
print(f"Step 1/4: Deleted old chunks (dry_run={DRY_RUN})")
# Step 2: RENAME chunk_vectors: old chunk_id → new corrected IDs
# For kept chunks: chunk_id unchanged → no action needed
# For corrections: clone the vector to each new child ID
vec_renamed = 0
batch_sql = []
for c in corrections:
old_id = str(c["parent_chunk_index"])
new_ids = []
for si, child in enumerate(c["corrected"]):
new_id = child.get("new_chunk_id", f"{c['parent_chunk_index']}-{si+1:02d}")
new_ids.append(new_id)
# Check if old_id has a vector in chunk_vectors
if not DRY_RUN:
out, err = psql(
f"SELECT count(*) FROM dev.chunk_vectors "
f"WHERE uuid='{UUID}' AND chunk_id='{old_id}'"
)
count = int(out.strip()) if out and out.strip().isdigit() else 0
else:
count = 1 # assume exists for dry-run
if count > 0:
# Delete old row, insert new rows for each child (cloning the embedding)
if not DRY_RUN:
# Get the embedding data
out, err = psql(
f"SELECT embedding FROM dev.chunk_vectors "
f"WHERE uuid='{UUID}' AND chunk_id='{old_id}'"
)
embedding = out.strip() if out and out.strip() else "NULL"
# Delete old
psql(f"DELETE FROM dev.chunk_vectors WHERE uuid='{UUID}' AND chunk_id='{old_id}'")
# Insert new rows
for new_id in new_ids:
psql(
f"INSERT INTO dev.chunk_vectors (chunk_id, uuid, chunk_type, embedding) "
f"VALUES ('{new_id}', '{UUID}', 'sentence', '{embedding}'::jsonb)"
)
vec_renamed += len(new_ids)
print(f"Step 2/4: chunk_vectors renamed: {vec_renamed} new entries (dry_run={DRY_RUN})")
# Step 3: INSERT kept chunks
batch = []
for k in kept:
child_id = str(k["chunk_index"])
sf = k["start_frame"]
ef = k["end_frame"]
text = k["text_content"]
st = round(sf / fps, 3)
et = round(ef / fps, 3)
batch.append(
f"INSERT INTO dev.chunks "
f"(file_uuid, chunk_id, old_chunk_id, chunk_index, chunk_type, "
f"start_time, end_time, start_frame, end_frame, text_content, fps, content) "
f"VALUES ("
f"'{UUID}', '{child_id}', '{child_id}', 0, 'sentence', "
f"{esc(st)}, {esc(et)}, {sf}, {ef}, {esc(text)}, {fps}, "
f"'{{\"source\": \"asr-1\"}}'::jsonb"
f");"
)
# Step 4: INSERT corrected chunks
for c in corrections:
for si, child in enumerate(c["corrected"]):
child_id = child.get("new_chunk_id", f"{c['parent_chunk_index']}-{si+1:02d}")
sf = child["start_frame"]
ef = child["end_frame"]
text = child["text_content"]
st = round(sf / fps, 3)
et = round(ef / fps, 3)
batch.append(
f"INSERT INTO dev.chunks "
f"(file_uuid, chunk_id, old_chunk_id, chunk_index, chunk_type, "
f"start_time, end_time, start_frame, end_frame, text_content, fps, content) "
f"VALUES ("
f"'{UUID}', '{child_id}', '{child_id}', 0, 'sentence', "
f"{esc(st)}, {esc(et)}, {sf}, {ef}, {esc(text)}, {fps}, "
f"'{{\"source\": \"asr-1\"}}'::jsonb"
f");"
)
# Execute batch
for bs in range(0, len(batch), 100):
be = min(bs + 100, len(batch))
if not DRY_RUN:
for s in batch[bs:be]:
out, err = psql(s)
if err:
errors += 1
if errors <= 3: print(f" ERROR: {err[:120]}")
pct = be * 100 // len(batch)
print(f" Steps 3+4/4: [{be}/{len(batch)}] {pct}% err={errors} [{time.time()-t0:.0f}s]")
# Verify
if not DRY_RUN:
sc = psql(f"SELECT count(*) FROM dev.chunks WHERE file_uuid='{UUID}' AND chunk_type='sentence'")
vc = psql(f"SELECT count(*) FROM dev.chunk_vectors WHERE uuid='{UUID}'")
mc = psql(
f"SELECT count(*) FROM dev.chunk_vectors cv "
f"JOIN dev.chunks c ON c.file_uuid=cv.uuid AND c.chunk_id=cv.chunk_id "
f"WHERE cv.uuid='{UUID}'"
)
print(f"\n Verify: {sc[0].strip()} chunks, {vc[0].strip()} vectors, {mc[0].strip()} matched")
print(f"\n{'='*50}")
print("DRY RUN" if DRY_RUN else "APPLIED")
print(f" Total chunks: {len(batch)}")
print(f" Vectors renamed: {vec_renamed}")
print(f" Errors: {errors}")
print(f" Time: {time.time()-t0:.1f}s")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,173 @@
#!/opt/homebrew/bin/python3.11
"""
LLM-clean all 4188 sentence texts, re-embed, update momentry_dev_v1 + sentence_story.
"""
import json, time, os
from urllib.request import Request, urlopen
import psycopg2
UUID = "aeed71342a899fe4b4c57b7d41bcb692"
DB_URL = "postgresql://accusys@localhost:5432/momentry?host=/tmp"
QDRANT_URL = "http://localhost:6333"
LLM_URL = "http://localhost:8082/v1/chat/completions"
EMBED_URL = "http://localhost:11436/v1/embeddings"
CHECKPOINT = f"/tmp/sentence_clean_{UUID}.json"
def call_llm(prompt):
body = json.dumps({"model": "google_gemma-4-26B-A4B-it-Q5_K_M.gguf",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1, "max_tokens": 80}).encode()
req = Request(LLM_URL, data=body, headers={"Content-Type": "application/json"})
resp = urlopen(req, timeout=30)
return json.loads(resp.read())["choices"][0]["message"]["content"].strip()
def call_embed(text):
body = json.dumps({"input": text}).encode()
req = Request(EMBED_URL, data=body, headers={"Content-Type": "application/json"})
resp = urlopen(req, timeout=30)
return json.loads(resp.read())["data"][0]["embedding"]
print("=== Step 1: Load all sentences ===")
conn = psycopg2.connect(DB_URL)
cur = conn.cursor()
cur.execute("""
SELECT id, chunk_id, text_content
FROM dev.chunks
WHERE file_uuid = %s AND chunk_type = 'sentence'
ORDER BY id
""", (UUID,))
rows = cur.fetchall()
conn.close()
print(f"Loaded {len(rows)} sentences")
# Reset checkpoint (incompatible with old chunk_index format)
if os.path.exists(CHECKPOINT):
os.remove(CHECKPOINT)
print("Old checkpoint removed (format changed)")
results = []
errors = 0
print("\n=== Step 2: LLM clean + embed ===")
for i, (cid, chunk_id, text_content) in enumerate(rows):
input_text = text_content
prompt = f"""Clean this movie dialogue line. Fix truncated words, capitalize, add punctuation.
Return: SPEAKER: "clean text"
Input: [Cary Grant] can't you do something constructive like start
Return: Cary Grant: "Can't you do something constructive like start?"
Input: [Audrey Hepburn] qui se présente influence d'une manière vitale la proposition l
Return: Audrey Hepburn: "Qui se présente influence d'une manière vitale la proposition..."
Input: {input_text}
Return:"""
try:
cleaned = call_llm(prompt)
embedding = call_embed(cleaned)
time.sleep(0.1)
except Exception as e:
print(f" [{i+1}/{len(rows)}] id={cid} chunk={chunk_id} ERROR: {e}")
cleaned = input_text
embedding = [0.0] * 768
errors += 1
entry = {
"index": i,
"chunk_id": chunk_id,
"original": input_text,
"cleaned": cleaned,
"embedding": embedding,
}
results.append(entry)
json.dump({"last": i}, open(CHECKPOINT, "w"))
if (i + 1) % 50 == 0:
print(f" [{i+1}/{len(rows)}] chunk={chunk_id} errors={errors}")
results.sort(key=lambda x: x["index"])
print(f"\nDone: {len(results)} cleaned, {errors} errors")
print("\n=== Step 3: Rebuild momentry_dev_v1 ===")
# Delete old
req = Request(f"{QDRANT_URL}/collections/momentry_dev_v1", method="DELETE")
try: urlopen(req); time.sleep(0.5)
except: pass
req = Request(f"{QDRANT_URL}/collections/momentry_dev_v1",
data=json.dumps({"vectors": {"size": 768, "distance": "Cosine"}}).encode(),
headers={"Content-Type": "application/json"}, method="PUT")
urlopen(req); time.sleep(0.5)
batch_size = 100
points = []
for pi, r in enumerate(results):
points.append({
"id": pi + 1,
"vector": r["embedding"],
"payload": {
"chunk_type": "sentence",
"uuid": UUID,
"chunk_id": r["chunk_id"],
"text": r["cleaned"],
"original": r["original"],
}
})
for start in range(0, len(points), batch_size):
batch = points[start:start+batch_size]
req = Request(f"{QDRANT_URL}/collections/momentry_dev_v1/points?wait=true",
data=json.dumps({"points": batch}).encode(),
headers={"Content-Type": "application/json"}, method="PUT")
try: urlopen(req)
except Exception as e: print(f" batch {start}: {e}")
if (start // batch_size) % 5 == 0:
print(f" momentry_dev_v1: {start+len(batch)}/{len(points)}")
print(" momentry_dev_v1 done")
print("\n=== Step 4: Rebuild sentence_story ===")
req = Request(f"{QDRANT_URL}/collections/sentence_story", method="DELETE")
try: urlopen(req); time.sleep(0.5)
except: pass
req = Request(f"{QDRANT_URL}/collections/sentence_story",
data=json.dumps({"vectors": {"size": 768, "distance": "Cosine"}}).encode(),
headers={"Content-Type": "application/json"}, method="PUT")
urlopen(req); time.sleep(0.5)
story_points = []
for pi, r in enumerate(results):
story_points.append({
"id": pi + 1,
"vector": r["embedding"],
"payload": {
"chunk_type": "sentence",
"uuid": UUID,
"chunk_id": r["chunk_id"],
"text": r["cleaned"],
}
})
for start in range(0, len(story_points), batch_size):
batch = story_points[start:start+batch_size]
req = Request(f"{QDRANT_URL}/collections/sentence_story/points?wait=true",
data=json.dumps({"points": batch}).encode(),
headers={"Content-Type": "application/json"}, method="PUT")
try: urlopen(req)
except Exception as e: print(f" batch {start}: {e}")
if (start // batch_size) % 5 == 0:
print(f" sentence_story: {start+len(batch)}/{len(story_points)}")
print(" sentence_story done")
# Verify
for col in ["momentry_dev_v1", "sentence_story"]:
resp = json.loads(urlopen(f"{QDRANT_URL}/collections/{col}").read())
info = resp["result"]
print(f"Verified {col}: {info['points_count']} pts, {info['config']['params']['vectors'].get('size','?')}D")
print("\n=== Done ===")

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,155 @@
#!/opt/homebrew/bin/python3.11
"""
Generate {uuid}.asr-1.json by comparing asr.json (3417) with DB chunks (4188).
Identifies which ASR segments were split and records corrections.
"""
import json, os, subprocess, sys, time
PG_BIN = "/Users/accusys/pgsql/18.3/bin"
DB_USER = "accusys"
DB_NAME = "momentry"
OUTPUT_DIR = "/Users/accusys/momentry/output_dev"
UUID = "aeed71342a899fe4b4c57b7d41bcb692"
def psql(sql):
r = subprocess.run([f"{PG_BIN}/psql", "-U", DB_USER, "-d", DB_NAME, "-t", "-A", "-F", chr(31), "-c", sql],
capture_output=True, text=True, timeout=30)
return r.stdout.strip()
def main():
t0 = time.time()
print(f"Loading ASR segments from {UUID}.asr.json...")
asr_path = os.path.join(OUTPUT_DIR, f"{UUID}.asr.json")
with open(asr_path) as f:
asr_data = json.load(f)
asr_segs = asr_data["segments"]
print(f" {len(asr_segs)} ASR segments")
print("Loading DB sentence chunks...")
rows = []
raw = psql(
f"SELECT chunk_index, start_frame, end_frame, start_time, end_time, chunk_id, text_content "
f"FROM dev.chunks WHERE file_uuid='{UUID}' AND chunk_type='sentence' "
f"ORDER BY chunk_index"
)
for line in raw.split("\n"):
if not line.strip():
continue
parts = line.split(chr(31))
rows.append(parts)
db_chunks = []
for r in rows:
db_chunks.append({
"chunk_index": int(r[0]),
"start_frame": int(r[1]),
"end_frame": int(r[2]),
"start_time": float(r[3]),
"end_time": float(r[4]),
"chunk_id": r[5],
"text_content": r[6] if len(r) > 6 and r[6] else "",
})
print(f" {len(db_chunks)} DB chunks")
# For each DB chunk, find the best-matching ASR segment.
# A DB chunk belongs to ASR segment i if chunk's time range
# falls WITHIN ASR segment i's time range.
asr_of_chunk = {} # chunk_index -> asr_idx
for dc in db_chunks:
ct_mid = (dc["start_time"] + dc["end_time"]) / 2
best_asr = None
for ai, a in enumerate(asr_segs):
if a["start"] - 0.1 <= dc["start_time"] and dc["end_time"] <= a["end"] + 0.1:
if best_asr is None:
best_asr = ai
else:
prev_a = asr_segs[best_asr]
prev_mid = (prev_a["start"] + prev_a["end"]) / 2
if abs(ct_mid - prev_mid) > abs(ct_mid - (a["start"] + a["end"]) / 2):
best_asr = ai
if best_asr is not None:
asr_of_chunk[dc["chunk_index"]] = best_asr
print(f" Mapped: {len(asr_of_chunk)} / {len(db_chunks)} chunks to ASR segments")
# Group DB chunks by ASR index
from collections import defaultdict
chunks_by_asr = defaultdict(list)
for ci, ai in asr_of_chunk.items():
chunks_by_asr[ai].append(ci)
# Build kept + corrections
corrections = []
kept = []
for ai, child_indices in sorted(chunks_by_asr.items()):
if len(child_indices) < 2:
dc = db_chunks[child_indices[0]]
kept.append({
"chunk_index": ai,
"start_frame": dc["start_frame"],
"end_frame": dc["end_frame"],
"text_content": dc["text_content"],
})
continue
a = asr_segs[ai]
children = []
for ci in child_indices:
dc = db_chunks[ci]
children.append({
"chunk_id": dc["chunk_id"],
"start_frame": dc["start_frame"],
"end_frame": dc["end_frame"],
"text_content": dc["text_content"],
})
children_sorted = sorted(children, key=lambda x: x["start_frame"])
# Assign new chunk_id format based on chunk_index
# The first child of parent ASR idx N gets "N-01", second "N-02", etc.
for si, child in enumerate(children_sorted):
child["new_chunk_id"] = f"{ai}-{si+1:02d}"
corrections.append({
"parent_chunk_index": ai,
"reason": "split",
"original": {
"start_frame": int(a["start"] * 24),
"end_frame": int(a["end"] * 24),
"text_content": a["text"],
},
"corrected": children_sorted
})
total_corrected = sum(len(c["corrected"]) for c in corrections)
print(f" Kept chunks: {len(kept)}")
print(f" Corrected chunks: {total_corrected}")
print(f" Total: {len(kept) + total_corrected} (should be {len(db_chunks)})\n")
# Write output
output = {
"file_uuid": UUID,
"asr_version": 1,
"kept": kept,
"corrections": corrections
}
output_path = os.path.join(OUTPUT_DIR, f"{UUID}.asr-1.json")
with open(output_path, "w") as f:
json.dump(output, f, indent=2, ensure_ascii=False)
print(f"\nSaved: {output_path} ({os.path.getsize(output_path) / 1024:.0f} KB)")
# Stats
split_sizes = {}
for c in corrections:
n = len(c["corrected"])
split_sizes[n] = split_sizes.get(n, 0) + 1
print(f"\nSplit distribution:")
for n in sorted(split_sizes):
print(f" {n} children: {split_sizes[n]} ASR segments → {n * split_sizes[n]} chunks")
elapsed = time.time() - t0
print(f"\nElapsed: {elapsed:.1f}s")
if __name__ == "__main__":
main()

Binary file not shown.

View File

@@ -0,0 +1,293 @@
#!/opt/homebrew/bin/python3.11
"""
Pipeline Status — checklist + health + timeline monitoring
Output: JSON for machine parsing, formatted table for human reading
"""
import json, os, subprocess, sys, time
from datetime import datetime
from pathlib import Path
PROJECT = Path(__file__).resolve().parent.parent
OUTPUT_DIR = Path(os.environ.get("MOMENTRY_OUTPUT_DIR", "/Users/accusys/momentry/output_dev"))
PG_BIN = "/Users/accusys/pgsql/18.3/bin"
DB_USER = os.environ.get("USER", "accusys")
DB_NAME = "momentry"
QDRANT_URL = os.environ.get("QDRANT_URL", "http://localhost:6333")
QDRANT_COL = os.environ.get("QDRANT_COLLECTION", "momentry_dev_v1")
now = time.time()
proc = subprocess.run
def psql(sql: str) -> str:
r = proc([f"{PG_BIN}/psql", "-U", DB_USER, "-d", DB_NAME, "-t", "-A", "-c", sql],
capture_output=True, text=True, timeout=30)
return r.stdout.strip()
def file_size(path: str) -> str:
p = Path(path)
if not p.exists(): return "missing"
kb = p.stat().st_size // 1024
if kb > 1024: return f"{kb//1024}MB"
return f"{kb}KB"
def fmt_secs(s: float) -> str:
if s < 60: return f"{s:.0f}s"
if s < 3600: return f"{s//60:.0f}m {s%60:.0f}s"
return f"{s//3600:.0f}h {(s%3600)//60:.0f}m"
def health_check() -> dict:
"""System health"""
h = {}
# CPU
try:
load = os.getloadavg()
h["cpu_load_1m"] = round(load[0], 1)
h["cpu_load_5m"] = round(load[1], 1)
except: h["cpu_load_1m"] = h["cpu_load_5m"] = -1
# Memory
try:
m = proc(["vm_stat"], capture_output=True, text=True).stdout
# Use ps for a simpler reading
rss = None
for line in proc(["ps", "-A", "-o", "rss="], capture_output=True, text=True).stdout.strip().split('\n'):
if line.strip():
if rss is None: rss = 0
rss += int(line.strip())
if rss:
h["memory_used_mb"] = rss // 1024
except: pass
# Disk
try:
d = proc(["df", "-h", str(OUTPUT_DIR)], capture_output=True, text=True).stdout.strip().split('\n')[-1].split()
h["disk_use_pct"] = d[4] if len(d) > 4 else "?"
h["disk_avail"] = d[3] if len(d) > 3 else "?"
except: pass
# GPU (ANE/MPS)
try:
if Path("/opt/homebrew/bin/python3.11").exists():
g = proc(["/opt/homebrew/bin/python3.11", "-c",
"import torch; print(torch.backends.mps.is_available())"],
capture_output=True, text=True, timeout=5)
h["gpu_available"] = g.stdout.strip() == "True"
except: h["gpu_available"] = False
# Services
services = {"postgresql": False, "redis": False, "qdrant": False, "embedding": False}
try:
services["postgresql"] = proc([f"{PG_BIN}/pg_isready"], capture_output=True, timeout=5).returncode == 0
except: pass
try:
r = proc(["redis-cli", "-a", "accusys", "ping"], capture_output=True, timeout=5)
services["redis"] = "PONG" in r.stdout.decode()
except:
try:
r = proc(["redis-cli", "ping"], capture_output=True, timeout=3)
services["redis"] = "PONG" in r.stdout.decode()
except: pass
try:
r = proc(["curl", "-s", "-o", "/dev/null", "-w", "%{http_code}", "--connect-timeout", "3",
"http://localhost:6333/healthz"], capture_output=True, timeout=5)
services["qdrant"] = r.stdout.decode().strip() == "200"
except: pass
try:
r = proc(["curl", "-s", "--connect-timeout", "3", "http://localhost:11436/health"],
capture_output=True, timeout=5)
out = r.stdout.decode()
services["embedding"] = '"ok"' in out or '"status":"ok"' in out
except: pass
h["services"] = services
return h
def check_job(uuid: str) -> dict:
"""Run checklist for a file_uuid and return status + timing"""
stages = []
t0 = time.time()
# 1. ASR (pass 1: faster-whisper small)
t = time.time()
f = OUTPUT_DIR / f"{uuid}.asr.json"
ok = f.exists() and f.stat().st_size > 0
segs = 0
if ok:
try:
with open(f) as fh: d = json.load(fh)
segs = len(d.get("segments", []))
except: ok = False
stages.append({"name": "ASR", "passed": ok and segs > 0, "detail": f"faster-whisper ({segs})" if ok else file_size(str(f)),
"elapsed": round(time.time() - t, 1)})
# 2. ASRX (ECAPA-TDNN speaker diarization)
t = time.time()
f = OUTPUT_DIR / f"{uuid}.asrx.json"
ok = f.exists() and f.stat().st_size > 0
segs = 0
if ok:
try:
with open(f) as fh: d = json.load(fh)
segs = len(d.get("segments", []))
except: ok = False
stages.append({"name": "ASRX", "passed": ok and segs > 0, "detail": f"ECAPA-TDNN ({segs})" if ok else file_size(str(f)),
"elapsed": round(time.time() - t, 1)})
# 3. ASR2 (pass 2: correct split segments)
t = time.time()
f2 = OUTPUT_DIR / f"{uuid}.asr-1.json"
ok2 = f2.exists() and f2.stat().st_size > 0
cnt2 = 0
if ok2:
try:
with open(f2) as fh: d2 = json.load(fh)
cnt2 = len(d2.get("kept", [])) + sum(len(c["corrected"]) for c in d2.get("corrections", []))
except: ok2 = False
stages.append({"name": "ASR2", "passed": ok2 and cnt2 > 0, "detail": f"{cnt2} chunks (asr-1.json)" if ok2 else file_size(str(f2)),
"elapsed": round(time.time() - t, 1)})
# 4. Sentence Chunks (DB)
t = time.time()
cnt = int(psql(f"SELECT count(*) FROM dev.chunk WHERE file_uuid='{uuid}' AND chunk_type='sentence'"))
stages.append({"name": "Sentence", "passed": cnt > 0, "detail": f"{cnt} DB", "elapsed": round(time.time() - t, 1)})
# 5. Vectorization
t = time.time()
vec = int(psql(f"SELECT count(*) FROM dev.chunk_vectors WHERE uuid='{uuid}'"))
qdrant_ok = False
try:
r = proc(["curl", "-s", "--connect-timeout", "3", "-X", "POST",
f"{QDRANT_URL}/collections/{QDRANT_COL}/points/count",
"-H", "Content-Type: application/json", "-d", '{"exact": true}'],
capture_output=True, timeout=5)
qdrant_ok = b'"count"' in r.stdout
except: pass
if not qdrant_ok:
try:
r = proc(["curl", "-s", "--connect-timeout", "3",
f"{QDRANT_URL}/collections/{QDRANT_COL}/points/scroll?limit=1&with_payload=false"],
capture_output=True, timeout=5)
qdrant_ok = b'"points"' in r.stdout
except: pass
stages.append({"name": "Vectorize", "passed": vec > 0 and qdrant_ok,
"detail": f"{vec} PG, Qdrant={'ok' if qdrant_ok else '?'}",
"elapsed": round(time.time() - t, 1)})
# 6. Face Trace
t = time.time()
traces = int(psql(f"SELECT count(DISTINCT trace_id) FROM dev.face_detections WHERE file_uuid='{uuid}' AND trace_id IS NOT NULL"))
faces = int(psql(f"SELECT count(*) FROM dev.face_detections WHERE file_uuid='{uuid}' AND trace_id IS NOT NULL"))
stages.append({"name": "FaceTrace", "passed": traces > 0,
"detail": f"{traces} traces, {faces} faces",
"elapsed": round(time.time() - t, 1)})
# 7. TKG
t = time.time()
nodes = int(psql(f"SELECT count(*) FROM dev.tkg_nodes WHERE file_uuid='{uuid}'"))
edges = int(psql(f"SELECT count(*) FROM dev.tkg_edges WHERE file_uuid='{uuid}'"))
stages.append({"name": "TKG", "passed": nodes > 0,
"detail": f"{nodes} nodes, {edges} edges",
"elapsed": round(time.time() - t, 1)})
# 8. Trace Chunks
t = time.time()
tc = int(psql(f"SELECT count(*) FROM dev.chunk WHERE file_uuid='{uuid}' AND chunk_type='trace'"))
stages.append({"name": "TraceChunks", "passed": tc > 0, "detail": f"{tc} chunks",
"elapsed": round(time.time() - t, 1)})
# 9. Phase 1 Release
t = time.time()
p1 = PROJECT / "release" / "phase1" / "latest"
p1_files = [p1 / "RELEASE_INFO.txt", p1 / "schema.sql", p1 / "snapshots"]
p1_ok = all(f.exists() for f in p1_files)
p1_size = sum(f.stat().st_size for f in p1.rglob("*") if f.is_file()) // 1024 if p1.exists() else 0
stages.append({"name": "Phase1", "passed": p1_ok,
"detail": f"{p1_size//1024}MB" if p1_size > 1024 else f"{p1_size}KB",
"elapsed": round(time.time() - t, 1)})
all_passed = all(s["passed"] for s in stages)
return {"uuid": uuid, "passed": all_passed, "stages": stages,
"checked_at": datetime.utcnow().isoformat() + "Z",
"total_elapsed": round(time.time() - t0, 1)}
def format_report(job: dict, health: dict) -> str:
"""Pretty-print the status report"""
lines = []
lines.append(f"{'='*70}")
lines.append(f" Pipeline Status — {job['uuid'][:16]}... {job['checked_at']}")
lines.append(f"{'='*70}")
# Checklist
lines.append(f"\n {'Stage':<15} {'Status':<9} {'Detail':<25} {'Time':<8}")
lines.append(f" {'-'*57}")
for s in job["stages"]:
st = "" if s["passed"] else ""
lines.append(f" {s['name']:<15} {st:<9} {s['detail']:<25} {s['elapsed']:.1f}s")
lines.append(f" {'-'*57}")
lines.append(f" {'TOTAL':<15} {'' if job['passed'] else '':<9} {'':<25} {job['total_elapsed']:.1f}s")
# Health
lines.append(f"\n{''*70}")
lines.append(" SYSTEM HEALTH")
lines.append(f"{''*70}")
h = health
lines.append(f" CPU Load: {h.get('cpu_load_1m','?')} (1m) {h.get('cpu_load_5m','?')} (5m)")
if 'memory_used_mb' in h:
total_mb = 49152
pct = round(h['memory_used_mb'] / total_mb * 100, 1)
lines.append(f" Memory: {h['memory_used_mb']}MB / {total_mb}MB ({pct}%)")
if 'disk_use_pct' in h:
lines.append(f" Disk: {h['disk_use_pct']} used, {h['disk_avail']} avail")
lines.append(f" GPU (MPS): {'' if h.get('gpu_available') else ''}")
svc = h.get("services", {})
svc_str = " ".join(f"{k}={chr(10003) if v else chr(10007)}" for k, v in svc.items())
lines.append(f" Services: {svc_str}")
# Processor Timing (from DB)
try:
proc_data = psql(f"""SELECT processor,
extract(epoch from (completed_at - created_at))::int as duration_secs
FROM dev.processor_results WHERE job_id IN
(SELECT id FROM dev.monitor_jobs WHERE uuid='{job['uuid']}')
AND completed_at IS NOT NULL
ORDER BY created_at""")
processors = []
for line in proc_data.split('\n'):
if not line.strip() or '|' not in line: continue
p = line.split('|')
processors.append({"name": p[0], "duration_secs": int(p[1]) if p[1] else 0})
health["processors"] = processors
except: pass
if "processors" in health:
lines.append(f"\n{''*70}")
lines.append(" PROCESSOR TIMING")
lines.append(f"{''*70}")
for p in health.get("processors", []):
dur = p.get("duration_secs", 0)
lines.append(f" {p['name']:<25} {fmt_secs(dur) if dur else 'running'}")
lines.append(f"\n{'='*70}\n")
return "\n".join(lines)
def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--uuid", default="aeed71342a899fe4b4c57b7d41bcb692")
parser.add_argument("--json", action="store_true", help="Output JSON only")
args = parser.parse_args()
job = check_job(args.uuid)
health = health_check()
if args.json:
print(json.dumps({"job": job, "health": health, "timestamp": job["checked_at"]}, indent=2))
else:
print(format_report(job, health))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,204 @@
#!/opt/homebrew/bin/python3.11
"""
Split ASR segments at detected speaker change points.
Uses ECAPA-TDNN sub-window classification against reference centroids.
Output: new asrx_fine.json with fine-grained segments + parent_asr_idx reference.
"""
import json, sys, os, time, argparse, subprocess, tempfile, shutil
import numpy as np
from collections import Counter
from pathlib import Path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "asrx_self"))
from main_fixed import SelfASRXFixed
from speaker_encoder import extract_speaker_embedding, normalize_embeddings
import torchaudio, psycopg2
SUB_WIN = 0.5
SUB_STRIDE = 0.25
CHANGE_CONFIRM = 2
MIN_DUR = 0.7
BATCH_SIZE = 500
def load_reference(uuid, db_url):
conn = psycopg2.connect(db_url)
cur = conn.cursor()
cur.execute("SELECT chunk_index, metadata->>'new_speaker_name' FROM dev.chunks WHERE file_uuid=%s AND chunk_type='sentence' ORDER BY chunk_index", (uuid,))
name_by_idx = dict(cur.fetchall())
conn.close()
asrx_path = f"/Users/accusys/momentry/output_dev/{uuid}.asrx.json"
asrx_full = json.load(open(asrx_path))
ref = {"Cary Grant": [], "Audrey Hepburn": [], "Unknown": []}
for i, seg in enumerate(asrx_full["segments"]):
name = name_by_idx.get(i, "Unknown")
if name in ref and i < len(asrx_full.get("embeddings", [])):
ref[name].append(np.array(asrx_full["embeddings"][i]))
centroids = {}
for name, el in ref.items():
if el:
c = np.mean(el, axis=0)
centroids[name] = c / (np.linalg.norm(c) + 1e-10)
name_to_speaker = {}
for i, seg in enumerate(asrx_full["segments"]):
name = name_by_idx.get(i, "Unknown")
sid = seg["speaker_id"]
name_to_speaker.setdefault(name, sid)
return centroids, name_to_speaker
def extract_audio(video_path, sr=16000):
tmp = tempfile.mkdtemp(prefix="asr_split_")
wav = os.path.join(tmp, "audio.wav")
subprocess.run(["ffmpeg", "-y", "-v", "quiet", "-i", video_path,
"-ar", str(sr), "-ac", "1", "-sample_fmt", "s16", wav], check=True, capture_output=True, timeout=300)
wav_data, sr_actual = torchaudio.load(wav)
if wav_data.shape[0] > 1:
wav_data = wav_data.mean(dim=0, keepdim=True)
return wav_data, sr_actual, tmp
def classify(emb, centroids):
return max(centroids, key=lambda n: float(np.dot(emb, centroids[n])))
def process_batch(asr_segs, wav, sr, centroids, encoder, offset_start=0):
ws = int(SUB_WIN * sr)
sw = int(SUB_STRIDE * sr)
results = []
for si, s in enumerate(asr_segs):
st = s["start"] - offset_start
et = s["end"] - offset_start
dur = et - st
if dur < 1.0:
a = wav[:, int(st*sr):int(et*sr)]
e = extract_speaker_embedding(encoder, a.numpy(), sr)
e /= np.linalg.norm(e) + 1e-10
results.append((s["start"], s["end"], classify(e, centroids), si))
continue
ss = int(st*sr); se = int(et*sr)
sub_e, sub_t = [], []
for wpos in range(ss, se-ws+1, sw):
chunk = wav[:, wpos:wpos+ws]
sub_e.append(extract_speaker_embedding(encoder, chunk.numpy(), sr))
sub_t.append(wpos/sr + offset_start)
if len(sub_e) < 3:
a = wav[:, ss:se]
e = extract_speaker_embedding(encoder, a.numpy(), sr)
e /= np.linalg.norm(e) + 1e-10
results.append((s["start"], s["end"], classify(e, centroids), si))
continue
sub_e = normalize_embeddings(np.array(sub_e))
names = []
for i in range(len(sub_e)):
names.append(classify(sub_e[i], centroids))
# Smooth
sm = list(names)
for i in range(1, len(names)-1):
sm[i] = Counter(names[max(0,i-1):min(len(names),i+2)]).most_common(1)[0][0]
# Find splits
splits = []
prev = sm[0]
for i in range(1, len(sm)):
if sm[i] != prev:
if i+CHANGE_CONFIRM < len(sm) and all(sm[i]==sm[j] for j in range(i, i+CHANGE_CONFIRM+1)):
splits.append(sub_t[i]); prev = sm[i]
elif i+CHANGE_CONFIRM >= len(sm):
splits.append(sub_t[i]); prev = sm[i]
if not splits:
results.append((s["start"], s["end"], Counter(names).most_common(1)[0][0], si))
else:
boundaries = [s["start"]] + splits + [s["end"]]
for pi in range(len(boundaries)-1):
ps, pe = boundaries[pi], boundaries[pi+1]
if pe-ps < MIN_DUR: continue
sub_i = [i for i, t in enumerate(sub_t) if ps <= t < pe]
lbl = Counter([names[i] for i in sub_i]).most_common(1)[0][0] if sub_i else Counter(names).most_common(1)[0][0]
results.append((round(ps,2), round(pe,2), lbl, si))
return results
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--uuid", default="aeed71342a899fe4b4c57b7d41bcb692")
parser.add_argument("--output", help="Output path for fine ASRX JSON")
args = parser.parse_args()
UUID = args.uuid
BASE = "/Users/accusys/momentry/output_dev"
DB_URL = "postgresql://accusys@localhost:5432/momentry?host=/tmp"
VIDEO = "/Users/accusys/momentry/var/sftpgo/data/demo/Charade (1963) Cary Grant & Audrey Hepburn \uff5c Comedy Mystery Romance Thriller \uff5c Full Movie.mp4"
print(f"Processing {UUID}")
centroids, name_to_speaker = load_reference(UUID, DB_URL)
print(f"Centroids: {list(centroids.keys())}")
asr = json.load(open(f"{BASE}/{UUID}.asr.json"))
asr_segs = asr["segments"]
print(f"ASR segments: {len(asr_segs)}")
print("Extracting audio...")
wav, sr, tmp_dir = extract_audio(VIDEO)
print(f"Audio: {wav.shape[1]/sr:.0f}s")
inst = SelfASRXFixed()
encoder = inst.speaker_encoder
all_results = []
t0 = time.time()
for batch_start in range(0, len(asr_segs), BATCH_SIZE):
batch = asr_segs[batch_start:batch_start + BATCH_SIZE]
segs = process_batch(batch, wav, sr, centroids, encoder)
all_results.extend(segs)
pct = (batch_start + len(batch)) * 100 // len(asr_segs)
print(f" {batch_start+len(batch)}/{len(asr_segs)} ({pct}%) -> {len(all_results)} segments [{time.time()-t0:.0f}s]")
shutil.rmtree(tmp_dir, ignore_errors=True)
# Build output
spk_stats = {}
out_segs = []
# Assign sequential SPEAKER_X IDs based on name order
name_order = {name: i for i, name in enumerate(sorted(set(s[2] for s in all_results)))}
for start, end, name, asr_idx in all_results:
sid = f"SPEAKER_{name_order[name]}"
dur = end - start
spk_stats.setdefault(sid, {"count": 0, "duration": 0})
spk_stats[sid]["count"] += 1
spk_stats[sid]["duration"] += dur
out_segs.append({
"start_time": start,
"end_time": end,
"speaker_id": sid,
"speaker_name": name,
"parent_asr_idx": asr_idx,
})
output = {
"uuid": UUID,
"language": "en",
"segments": out_segs,
"speaker_stats": spk_stats,
"total_asr_segments": len(asr_segs),
"total_fine_segments": len(out_segs),
}
output_path = args.output or f"{BASE}/{UUID}.asrx_fine.json"
json.dump(output, open(output_path, "w"), indent=2)
print(f"\nSaved: {output_path}")
print(f"Segments: {len(out_segs)} (was {len(asr_segs)}, +{len(out_segs)-len(asr_segs)})")
print(f"Speakers: {len(spk_stats)}")
for sid, st in sorted(spk_stats.items()):
print(f" {sid}: {st['count']} segs, {st['duration']:.0f}s")
if __name__ == "__main__":
main()