Files
momentry_core/scripts/import_file.py
Accusys 39ba5ddf76 feat: Phase 1 handover - schema migration, correction mechanism, API fixes
Schema changes: dev.chunks->dev.chunk, remove old_chunk_id/chunk_index
Correction: asr-1.json format, generate/apply scripts
API: 37/37 endpoints fixed and tested
Docs: HANDOVER_V2.0.md for M4
2026-05-11 07:03:22 +08:00

260 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/opt/homebrew/bin/python3.11
"""
momentry-import — 匯入檔案歷程封包
將 export_file.py 產出的 tar.gz 匯入到目標 Momentry 系統
Usage:
python3 scripts/import_file.py <package.tar.gz> [--schema <schema>]
Example:
python3 scripts/import_file.py /tmp/charade_export.tar.gz --schema dev
"""
import sys, os, json, argparse, tarfile, io, tempfile, shutil
from pathlib import Path
import psycopg2
import psycopg2.extras
DB_URL = os.environ.get("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry")
SCHEMA = os.environ.get("MOMENTRY_DB_SCHEMA", "dev")
OUTPUT_DIR = os.environ.get("MOMENTRY_OUTPUT_DIR", "/Users/accusys/momentry/output_dev")
def get_conn():
return psycopg2.connect(DB_URL)
def json_loads(data: bytes):
return json.loads(data.decode())
def import_package(package_path: str, schema: str):
print(f"[IMPORT] Opening {package_path}...")
with tarfile.open(package_path, "r:gz") as tar:
# 讀取 manifest
manifest = json_loads(tar.extractfile("manifest.json").read())
uuid = manifest["file_uuid"]
print(f"[IMPORT] File: {manifest.get('file_name','?')} ({uuid})")
print(f"[IMPORT] Exported at: {manifest.get('exported_at','?')}")
print(f"[IMPORT] Completeness: {manifest.get('completeness',{})}")
print(f"[IMPORT] Merge policy: {manifest.get('merge_policy',{})}")
conn = get_conn()
cur = conn.cursor()
# Step 1: 檢查目標系統是否已有此 file_uuid
cur.execute(
f"SELECT file_uuid FROM {schema}.videos WHERE file_uuid = %s",
(uuid,),
)
existing = cur.fetchone()
if existing:
print(f" ⚠️ UUID {uuid} 已存在於目標系統")
# TODO: 支援覆蓋或略過
# Step 2: 匯入 identities需先做 identity merge
identity_map = {} # old_id → new_id
if "data/identities.json" in [m.name for m in tar.getmembers()]:
identities = json_loads(tar.extractfile("data/identities.json").read())
print(f"\n ── Identity Merge ──")
for ident in identities:
old_id = ident["id"]
name = ident.get("name", "")
# 依名稱比對
cur.execute(
f"SELECT id FROM {schema}.identities WHERE name = %s",
(name,),
)
row = cur.fetchone()
if row:
# 已存在 → merge
identity_map[old_id] = row[0]
print(f" 🔗 '{name}' → 已存在 (id={row[0]}), 合併")
else:
# 不存在 → 新增
cur.execute(
f"INSERT INTO {schema}.identities (name) VALUES (%s) RETURNING id",
(name,),
)
new_id = cur.fetchone()[0]
identity_map[old_id] = new_id
print(f"'{name}' → 新增 (id={new_id})")
conn.commit()
print(f" ────────────────")
else:
print(f" [IMPORT] identities: (package 無 identity 資料)")
# Step 3: 匯入 identity_bindings若有
if "data/identity_bindings.json" in [m.name for m in tar.getmembers()]:
bindings = json_loads(tar.extractfile("data/identity_bindings.json").read())
for b in bindings:
b["identity_id"] = identity_map.get(b["identity_id"], b["identity_id"])
try:
cur.execute(
f"INSERT INTO {schema}.identity_bindings "
f"(identity_id, identity_type, identity_value, metadata, confidence) "
f"VALUES (%s, %s, %s, %s, %s) ON CONFLICT DO NOTHING",
(b["identity_id"], b["identity_type"], b["identity_value"],
json.dumps(b.get("metadata", {})), b.get("confidence", 1.0)),
)
except Exception as e:
print(f" ⚠️ binding 匯入失敗: {e}")
conn.commit()
print(f" [IMPORT] identity_bindings: {len(bindings)} rows")
# Step 4: 匯入 videos 資料
video_data = json_loads(tar.extractfile("data/video.json").read())
cur.execute(
f"""
INSERT INTO {schema}.videos
(file_uuid, file_path, file_name, file_type, duration, width, height,
fps, total_frames, probe_json, status)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'completed')
ON CONFLICT (file_uuid) DO UPDATE SET
file_path = EXCLUDED.file_path,
file_name = EXCLUDED.file_name,
status = 'completed'
""",
(
uuid,
video_data.get("file_path", ""),
video_data.get("file_name", ""),
video_data.get("file_type", "video"),
video_data.get("duration"),
video_data.get("width"),
video_data.get("height"),
float(video_data.get("fps") or 0),
video_data.get("total_frames"),
json.dumps(video_data.get("probe_json", {})),
),
)
conn.commit()
print(f" [IMPORT] videos: ✅")
# Step 5: 匯入 output JSON 檔案
output_dir = Path(OUTPUT_DIR)
for member in tar.getmembers():
if member.name.startswith("output/") and member.isfile():
fname = member.name.replace("output/", "")
dst = output_dir / fname
if not dst.parent.exists():
dst.parent.mkdir(parents=True)
with tar.extractfile(member) as src_f:
with open(dst, "wb") as dst_f:
shutil.copyfileobj(src_f, dst_f)
print(f" [IMPORT] output/{fname} ({member.size // 1024}KB)")
print(f" [IMPORT] output files: 完成")
# Step 6: 匯入 pre_chunks批次插入
if "data/pre_chunks.json" in [m.name for m in tar.getmembers()]:
pre_chunks = json_loads(tar.extractfile("data/pre_chunks.json").read())
# 先取得 file_idvideos table 的 id
cur.execute(f"SELECT id FROM {schema}.videos WHERE file_uuid = %s", (uuid,))
file_row = cur.fetchone()
if file_row:
file_id = file_row[0]
inserted = 0
for pc in pre_chunks:
try:
cur.execute(
f"INSERT INTO {schema}.pre_chunks "
f"(file_id, file_uuid, processor_type, coordinate_type, "
f"coordinate_index, start_frame, end_frame, start_time, end_time, "
f"fps, data) "
f"VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) "
f"ON CONFLICT DO NOTHING",
(
file_id, uuid,
pc.get("processor_type"), pc.get("coordinate_type"),
pc.get("coordinate_index"),
pc.get("start_frame"), pc.get("end_frame"),
pc.get("start_time"), pc.get("end_time"),
pc.get("fps"), json.dumps(pc.get("data", {})),
),
)
inserted += 1
if inserted % 1000 == 0:
print(f" ... {inserted}/{len(pre_chunks)}", end="\r")
except Exception as e:
pass
conn.commit()
print(f" [IMPORT] pre_chunks: {inserted} rows \n")
else:
print(f" [IMPORT] pre_chunks: 無法取得 file_id")
# Step 7: 匯入 processor_results
if "data/processor_results.json" in [m.name for m in tar.getmembers()]:
results = json_loads(tar.extractfile("data/processor_results.json").read())
for r in results:
try:
cur.execute(
f"INSERT INTO {schema}.processor_results "
f"(job_id, file_uuid, processor, status, chunks_produced, frames_processed) "
f"VALUES (0, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING",
(uuid, r.get("processor"), r.get("status"),
r.get("chunks_produced", 0), r.get("frames_processed", 0)),
)
except Exception:
pass
conn.commit()
print(f" [IMPORT] processor_results: {len(results)} rows")
# Step 7: 匯入 face_detections若無 embedding 可省略該欄位)
face_detections_src = None
for candidate in ["data/face_detections.json", "data/face_detections_meta.json"]:
if candidate in [m.name for m in tar.getmembers()]:
face_detections_src = candidate
break
if face_detections_src:
fds = json_loads(tar.extractfile(face_detections_src).read())
inserted = 0
for fd in fds:
try:
cur.execute(
f"INSERT INTO {schema}.face_detections "
f"(file_uuid, face_id, frame_number, x, y, width, height, "
f"confidence, identity_id, trace_id) "
f"VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) "
f"ON CONFLICT DO NOTHING",
(
uuid,
fd.get("face_id"),
fd.get("frame_number"),
fd.get("x"), fd.get("y"),
fd.get("width"), fd.get("height"),
fd.get("confidence"),
identity_map.get(fd.get("identity_id"), fd.get("identity_id")),
fd.get("trace_id"),
),
)
inserted += 1
if inserted % 1000 == 0:
print(f" ... {inserted}/{len(fds)}", end="\r")
except Exception as e:
pass
conn.commit()
print(f" [IMPORT] face_detections: {inserted} rows \n")
cur.close()
conn.close()
print(f"\n[IMPORT] ✅ 完成: {manifest.get('file_name','?')} 已匯入 (file_uuid={uuid})")
def main():
parser = argparse.ArgumentParser(description="Import file processing history package")
parser.add_argument("package", help="Path to .tar.gz package")
parser.add_argument("--schema", default=SCHEMA, help="Target DB schema")
args = parser.parse_args()
if not os.path.exists(args.package):
print(f"[IMPORT] ❌ Package not found: {args.package}")
sys.exit(1)
import_package(args.package, args.schema)
if __name__ == "__main__":
main()