#!/opt/homebrew/bin/python3.11 """ momentry-import — 匯入檔案歷程封包 將 export_file.py 產出的 tar.gz 匯入到目標 Momentry 系統 Usage: python3 scripts/import_file.py [--schema ] Example: python3 scripts/import_file.py /tmp/charade_export.tar.gz --schema dev """ import sys, os, json, argparse, tarfile, io, tempfile, shutil from pathlib import Path import psycopg2 import psycopg2.extras DB_URL = os.environ.get("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry") SCHEMA = os.environ.get("MOMENTRY_DB_SCHEMA", "dev") OUTPUT_DIR = os.environ.get("MOMENTRY_OUTPUT_DIR", "/Users/accusys/momentry/output_dev") def get_conn(): return psycopg2.connect(DB_URL) def json_loads(data: bytes): return json.loads(data.decode()) def import_package(package_path: str, schema: str): print(f"[IMPORT] Opening {package_path}...") with tarfile.open(package_path, "r:gz") as tar: # 讀取 manifest manifest = json_loads(tar.extractfile("manifest.json").read()) uuid = manifest["file_uuid"] print(f"[IMPORT] File: {manifest.get('file_name','?')} ({uuid})") print(f"[IMPORT] Exported at: {manifest.get('exported_at','?')}") print(f"[IMPORT] Completeness: {manifest.get('completeness',{})}") print(f"[IMPORT] Merge policy: {manifest.get('merge_policy',{})}") conn = get_conn() cur = conn.cursor() # Step 1: 檢查目標系統是否已有此 file_uuid cur.execute( f"SELECT file_uuid FROM {schema}.videos WHERE file_uuid = %s", (uuid,), ) existing = cur.fetchone() if existing: print(f" ⚠️ UUID {uuid} 已存在於目標系統") # TODO: 支援覆蓋或略過 # Step 2: 匯入 identities(需先做 identity merge) identity_map = {} # old_id → new_id if "data/identities.json" in [m.name for m in tar.getmembers()]: identities = json_loads(tar.extractfile("data/identities.json").read()) print(f"\n ── Identity Merge ──") for ident in identities: old_id = ident["id"] name = ident.get("name", "") # 依名稱比對 cur.execute( f"SELECT id FROM {schema}.identities WHERE name = %s", (name,), ) row = cur.fetchone() if row: # 已存在 → merge identity_map[old_id] = row[0] print(f" 🔗 '{name}' → 已存在 (id={row[0]}), 合併") else: # 不存在 → 新增 cur.execute( f"INSERT INTO {schema}.identities (name) VALUES (%s) RETURNING id", (name,), ) new_id = cur.fetchone()[0] identity_map[old_id] = new_id print(f" ✅ '{name}' → 新增 (id={new_id})") conn.commit() print(f" ────────────────") else: print(f" [IMPORT] identities: (package 無 identity 資料)") # Step 3: 匯入 identity_bindings(若有) if "data/identity_bindings.json" in [m.name for m in tar.getmembers()]: bindings = json_loads(tar.extractfile("data/identity_bindings.json").read()) for b in bindings: b["identity_id"] = identity_map.get(b["identity_id"], b["identity_id"]) try: cur.execute( f"INSERT INTO {schema}.identity_bindings " f"(identity_id, identity_type, identity_value, metadata, confidence) " f"VALUES (%s, %s, %s, %s, %s) ON CONFLICT DO NOTHING", (b["identity_id"], b["identity_type"], b["identity_value"], json.dumps(b.get("metadata", {})), b.get("confidence", 1.0)), ) except Exception as e: print(f" ⚠️ binding 匯入失敗: {e}") conn.commit() print(f" [IMPORT] identity_bindings: {len(bindings)} rows") # Step 4: 匯入 videos 資料 video_data = json_loads(tar.extractfile("data/video.json").read()) cur.execute( f""" INSERT INTO {schema}.videos (file_uuid, file_path, file_name, file_type, duration, width, height, fps, total_frames, probe_json, status) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'completed') ON CONFLICT (file_uuid) DO UPDATE SET file_path = EXCLUDED.file_path, file_name = EXCLUDED.file_name, status = 'completed' """, ( uuid, video_data.get("file_path", ""), video_data.get("file_name", ""), video_data.get("file_type", "video"), video_data.get("duration"), video_data.get("width"), video_data.get("height"), float(video_data.get("fps") or 0), video_data.get("total_frames"), json.dumps(video_data.get("probe_json", {})), ), ) conn.commit() print(f" [IMPORT] videos: ✅") # Step 5: 匯入 output JSON 檔案 output_dir = Path(OUTPUT_DIR) for member in tar.getmembers(): if member.name.startswith("output/") and member.isfile(): fname = member.name.replace("output/", "") dst = output_dir / fname if not dst.parent.exists(): dst.parent.mkdir(parents=True) with tar.extractfile(member) as src_f: with open(dst, "wb") as dst_f: shutil.copyfileobj(src_f, dst_f) print(f" [IMPORT] output/{fname} ({member.size // 1024}KB)") print(f" [IMPORT] output files: 完成") # Step 6: 匯入 pre_chunks(批次插入) if "data/pre_chunks.json" in [m.name for m in tar.getmembers()]: pre_chunks = json_loads(tar.extractfile("data/pre_chunks.json").read()) # 先取得 file_id(videos table 的 id) cur.execute(f"SELECT id FROM {schema}.videos WHERE file_uuid = %s", (uuid,)) file_row = cur.fetchone() if file_row: file_id = file_row[0] inserted = 0 for pc in pre_chunks: try: cur.execute( f"INSERT INTO {schema}.pre_chunks " f"(file_id, file_uuid, processor_type, coordinate_type, " f"coordinate_index, start_frame, end_frame, start_time, end_time, " f"fps, data) " f"VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) " f"ON CONFLICT DO NOTHING", ( file_id, uuid, pc.get("processor_type"), pc.get("coordinate_type"), pc.get("coordinate_index"), pc.get("start_frame"), pc.get("end_frame"), pc.get("start_time"), pc.get("end_time"), pc.get("fps"), json.dumps(pc.get("data", {})), ), ) inserted += 1 if inserted % 1000 == 0: print(f" ... {inserted}/{len(pre_chunks)}", end="\r") except Exception as e: pass conn.commit() print(f" [IMPORT] pre_chunks: {inserted} rows \n") else: print(f" [IMPORT] pre_chunks: 無法取得 file_id") # Step 7: 匯入 processor_results if "data/processor_results.json" in [m.name for m in tar.getmembers()]: results = json_loads(tar.extractfile("data/processor_results.json").read()) for r in results: try: cur.execute( f"INSERT INTO {schema}.processor_results " f"(job_id, file_uuid, processor, status, chunks_produced, frames_processed) " f"VALUES (0, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING", (uuid, r.get("processor"), r.get("status"), r.get("chunks_produced", 0), r.get("frames_processed", 0)), ) except Exception: pass conn.commit() print(f" [IMPORT] processor_results: {len(results)} rows") # Step 7: 匯入 face_detections(若無 embedding 可省略該欄位) face_detections_src = None for candidate in ["data/face_detections.json", "data/face_detections_meta.json"]: if candidate in [m.name for m in tar.getmembers()]: face_detections_src = candidate break if face_detections_src: fds = json_loads(tar.extractfile(face_detections_src).read()) inserted = 0 for fd in fds: try: cur.execute( f"INSERT INTO {schema}.face_detections " f"(file_uuid, face_id, frame_number, x, y, width, height, " f"confidence, identity_id, trace_id) " f"VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) " f"ON CONFLICT DO NOTHING", ( uuid, fd.get("face_id"), fd.get("frame_number"), fd.get("x"), fd.get("y"), fd.get("width"), fd.get("height"), fd.get("confidence"), identity_map.get(fd.get("identity_id"), fd.get("identity_id")), fd.get("trace_id"), ), ) inserted += 1 if inserted % 1000 == 0: print(f" ... {inserted}/{len(fds)}", end="\r") except Exception as e: pass conn.commit() print(f" [IMPORT] face_detections: {inserted} rows \n") cur.close() conn.close() print(f"\n[IMPORT] ✅ 完成: {manifest.get('file_name','?')} 已匯入 (file_uuid={uuid})") def main(): parser = argparse.ArgumentParser(description="Import file processing history package") parser.add_argument("package", help="Path to .tar.gz package") parser.add_argument("--schema", default=SCHEMA, help="Target DB schema") args = parser.parse_args() if not os.path.exists(args.package): print(f"[IMPORT] ❌ Package not found: {args.package}") sys.exit(1) import_package(args.package, args.schema) if __name__ == "__main__": main()