#!/opt/homebrew/bin/python3.11 """ Momentry Dashboard — Flask web app Reads pipeline status + Redis + system health on demand """ import json, os, subprocess, sys, platform from pathlib import Path from flask import Flask, jsonify, render_template_string app = Flask(__name__) PROJECT = Path(__file__).resolve().parent.parent # System role detection HOSTNAME = platform.node() IS_M5 = "MacBook" in HOSTNAME or "M5" in HOSTNAME SYSTEM_ROLE = "M5 (MacBook Pro)" if IS_M5 else "M4 (Mac Mini)" SYSTEM_COLOR = "#58a6ff" if IS_M5 else "#f0883e" def run_status_json(): """Run pipeline_status.py and return parsed JSON""" r = subprocess.run( [sys.executable, str(PROJECT / "scripts/pipeline_status.py"), "--json"], capture_output=True, text=True, timeout=30, ) return json.loads(r.stdout) def run_redis_info(): """Fetch key Redis metrics""" result = {} try: r = subprocess.run( ["redis-cli", "-a", "accusys", "INFO", "all"], capture_output=True, text=True, timeout=5, ) for line in r.stdout.split("\n"): line = line.strip() if ":" not in line or line.startswith("#"): continue k, v = line.split(":", 1) if k in ("total_system_memory_human", "used_memory_human", "used_memory_peak_human", "total_connections_received", "total_commands_processed", "keyspace_hits", "keyspace_misses", "connected_clients", "uptime_in_seconds"): result[k] = v if not v.endswith("_human") else v result["keyspace_hits"] = int(result.get("keyspace_hits", 0)) result["keyspace_misses"] = int(result.get("keyspace_misses", 0)) hit_rate = result["keyspace_hits"] / max(result["keyspace_hits"] + result["keyspace_misses"], 1) * 100 result["hit_rate_pct"] = round(hit_rate, 1) except Exception as e: result["error"] = str(e) # Get momentry keys try: r = subprocess.run( ["redis-cli", "-a", "accusys", "KEYS", "momentry_dev:*"], capture_output=True, text=True, timeout=5, ) keys = [k for k in r.stdout.strip().split("\n") if k] result["momentry_keys"] = len(keys) # Sample a few interesting keys sample = {} for k in keys: if k.endswith(":health") or k.endswith(":job:") or ":processor:" in k: pass if len(sample) >= 5: break result["key_sample"] = keys[:10] except: result["momentry_keys"] = 0 result["key_sample"] = [] return result def run_db_info(): """Fetch DB metrics + current processing file""" psql = "/Users/accusys/pgsql/18.3/bin/psql" cmd = [psql, "-U", "accusys", "-d", "momentry", "-t", "-A"] result = {} try: r = subprocess.run(cmd + ["-c", """ SELECT 'videos', count(*) FROM dev.videos UNION ALL SELECT 'chunks', count(*) FROM dev.chunks UNION ALL SELECT 'face_detections', count(*) FROM dev.face_detections UNION ALL SELECT 'identities', count(*) FROM dev.identities UNION ALL SELECT 'tkg_nodes', count(*) FROM dev.tkg_nodes UNION ALL SELECT 'tkg_edges', count(*) FROM dev.tkg_edges """], capture_output=True, text=True, timeout=10) for line in r.stdout.strip().split("\n"): if not line.strip() or "|" not in line: continue parts = line.split("|") result[parts[0].strip()] = int(parts[1]) except: pass # 所有檔案的 pipeline 進度(依檔案名去重,取最新) try: r = subprocess.run(cmd + ["-c", """ SELECT DISTINCT ON (v.file_name) v.file_uuid, v.file_name, v.status, COALESCE(v.processing_status::text, '{}') as pstatus, m.status as job_status FROM dev.videos v LEFT JOIN dev.monitor_jobs m ON m.uuid = v.file_uuid WHERE v.status IN ('completed', 'processing') OR m.status IS NOT NULL ORDER BY v.file_name, GREATEST( COALESCE(v.registration_time::timestamp, '1970-01-01'), COALESCE(m.updated_at, '1970-01-01') ) DESC LIMIT 20 """], capture_output=True, text=True, timeout=10) seen_names = set() files = [] for line in r.stdout.strip().split("\n"): if not line.strip() or "|" not in line: continue parts = line.split("|", 4) if len(parts) < 5: continue name = parts[1].strip() if name in seen_names: continue seen_names.add(name) f = {"uuid": parts[0].strip(), "name": name, "status": parts[2].strip(), "job_status": parts[4].strip()} try: ps = json.loads(parts[3]) if parts[3] and parts[3] != '{}' else {} f["progress"] = ps.get("progress", {}) except: f["progress"] = {} files.append(f) result["files"] = files except Exception as e: result["files_error"] = str(e) return result @app.route("/") def index(): return render_template_string(TEMPLATE) @app.route("/api/status") def api_status(): return jsonify(run_status_json()) @app.route("/api/redis") def api_redis(): return jsonify(run_redis_info()) @app.route("/api/db") def api_db(): return jsonify(run_db_info()) @app.route("/api/all") def api_all(): return jsonify({ "system": {"hostname": HOSTNAME, "role": SYSTEM_ROLE, "is_m5": IS_M5}, "status": run_status_json(), "redis": run_redis_info(), "db": run_db_info(), }) TEMPLATE = """ Momentry Dashboard

Momentry Dashboard 🤖 {{ SYSTEM_ROLE }}

✅ Pipeline Checklist

Loading...

💻 System Health

Loading...

🛠 Services

Loading...

📁 Pipeline Progress

Loading...

⚡ Redis

Loading...

🗄 Database

Loading...

⏱ Processor Timing

Loading...
""" if __name__ == "__main__": port = int(os.environ.get("DASHBOARD_PORT", 5050)) print(f"Momentry Dashboard: http://0.0.0.0:{port}") app.run(host="0.0.0.0", port=port, debug=False)