Schema changes: dev.chunks->dev.chunk, remove old_chunk_id/chunk_index Correction: asr-1.json format, generate/apply scripts API: 37/37 endpoints fixed and tested Docs: HANDOVER_V2.0.md for M4
294 lines
11 KiB
Python
294 lines
11 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Pipeline Status — checklist + health + timeline monitoring
|
|
Output: JSON for machine parsing, formatted table for human reading
|
|
"""
|
|
|
|
import json, os, subprocess, sys, time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
PROJECT = Path(__file__).resolve().parent.parent
|
|
OUTPUT_DIR = Path(os.environ.get("MOMENTRY_OUTPUT_DIR", "/Users/accusys/momentry/output_dev"))
|
|
PG_BIN = "/Users/accusys/pgsql/18.3/bin"
|
|
DB_USER = os.environ.get("USER", "accusys")
|
|
DB_NAME = "momentry"
|
|
QDRANT_URL = os.environ.get("QDRANT_URL", "http://localhost:6333")
|
|
QDRANT_COL = os.environ.get("QDRANT_COLLECTION", "momentry_dev_v1")
|
|
|
|
now = time.time()
|
|
proc = subprocess.run
|
|
|
|
|
|
def psql(sql: str) -> str:
|
|
r = proc([f"{PG_BIN}/psql", "-U", DB_USER, "-d", DB_NAME, "-t", "-A", "-c", sql],
|
|
capture_output=True, text=True, timeout=30)
|
|
return r.stdout.strip()
|
|
|
|
|
|
def file_size(path: str) -> str:
|
|
p = Path(path)
|
|
if not p.exists(): return "missing"
|
|
kb = p.stat().st_size // 1024
|
|
if kb > 1024: return f"{kb//1024}MB"
|
|
return f"{kb}KB"
|
|
|
|
|
|
def fmt_secs(s: float) -> str:
|
|
if s < 60: return f"{s:.0f}s"
|
|
if s < 3600: return f"{s//60:.0f}m {s%60:.0f}s"
|
|
return f"{s//3600:.0f}h {(s%3600)//60:.0f}m"
|
|
|
|
|
|
def health_check() -> dict:
|
|
"""System health"""
|
|
h = {}
|
|
# CPU
|
|
try:
|
|
load = os.getloadavg()
|
|
h["cpu_load_1m"] = round(load[0], 1)
|
|
h["cpu_load_5m"] = round(load[1], 1)
|
|
except: h["cpu_load_1m"] = h["cpu_load_5m"] = -1
|
|
# Memory
|
|
try:
|
|
m = proc(["vm_stat"], capture_output=True, text=True).stdout
|
|
# Use ps for a simpler reading
|
|
rss = None
|
|
for line in proc(["ps", "-A", "-o", "rss="], capture_output=True, text=True).stdout.strip().split('\n'):
|
|
if line.strip():
|
|
if rss is None: rss = 0
|
|
rss += int(line.strip())
|
|
if rss:
|
|
h["memory_used_mb"] = rss // 1024
|
|
except: pass
|
|
# Disk
|
|
try:
|
|
d = proc(["df", "-h", str(OUTPUT_DIR)], capture_output=True, text=True).stdout.strip().split('\n')[-1].split()
|
|
h["disk_use_pct"] = d[4] if len(d) > 4 else "?"
|
|
h["disk_avail"] = d[3] if len(d) > 3 else "?"
|
|
except: pass
|
|
# GPU (ANE/MPS)
|
|
try:
|
|
if Path("/opt/homebrew/bin/python3.11").exists():
|
|
g = proc(["/opt/homebrew/bin/python3.11", "-c",
|
|
"import torch; print(torch.backends.mps.is_available())"],
|
|
capture_output=True, text=True, timeout=5)
|
|
h["gpu_available"] = g.stdout.strip() == "True"
|
|
except: h["gpu_available"] = False
|
|
# Services
|
|
services = {"postgresql": False, "redis": False, "qdrant": False, "embedding": False}
|
|
try:
|
|
services["postgresql"] = proc([f"{PG_BIN}/pg_isready"], capture_output=True, timeout=5).returncode == 0
|
|
except: pass
|
|
try:
|
|
r = proc(["redis-cli", "-a", "accusys", "ping"], capture_output=True, timeout=5)
|
|
services["redis"] = "PONG" in r.stdout.decode()
|
|
except:
|
|
try:
|
|
r = proc(["redis-cli", "ping"], capture_output=True, timeout=3)
|
|
services["redis"] = "PONG" in r.stdout.decode()
|
|
except: pass
|
|
try:
|
|
r = proc(["curl", "-s", "-o", "/dev/null", "-w", "%{http_code}", "--connect-timeout", "3",
|
|
"http://localhost:6333/healthz"], capture_output=True, timeout=5)
|
|
services["qdrant"] = r.stdout.decode().strip() == "200"
|
|
except: pass
|
|
try:
|
|
r = proc(["curl", "-s", "--connect-timeout", "3", "http://localhost:11436/health"],
|
|
capture_output=True, timeout=5)
|
|
out = r.stdout.decode()
|
|
services["embedding"] = '"ok"' in out or '"status":"ok"' in out
|
|
except: pass
|
|
h["services"] = services
|
|
return h
|
|
|
|
|
|
def check_job(uuid: str) -> dict:
|
|
"""Run checklist for a file_uuid and return status + timing"""
|
|
stages = []
|
|
t0 = time.time()
|
|
|
|
# 1. ASR (pass 1: faster-whisper small)
|
|
t = time.time()
|
|
f = OUTPUT_DIR / f"{uuid}.asr.json"
|
|
ok = f.exists() and f.stat().st_size > 0
|
|
segs = 0
|
|
if ok:
|
|
try:
|
|
with open(f) as fh: d = json.load(fh)
|
|
segs = len(d.get("segments", []))
|
|
except: ok = False
|
|
stages.append({"name": "ASR", "passed": ok and segs > 0, "detail": f"faster-whisper ({segs})" if ok else file_size(str(f)),
|
|
"elapsed": round(time.time() - t, 1)})
|
|
|
|
# 2. ASRX (ECAPA-TDNN speaker diarization)
|
|
t = time.time()
|
|
f = OUTPUT_DIR / f"{uuid}.asrx.json"
|
|
ok = f.exists() and f.stat().st_size > 0
|
|
segs = 0
|
|
if ok:
|
|
try:
|
|
with open(f) as fh: d = json.load(fh)
|
|
segs = len(d.get("segments", []))
|
|
except: ok = False
|
|
stages.append({"name": "ASRX", "passed": ok and segs > 0, "detail": f"ECAPA-TDNN ({segs})" if ok else file_size(str(f)),
|
|
"elapsed": round(time.time() - t, 1)})
|
|
|
|
# 3. ASR2 (pass 2: correct split segments)
|
|
t = time.time()
|
|
f2 = OUTPUT_DIR / f"{uuid}.asr-1.json"
|
|
ok2 = f2.exists() and f2.stat().st_size > 0
|
|
cnt2 = 0
|
|
if ok2:
|
|
try:
|
|
with open(f2) as fh: d2 = json.load(fh)
|
|
cnt2 = len(d2.get("kept", [])) + sum(len(c["corrected"]) for c in d2.get("corrections", []))
|
|
except: ok2 = False
|
|
stages.append({"name": "ASR2", "passed": ok2 and cnt2 > 0, "detail": f"{cnt2} chunks (asr-1.json)" if ok2 else file_size(str(f2)),
|
|
"elapsed": round(time.time() - t, 1)})
|
|
|
|
# 4. Sentence Chunks (DB)
|
|
t = time.time()
|
|
cnt = int(psql(f"SELECT count(*) FROM dev.chunk WHERE file_uuid='{uuid}' AND chunk_type='sentence'"))
|
|
stages.append({"name": "Sentence", "passed": cnt > 0, "detail": f"{cnt} DB", "elapsed": round(time.time() - t, 1)})
|
|
|
|
# 5. Vectorization
|
|
t = time.time()
|
|
vec = int(psql(f"SELECT count(*) FROM dev.chunk_vectors WHERE uuid='{uuid}'"))
|
|
qdrant_ok = False
|
|
try:
|
|
r = proc(["curl", "-s", "--connect-timeout", "3", "-X", "POST",
|
|
f"{QDRANT_URL}/collections/{QDRANT_COL}/points/count",
|
|
"-H", "Content-Type: application/json", "-d", '{"exact": true}'],
|
|
capture_output=True, timeout=5)
|
|
qdrant_ok = b'"count"' in r.stdout
|
|
except: pass
|
|
if not qdrant_ok:
|
|
try:
|
|
r = proc(["curl", "-s", "--connect-timeout", "3",
|
|
f"{QDRANT_URL}/collections/{QDRANT_COL}/points/scroll?limit=1&with_payload=false"],
|
|
capture_output=True, timeout=5)
|
|
qdrant_ok = b'"points"' in r.stdout
|
|
except: pass
|
|
stages.append({"name": "Vectorize", "passed": vec > 0 and qdrant_ok,
|
|
"detail": f"{vec} PG, Qdrant={'ok' if qdrant_ok else '?'}",
|
|
"elapsed": round(time.time() - t, 1)})
|
|
|
|
# 6. Face Trace
|
|
t = time.time()
|
|
traces = int(psql(f"SELECT count(DISTINCT trace_id) FROM dev.face_detections WHERE file_uuid='{uuid}' AND trace_id IS NOT NULL"))
|
|
faces = int(psql(f"SELECT count(*) FROM dev.face_detections WHERE file_uuid='{uuid}' AND trace_id IS NOT NULL"))
|
|
stages.append({"name": "FaceTrace", "passed": traces > 0,
|
|
"detail": f"{traces} traces, {faces} faces",
|
|
"elapsed": round(time.time() - t, 1)})
|
|
|
|
# 7. TKG
|
|
t = time.time()
|
|
nodes = int(psql(f"SELECT count(*) FROM dev.tkg_nodes WHERE file_uuid='{uuid}'"))
|
|
edges = int(psql(f"SELECT count(*) FROM dev.tkg_edges WHERE file_uuid='{uuid}'"))
|
|
stages.append({"name": "TKG", "passed": nodes > 0,
|
|
"detail": f"{nodes} nodes, {edges} edges",
|
|
"elapsed": round(time.time() - t, 1)})
|
|
|
|
# 8. Trace Chunks
|
|
t = time.time()
|
|
tc = int(psql(f"SELECT count(*) FROM dev.chunk WHERE file_uuid='{uuid}' AND chunk_type='trace'"))
|
|
stages.append({"name": "TraceChunks", "passed": tc > 0, "detail": f"{tc} chunks",
|
|
"elapsed": round(time.time() - t, 1)})
|
|
|
|
# 9. Phase 1 Release
|
|
t = time.time()
|
|
p1 = PROJECT / "release" / "phase1" / "latest"
|
|
p1_files = [p1 / "RELEASE_INFO.txt", p1 / "schema.sql", p1 / "snapshots"]
|
|
p1_ok = all(f.exists() for f in p1_files)
|
|
p1_size = sum(f.stat().st_size for f in p1.rglob("*") if f.is_file()) // 1024 if p1.exists() else 0
|
|
stages.append({"name": "Phase1", "passed": p1_ok,
|
|
"detail": f"{p1_size//1024}MB" if p1_size > 1024 else f"{p1_size}KB",
|
|
"elapsed": round(time.time() - t, 1)})
|
|
|
|
all_passed = all(s["passed"] for s in stages)
|
|
return {"uuid": uuid, "passed": all_passed, "stages": stages,
|
|
"checked_at": datetime.utcnow().isoformat() + "Z",
|
|
"total_elapsed": round(time.time() - t0, 1)}
|
|
|
|
|
|
def format_report(job: dict, health: dict) -> str:
|
|
"""Pretty-print the status report"""
|
|
lines = []
|
|
lines.append(f"{'='*70}")
|
|
lines.append(f" Pipeline Status — {job['uuid'][:16]}... {job['checked_at']}")
|
|
lines.append(f"{'='*70}")
|
|
|
|
# Checklist
|
|
lines.append(f"\n {'Stage':<15} {'Status':<9} {'Detail':<25} {'Time':<8}")
|
|
lines.append(f" {'-'*57}")
|
|
for s in job["stages"]:
|
|
st = "✅" if s["passed"] else "❌"
|
|
lines.append(f" {s['name']:<15} {st:<9} {s['detail']:<25} {s['elapsed']:.1f}s")
|
|
lines.append(f" {'-'*57}")
|
|
lines.append(f" {'TOTAL':<15} {'✅' if job['passed'] else '❌':<9} {'':<25} {job['total_elapsed']:.1f}s")
|
|
|
|
# Health
|
|
lines.append(f"\n{'─'*70}")
|
|
lines.append(" SYSTEM HEALTH")
|
|
lines.append(f"{'─'*70}")
|
|
h = health
|
|
lines.append(f" CPU Load: {h.get('cpu_load_1m','?')} (1m) {h.get('cpu_load_5m','?')} (5m)")
|
|
if 'memory_used_mb' in h:
|
|
total_mb = 49152
|
|
pct = round(h['memory_used_mb'] / total_mb * 100, 1)
|
|
lines.append(f" Memory: {h['memory_used_mb']}MB / {total_mb}MB ({pct}%)")
|
|
if 'disk_use_pct' in h:
|
|
lines.append(f" Disk: {h['disk_use_pct']} used, {h['disk_avail']} avail")
|
|
lines.append(f" GPU (MPS): {'✅' if h.get('gpu_available') else '❌'}")
|
|
svc = h.get("services", {})
|
|
svc_str = " ".join(f"{k}={chr(10003) if v else chr(10007)}" for k, v in svc.items())
|
|
lines.append(f" Services: {svc_str}")
|
|
|
|
# Processor Timing (from DB)
|
|
try:
|
|
proc_data = psql(f"""SELECT processor,
|
|
extract(epoch from (completed_at - created_at))::int as duration_secs
|
|
FROM dev.processor_results WHERE job_id IN
|
|
(SELECT id FROM dev.monitor_jobs WHERE uuid='{job['uuid']}')
|
|
AND completed_at IS NOT NULL
|
|
ORDER BY created_at""")
|
|
processors = []
|
|
for line in proc_data.split('\n'):
|
|
if not line.strip() or '|' not in line: continue
|
|
p = line.split('|')
|
|
processors.append({"name": p[0], "duration_secs": int(p[1]) if p[1] else 0})
|
|
health["processors"] = processors
|
|
except: pass
|
|
|
|
if "processors" in health:
|
|
lines.append(f"\n{'─'*70}")
|
|
lines.append(" PROCESSOR TIMING")
|
|
lines.append(f"{'─'*70}")
|
|
for p in health.get("processors", []):
|
|
dur = p.get("duration_secs", 0)
|
|
lines.append(f" {p['name']:<25} {fmt_secs(dur) if dur else 'running'}")
|
|
|
|
lines.append(f"\n{'='*70}\n")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main():
|
|
import argparse
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--uuid", default="aeed71342a899fe4b4c57b7d41bcb692")
|
|
parser.add_argument("--json", action="store_true", help="Output JSON only")
|
|
args = parser.parse_args()
|
|
|
|
job = check_job(args.uuid)
|
|
health = health_check()
|
|
|
|
if args.json:
|
|
print(json.dumps({"job": job, "health": health, "timestamp": job["checked_at"]}, indent=2))
|
|
else:
|
|
print(format_report(job, health))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|