#!/opt/homebrew/bin/python3.11 """ Pipeline Status — checklist + health + timeline monitoring Output: JSON for machine parsing, formatted table for human reading """ import json, os, subprocess, sys, time from datetime import datetime from pathlib import Path PROJECT = Path(__file__).resolve().parent.parent OUTPUT_DIR = Path(os.environ.get("MOMENTRY_OUTPUT_DIR", "/Users/accusys/momentry/output_dev")) PG_BIN = "/Users/accusys/pgsql/18.3/bin" DB_USER = os.environ.get("USER", "accusys") DB_NAME = "momentry" QDRANT_URL = os.environ.get("QDRANT_URL", "http://localhost:6333") QDRANT_COL = os.environ.get("QDRANT_COLLECTION", "momentry_dev_v1") now = time.time() proc = subprocess.run def psql(sql: str) -> str: r = proc([f"{PG_BIN}/psql", "-U", DB_USER, "-d", DB_NAME, "-t", "-A", "-c", sql], capture_output=True, text=True, timeout=30) return r.stdout.strip() def file_size(path: str) -> str: p = Path(path) if not p.exists(): return "missing" kb = p.stat().st_size // 1024 if kb > 1024: return f"{kb//1024}MB" return f"{kb}KB" def fmt_secs(s: float) -> str: if s < 60: return f"{s:.0f}s" if s < 3600: return f"{s//60:.0f}m {s%60:.0f}s" return f"{s//3600:.0f}h {(s%3600)//60:.0f}m" def health_check() -> dict: """System health""" h = {} # CPU try: load = os.getloadavg() h["cpu_load_1m"] = round(load[0], 1) h["cpu_load_5m"] = round(load[1], 1) except: h["cpu_load_1m"] = h["cpu_load_5m"] = -1 # Memory try: m = proc(["vm_stat"], capture_output=True, text=True).stdout # Use ps for a simpler reading rss = None for line in proc(["ps", "-A", "-o", "rss="], capture_output=True, text=True).stdout.strip().split('\n'): if line.strip(): if rss is None: rss = 0 rss += int(line.strip()) if rss: h["memory_used_mb"] = rss // 1024 except: pass # Disk try: d = proc(["df", "-h", str(OUTPUT_DIR)], capture_output=True, text=True).stdout.strip().split('\n')[-1].split() h["disk_use_pct"] = d[4] if len(d) > 4 else "?" h["disk_avail"] = d[3] if len(d) > 3 else "?" except: pass # GPU (ANE/MPS) try: if Path("/opt/homebrew/bin/python3.11").exists(): g = proc(["/opt/homebrew/bin/python3.11", "-c", "import torch; print(torch.backends.mps.is_available())"], capture_output=True, text=True, timeout=5) h["gpu_available"] = g.stdout.strip() == "True" except: h["gpu_available"] = False # Services services = {"postgresql": False, "redis": False, "qdrant": False, "embedding": False} try: services["postgresql"] = proc([f"{PG_BIN}/pg_isready"], capture_output=True, timeout=5).returncode == 0 except: pass try: r = proc(["redis-cli", "-a", "accusys", "ping"], capture_output=True, timeout=5) services["redis"] = "PONG" in r.stdout.decode() except: try: r = proc(["redis-cli", "ping"], capture_output=True, timeout=3) services["redis"] = "PONG" in r.stdout.decode() except: pass try: r = proc(["curl", "-s", "-o", "/dev/null", "-w", "%{http_code}", "--connect-timeout", "3", "http://localhost:6333/healthz"], capture_output=True, timeout=5) services["qdrant"] = r.stdout.decode().strip() == "200" except: pass try: r = proc(["curl", "-s", "--connect-timeout", "3", "http://localhost:11436/health"], capture_output=True, timeout=5) out = r.stdout.decode() services["embedding"] = '"ok"' in out or '"status":"ok"' in out except: pass h["services"] = services return h def check_job(uuid: str) -> dict: """Run checklist for a file_uuid and return status + timing""" stages = [] t0 = time.time() # 1. ASR (pass 1: faster-whisper small) t = time.time() f = OUTPUT_DIR / f"{uuid}.asr.json" ok = f.exists() and f.stat().st_size > 0 segs = 0 if ok: try: with open(f) as fh: d = json.load(fh) segs = len(d.get("segments", [])) except: ok = False stages.append({"name": "ASR", "passed": ok and segs > 0, "detail": f"faster-whisper ({segs})" if ok else file_size(str(f)), "elapsed": round(time.time() - t, 1)}) # 2. ASRX (ECAPA-TDNN speaker diarization) t = time.time() f = OUTPUT_DIR / f"{uuid}.asrx.json" ok = f.exists() and f.stat().st_size > 0 segs = 0 if ok: try: with open(f) as fh: d = json.load(fh) segs = len(d.get("segments", [])) except: ok = False stages.append({"name": "ASRX", "passed": ok and segs > 0, "detail": f"ECAPA-TDNN ({segs})" if ok else file_size(str(f)), "elapsed": round(time.time() - t, 1)}) # 3. ASR2 (pass 2: correct split segments) t = time.time() f2 = OUTPUT_DIR / f"{uuid}.asr-1.json" ok2 = f2.exists() and f2.stat().st_size > 0 cnt2 = 0 if ok2: try: with open(f2) as fh: d2 = json.load(fh) cnt2 = len(d2.get("kept", [])) + sum(len(c["corrected"]) for c in d2.get("corrections", [])) except: ok2 = False stages.append({"name": "ASR2", "passed": ok2 and cnt2 > 0, "detail": f"{cnt2} chunks (asr-1.json)" if ok2 else file_size(str(f2)), "elapsed": round(time.time() - t, 1)}) # 4. Sentence Chunks (DB) t = time.time() cnt = int(psql(f"SELECT count(*) FROM dev.chunk WHERE file_uuid='{uuid}' AND chunk_type='sentence'")) stages.append({"name": "Sentence", "passed": cnt > 0, "detail": f"{cnt} DB", "elapsed": round(time.time() - t, 1)}) # 5. Vectorization t = time.time() vec = int(psql(f"SELECT count(*) FROM dev.chunk_vectors WHERE uuid='{uuid}'")) qdrant_ok = False try: r = proc(["curl", "-s", "--connect-timeout", "3", "-X", "POST", f"{QDRANT_URL}/collections/{QDRANT_COL}/points/count", "-H", "Content-Type: application/json", "-d", '{"exact": true}'], capture_output=True, timeout=5) qdrant_ok = b'"count"' in r.stdout except: pass if not qdrant_ok: try: r = proc(["curl", "-s", "--connect-timeout", "3", f"{QDRANT_URL}/collections/{QDRANT_COL}/points/scroll?limit=1&with_payload=false"], capture_output=True, timeout=5) qdrant_ok = b'"points"' in r.stdout except: pass stages.append({"name": "Vectorize", "passed": vec > 0 and qdrant_ok, "detail": f"{vec} PG, Qdrant={'ok' if qdrant_ok else '?'}", "elapsed": round(time.time() - t, 1)}) # 6. Face Trace t = time.time() traces = int(psql(f"SELECT count(DISTINCT trace_id) FROM dev.face_detections WHERE file_uuid='{uuid}' AND trace_id IS NOT NULL")) faces = int(psql(f"SELECT count(*) FROM dev.face_detections WHERE file_uuid='{uuid}' AND trace_id IS NOT NULL")) stages.append({"name": "FaceTrace", "passed": traces > 0, "detail": f"{traces} traces, {faces} faces", "elapsed": round(time.time() - t, 1)}) # 7. TKG t = time.time() nodes = int(psql(f"SELECT count(*) FROM dev.tkg_nodes WHERE file_uuid='{uuid}'")) edges = int(psql(f"SELECT count(*) FROM dev.tkg_edges WHERE file_uuid='{uuid}'")) stages.append({"name": "TKG", "passed": nodes > 0, "detail": f"{nodes} nodes, {edges} edges", "elapsed": round(time.time() - t, 1)}) # 8. Trace Chunks t = time.time() tc = int(psql(f"SELECT count(*) FROM dev.chunk WHERE file_uuid='{uuid}' AND chunk_type='trace'")) stages.append({"name": "TraceChunks", "passed": tc > 0, "detail": f"{tc} chunks", "elapsed": round(time.time() - t, 1)}) # 9. Phase 1 Release t = time.time() p1 = PROJECT / "release" / "phase1" / "latest" p1_files = [p1 / "RELEASE_INFO.txt", p1 / "schema.sql", p1 / "snapshots"] p1_ok = all(f.exists() for f in p1_files) p1_size = sum(f.stat().st_size for f in p1.rglob("*") if f.is_file()) // 1024 if p1.exists() else 0 stages.append({"name": "Phase1", "passed": p1_ok, "detail": f"{p1_size//1024}MB" if p1_size > 1024 else f"{p1_size}KB", "elapsed": round(time.time() - t, 1)}) all_passed = all(s["passed"] for s in stages) return {"uuid": uuid, "passed": all_passed, "stages": stages, "checked_at": datetime.utcnow().isoformat() + "Z", "total_elapsed": round(time.time() - t0, 1)} def format_report(job: dict, health: dict) -> str: """Pretty-print the status report""" lines = [] lines.append(f"{'='*70}") lines.append(f" Pipeline Status — {job['uuid'][:16]}... {job['checked_at']}") lines.append(f"{'='*70}") # Checklist lines.append(f"\n {'Stage':<15} {'Status':<9} {'Detail':<25} {'Time':<8}") lines.append(f" {'-'*57}") for s in job["stages"]: st = "✅" if s["passed"] else "❌" lines.append(f" {s['name']:<15} {st:<9} {s['detail']:<25} {s['elapsed']:.1f}s") lines.append(f" {'-'*57}") lines.append(f" {'TOTAL':<15} {'✅' if job['passed'] else '❌':<9} {'':<25} {job['total_elapsed']:.1f}s") # Health lines.append(f"\n{'─'*70}") lines.append(" SYSTEM HEALTH") lines.append(f"{'─'*70}") h = health lines.append(f" CPU Load: {h.get('cpu_load_1m','?')} (1m) {h.get('cpu_load_5m','?')} (5m)") if 'memory_used_mb' in h: total_mb = 49152 pct = round(h['memory_used_mb'] / total_mb * 100, 1) lines.append(f" Memory: {h['memory_used_mb']}MB / {total_mb}MB ({pct}%)") if 'disk_use_pct' in h: lines.append(f" Disk: {h['disk_use_pct']} used, {h['disk_avail']} avail") lines.append(f" GPU (MPS): {'✅' if h.get('gpu_available') else '❌'}") svc = h.get("services", {}) svc_str = " ".join(f"{k}={chr(10003) if v else chr(10007)}" for k, v in svc.items()) lines.append(f" Services: {svc_str}") # Processor Timing (from DB) try: proc_data = psql(f"""SELECT processor, extract(epoch from (completed_at - created_at))::int as duration_secs FROM dev.processor_results WHERE job_id IN (SELECT id FROM dev.monitor_jobs WHERE uuid='{job['uuid']}') AND completed_at IS NOT NULL ORDER BY created_at""") processors = [] for line in proc_data.split('\n'): if not line.strip() or '|' not in line: continue p = line.split('|') processors.append({"name": p[0], "duration_secs": int(p[1]) if p[1] else 0}) health["processors"] = processors except: pass if "processors" in health: lines.append(f"\n{'─'*70}") lines.append(" PROCESSOR TIMING") lines.append(f"{'─'*70}") for p in health.get("processors", []): dur = p.get("duration_secs", 0) lines.append(f" {p['name']:<25} {fmt_secs(dur) if dur else 'running'}") lines.append(f"\n{'='*70}\n") return "\n".join(lines) def main(): import argparse parser = argparse.ArgumentParser() parser.add_argument("--uuid", default="aeed71342a899fe4b4c57b7d41bcb692") parser.add_argument("--json", action="store_true", help="Output JSON only") args = parser.parse_args() job = check_job(args.uuid) health = health_check() if args.json: print(json.dumps({"job": job, "health": health, "timestamp": job["checked_at"]}, indent=2)) else: print(format_report(job, health)) if __name__ == "__main__": main()