#!/usr/bin/env python3 """ Release packaging — two non-overlapping phases. Phase 1: ASR + ASRX + Rule 1 sentence chunks complete Phase 2: Full pipeline + Rule 3 + 5W1H complete Output: release/phase{N}/v{VERSION}_{TIMESTAMP}/ """ import json import os import shutil import subprocess import sys import time from datetime import datetime, timezone from pathlib import Path PROJECT = Path(__file__).resolve().parent.parent OUTPUT_DIR = Path(os.environ.get("MOMENTRY_OUTPUT_DIR", PROJECT / "output_dev")) RELEASE_DIR = PROJECT / "release" VERSION = "v1.0.0" DB_USER = os.environ.get("USER", "accusys") DB_NAME = "momentry" QDRANT_URL = os.environ.get("QDRANT_URL", "http://localhost:6333") QDRANT_COLLECTION = os.environ.get("QDRANT_COLLECTION", "momentry_dev_rule1_v2") PG_BIN = "/Users/accusys/pgsql/18.3/bin" def ts(): return datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") def psql_cmd() -> list: return [f"{PG_BIN}/psql", "-U", DB_USER, "-d", DB_NAME] def run_sql(sql: str) -> str: r = subprocess.run( psql_cmd() + ["-t", "-A", "-c", sql], capture_output=True, text=True, timeout=30, ) return r.stdout.strip() def pack_phase(file_uuid: str, phase: int) -> Path: """Package deliverables for phase 1 or 2.""" phase_dir = RELEASE_DIR / f"phase{phase}" stamp = ts() pkg_dir = phase_dir / f"{VERSION}_{stamp}" out_dir = pkg_dir / "output_json" out_dir.mkdir(parents=True, exist_ok=True) # 收集 processor output .json 檔 for f in OUTPUT_DIR.glob(f"{file_uuid}.*.json"): if f.is_file(): shutil.copy2(f, out_dir / f.name) # 收集 schema schema_path = pkg_dir / "schema.sql" with open(schema_path, "w") as fh: subprocess.run( [f"{PG_BIN}/pg_dump", "-U", DB_USER, "-d", DB_NAME, "--schema=dev", "--schema-only", "-T", "dev.monitor_jobs", "-T", "dev.processor_results"], stdout=fh, text=True, timeout=60, ) # 收集 chunks chunks_csv = pkg_dir / "chunks.csv" run_sql(f"\\COPY (SELECT * FROM dev.chunks WHERE file_uuid='{file_uuid}') TO '{chunks_csv}' CSV HEADER") # 收集 vectors vecs_csv = pkg_dir / "vectors.csv" run_sql(f"\\COPY (SELECT * FROM dev.chunk_vectors WHERE uuid='{file_uuid}') TO '{vecs_csv}' CSV HEADER") idents_csv = pkg_dir / "identities.csv" run_sql(f"\\COPY (SELECT * FROM dev.identities) TO '{idents_csv}' CSV HEADER") if phase >= 2: faces_csv = pkg_dir / "face_detections.csv" run_sql(f"\\COPY (SELECT * FROM dev.face_detections WHERE file_uuid='{file_uuid}') TO '{faces_csv}' CSV HEADER") # 匯出 Qdrant collection 快照 import urllib.request qdrant_path = pkg_dir / "qdrant_points.jsonl" try: offset = None with open(qdrant_path, "w") as qf: while True: params = f"limit=1000&with_payload=true&with_vectors=true" if offset is not None: params += f"&offset={offset}" url = f"{QDRANT_URL}/collections/{QDRANT_COLLECTION}/points/scroll?{params}" req = urllib.request.Request(url) with urllib.request.urlopen(req, timeout=30) as resp: data = json.loads(resp.read()) pts = data.get("result", {}).get("points", []) if not pts: break for p in pts: qf.write(json.dumps(p, ensure_ascii=False) + "\n") # 從回傳的 next_page_offset 取得下一頁偏移量 offset = data.get("result", {}).get("next_page_offset") if offset is None: break n_points = sum(1 for _ in open(qdrant_path) if _.strip()) print(f"[RELEASE] Qdrant: {n_points} points exported from '{QDRANT_COLLECTION}'") except Exception as e: print(f"[RELEASE] Qdrant export skipped: {e}") if qdrant_path.exists(): qdrant_path.unlink() # RELEASE_INFO git_commit = subprocess.run( ["git", "-C", str(PROJECT), "rev-parse", "HEAD"], capture_output=True, text=True, timeout=10, ).stdout.strip() model_name = f"{file_uuid}_v1" if phase == 1 else f"{file_uuid}_v2" info = pkg_dir / "RELEASE_INFO.txt" with open(info, "w") as fh: fh.write(f"Model: {model_name}\n") fh.write(f"Phase: {phase}\n") fh.write(f"Version: {VERSION}\n") fh.write(f"Timestamp: {stamp}\n") fh.write(f"File UUID: {file_uuid}\n") fh.write(f"Qdrant Collection: {QDRANT_COLLECTION}\n") fh.write(f"Git Commit: {git_commit}\n") fh.write(f"Packaged at: {datetime.now(timezone.utc).isoformat()}\n") # latest symlink latest = phase_dir / "latest" if latest.is_symlink(): latest.unlink() if not latest.exists(): latest.symlink_to(pkg_dir.name, target_is_directory=True) size = sum(f.stat().st_size for f in pkg_dir.rglob("*") if f.is_file()) print(f"[RELEASE] Phase {phase} packaged: {pkg_dir} ({size / 1024:.0f} KB)") return pkg_dir def main(): import argparse parser = argparse.ArgumentParser() parser.add_argument("--phase", type=int, required=True, choices=[1, 2]) parser.add_argument("--file-uuid", required=True) parser.add_argument("--uuid", help="UUID for Redis tracking (accepted by executor)") args = parser.parse_args() pack_phase(args.file_uuid, args.phase) if __name__ == "__main__": main()