#!/opt/homebrew/bin/python3.11 """ Momentry Dashboard — Flask web app Reads pipeline status + Redis + system health on demand """ import json, os, subprocess, sys from pathlib import Path from flask import Flask, jsonify, render_template_string app = Flask(__name__) PROJECT = Path(__file__).resolve().parent.parent def run_status_json(): """Run pipeline_status.py and return parsed JSON""" r = subprocess.run( [sys.executable, str(PROJECT / "scripts/pipeline_status.py"), "--json"], capture_output=True, text=True, timeout=30, ) return json.loads(r.stdout) def run_redis_info(): """Fetch key Redis metrics""" result = {} try: r = subprocess.run( ["redis-cli", "-a", "accusys", "INFO", "all"], capture_output=True, text=True, timeout=5, ) for line in r.stdout.split("\n"): line = line.strip() if ":" not in line or line.startswith("#"): continue k, v = line.split(":", 1) if k in ("total_system_memory_human", "used_memory_human", "used_memory_peak_human", "total_connections_received", "total_commands_processed", "keyspace_hits", "keyspace_misses", "connected_clients", "uptime_in_seconds"): result[k] = v if not v.endswith("_human") else v result["keyspace_hits"] = int(result.get("keyspace_hits", 0)) result["keyspace_misses"] = int(result.get("keyspace_misses", 0)) hit_rate = result["keyspace_hits"] / max(result["keyspace_hits"] + result["keyspace_misses"], 1) * 100 result["hit_rate_pct"] = round(hit_rate, 1) except Exception as e: result["error"] = str(e) # Get momentry keys try: r = subprocess.run( ["redis-cli", "-a", "accusys", "KEYS", "momentry_dev:*"], capture_output=True, text=True, timeout=5, ) keys = [k for k in r.stdout.strip().split("\n") if k] result["momentry_keys"] = len(keys) # Sample a few interesting keys sample = {} for k in keys: if k.endswith(":health") or k.endswith(":job:") or ":processor:" in k: pass if len(sample) >= 5: break result["key_sample"] = keys[:10] except: result["momentry_keys"] = 0 result["key_sample"] = [] return result def run_db_info(): """Fetch DB metrics""" psql = "/Users/accusys/pgsql/18.3/bin/psql" cmd = [psql, "-U", "accusys", "-d", "momentry", "-t", "-A"] result = {} try: r = subprocess.run(cmd + ["-c", """ SELECT 'videos', count(*) FROM dev.videos UNION ALL SELECT 'chunks', count(*) FROM dev.chunks UNION ALL SELECT 'face_detections', count(*) FROM dev.face_detections UNION ALL SELECT 'identities', count(*) FROM dev.identities UNION ALL SELECT 'tkg_nodes', count(*) FROM dev.tkg_nodes UNION ALL SELECT 'tkg_edges', count(*) FROM dev.tkg_edges """], capture_output=True, text=True, timeout=10) for line in r.stdout.strip().split("\n"): if not line.strip() or "|" not in line: continue parts = line.split("|") result[parts[0].strip()] = int(parts[1]) except: pass return result @app.route("/") def index(): return render_template_string(TEMPLATE) @app.route("/api/status") def api_status(): return jsonify(run_status_json()) @app.route("/api/redis") def api_redis(): return jsonify(run_redis_info()) @app.route("/api/db") def api_db(): return jsonify(run_db_info()) @app.route("/api/all") def api_all(): return jsonify({ "status": run_status_json(), "redis": run_redis_info(), "db": run_db_info(), }) TEMPLATE = """ Momentry Dashboard

Momentry Dashboard

✅ Pipeline Checklist

Loading...

💻 System Health

Loading...

🛠 Services

Loading...

⚡ Redis

Loading...

🗄 Database

Loading...

⏱ Processor Timing

Loading...
""" if __name__ == "__main__": port = int(os.environ.get("DASHBOARD_PORT", 5050)) print(f"Momentry Dashboard: http://0.0.0.0:{port}") app.run(host="0.0.0.0", port=port, debug=False)