#!/opt/homebrew/bin/python3.11 """ Export a video's data to a self-contained SQLite database for offline app use. Uses sqlite-vec extension for native vector storage. The vec0.dylib must be in the script directory or /tmp/. Usage: python3 export_sqlite.py [output.sqlite] """ import sys, json, sqlite3, psycopg2, os UUID = sys.argv[1] if len(sys.argv) > 1 else "aeed71342a899fe4b4c57b7d41bcb692" OUT = sys.argv[2] if len(sys.argv) > 2 else f"/Users/accusys/momentry/output_dev/{UUID}.sqlite" SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) # Find vec0.dylib VEC_DYLIB = None for path in [ os.path.join(SCRIPT_DIR, "vec0.dylib"), "/tmp/vec0.dylib", os.path.join(SCRIPT_DIR, "sqlite-vec", "vec0.dylib"), ]: if os.path.exists(path): VEC_DYLIB = path break print(f"Exporting {UUID} → {OUT}") if VEC_DYLIB: print(f" sqlite-vec: {VEC_DYLIB}") # Connect to PostgreSQL pg = psycopg2.connect("dbname=momentry user=accusys") pg_cur = pg.cursor() # Connect to SQLite if os.path.exists(OUT): os.remove(OUT) lite = sqlite3.connect(OUT) # Load sqlite-vec extension if available if VEC_DYLIB: lite.enable_load_extension(True) try: lite.load_extension(VEC_DYLIB) print(" sqlite-vec extension loaded") except Exception as e: print(f" WARNING: Could not load sqlite-vec: {e}") lite.enable_load_extension(False) lite_cur = lite.cursor() # ---- Helper ---- def pg_to_sqlite(pg_query, lite_table, lite_schema, params=None, transform=None): """Copy PostgreSQL query result to SQLite table.""" lite_cur.execute(lite_schema) pg_cur.execute(pg_query, params or []) rows = pg_cur.fetchall() if not rows: return 0 cols = [d[0] for d in pg_cur.description] placeholders = ",".join(["?" for _ in cols]) count = 0 for row in rows: d = dict(zip(cols, row)) if transform: d = transform(d) vals = [] for c in cols: v = d.get(c) vals.append(None if v is None else v) try: lite_cur.execute(f"INSERT INTO {lite_table} VALUES ({placeholders})", vals) count += 1 except Exception: pass lite.commit() return count # Create tables (skip WAL — Python sqlite3 may not support PRAGMA with extensions loaded) print("Creating tables...") # videos pg_to_sqlite( "SELECT file_uuid, file_name, file_path, duration, fps, width, height, probe_json::text, status FROM dev.videos WHERE file_uuid=%s", "videos", "CREATE TABLE IF NOT EXISTS videos (file_uuid TEXT PRIMARY KEY, file_name TEXT, file_path TEXT, duration REAL, fps REAL, width INTEGER, height INTEGER, probe_json TEXT, status TEXT)", [UUID]) # chunk pg_to_sqlite( "SELECT file_uuid, chunk_id, chunk_type, start_time, end_time, fps, start_frame, end_frame, text_content, metadata->>'speaker_id' as speaker_id FROM dev.chunk WHERE file_uuid=%s ORDER BY chunk_id", "chunk", """CREATE TABLE IF NOT EXISTS chunk ( file_uuid TEXT, chunk_id TEXT, chunk_type TEXT, start_time REAL, end_time REAL, fps REAL, start_frame INTEGER, end_frame INTEGER, text_content TEXT, speaker_id TEXT, PRIMARY KEY(file_uuid, chunk_id))""", [UUID]) def parse_pg_array(text): """Parse PostgreSQL array format {0.1,0.2,...} to Python list.""" if not text or text == 'null': return None try: text = text.strip('{}') return [float(x) for x in text.split(',') if x.strip()] except: return None # chunk vectors → vec0 virtual table print(" Creating vec0 table: chunk_embeddings (768D)...") lite_cur.execute(""" CREATE VIRTUAL TABLE IF NOT EXISTS chunk_embeddings USING vec0( embedding float[768] ) """) pg_cur.execute("SELECT chunk_id, COALESCE(embedding::text, 'null'), uuid FROM dev.chunk_vectors WHERE uuid=%s", [UUID]) chunk_vecs = pg_cur.fetchall() if chunk_vecs: for chunk_id, emb_text, _ in chunk_vecs: # chunk_vectors uses JSONB format, not PG array format emb = None try: emb = json.loads(emb_text) if emb_text else None except: pass if not emb: emb = parse_pg_array(emb_text) # fallback if emb and len(emb) == 768: lite_cur.execute( "INSERT INTO chunk_embeddings (rowid, embedding) VALUES (?, ?)", [int(chunk_id) if chunk_id.isdigit() else hash(chunk_id) & 0x7fffffff, json.dumps(emb)]) lite.commit() print(f" chunk_embeddings: {len(chunk_vecs)} vectors") # face detections def transform_face(row): return row # embedding moved to vec0 table pg_to_sqlite( """SELECT file_uuid, face_id, frame_number, x, y, width, height, confidence, identity_id, trace_id, COALESCE(timestamp_secs, frame_number / 25.0) as timestamp_secs FROM dev.face_detections WHERE file_uuid=%s ORDER BY frame_number""", "face_detections", """CREATE TABLE IF NOT EXISTS face_detections ( file_uuid TEXT, face_id TEXT, frame_number INTEGER, x INTEGER, y INTEGER, width INTEGER, height INTEGER, confidence REAL, identity_id INTEGER, trace_id INTEGER, timestamp_secs REAL)""", [UUID], transform_face) # face embeddings → vec0 virtual table (512D) print(" Creating vec0 table: face_embeddings (512D)...") lite_cur.execute(""" CREATE VIRTUAL TABLE IF NOT EXISTS face_embeddings USING vec0( embedding float[512] ) """) pg_cur.execute("SELECT id, COALESCE(embedding::text, 'null') FROM dev.face_detections WHERE file_uuid=%s", [UUID]) face_vecs = pg_cur.fetchall() if face_vecs: batch = [] for db_id, emb_text in face_vecs: emb = parse_pg_array(emb_text) if emb and len(emb) == 512: batch.append((db_id, json.dumps(emb))) if len(batch) >= 500: lite_cur.executemany("INSERT INTO face_embeddings VALUES (?, ?)", batch) batch = [] if batch: lite_cur.executemany("INSERT INTO face_embeddings VALUES (?, ?)", batch) lite.commit() print(f" face_embeddings: {len(face_vecs)} vectors") # identities def transform_identity(row): return row pg_to_sqlite( """SELECT DISTINCT i.id, i.name, i.uuid, i.identity_type, i.source, i.status, i.tmdb_id, i.tmdb_profile, i.tmdb_poster FROM dev.identities i INNER JOIN dev.face_detections fd ON fd.identity_id = i.id WHERE fd.file_uuid=%s""", "identities", """CREATE TABLE IF NOT EXISTS identities ( id INTEGER PRIMARY KEY, name TEXT, uuid TEXT, identity_type TEXT, source TEXT, status TEXT, tmdb_id INTEGER, tmdb_profile TEXT, tmdb_poster TEXT)""", [UUID], transform_identity) # identity_bindings pg_to_sqlite( """SELECT DISTINCT ib.identity_id, ib.identity_type, ib.identity_value, ib.confidence FROM dev.identity_bindings ib INNER JOIN dev.face_detections fd ON fd.identity_id = ib.identity_id WHERE fd.file_uuid=%s""", "identity_bindings", "CREATE TABLE IF NOT EXISTS identity_bindings (identity_id INTEGER, identity_type TEXT, identity_value TEXT, confidence REAL)", [UUID]) # tkg_nodes pg_to_sqlite( "SELECT id, node_type, external_id, file_uuid, label, properties::text FROM dev.tkg_nodes WHERE file_uuid=%s", "tkg_nodes", "CREATE TABLE IF NOT EXISTS tkg_nodes (id INTEGER PRIMARY KEY, node_type TEXT, external_id TEXT, file_uuid TEXT, label TEXT, properties TEXT)", [UUID]) # tkg_edges pg_to_sqlite( "SELECT id, edge_type, source_node_id, target_node_id, file_uuid, properties::text FROM dev.tkg_edges WHERE file_uuid=%s", "tkg_edges", "CREATE TABLE IF NOT EXISTS tkg_edges (id INTEGER PRIMARY KEY, edge_type TEXT, source_node_id INTEGER, target_node_id INTEGER, file_uuid TEXT, properties TEXT)", [UUID]) # Voice vectors from Qdrant (ECAPA-TDNN speaker embeddings, 192D) print(" Exporting voice vectors from Qdrant...") try: from urllib.request import Request, urlopen lite_cur.execute("SELECT chunk_id FROM chunk") db_chunk_ids = set(r[0] for r in lite_cur.fetchall()) qdrant_chunks = {} offset_val = None while True: data = {"limit": 100, "with_vector": True, "with_payload": True} if offset_val: data["offset"] = offset_val req = Request("http://localhost:6333/collections/momentry_dev_voice/points/scroll", data=json.dumps(data).encode(), headers={"Content-Type": "application/json"}, method="POST") resp = json.loads(urlopen(req).read()) for pt in resp["result"].get("points", []): cid = pt["payload"].get("chunk_id", "") if cid in db_chunk_ids: qdrant_chunks[cid] = pt.get("vector", []) offset_val = resp["result"].get("next_page_offset") if offset_val is None: break if qdrant_chunks: dim = len(next(iter(qdrant_chunks.values()))) lite_cur.execute("CREATE VIRTUAL TABLE IF NOT EXISTS voice_embeddings USING vec0(embedding float[{}])".format(dim)) for chunk_id, vec in qdrant_chunks.items(): if len(vec) == dim: rid = int(chunk_id) if chunk_id.isdigit() else hash(chunk_id) & 0x7fffffff lite_cur.execute("INSERT INTO voice_embeddings (rowid, embedding) VALUES (?, ?)", [rid, json.dumps(vec)]) lite.commit() print(" voice_embeddings (vec0, {}D): {} vectors".format(dim, len(qdrant_chunks))) except Exception as e: print(" WARNING: Qdrant voice export skipped: {}".format(e)) # ---- Create indexes ---- print("Creating indexes...") lite_cur.execute("CREATE INDEX IF NOT EXISTS idx_fd_trace ON face_detections(trace_id)") lite_cur.execute("CREATE INDEX IF NOT EXISTS idx_fd_identity ON face_detections(identity_id)") lite_cur.execute("CREATE INDEX IF NOT EXISTS idx_fd_frame ON face_detections(frame_number)") lite_cur.execute("CREATE INDEX IF NOT EXISTS idx_chunk_chunkid ON chunk(chunk_id)") lite_cur.execute("CREATE INDEX IF NOT EXISTS idx_tkg_node_type ON tkg_nodes(node_type)") lite_cur.execute("CREATE INDEX IF NOT EXISTS idx_tkg_edge_type ON tkg_edges(edge_type)") lite.commit() # ---- Stats ---- pg_cur.close(); pg.close() lite_cur.close(); lite.close() size_mb = os.path.getsize(OUT) / 1024 / 1024 print(f"\n {OUT} ({size_mb:.0f}MB)") # Verify lite = sqlite3.connect(OUT) if VEC_DYLIB: lite.enable_load_extension(True) lite.load_extension(VEC_DYLIB) lite.enable_load_extension(False) c = lite.cursor() for tbl in ['videos', 'chunk', 'face_detections', 'identities', 'identity_bindings', 'tkg_nodes', 'tkg_edges']: c.execute(f"SELECT COUNT(*) FROM {tbl}") print(f" {tbl}: {c.fetchone()[0]} rows") # Check vec tables try: c.execute("SELECT COUNT(*) FROM chunk_embeddings") print(f" chunk_embeddings (vec0, 768D): {c.fetchone()[0]} vectors") except: print(" chunk_embeddings: N/A") try: c.execute("SELECT COUNT(*) FROM face_embeddings") print(f" face_embeddings (vec0, 512D): {c.fetchone()[0]} vectors") except: print(" face_embeddings: N/A") try: c.execute("SELECT COUNT(*) FROM voice_embeddings") print(f" voice_embeddings (vec0, 192D): {c.fetchone()[0]} vectors") except: print(" voice_embeddings: N/A") c.close(); lite.close()