#!/opt/homebrew/bin/python3.11 """ Migrate Identity Files — one-time: DB identities → filesystem identity.json Reads all identities from PostgreSQL, queries file bindings, and writes identity.json + _index.json to {OUTPUT_DIR}/identities/{uuid}/ Usage: python3 scripts/migrate_identity_files.py python3 scripts/migrate_identity_files.py --db "dbname=momentry user=accusys" python3 scripts/migrate_identity_files.py --output /path/to/output """ import argparse import json import os from datetime import datetime, timezone from pathlib import Path import psycopg2 import psycopg2.extras def main(): parser = argparse.ArgumentParser(description="Migrate identities to filesystem") parser.add_argument("--db", default=os.getenv("DATABASE_URL", "dbname=momentry user=accusys host=localhost")) parser.add_argument("--output", default=os.getenv("MOMENTRY_OUTPUT_DIR", "/Users/accusys/momentry/output")) args = parser.parse_args() conn = psycopg2.connect(args.db) identities_root = Path(args.output) / "identities" identities_root.mkdir(parents=True, exist_ok=True) cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute(""" SELECT id, uuid::text, name, identity_type, source, status, tmdb_id, tmdb_profile, metadata::text, created_at, updated_at FROM identities WHERE uuid IS NOT NULL ORDER BY id """) rows = cur.fetchall() if not rows: print("No identities found in DB.") return index = {} migrated = 0 skipped = 0 for row in rows: uuid_raw = row["uuid"] uuid_clean = uuid_raw.replace("-", "") name = row["name"] or "" dir_path = identities_root / uuid_clean dir_path.mkdir(parents=True, exist_ok=True) # Get bindings for this identity from face_detections bindings_cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) bindings_cur.execute(""" SELECT fd.file_uuid, COALESCE(array_agg(DISTINCT fd.trace_id) FILTER (WHERE fd.trace_id IS NOT NULL), '{}') AS trace_ids, COUNT(*)::bigint AS face_count FROM face_detections fd WHERE fd.identity_id = %s GROUP BY fd.file_uuid ORDER BY fd.file_uuid """, (row["id"],)) binding_rows = bindings_cur.fetchall() bindings_cur.close() file_bindings = [] for b in binding_rows: trace_ids = b["trace_ids"] if isinstance(trace_ids, list): trace_ids = [int(t) for t in trace_ids if t is not None] file_bindings.append({ "file_uuid": b["file_uuid"], "trace_ids": trace_ids, "face_count": int(b["face_count"]), }) metadata = row.get("metadata") if isinstance(metadata, str): metadata = json.loads(metadata) if metadata else {} elif metadata is None: metadata = {} fmt_time = lambda v: v.isoformat() if v else datetime.now(timezone.utc).isoformat() identity_file = { "version": 1, "identity_uuid": uuid_clean, "name": name, "identity_type": row.get("identity_type"), "source": row.get("source"), "status": row.get("status"), "tmdb_id": row.get("tmdb_id"), "tmdb_profile": row.get("tmdb_profile"), "metadata": metadata, "file_bindings": file_bindings, "created_at": fmt_time(row.get("created_at")), "updated_at": fmt_time(row.get("updated_at")), } with open(dir_path / "identity.json", "w", encoding="utf-8") as f: json.dump(identity_file, f, indent=2, ensure_ascii=False) index[uuid_clean] = name migrated += 1 print(f" [{migrated:5d}] {name} ({uuid_clean})") cur.close() conn.close() # Write _index.json index_file = { "version": 1, "updated_at": datetime.now(timezone.utc).isoformat(), "entries": index, } with open(identities_root / "_index.json", "w", encoding="utf-8") as f: json.dump(index_file, f, indent=2, ensure_ascii=False) print(f"\nDone: {migrated} identities migrated") print(f"Index: {identities_root / '_index.json'} ({len(index)} entries)") if __name__ == "__main__": main()