#!/opt/homebrew/bin/python3.11 """ Release Manager - Deploy / Undeploy video release packages. Usage: python3 release_manager.py deploy python3 release_manager.py undeploy python3 release_manager.py list python3 release_manager.py package # Create new release package """ import json, os, sys, shutil, subprocess, tarfile, tempfile, argparse, time import psycopg2 from urllib.request import Request, urlopen PG_BIN = "/Users/accusys/pgsql/18.3/bin" DB = "dbname=momentry user=accusys" QDRANT = "http://localhost:6333" DEMO_DIR = "/Users/accusys/momentry/var/sftpgo/data/demo" OUTPUT_DIR = "/Users/accusys/momentry/output_dev" RELEASE_DIR = "/Users/accusys/momentry_core_0.1/release/files" # ---- Helpers ---- def psql_cmd(sql, db=DB): """Run a SQL command via psql.""" r = subprocess.run( [f"{PG_BIN}/psql", "-U", "accusys", "-d", "momentry", "-t", "-A", "-c", sql], capture_output=True, text=True, timeout=30) return r.stdout.strip() def pg_execute(sql, params=None): """Execute SQL via psycopg2.""" conn = psycopg2.connect(DB) cur = conn.cursor() if params: cur.execute(sql, params) else: cur.execute(sql) conn.commit() cur.close() conn.close() def pg_query(sql, params=None): """Query via psycopg2.""" conn = psycopg2.connect(DB) cur = conn.cursor() if params: cur.execute(sql, params) else: cur.execute(sql) rows = cur.fetchall() cur.close() conn.close() return rows def qdrant_delete_points(uuid, collection): """Delete points from Qdrant collection by payload filter.""" try: req = Request(f"{QDRANT}/collections/{collection}/points/delete", data=json.dumps({ "filter": {"must": [{"key": "file_uuid", "match": {"value": uuid}}]} }).encode(), headers={"Content-Type": "application/json"}, method="POST") urlopen(req) return True except: return False # ---- Deploy ---- def cmd_deploy(tarball_path): """Deploy a release package.""" if not os.path.exists(tarball_path): print(f"ERROR: {tarball_path} not found") return 1 t0 = time.time() print(f"=== Deploy: {os.path.basename(tarball_path)} ===") # 1. Extract tmpdir = tempfile.mkdtemp(prefix="release_deploy_") print(f"Extracting to {tmpdir}...") with tarfile.open(tarball_path) as tar: tar.extractall(tmpdir) # Find UUID from directory name or file_info.json uuid = None for item in os.listdir(tmpdir): info_path = os.path.join(tmpdir, item, "file_info.json") if os.path.exists(info_path): with open(info_path) as f: info = json.load(f) uuid = info.get("file_uuid", "") break if not uuid: print("ERROR: Could not find file_info.json with UUID") return 1 pkg_dir = os.path.join(tmpdir, uuid) print(f"UUID: {uuid}") # 2. Import data.sql sql_path = os.path.join(pkg_dir, "data.sql") if os.path.exists(sql_path): print(f"Importing data.sql ({os.path.getsize(sql_path)/1024/1024:.0f} MB)...") r = subprocess.run([f"{PG_BIN}/psql", "-U", "accusys", "-d", "momentry", "-f", sql_path], capture_output=True, text=True, timeout=300) if r.returncode != 0: print(f"WARNING: SQL import may have issues") print(r.stderr[-500:] if r.stderr else "") else: print("WARNING: data.sql not found in package") # 3. Copy video to demo dir for fname in os.listdir(pkg_dir): fpath = os.path.join(pkg_dir, fname) if fname.endswith(('.mp4', '.mov', '.avi', '.mkv')): dest = os.path.join(DEMO_DIR, fname) if not os.path.exists(dest): shutil.copy2(fpath, dest) print(f"Video: {fname} → {DEMO_DIR}/") else: print(f"Video: {fname} already exists in demo dir") # 4. Copy JSON outputs for fname in os.listdir(pkg_dir): if fname.endswith('.json'): src = os.path.join(pkg_dir, fname) dest = os.path.join(OUTPUT_DIR, fname) shutil.copy2(src, dest) print(f"Output files copied to {OUTPUT_DIR}/") # 5. Verify deployment rows = pg_query("SELECT COUNT(*) FROM dev.chunk WHERE file_uuid = %s", (uuid,)) chunks = rows[0][0] if rows else 0 rows = pg_query("SELECT COUNT(*) FROM dev.face_detections WHERE file_uuid = %s", (uuid,)) faces = rows[0][0] if rows else 0 rows = pg_query("SELECT file_name, duration FROM dev.videos WHERE file_uuid = %s", (uuid,)) video_info = rows[0] if rows else ("?", "?") elapsed = time.time() - t0 print(f"\n=== Deploy Complete ({elapsed:.0f}s) ===") print(f" Video: {video_info[0]} ({float(video_info[1]):.0f}s)") print(f" Chunks: {chunks}") print(f" Face detections: {faces}") shutil.rmtree(tmpdir, ignore_errors=True) return 0 # ---- Undeploy ---- def cmd_undeploy(uuid): """Undeploy: remove all trace of a UUID from the system.""" print(f"=== Undeploy: {uuid} ===") # Confirm rows = pg_query("SELECT file_name FROM dev.videos WHERE file_uuid = %s", (uuid,)) if not rows: print(f"ERROR: UUID {uuid} not found in DB") return 1 filename = rows[0][0] print(f"Video: {filename}") print("This will DELETE all data for this video. Are you sure? (y/N): ", end="") confirm = sys.stdin.readline().strip().lower() if confirm != 'y': print("Cancelled") return 0 t0 = time.time() # Get video path before deleting rows = pg_query("SELECT file_path FROM dev.videos WHERE file_uuid = %s", (uuid,)) video_path = rows[0][0] if rows else "" # 1. Delete DB data tables = [ ("dev.chunk", "file_uuid"), ("dev.chunk_vectors", "uuid"), ("dev.face_detections", "file_uuid"), ("dev.processor_results", "file_uuid"), ("dev.monitor_jobs", "uuid"), ("dev.pre_chunks", "file_uuid"), ] for tbl, col in tables: pg_execute(f"DELETE FROM {tbl} WHERE {col} = %s", (uuid,)) print(f" {tbl}: cleared") pg_execute("DELETE FROM dev.videos WHERE file_uuid = %s", (uuid,)) print(f" dev.videos: removed") # Clean orphaned identity bindings pg_execute("DELETE FROM dev.identity_bindings WHERE identity_value NOT IN (SELECT face_id FROM dev.face_detections)") # 2. Delete output files for f in os.listdir(OUTPUT_DIR): if f.startswith(uuid): os.remove(os.path.join(OUTPUT_DIR, f)) print(f" Output files: removed") # 3. Delete video from demo dir if video_path and os.path.exists(video_path): os.remove(video_path) print(f" Video file: removed ({os.path.basename(video_path)})") # 4. Clean Qdrant (skip - Qdrant points don't have easy UUID filter) # Instead rely on upsert behavior # 5. Delete release package pkg_path = os.path.join(RELEASE_DIR, uuid) if os.path.exists(pkg_path): shutil.rmtree(pkg_path) print(f" Release dir: removed") for f in os.listdir(RELEASE_DIR): if f.startswith(uuid): os.remove(os.path.join(RELEASE_DIR, f)) print(f" Release file: {f} removed") elapsed = time.time() - t0 print(f"\n=== Undeploy Complete ({elapsed:.0f}s) ===") return 0 # ---- List ---- def cmd_list(): """List deployed videos.""" rows = pg_query(""" SELECT file_uuid, file_name, TO_CHAR((duration/60)::int, 'FM999"min"') as dur, status, (SELECT COUNT(*) FROM dev.chunk WHERE file_uuid = v.file_uuid) as chunks, (SELECT COUNT(*) FROM dev.face_detections WHERE file_uuid = v.file_uuid) as faces FROM dev.videos v ORDER BY id DESC """) print(f"{'UUID':36s} {'Name':40s} {'Duration':8s} {'Status':10s} {'Chunks':>6s} {'Faces':>6s}") print("-" * 120) for r in rows: uuid, name, dur, status, chunks, faces = r short_name = (name or "")[:38] + ".." if len(name or "") > 40 else (name or "") print(f"{uuid:36s} {short_name:40s} {dur or '?':8s} {status or '?':10s} {chunks or 0:>6d} {faces or 0:>6d}") # ---- Package ---- def cmd_package(uuid): """Create a release package for a deployed video.""" print(f"=== Package: {uuid} ===") # Check video exists rows = pg_query("SELECT file_uuid, file_name, file_path FROM dev.videos WHERE file_uuid = %s", (uuid,)) if not rows: print(f"ERROR: UUID {uuid} not found") return 1 outdir = os.path.join(RELEASE_DIR, uuid) shutil.rmtree(outdir, ignore_errors=True) os.makedirs(outdir, exist_ok=True) # Export data.sql r = subprocess.run([f"{PG_BIN}/psql", "-U", "accusys", "-d", "momentry", "-t", "-A", "-c", f"SELECT json_build_object('file_uuid', file_uuid, 'file_name', file_name, 'duration', duration, 'fps', fps, 'width', width, 'height', height, 'total_frames', total_frames, 'status', status) FROM dev.videos WHERE file_uuid='{uuid}'"], capture_output=True, text=True, timeout=15) if r.stdout.strip(): info = json.loads(r.stdout.strip()) with open(os.path.join(outdir, "file_info.json"), "w") as f: json.dump(info, f, indent=2) # Export SQL sql_path = os.path.join(outdir, "data.sql") with open(sql_path, "w") as f: f.write(f"-- Release package: {uuid}\nBEGIN;\n\n") for tbl, col in [("dev.videos", "file_uuid"), ("dev.chunk", "file_uuid"), ("dev.chunk_vectors", "uuid"), ("dev.face_detections", "file_uuid")]: r = subprocess.run([f"{PG_BIN}/psql", "-U", "accusys", "-d", "momentry", "-c", f"COPY (SELECT * FROM {tbl} WHERE {col} = '{uuid}') TO STDOUT WITH CSV HEADER"], capture_output=True, text=True, timeout=60) if r.stdout.strip(): # Get column names schema, table = tbl.split(".") r2 = subprocess.run([f"{PG_BIN}/psql", "-U", "accusys", "-d", "momentry", "-t", "-A", "-c", f"SELECT string_agg(column_name, ', ' ORDER BY ordinal_position) FROM information_schema.columns WHERE table_schema='{schema}' AND table_name='{table}' AND is_updatable='YES'"], capture_output=True, text=True, timeout=15) cols = r2.stdout.strip() f.write(f"COPY {tbl} ({cols}) FROM STDIN WITH CSV HEADER;\n") f.write(r.stdout) if not r.stdout.endswith("\n"): f.write("\n") f.write("\\.\n\n") f.write("COMMIT;\n") size = os.path.getsize(sql_path) print(f" data.sql ({size/1024/1024:.0f} MB)") # Copy video video_path = rows[0][2] if video_path and os.path.exists(video_path): dest = os.path.join(outdir, os.path.basename(video_path)) shutil.copy2(video_path, dest) print(f" {os.path.basename(video_path)} ({os.path.getsize(dest)/1024/1024:.0f} MB)") # Copy output JSONs for fname in os.listdir(OUTPUT_DIR): if fname.startswith(uuid) and fname.endswith('.json'): shutil.copy2(os.path.join(OUTPUT_DIR, fname), os.path.join(outdir, fname)) # tar.gz tarball = os.path.join(RELEASE_DIR, f"{uuid}_v{int(time.time())}.tar.gz") subprocess.run(["tar", "-czf", tarball, "-C", RELEASE_DIR, uuid], check=True, timeout=300) tsize = os.path.getsize(tarball) print(f" Package: {tarball} ({tsize/1024/1024:.0f} MB)") return 0 # ---- Main ---- def main(): parser = argparse.ArgumentParser(description="Release Manager — deploy/undeploy/list video packages") sub = parser.add_subparsers(dest="cmd") p_deploy = sub.add_parser("deploy", help="Deploy a release package") p_deploy.add_argument("tarball", help="Path to .tar.gz package") p_undeploy = sub.add_parser("undeploy", help="Undeploy (remove all data for a UUID)") p_undeploy.add_argument("uuid", help="File UUID") p_list = sub.add_parser("list", help="List deployed videos") p_package = sub.add_parser("package", help="Create release package for deployed video") p_package.add_argument("uuid", help="File UUID") args = parser.parse_args() if args.cmd == "deploy": sys.exit(cmd_deploy(args.tarball)) elif args.cmd == "undeploy": sys.exit(cmd_undeploy(args.uuid)) elif args.cmd == "list": cmd_list() elif args.cmd == "package": sys.exit(cmd_package(args.uuid)) else: parser.print_help() if __name__ == "__main__": main()