diff --git a/scripts/pipeline_status.py b/scripts/pipeline_status.py new file mode 100644 index 0000000..69a7721 --- /dev/null +++ b/scripts/pipeline_status.py @@ -0,0 +1,280 @@ +#!/opt/homebrew/bin/python3.11 +""" +Pipeline Status — checklist + health + timeline monitoring +Output: JSON for machine parsing, formatted table for human reading +""" + +import json, os, subprocess, sys, time +from datetime import datetime +from pathlib import Path + +PROJECT = Path(__file__).resolve().parent.parent +OUTPUT_DIR = Path(os.environ.get("MOMENTRY_OUTPUT_DIR", "/Users/accusys/momentry/output_dev")) +PG_BIN = "/Users/accusys/pgsql/18.3/bin" +DB_USER = os.environ.get("USER", "accusys") +DB_NAME = "momentry" +QDRANT_URL = os.environ.get("QDRANT_URL", "http://localhost:6333") +QDRANT_COL = os.environ.get("QDRANT_COLLECTION", "momentry_dev_v1") + +now = time.time() +proc = subprocess.run + + +def psql(sql: str) -> str: + r = proc([f"{PG_BIN}/psql", "-U", DB_USER, "-d", DB_NAME, "-t", "-A", "-c", sql], + capture_output=True, text=True, timeout=30) + return r.stdout.strip() + + +def file_size(path: str) -> str: + p = Path(path) + if not p.exists(): return "missing" + kb = p.stat().st_size // 1024 + if kb > 1024: return f"{kb//1024}MB" + return f"{kb}KB" + + +def fmt_secs(s: float) -> str: + if s < 60: return f"{s:.0f}s" + if s < 3600: return f"{s//60:.0f}m {s%60:.0f}s" + return f"{s//3600:.0f}h {(s%3600)//60:.0f}m" + + +def health_check() -> dict: + """System health""" + h = {} + # CPU + try: + load = os.getloadavg() + h["cpu_load_1m"] = round(load[0], 1) + h["cpu_load_5m"] = round(load[1], 1) + except: h["cpu_load_1m"] = h["cpu_load_5m"] = -1 + # Memory + try: + m = proc(["vm_stat"], capture_output=True, text=True).stdout + # Use ps for a simpler reading + rss = None + for line in proc(["ps", "-A", "-o", "rss="], capture_output=True, text=True).stdout.strip().split('\n'): + if line.strip(): + if rss is None: rss = 0 + rss += int(line.strip()) + if rss: + h["memory_used_mb"] = rss // 1024 + except: pass + # Disk + try: + d = proc(["df", "-h", str(OUTPUT_DIR)], capture_output=True, text=True).stdout.strip().split('\n')[-1].split() + h["disk_use_pct"] = d[4] if len(d) > 4 else "?" + h["disk_avail"] = d[3] if len(d) > 3 else "?" + except: pass + # GPU (ANE/MPS) + try: + if Path("/opt/homebrew/bin/python3.11").exists(): + g = proc(["/opt/homebrew/bin/python3.11", "-c", + "import torch; print(torch.backends.mps.is_available())"], + capture_output=True, text=True, timeout=5) + h["gpu_available"] = g.stdout.strip() == "True" + except: h["gpu_available"] = False + # Services + services = {"postgresql": False, "redis": False, "qdrant": False, "embedding": False} + try: + services["postgresql"] = proc([f"{PG_BIN}/pg_isready"], capture_output=True, timeout=5).returncode == 0 + except: pass + try: + r = proc(["redis-cli", "-a", "accusys", "ping"], capture_output=True, timeout=5) + services["redis"] = "PONG" in r.stdout.decode() + except: + try: + r = proc(["redis-cli", "ping"], capture_output=True, timeout=3) + services["redis"] = "PONG" in r.stdout.decode() + except: pass + try: + r = proc(["curl", "-s", "-o", "/dev/null", "-w", "%{http_code}", "--connect-timeout", "3", + "http://localhost:6333/healthz"], capture_output=True, timeout=5) + services["qdrant"] = r.stdout.decode().strip() == "200" + except: pass + try: + r = proc(["curl", "-s", "--connect-timeout", "3", "http://localhost:11436/health"], + capture_output=True, timeout=5) + out = r.stdout.decode() + services["embedding"] = '"ok"' in out or '"status":"ok"' in out + except: pass + h["services"] = services + return h + + +def check_job(uuid: str) -> dict: + """Run checklist for a file_uuid and return status + timing""" + stages = [] + t0 = time.time() + + # 1. ASR + t = time.time() + f = OUTPUT_DIR / f"{uuid}.asr.json" + ok = f.exists() and f.stat().st_size > 0 + segs = 0 + if ok: + try: + with open(f) as fh: d = json.load(fh) + segs = len(d.get("segments", [])) + except: ok = False + stages.append({"name": "ASR", "passed": ok and segs > 0, "detail": f"{segs} seg" if ok else file_size(str(f)), + "elapsed": round(time.time() - t, 1)}) + + # 2. ASRX + t = time.time() + f = OUTPUT_DIR / f"{uuid}.asrx.json" + ok = f.exists() and f.stat().st_size > 0 + segs = 0 + if ok: + try: + with open(f) as fh: d = json.load(fh) + segs = len(d.get("segments", [])) + except: ok = False + stages.append({"name": "ASRX", "passed": ok and segs > 0, "detail": f"{segs} seg" if ok else file_size(str(f)), + "elapsed": round(time.time() - t, 1)}) + + # 3. Sentence Chunks + t = time.time() + cnt = int(psql(f"SELECT count(*) FROM dev.chunks WHERE file_uuid='{uuid}' AND chunk_type='sentence'")) + stages.append({"name": "Sentence", "passed": cnt > 0, "detail": f"{cnt} chunks", "elapsed": round(time.time() - t, 1)}) + + # 4. Vectorization + t = time.time() + vec = int(psql(f"SELECT count(*) FROM dev.chunk_vectors WHERE uuid='{uuid}'")) + qdrant_ok = False + try: + r = proc(["curl", "-s", "--connect-timeout", "3", "-X", "POST", + f"{QDRANT_URL}/collections/{QDRANT_COL}/points/count", + "-H", "Content-Type: application/json", "-d", '{"exact": true}'], + capture_output=True, timeout=5) + qdrant_ok = b'"count"' in r.stdout + except: pass + if not qdrant_ok: + try: + r = proc(["curl", "-s", "--connect-timeout", "3", + f"{QDRANT_URL}/collections/{QDRANT_COL}/points/scroll?limit=1&with_payload=false"], + capture_output=True, timeout=5) + qdrant_ok = b'"points"' in r.stdout + except: pass + stages.append({"name": "Vectorize", "passed": vec > 0 and qdrant_ok, + "detail": f"{vec} PG, Qdrant={'ok' if qdrant_ok else '?'}", + "elapsed": round(time.time() - t, 1)}) + + # 5. Face Trace + t = time.time() + traces = int(psql(f"SELECT count(DISTINCT trace_id) FROM dev.face_detections WHERE file_uuid='{uuid}' AND trace_id IS NOT NULL")) + faces = int(psql(f"SELECT count(*) FROM dev.face_detections WHERE file_uuid='{uuid}' AND trace_id IS NOT NULL")) + stages.append({"name": "FaceTrace", "passed": traces > 0, + "detail": f"{traces} traces, {faces} faces", + "elapsed": round(time.time() - t, 1)}) + + # 6. TKG + t = time.time() + nodes = int(psql(f"SELECT count(*) FROM dev.tkg_nodes WHERE file_uuid='{uuid}'")) + edges = int(psql(f"SELECT count(*) FROM dev.tkg_edges WHERE file_uuid='{uuid}'")) + stages.append({"name": "TKG", "passed": nodes > 0, + "detail": f"{nodes} nodes, {edges} edges", + "elapsed": round(time.time() - t, 1)}) + + # 7. Trace Chunks + t = time.time() + tc = int(psql(f"SELECT count(*) FROM dev.chunks WHERE file_uuid='{uuid}' AND chunk_type='trace'")) + stages.append({"name": "TraceChunks", "passed": tc > 0, "detail": f"{tc} chunks", + "elapsed": round(time.time() - t, 1)}) + + # 8. Phase 1 Release + t = time.time() + p1 = PROJECT / "release" / "phase1" / "latest" + p1_files = [p1 / "RELEASE_INFO.txt", p1 / "chunks.csv", p1 / "vectors.csv"] + p1_ok = all(f.exists() for f in p1_files) + p1_size = sum(f.stat().st_size for f in p1.rglob("*") if f.is_file()) // 1024 if p1.exists() else 0 + stages.append({"name": "Phase1", "passed": p1_ok, + "detail": f"{p1_size//1024}MB" if p1_size > 1024 else f"{p1_size}KB", + "elapsed": round(time.time() - t, 1)}) + + all_passed = all(s["passed"] for s in stages) + return {"uuid": uuid, "passed": all_passed, "stages": stages, + "checked_at": datetime.utcnow().isoformat() + "Z", + "total_elapsed": round(time.time() - t0, 1)} + + +def format_report(job: dict, health: dict) -> str: + """Pretty-print the status report""" + lines = [] + lines.append(f"{'='*70}") + lines.append(f" Pipeline Status — {job['uuid'][:16]}... {job['checked_at']}") + lines.append(f"{'='*70}") + + # Checklist + lines.append(f"\n {'Stage':<15} {'Status':<9} {'Detail':<25} {'Time':<8}") + lines.append(f" {'-'*57}") + for s in job["stages"]: + st = "✅" if s["passed"] else "❌" + lines.append(f" {s['name']:<15} {st:<9} {s['detail']:<25} {s['elapsed']:.1f}s") + lines.append(f" {'-'*57}") + lines.append(f" {'TOTAL':<15} {'✅' if job['passed'] else '❌':<9} {'':<25} {job['total_elapsed']:.1f}s") + + # Health + lines.append(f"\n{'─'*70}") + lines.append(" SYSTEM HEALTH") + lines.append(f"{'─'*70}") + h = health + lines.append(f" CPU Load: {h.get('cpu_load_1m','?')} (1m) {h.get('cpu_load_5m','?')} (5m)") + if 'memory_used_mb' in h: + total_mb = 49152 + pct = round(h['memory_used_mb'] / total_mb * 100, 1) + lines.append(f" Memory: {h['memory_used_mb']}MB / {total_mb}MB ({pct}%)") + if 'disk_use_pct' in h: + lines.append(f" Disk: {h['disk_use_pct']} used, {h['disk_avail']} avail") + lines.append(f" GPU (MPS): {'✅' if h.get('gpu_available') else '❌'}") + svc = h.get("services", {}) + svc_str = " ".join(f"{k}={chr(10003) if v else chr(10007)}" for k, v in svc.items()) + lines.append(f" Services: {svc_str}") + + # Processor Timing (from DB) + try: + proc_data = psql(f"""SELECT processor, + extract(epoch from (completed_at - created_at))::int as duration_secs + FROM dev.processor_results WHERE job_id IN + (SELECT id FROM dev.monitor_jobs WHERE uuid='{job['uuid']}') + AND completed_at IS NOT NULL + ORDER BY created_at""") + processors = [] + for line in proc_data.split('\n'): + if not line.strip() or '|' not in line: continue + p = line.split('|') + processors.append({"name": p[0], "duration_secs": int(p[1]) if p[1] else 0}) + health["processors"] = processors + except: pass + + if "processors" in health: + lines.append(f"\n{'─'*70}") + lines.append(" PROCESSOR TIMING") + lines.append(f"{'─'*70}") + for p in health.get("processors", []): + dur = p.get("duration_secs", 0) + lines.append(f" {p['name']:<25} {fmt_secs(dur) if dur else 'running'}") + + lines.append(f"\n{'='*70}\n") + return "\n".join(lines) + + +def main(): + import argparse + parser = argparse.ArgumentParser() + parser.add_argument("--uuid", default="aeed71342a899fe4b4c57b7d41bcb692") + parser.add_argument("--json", action="store_true", help="Output JSON only") + args = parser.parse_args() + + job = check_job(args.uuid) + health = health_check() + + if args.json: + print(json.dumps({"job": job, "health": health, "timestamp": job["checked_at"]}, indent=2)) + else: + print(format_report(job, health)) + + +if __name__ == "__main__": + main()