132 lines
4.4 KiB
Python
132 lines
4.4 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Migrate Identity Files — one-time: DB identities → filesystem identity.json
|
|
|
|
Reads all identities from PostgreSQL, queries file bindings,
|
|
and writes identity.json + _index.json to {OUTPUT_DIR}/identities/{uuid}/
|
|
|
|
Usage:
|
|
python3 scripts/migrate_identity_files.py
|
|
python3 scripts/migrate_identity_files.py --db "dbname=momentry user=accusys"
|
|
python3 scripts/migrate_identity_files.py --output /path/to/output
|
|
"""
|
|
import argparse
|
|
import json
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Migrate identities to filesystem")
|
|
parser.add_argument("--db", default=os.getenv("DATABASE_URL", "dbname=momentry user=accusys host=localhost"))
|
|
parser.add_argument("--output", default=os.getenv("MOMENTRY_OUTPUT_DIR", "/Users/accusys/momentry/output"))
|
|
args = parser.parse_args()
|
|
|
|
conn = psycopg2.connect(args.db)
|
|
identities_root = Path(args.output) / "identities"
|
|
identities_root.mkdir(parents=True, exist_ok=True)
|
|
|
|
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
|
cur.execute("""
|
|
SELECT id, uuid::text, name, identity_type, source, status,
|
|
tmdb_id, tmdb_profile, metadata::text, created_at, updated_at
|
|
FROM identities
|
|
WHERE uuid IS NOT NULL
|
|
ORDER BY id
|
|
""")
|
|
rows = cur.fetchall()
|
|
|
|
if not rows:
|
|
print("No identities found in DB.")
|
|
return
|
|
|
|
index = {}
|
|
migrated = 0
|
|
skipped = 0
|
|
|
|
for row in rows:
|
|
uuid_raw = row["uuid"]
|
|
uuid_clean = uuid_raw.replace("-", "")
|
|
name = row["name"] or ""
|
|
|
|
dir_path = identities_root / uuid_clean
|
|
dir_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Get bindings for this identity from face_detections
|
|
bindings_cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
|
bindings_cur.execute("""
|
|
SELECT fd.file_uuid,
|
|
COALESCE(array_agg(DISTINCT fd.trace_id) FILTER (WHERE fd.trace_id IS NOT NULL), '{}') AS trace_ids,
|
|
COUNT(*)::bigint AS face_count
|
|
FROM face_detections fd
|
|
WHERE fd.identity_id = %s
|
|
GROUP BY fd.file_uuid
|
|
ORDER BY fd.file_uuid
|
|
""", (row["id"],))
|
|
binding_rows = bindings_cur.fetchall()
|
|
bindings_cur.close()
|
|
|
|
file_bindings = []
|
|
for b in binding_rows:
|
|
trace_ids = b["trace_ids"]
|
|
if isinstance(trace_ids, list):
|
|
trace_ids = [int(t) for t in trace_ids if t is not None]
|
|
file_bindings.append({
|
|
"file_uuid": b["file_uuid"],
|
|
"trace_ids": trace_ids,
|
|
"face_count": int(b["face_count"]),
|
|
})
|
|
|
|
metadata = row.get("metadata")
|
|
if isinstance(metadata, str):
|
|
metadata = json.loads(metadata) if metadata else {}
|
|
elif metadata is None:
|
|
metadata = {}
|
|
|
|
fmt_time = lambda v: v.isoformat() if v else datetime.now(timezone.utc).isoformat()
|
|
|
|
identity_file = {
|
|
"version": 1,
|
|
"identity_uuid": uuid_clean,
|
|
"name": name,
|
|
"identity_type": row.get("identity_type"),
|
|
"source": row.get("source"),
|
|
"status": row.get("status"),
|
|
"tmdb_id": row.get("tmdb_id"),
|
|
"tmdb_profile": row.get("tmdb_profile"),
|
|
"metadata": metadata,
|
|
"file_bindings": file_bindings,
|
|
"created_at": fmt_time(row.get("created_at")),
|
|
"updated_at": fmt_time(row.get("updated_at")),
|
|
}
|
|
|
|
with open(dir_path / "identity.json", "w", encoding="utf-8") as f:
|
|
json.dump(identity_file, f, indent=2, ensure_ascii=False)
|
|
|
|
index[uuid_clean] = name
|
|
migrated += 1
|
|
print(f" [{migrated:5d}] {name} ({uuid_clean})")
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
# Write _index.json
|
|
index_file = {
|
|
"version": 1,
|
|
"updated_at": datetime.now(timezone.utc).isoformat(),
|
|
"entries": index,
|
|
}
|
|
with open(identities_root / "_index.json", "w", encoding="utf-8") as f:
|
|
json.dump(index_file, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"\nDone: {migrated} identities migrated")
|
|
print(f"Index: {identities_root / '_index.json'} ({len(index)} entries)")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|