#!/opt/homebrew/bin/python3.11 """ momentry-export — 打包檔案歷程 將單一 file_uuid 的所有產出打包成可攜帶的 tar.gz Usage: python3 scripts/export_file.py [--output ] [--include-video] Example: python3 scripts/export_file.py fa182e9c26145b2c1a932f73d1d484e5 --output /tmp/test_export.tar.gz """ import sys, os, json, argparse, tarfile, io, time from pathlib import Path from datetime import datetime import psycopg2 import psycopg2.extras DB_URL = os.environ.get("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry") SCHEMA = os.environ.get("MOMENTRY_DB_SCHEMA", "dev") OUTPUT_DIR = os.environ.get("MOMENTRY_OUTPUT_DIR", "/Users/accusys/momentry/output_dev") TABLES = [ "pre_chunks", "chunks", "face_detections", "processor_results", "processor_versions", "videos", "api_keys", ] def get_conn(): return psycopg2.connect(DB_URL) def fetch_table(conn, table: str, uuid: str) -> list[dict]: """Fetch rows from a table that reference this UUID""" uuid_columns = {"file_uuid", "uuid"} # Get columns cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute( "SELECT column_name, data_type FROM information_schema.columns " "WHERE table_schema = %s AND table_name = %s", (SCHEMA, table), ) cols = cur.fetchall() uuid_col = None for c in cols: if c["column_name"] in uuid_columns: uuid_col = c["column_name"] break if not uuid_col: cur.close() return [] # Fetch rows cur.execute( f"SELECT * FROM {SCHEMA}.{table} WHERE {uuid_col} = %s", (uuid,), ) rows = [dict(r) for r in cur.fetchall()] cur.close() return rows def fetch_video_row(conn, uuid: str) -> dict | None: """Get video metadata""" cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute(f"SELECT * FROM {SCHEMA}.videos WHERE file_uuid = %s", (uuid,)) row = cur.fetchone() cur.close() return dict(row) if row else None def serialize_value(v): """Convert DB types to JSON-serializable""" if isinstance(v, (datetime,)): return v.isoformat() if isinstance(v, bytes): return list(v) # convert bytea to list of ints if isinstance(v, (list,)): # Check if it's a pgvector (list of floats) return v return v def export_file(uuid: str, output_path: str, include_video: bool = False): """Export all data for a UUID into a tar.gz""" t0 = time.time() print(f"[EXPORT] Exporting {uuid}...") conn = get_conn() buf = io.BytesIO() # 先確認是否完成 cur = conn.cursor() cur.execute( f"SELECT status FROM {SCHEMA}.monitor_jobs WHERE uuid = %s ORDER BY id DESC LIMIT 1", (uuid,), ) row = cur.fetchone() job_status = row[0] if row else "unknown" cur.close() if job_status == "completed": print(f" [EXPORT] Job status: ✅ {job_status}") elif job_status == "failed": print(f" [EXPORT] ⚠️ Job status: ❌ {job_status} (仍可匯出部分資料)") elif job_status == "running": print(f" [EXPORT] ⚠️ Job status: ⏳ {job_status} (處理中,產出不完全)") else: print(f" [EXPORT] ⚠️ Job status: {job_status}") video = fetch_video_row(conn, uuid) if not video: print(f"[EXPORT] UUID {uuid} not found in videos table") conn.close() return False # 歷程完整性檢查 print(f"\n ── 歷程完整性檢查 ──") # Job status completeness = {"job": job_status == "completed"} # Processors: 7 processors all completed cur = conn.cursor() cur.execute( f"SELECT processor, status FROM {SCHEMA}.processor_results " f"WHERE file_uuid = %s ORDER BY processor", (uuid,), ) procs = {r[0]: r[1] for r in cur.fetchall()} cur.close() expected = ["asr", "asrx", "cut", "face", "ocr", "pose", "yolo"] for p in expected: st = procs.get(p, "missing") completeness[f"proc_{p}"] = st == "completed" completeness["processors"] = f"{sum(1 for p in expected if procs.get(p)=='completed')}/{len(expected)}" # Output JSON files output_dir = Path(OUTPUT_DIR) json_files = sorted(output_dir.glob(f"{uuid}.*.json")) completeness["output_jsons"] = len(json_files) # Face detections cur = conn.cursor() cur.execute( f"SELECT count(*) FROM {SCHEMA}.face_detections WHERE file_uuid = %s", (uuid,), ) completeness["face_detections"] = cur.fetchone()[0] cur.close() # Chunks (Rule 1) cur = conn.cursor() cur.execute( f"SELECT count(*) FROM {SCHEMA}.chunks WHERE file_uuid = %s", (uuid,), ) completeness["chunks"] = cur.fetchone()[0] cur.close() # Print completeness report for k, v in completeness.items(): icon = "✅" if v is True else ("❌" if v is False else "ℹ️") print(f" {icon} {k}: {v}") # Decide if export is viable has_core_data = completeness["output_jsons"] > 0 or completeness["face_detections"] > 0 or completeness["chunks"] > 0 if not has_core_data and job_status != "completed": print(f"\n ⛔ 歷程不完整,無核心產出,中止匯出") conn.close() return False print(f" ─────────────────\n") with tarfile.open(fileobj=buf, mode="w:gz") as tar: manifest = { "exported_at": datetime.now().isoformat(), "version": "1.0", "file_uuid": uuid, "file_name": video.get("file_name"), "duration": video.get("duration"), "fps": float(video.get("fps") or 0), "width": video.get("width"), "height": video.get("height"), "total_frames": video.get("total_frames"), "include_video": include_video, "completeness": {k: str(v) if not isinstance(v, (bool, int, str)) else v for k, v in completeness.items()}, "merge_policy": { "identities": "merge_by_name", "description": "匯入時 identity 依名稱比對,已存在則合併(保留 target 的 identity_id),不存在則新增", }, } _add_json(tar, "manifest.json", manifest) # 2. Video metadata (videos table row) _add_json(tar, "data/video.json", video) # 3. DB tables for table in TABLES: rows = fetch_table(conn, table, uuid) if rows: _add_json(tar, f"data/{table}.json", rows) print(f" [EXPORT] {table}: {len(rows)} rows") else: print(f" [EXPORT] {table}: (empty)") # 4. Face detection embeddings (handle vector type) cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute( f"SELECT id, file_uuid, frame_number, trace_id, x, y, width, height, " f"confidence, identity_id FROM {SCHEMA}.face_detections WHERE file_uuid = %s", (uuid,), ) fd_rows = [dict(r) for r in cur.fetchall()] cur.close() if fd_rows: _add_json(tar, "data/face_detections_meta.json", fd_rows) print(f" [EXPORT] face_detections (meta): {len(fd_rows)} rows") else: print(f" [EXPORT] face_detections: (empty)") # 5. Identity 關聯資料 cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) # 找出此 file_uuid 相關的所有 identity_id cur.execute( f"SELECT DISTINCT identity_id FROM {SCHEMA}.face_detections " f"WHERE file_uuid = %s AND identity_id IS NOT NULL", (uuid,), ) identity_ids = [r["identity_id"] for r in cur.fetchall()] if identity_ids: # 查 identities 表 placeholders = ",".join(["%s"] * len(identity_ids)) cur.execute( f"SELECT * FROM {SCHEMA}.identities WHERE id IN ({placeholders})", identity_ids, ) ident_rows = [dict(r) for r in cur.fetchall()] _add_json(tar, "data/identities.json", ident_rows) print(f" [EXPORT] identities: {len(ident_rows)} rows") # 查 identity_bindings cur.execute( f"SELECT * FROM {SCHEMA}.identity_bindings " f"WHERE identity_id IN ({placeholders})", identity_ids, ) bind_rows = [dict(r) for r in cur.fetchall()] if bind_rows: _add_json(tar, "data/identity_bindings.json", bind_rows) print(f" [EXPORT] identity_bindings: {len(bind_rows)} rows") # 查 file_identities(若 table 存在) try: cur.execute( f"SELECT * FROM {SCHEMA}.file_identities WHERE file_uuid = %s", (uuid,), ) fi_rows = [dict(r) for r in cur.fetchall()] if fi_rows: _add_json(tar, "data/file_identities.json", fi_rows) print(f" [EXPORT] file_identities: {len(fi_rows)} rows") except Exception: pass # table 可能不存在 else: print(f" [EXPORT] identities: (none bound to this file)") cur.close() # 6. Output JSON files output_dir = Path(OUTPUT_DIR) json_files = list(output_dir.glob(f"{uuid}.*.json")) for jf in json_files: arcname = f"output/{jf.name}" tar.add(str(jf), arcname=arcname) print(f" [EXPORT] output/{jf.name} ({jf.stat().st_size / 1024:.0f}KB)") print(f" [EXPORT] output JSONs: {len(json_files)} files") # 7. Original video file (optional) if include_video and video.get("file_path"): src = video["file_path"] if os.path.exists(src): tar.add(src, arcname="original/" + os.path.basename(src)) print(f" [EXPORT] original video: {src}") else: print(f" [WARN] Video file not found: {src}") conn.close() # Write to disk with open(output_path, "wb") as f: f.write(buf.getvalue()) size_mb = os.path.getsize(output_path) / 1e6 elapsed = time.time() - t0 print(f"\n[EXPORT] Done: {output_path} ({size_mb:.1f}MB, {elapsed:.1f}s)") return True def _add_json(tar: tarfile.TarFile, arcname: str, data): """Add a JSON file to the tar archive""" raw = json.dumps(data, ensure_ascii=False, default=str, indent=2).encode() info = tarfile.TarInfo(name=arcname) info.size = len(raw) info.mtime = int(time.time()) tar.addfile(info, io.BytesIO(raw)) def main(): parser = argparse.ArgumentParser(description="Export file processing history") parser.add_argument("uuid", help="File UUID to export") parser.add_argument("--output", "-o", default=None, help="Output tar.gz path (default: {uuid}.tar.gz)") parser.add_argument("--include-video", action="store_true", help="Include original video file in export") args = parser.parse_args() output = args.output or f"{args.uuid}.tar.gz" success = export_file(args.uuid, output, args.include_video) sys.exit(0 if success else 1) if __name__ == "__main__": main()