From 4ba248513e30f3711de22ab689e0e2b4ce75141a Mon Sep 17 00:00:00 2001 From: Accusys Date: Mon, 22 Jun 2026 08:49:41 +0800 Subject: [PATCH] fix: correct processor list - remove deprecated mediapipe/appearance/story, fix auto-pipeline order - ProcessorType::all(): remove MediaPipe, Appearance, Story (mediapipe replaced by Swift) - files.rs auto-pipeline: fix order to cut,asr,asrx,yolo,ocr,face,pose (was missing asr) - postgres_db.rs run_migrations(): rewrite to auto-create all 38 tables idempotently --- src/api/files.rs | 2 +- src/core/db/postgres_db.rs | 396 +++++++++++++++++++++++++++++++------ 2 files changed, 333 insertions(+), 65 deletions(-) diff --git a/src/api/files.rs b/src/api/files.rs index 4c9a9a6..930b6e5 100644 --- a/src/api/files.rs +++ b/src/api/files.rs @@ -767,7 +767,7 @@ async fn register_file( if let Some(ref vp) = video_path { if let Ok(job) = auto_state.db.create_monitor_job(&auto_uuid, Some(vp)).await { tracing::info!("[AUTO-PIPELINE] Job {} created for {}", job.id, auto_uuid); - let all_procs: Vec<&str> = vec!["cut", "yolo", "ocr", "face", "pose", "asrx"]; + let all_procs: Vec<&str> = vec!["cut", "asr", "asrx", "yolo", "ocr", "face", "pose"]; let total = sqlx::query_scalar::<_, i64>(&format!( "SELECT COALESCE(total_frames, 0) FROM {} WHERE file_uuid = $1", schema::table_name("videos") diff --git a/src/core/db/postgres_db.rs b/src/core/db/postgres_db.rs index bcde175..243d4e7 100644 --- a/src/core/db/postgres_db.rs +++ b/src/core/db/postgres_db.rs @@ -591,9 +591,6 @@ impl ProcessorType { ProcessorType::Ocr, ProcessorType::Face, ProcessorType::Pose, - ProcessorType::MediaPipe, - ProcessorType::Appearance, - ProcessorType::Story, ] } @@ -926,6 +923,10 @@ impl PostgresDb { } /// Run database migrations (Schema initialization) + /// + /// Creates all required tables with their final schema (all columns from + /// all migrations applied). Uses CREATE TABLE IF NOT EXISTS + ALTER TABLE + /// ADD COLUMN IF NOT EXISTS so it is safe to run on existing databases. pub async fn run_migrations(pool: &PgPool) -> Result<()> { tracing::info!("Running database migrations..."); @@ -937,62 +938,365 @@ impl PostgresDb { .await?; } - // Videos - sqlx::query("CREATE TABLE IF NOT EXISTS videos (id SERIAL PRIMARY KEY, file_uuid VARCHAR(32) UNIQUE NOT NULL, file_path TEXT NOT NULL, file_name TEXT NOT NULL, duration DOUBLE PRECISION, width INTEGER, height INTEGER, fps DOUBLE PRECISION, probe_json jsonb, fs_video BOOLEAN DEFAULT FALSE, fs_json BOOLEAN DEFAULT FALSE, psql_chunk BOOLEAN DEFAULT FALSE, pobject_chunk BOOLEAN DEFAULT FALSE, mobject_chunk BOOLEAN DEFAULT FALSE, pvector_chunk BOOLEAN DEFAULT FALSE, qvector_chunk BOOLEAN DEFAULT FALSE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)").execute(pool).await?; - sqlx::query("CREATE INDEX IF NOT EXISTS idx_videos_file_uuid ON videos(file_uuid)") + // ── Extensions ── + sqlx::query("CREATE EXTENSION IF NOT EXISTS vector") .execute(pool) .await?; - // Chunks - sqlx::query("CREATE TABLE IF NOT EXISTS chunk (id SERIAL PRIMARY KEY, file_uuid VARCHAR(32) NOT NULL, chunk_id VARCHAR(32) NOT NULL, chunk_type VARCHAR(32) NOT NULL, start_time DOUBLE PRECISION NOT NULL, end_time DOUBLE PRECISION NOT NULL, fps DOUBLE PRECISION DEFAULT 24.0, start_frame BIGINT DEFAULT 0, end_frame BIGINT DEFAULT 0, content JSONB NOT NULL, metadata JSONB, vector_id VARCHAR(64), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(file_uuid, chunk_id))").execute(pool).await?; + // ── Schema Migrations (tracking table, used by health check) ── + sqlx::query("CREATE TABLE IF NOT EXISTS schema_migrations (id SERIAL PRIMARY KEY, filename TEXT NOT NULL UNIQUE, checksum TEXT NOT NULL, applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), duration_ms INTEGER DEFAULT 0)").execute(pool).await?; + + // ── Videos ── + sqlx::query("CREATE TABLE IF NOT EXISTS videos (id SERIAL PRIMARY KEY, file_uuid VARCHAR(32) UNIQUE NOT NULL, file_path TEXT NOT NULL, file_name TEXT NOT NULL, duration DOUBLE PRECISION, width INTEGER, height INTEGER, fps DOUBLE PRECISION, probe_json jsonb, fs_video BOOLEAN DEFAULT FALSE, fs_json BOOLEAN DEFAULT FALSE, psql_chunk BOOLEAN DEFAULT FALSE, pobject_chunk BOOLEAN DEFAULT FALSE, mobject_chunk BOOLEAN DEFAULT FALSE, pvector_chunk BOOLEAN DEFAULT FALSE, qvector_chunk BOOLEAN DEFAULT FALSE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)").execute(pool).await?; + // Columns added by later migrations + let video_cols = [ + "status VARCHAR(20) DEFAULT 'pending'", + "user_id BIGINT", + "job_id INTEGER", + "registration_time TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP", + "file_type VARCHAR(20)", + "processing_status JSONB DEFAULT '{}'", + "birth_registration JSONB", + "total_frames BIGINT DEFAULT 0", + "parent_uuid VARCHAR(32)", + "cut_done BOOLEAN DEFAULT false", + "scene_done BOOLEAN DEFAULT false", + "audio_tracks JSONB DEFAULT '[]'", + "cut_count INTEGER DEFAULT 0", + "cut_max_duration DOUBLE PRECISION DEFAULT 0", + "content_hash TEXT", + ]; + for col in &video_cols { + let (col_name, col_def) = col.split_once(' ').unwrap_or((col, "")); + sqlx::query(&format!( + "ALTER TABLE videos ADD COLUMN IF NOT EXISTS {} {}", + col_name, col_def + )) + .execute(pool) + .await?; + } + sqlx::query("CREATE INDEX IF NOT EXISTS idx_videos_file_uuid ON videos(file_uuid)") + .execute(pool) + .await?; + sqlx::query( + "CREATE INDEX IF NOT EXISTS idx_videos_content_hash ON videos(content_hash) WHERE content_hash IS NOT NULL", + ) + .execute(pool) + .await?; + + // ── Chunk ── + sqlx::query("CREATE TABLE IF NOT EXISTS chunk (id SERIAL PRIMARY KEY, file_uuid VARCHAR(32) NOT NULL, chunk_id VARCHAR(128) NOT NULL, chunk_type VARCHAR(32) NOT NULL, start_time DOUBLE PRECISION NOT NULL, end_time DOUBLE PRECISION NOT NULL, fps DOUBLE PRECISION DEFAULT 24.0, start_frame BIGINT DEFAULT 0, end_frame BIGINT DEFAULT 0, content JSONB NOT NULL, metadata JSONB, vector_id VARCHAR(64), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(file_uuid, chunk_id))").execute(pool).await?; + let chunk_cols = [ + "text_content TEXT", + "frame_count INTEGER DEFAULT 0", + "pre_chunk_ids INTEGER[]", + "parent_chunk_id VARCHAR(64)", + "child_chunk_ids TEXT[]", + "search_vector TSVECTOR", + "visual_stats JSONB", + "summary_text TEXT", + "embedding vector", + "old_chunk_id VARCHAR(128)", + "chunk_index INTEGER DEFAULT 0", + ]; + for col in &chunk_cols { + let (col_name, col_def) = col.split_once(' ').unwrap_or((col, "")); + sqlx::query(&format!( + "ALTER TABLE chunk ADD COLUMN IF NOT EXISTS {} {}", + col_name, col_def + )) + .execute(pool) + .await?; + } sqlx::query("CREATE INDEX IF NOT EXISTS idx_chunk_file ON chunk(file_uuid)") .execute(pool) .await?; sqlx::query("CREATE INDEX IF NOT EXISTS idx_chunks_type ON chunk(chunk_type)") .execute(pool) .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_chunk_parent ON chunk(parent_chunk_id)") + .execute(pool) + .await?; + sqlx::query( + "CREATE INDEX IF NOT EXISTS idx_chunk_file_type ON chunk(file_uuid, chunk_type)", + ) + .execute(pool) + .await?; - // Monitor Jobs - sqlx::query("CREATE TABLE IF NOT EXISTS monitor_jobs (id SERIAL PRIMARY KEY, uuid VARCHAR(16) NOT NULL, video_path VARCHAR(512), status VARCHAR(20) NOT NULL DEFAULT 'pending', current_processor VARCHAR(20), progress_total INT DEFAULT 0, progress_current INT DEFAULT 0, error_count INT DEFAULT 0, last_error TEXT, started_at TIMESTAMP, updated_at TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)").execute(pool).await?; - sqlx::query("CREATE INDEX IF NOT EXISTS idx_monitor_jobs_status ON monitor_jobs(status)") + // ── Chunk Vectors ── + sqlx::query("CREATE TABLE IF NOT EXISTS chunk_vectors (id SERIAL PRIMARY KEY, chunk_id VARCHAR(64) NOT NULL, uuid VARCHAR(64) NOT NULL, chunk_type VARCHAR(32) NOT NULL DEFAULT 'sentence', embedding JSONB, created_at TIMESTAMPTZ DEFAULT NOW())").execute(pool).await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_chunk_vec_uuid ON chunk_vectors(uuid)") + .execute(pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_chunk_vec_chunk ON chunk_vectors(chunk_id)") .execute(pool) .await?; - // Processor Results - sqlx::query("CREATE TABLE IF NOT EXISTS processor_results (id SERIAL PRIMARY KEY, job_id INTEGER, file_uuid VARCHAR(255) NOT NULL, processor VARCHAR(64), processor_type VARCHAR(64) NOT NULL, status VARCHAR(32) DEFAULT 'pending', result JSONB, error_message TEXT, started_at TIMESTAMP WITH TIME ZONE, completed_at TIMESTAMP WITH TIME ZONE, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP)").execute(pool).await?; + // ── Parent Chunks ── + sqlx::query("CREATE TABLE IF NOT EXISTS parent_chunks (id SERIAL PRIMARY KEY, uuid VARCHAR(32) NOT NULL, chunk_id VARCHAR(64), summary_text TEXT, summary_tsvector TSVECTOR, metadata JSONB DEFAULT '{}')").execute(pool).await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_parent_chunks_uuid ON parent_chunks(uuid)") + .execute(pool) + .await?; - // Talents - sqlx::query("CREATE TABLE IF NOT EXISTS talents (id BIGSERIAL PRIMARY KEY, real_name VARCHAR(255) NOT NULL UNIQUE, actor_name VARCHAR(255), voice_embedding TEXT, face_embedding TEXT, metadata JSONB DEFAULT '{}', created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP)").execute(pool).await?; + // ── Pre Chunks ── + sqlx::query("CREATE TABLE IF NOT EXISTS pre_chunks (id BIGSERIAL PRIMARY KEY, file_uuid VARCHAR(255) NOT NULL, processor_type VARCHAR(50) NOT NULL, coordinate_type VARCHAR(50) NOT NULL, coordinate_index BIGINT NOT NULL, start_frame BIGINT, end_frame BIGINT, start_time DOUBLE PRECISION, end_time DOUBLE PRECISION, fps DOUBLE PRECISION, data JSONB NOT NULL, identity_id UUID, confidence DOUBLE PRECISION, created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP)").execute(pool).await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_pre_chunks_file_uuid ON pre_chunks(file_uuid)") + .execute(pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_pre_chunks_processor ON pre_chunks(processor_type)") + .execute(pool) + .await?; - // Identity History (Undo/Redo Support - for PATCH operations only) - // Create before identity_bindings to avoid dependency issues + // ── Monitor Jobs ── + sqlx::query("CREATE TABLE IF NOT EXISTS monitor_jobs (id SERIAL PRIMARY KEY, uuid VARCHAR(32) NOT NULL, video_path VARCHAR(512), status VARCHAR(20) NOT NULL DEFAULT 'pending', current_processor VARCHAR(20), progress_total INT DEFAULT 0, progress_current INT DEFAULT 0, error_count INT DEFAULT 0, last_error TEXT, started_at TIMESTAMP, updated_at TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)").execute(pool).await?; + let mj_cols = [ + "video_id BIGINT", + "user_id BIGINT", + "processors TEXT[]", + "completed_processors TEXT[]", + "failed_processors TEXT[]", + ]; + for col in &mj_cols { + let (col_name, col_def) = col.split_once(' ').unwrap_or((col, "")); + sqlx::query(&format!( + "ALTER TABLE monitor_jobs ADD COLUMN IF NOT EXISTS {} {}", + col_name, col_def + )) + .execute(pool) + .await?; + } + sqlx::query("CREATE INDEX IF NOT EXISTS idx_monitor_jobs_status ON monitor_jobs(status)") + .execute(pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_monitor_jobs_uuid ON monitor_jobs(uuid)") + .execute(pool) + .await?; + + // ── Processor Results ── + sqlx::query("CREATE TABLE IF NOT EXISTS processor_results (id SERIAL PRIMARY KEY, job_id INTEGER, file_uuid VARCHAR(255), processor VARCHAR(64), processor_type VARCHAR(64), status VARCHAR(32) DEFAULT 'pending', result JSONB, error_message TEXT, started_at TIMESTAMPTZ, completed_at TIMESTAMPTZ, created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, duration_secs DOUBLE PRECISION, chunks_produced INTEGER DEFAULT 0, frames_processed INTEGER DEFAULT 0, output_size_bytes BIGINT DEFAULT 0, output_path TEXT, progress_total INTEGER DEFAULT 0, progress_current INTEGER DEFAULT 0, last_checkpoint JSONB, retry_count INTEGER DEFAULT 0, output_data JSONB, uuid VARCHAR(255))").execute(pool).await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_pr_uuid ON processor_results(file_uuid)") + .execute(pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_pr_job_id ON processor_results(job_id)") + .execute(pool) + .await?; + sqlx::query("CREATE UNIQUE INDEX IF NOT EXISTS idx_pr_job_processor_type ON processor_results(job_id, processor_type)") + .execute(pool) + .await?; + + // ── Processor Versions ── + sqlx::query("CREATE TABLE IF NOT EXISTS processor_versions (processor VARCHAR(64) PRIMARY KEY, model_version VARCHAR(128) NOT NULL, processor_type VARCHAR(32) NOT NULL DEFAULT 'processor', dependencies TEXT[] DEFAULT '{}', updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, file_uuid VARCHAR(64))").execute(pool).await?; + + // ── Processor Alerts ── + sqlx::query("CREATE TABLE IF NOT EXISTS processor_alerts (id SERIAL PRIMARY KEY, file_uuid VARCHAR(32), processor_type VARCHAR(32) NOT NULL, alert_type VARCHAR(32) NOT NULL, message TEXT, created_at TIMESTAMPTZ DEFAULT NOW())").execute(pool).await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_alerts_file_uuid ON processor_alerts(file_uuid)") + .execute(pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_alerts_processor_type ON processor_alerts(processor_type)") + .execute(pool) + .await?; + + // ── Identities (renamed from talents in migration 012) ── + sqlx::query("CREATE TABLE IF NOT EXISTS identities (id SERIAL PRIMARY KEY, name TEXT NOT NULL, embedding vector, metadata JSONB DEFAULT '{}', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, uuid UUID DEFAULT gen_random_uuid(), identity_type VARCHAR, source VARCHAR, status VARCHAR, face_embedding vector, voice_embedding vector, identity_embedding vector, reference_data JSONB, tmdb_id INTEGER, tmdb_profile TEXT, tmdb_poster TEXT, file_uuid TEXT, updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP)").execute(pool).await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_identities_uuid ON identities(uuid)") + .execute(pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_identities_name ON identities(name)") + .execute(pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_identities_source ON identities(source)") + .execute(pool) + .await?; + + // ── Identity Bindings (depends on identities) ── + sqlx::query("CREATE TABLE IF NOT EXISTS identity_bindings (id BIGSERIAL PRIMARY KEY, identity_id BIGINT REFERENCES identities(id) ON DELETE CASCADE, identity_type VARCHAR(20) NOT NULL, identity_value VARCHAR(100) NOT NULL, metadata JSONB DEFAULT '{}', confidence DOUBLE PRECISION DEFAULT 1.0, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, UNIQUE(identity_id, identity_type, identity_value))").execute(pool).await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_identity_bindings_identity ON identity_bindings(identity_id)") + .execute(pool) + .await?; + + // ── Identity History ── let history_table = schema::table_name("identity_history"); sqlx::query(&format!("CREATE TABLE IF NOT EXISTS {} (id BIGSERIAL PRIMARY KEY, identity_id INTEGER NOT NULL, operation VARCHAR(20) NOT NULL, before_snapshot JSONB, after_snapshot JSONB, is_undone BOOLEAN DEFAULT FALSE, undone_at TIMESTAMPTZ, user_id VARCHAR(100), user_source VARCHAR(50), created_at TIMESTAMPTZ DEFAULT NOW())", history_table)).execute(pool).await?; sqlx::query(&format!("CREATE INDEX IF NOT EXISTS idx_identity_history_identity_time ON {}(identity_id, created_at DESC)", history_table)).execute(pool).await?; sqlx::query(&format!("CREATE INDEX IF NOT EXISTS idx_identity_history_not_undone ON {}(identity_id, created_at DESC) WHERE NOT is_undone", history_table)).execute(pool).await?; - // Identity Bindings (depends on identities table) - sqlx::query("CREATE TABLE IF NOT EXISTS identity_bindings (id BIGSERIAL PRIMARY KEY, identity_id BIGINT REFERENCES identities(id) ON DELETE CASCADE, identity_type VARCHAR(20) NOT NULL, identity_value VARCHAR(100) NOT NULL, metadata JSONB DEFAULT '{}', confidence DOUBLE PRECISION DEFAULT 1.0, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, UNIQUE(identity_id, identity_type, identity_value))").execute(pool).await?; + // ── File Identities (N:N file ↔ identity) ── + sqlx::query("CREATE TABLE IF NOT EXISTS file_identities (id SERIAL PRIMARY KEY, file_uuid VARCHAR(255) NOT NULL, identity_id INTEGER NOT NULL REFERENCES identities(id) ON DELETE CASCADE, confidence DOUBLE PRECISION DEFAULT 1.0, metadata JSONB, created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, UNIQUE(file_uuid, identity_id))").execute(pool).await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_file_identities_file ON file_identities(file_uuid)") + .execute(pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_file_identities_identity ON file_identities(identity_id)") + .execute(pool) + .await?; - // API Keys + // ── Face Detections ── + sqlx::query("CREATE TABLE IF NOT EXISTS face_detections (id SERIAL PRIMARY KEY, file_uuid VARCHAR(32) NOT NULL, frame_number BIGINT NOT NULL, timestamp_secs DOUBLE PRECISION NOT NULL, face_id VARCHAR(255), x INTEGER NOT NULL, y INTEGER NOT NULL, width INTEGER NOT NULL, height INTEGER NOT NULL, confidence DOUBLE PRECISION NOT NULL, embedding vector, attributes JSONB, identity_id INTEGER REFERENCES identities(id) ON DELETE SET NULL, identity_confidence DOUBLE PRECISION, cluster_id VARCHAR(255), created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, metadata JSONB DEFAULT '{}', trace_id INTEGER, stranger_id INTEGER, cut_id INTEGER)").execute(pool).await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_face_detections_file ON face_detections(file_uuid)") + .execute(pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_face_detections_face_id ON face_detections(face_id)") + .execute(pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_face_detections_frame ON face_detections(frame_number)") + .execute(pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_face_detections_identity ON face_detections(identity_id)") + .execute(pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_face_detections_trace ON face_detections(trace_id) WHERE trace_id IS NOT NULL") + .execute(pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_face_detections_trace_time ON face_detections(file_uuid, trace_id) WHERE trace_id IS NOT NULL") + .execute(pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_face_detections_stranger ON face_detections(file_uuid, stranger_id) WHERE stranger_id IS NOT NULL") + .execute(pool) + .await?; + + // ── Face Identities ── + sqlx::query("CREATE TABLE IF NOT EXISTS face_identities (id SERIAL PRIMARY KEY, face_id VARCHAR(255) NOT NULL UNIQUE, name VARCHAR(255), embedding vector(512), attributes JSONB, metadata JSONB DEFAULT '{}', created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, is_active BOOLEAN DEFAULT TRUE)").execute(pool).await?; + + // ── Face Clusters ── + sqlx::query("CREATE TABLE IF NOT EXISTS face_clusters (id SERIAL PRIMARY KEY, cluster_id VARCHAR(255) NOT NULL UNIQUE, file_uuid VARCHAR(255) NOT NULL, centroid vector(512), size INTEGER NOT NULL DEFAULT 0, representative_face_id VARCHAR(255), metadata JSONB DEFAULT '{}', created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP)").execute(pool).await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_face_clusters_file ON face_clusters(file_uuid)") + .execute(pool) + .await?; + + // ── Face Recognition Results ── + sqlx::query("CREATE TABLE IF NOT EXISTS face_recognition_results (id SERIAL PRIMARY KEY, file_uuid VARCHAR(255) NOT NULL UNIQUE, frame_count BIGINT NOT NULL DEFAULT 0, fps DOUBLE PRECISION NOT NULL DEFAULT 0.0, total_faces INTEGER NOT NULL DEFAULT 0, recognized_faces INTEGER NOT NULL DEFAULT 0, clusters_count INTEGER NOT NULL DEFAULT 0, result_data JSONB NOT NULL, processing_time_secs DOUBLE PRECISION, created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP)").execute(pool).await?; + + // ── Strangers ── + sqlx::query("CREATE TABLE IF NOT EXISTS strangers (id SERIAL PRIMARY KEY, file_uuid TEXT, trace_id INTEGER, metadata JSONB DEFAULT '{}', created_at TIMESTAMPTZ DEFAULT NOW(), UNIQUE(file_uuid, trace_id))").execute(pool).await?; + + // ── Cuts ── + sqlx::query("CREATE TABLE IF NOT EXISTS cuts (id SERIAL PRIMARY KEY, file_uuid VARCHAR(32) NOT NULL, cut_number INTEGER NOT NULL, start_frame BIGINT NOT NULL, end_frame BIGINT NOT NULL, start_time DOUBLE PRECISION, end_time DOUBLE PRECISION, fps DOUBLE PRECISION, metadata JSONB DEFAULT '{}', created_at TIMESTAMPTZ DEFAULT NOW(), UNIQUE(file_uuid, cut_number))").execute(pool).await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_cuts_file_uuid ON cuts(file_uuid)") + .execute(pool) + .await?; + + // ── Frames ── + sqlx::query("CREATE TABLE IF NOT EXISTS frames (id SERIAL PRIMARY KEY, file_id INTEGER NOT NULL, frame_number BIGINT NOT NULL, timestamp DOUBLE PRECISION NOT NULL, fps DOUBLE PRECISION DEFAULT 24.0, yolo_objects JSONB, ocr_results JSONB, face_results JSONB, frame_path TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)").execute(pool).await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_frames_file_id ON frames(file_id)") + .execute(pool) + .await?; + + // ── TKG Nodes ── + sqlx::query("CREATE TABLE IF NOT EXISTS tkg_nodes (id BIGSERIAL PRIMARY KEY, node_type VARCHAR(64) NOT NULL, external_id VARCHAR(256) NOT NULL, file_uuid VARCHAR(64) NOT NULL, label VARCHAR(512), properties JSONB NOT NULL DEFAULT '{}', created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, UNIQUE(file_uuid, node_type, external_id))").execute(pool).await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_tkg_nodes_type ON tkg_nodes(node_type)") + .execute(pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_tkg_nodes_file ON tkg_nodes(file_uuid)") + .execute(pool) + .await?; + + // ── TKG Edges ── + sqlx::query("CREATE TABLE IF NOT EXISTS tkg_edges (id BIGSERIAL PRIMARY KEY, edge_type VARCHAR(64) NOT NULL, source_node_id BIGINT NOT NULL REFERENCES tkg_nodes(id) ON DELETE CASCADE, target_node_id BIGINT NOT NULL REFERENCES tkg_nodes(id) ON DELETE CASCADE, file_uuid VARCHAR(64) NOT NULL, properties JSONB NOT NULL DEFAULT '{}', created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, UNIQUE(file_uuid, edge_type, source_node_id, target_node_id))").execute(pool).await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_tkg_edges_type ON tkg_edges(edge_type)") + .execute(pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_tkg_edges_source ON tkg_edges(source_node_id)") + .execute(pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_tkg_edges_file ON tkg_edges(file_uuid)") + .execute(pool) + .await?; + + // ── Characters ── + sqlx::query("CREATE TABLE IF NOT EXISTS characters (id BIGSERIAL PRIMARY KEY, file_uuid TEXT NOT NULL, name TEXT NOT NULL, language_track TEXT DEFAULT 'original', is_voice_only BOOLEAN DEFAULT FALSE, metadata JSONB DEFAULT '{}', UNIQUE(file_uuid, name, language_track))").execute(pool).await?; + + // ── Castings ── + sqlx::query("CREATE TABLE IF NOT EXISTS castings (id BIGSERIAL PRIMARY KEY, character_id BIGINT REFERENCES characters(id) ON DELETE CASCADE, identity_id BIGINT REFERENCES identities(id) ON DELETE CASCADE, track_type VARCHAR(32) DEFAULT 'original', role_type VARCHAR(32) DEFAULT 'both', UNIQUE(character_id, identity_id, track_type))").execute(pool).await?; + + // ── Users ── + sqlx::query("CREATE TABLE IF NOT EXISTS users (id SERIAL PRIMARY KEY, username VARCHAR(64) NOT NULL UNIQUE, password_hash VARCHAR(256) NOT NULL, role VARCHAR(20) NOT NULL DEFAULT 'user' CHECK (role IN ('admin', 'user', 'readonly')), status VARCHAR(20) NOT NULL DEFAULT 'active' CHECK (status IN ('active', 'disabled')), created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, last_login TIMESTAMPTZ)").execute(pool).await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_users_username ON users(username)") + .execute(pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_users_status ON users(status)") + .execute(pool) + .await?; + + // ── API Keys (before sessions — sessions FK references api_keys) ── sqlx::query("CREATE TABLE IF NOT EXISTS api_keys (id SERIAL PRIMARY KEY, key_id VARCHAR(48) UNIQUE NOT NULL, key_hash VARCHAR(64) NOT NULL, key_prefix VARCHAR(8) NOT NULL, name VARCHAR(128) NOT NULL, key_type VARCHAR(20) NOT NULL DEFAULT 'user', user_id BIGINT, service_name VARCHAR(64), permissions JSONB DEFAULT '[\"read\", \"write\"]', expires_at TIMESTAMP, last_used_at TIMESTAMP, last_used_ip VARCHAR(45), usage_count BIGINT DEFAULT 0, status VARCHAR(20) NOT NULL DEFAULT 'active', rotation_required BOOLEAN DEFAULT FALSE, rotation_reason TEXT, grace_period_end TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)").execute(pool).await?; sqlx::query("CREATE INDEX IF NOT EXISTS idx_api_keys_hash ON api_keys(key_hash)") .execute(pool) .await?; - // API Key Audit Log + // ── API Key Audit Log ── sqlx::query("CREATE TABLE IF NOT EXISTS api_key_audit_log (id SERIAL PRIMARY KEY, key_id VARCHAR(32) NOT NULL, action VARCHAR(50) NOT NULL, actor VARCHAR(128), ip_address VARCHAR(45), user_agent TEXT, request_path TEXT, response_code INT, anomaly_type VARCHAR(30), details JSONB, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)").execute(pool).await?; - // API Key Anomalies + // ── API Key Anomalies ── sqlx::query("CREATE TABLE IF NOT EXISTS api_key_anomalies (id SERIAL PRIMARY KEY, key_id VARCHAR(32) NOT NULL, anomaly_type VARCHAR(30) NOT NULL, severity VARCHAR(10) NOT NULL, ip_address VARCHAR(45), request_count INT, error_count INT, error_rate DOUBLE PRECISION, unique_ips INT, details JSONB, resolved BOOLEAN DEFAULT FALSE, resolved_at TIMESTAMP, resolved_by VARCHAR(128), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)").execute(pool).await?; - // Gitea Tokens + // ── Sessions (FK: users(id), api_keys(key_id)) ── + sqlx::query("CREATE TABLE IF NOT EXISTS sessions (id SERIAL PRIMARY KEY, session_id VARCHAR(64) NOT NULL UNIQUE, user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, api_key_id VARCHAR(48) NOT NULL REFERENCES api_keys(key_id) ON DELETE CASCADE, created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, expires_at TIMESTAMPTZ NOT NULL, last_accessed_at TIMESTAMPTZ)").execute(pool).await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_sessions_session_id ON sessions(session_id)") + .execute(pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_sessions_user_id ON sessions(user_id)") + .execute(pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_sessions_expires ON sessions(expires_at)") + .execute(pool) + .await?; + + // ── JWT Blacklist ── + sqlx::query("CREATE TABLE IF NOT EXISTS jwt_blacklist (id SERIAL PRIMARY KEY, jti VARCHAR(64) NOT NULL UNIQUE, reason VARCHAR(20) NOT NULL DEFAULT 'logout' CHECK (reason IN ('logout', 'revoke', 'rotate')), created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, expires_at TIMESTAMPTZ NOT NULL)").execute(pool).await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_jwt_blacklist_jti ON jwt_blacklist(jti)") + .execute(pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_jwt_blacklist_expires ON jwt_blacklist(expires_at)") + .execute(pool) + .await?; + + // ── Resources ── + sqlx::query("CREATE TABLE IF NOT EXISTS resources (resource_id VARCHAR(64) PRIMARY KEY, resource_type VARCHAR(20) NOT NULL, category VARCHAR(50), capabilities JSONB DEFAULT '{}', config JSONB DEFAULT '{}', metadata JSONB DEFAULT '{}', status VARCHAR(20) DEFAULT 'offline', last_heartbeat TIMESTAMPTZ, created_at TIMESTAMPTZ DEFAULT NOW())").execute(pool).await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_resources_type ON resources(resource_type)") + .execute(pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_resources_status ON resources(status)") + .execute(pool) + .await?; + + // ── MAC Allocations ── + sqlx::query("CREATE TABLE IF NOT EXISTS mac_allocations (mac_address VARCHAR(17) PRIMARY KEY, machine_name VARCHAR(100), license_key VARCHAR(64), is_active BOOLEAN DEFAULT true, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW())").execute(pool).await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_mac_allocations_license ON mac_allocations(license_key)") + .execute(pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_mac_allocations_active ON mac_allocations(is_active) WHERE is_active") + .execute(pool) + .await?; + + // ── Chunks Rule1 ── + sqlx::query("CREATE TABLE IF NOT EXISTS chunks_rule1 (id BIGSERIAL PRIMARY KEY, uuid VARCHAR(255) NOT NULL, chunk_id VARCHAR(100) NOT NULL, chunk_index INTEGER NOT NULL, start_time DOUBLE PRECISION, end_time DOUBLE PRECISION, start_frame BIGINT, end_frame BIGINT, fps DOUBLE PRECISION, content JSONB, metadata JSONB, created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP)").execute(pool).await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_chunks_rule1_uuid ON chunks_rule1(uuid)") + .execute(pool) + .await?; + + // ── Gitea Tokens ── sqlx::query("CREATE TABLE IF NOT EXISTS gitea_tokens (id SERIAL PRIMARY KEY, gitea_token_id BIGINT NOT NULL, gitea_user VARCHAR(128) NOT NULL, token_name VARCHAR(128) NOT NULL, token_last_eight VARCHAR(8) NOT NULL, scopes JSONB DEFAULT '[]', api_key_id VARCHAR(48), last_verified TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(gitea_user, token_name))").execute(pool).await?; - // N8N API Keys - sqlx::query("CREATE TABLE IF NOT EXISTS n8n_api_keys (id SERIAL PRIMARY KEY, n8n_key_id VARCHAR(64) UNIQUE NOT NULL, label VARCHAR(100) NOT NULL, api_key_last_eight VARCHAR(8) NOT NULL, momentry_api_key_id VARCHAR(48), expires_at TIMESTAMP WITH TIME ZONE, last_verified TIMESTAMP WITH TIME ZONE, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP)").execute(pool).await?; + // ── N8N API Keys ── + sqlx::query("CREATE TABLE IF NOT EXISTS n8n_api_keys (id SERIAL PRIMARY KEY, n8n_key_id VARCHAR(64) UNIQUE NOT NULL, label VARCHAR(100) NOT NULL, api_key_last_eight VARCHAR(8) NOT NULL, momentry_api_key_id VARCHAR(48), expires_at TIMESTAMPTZ, last_verified TIMESTAMPTZ, created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP)").execute(pool).await?; - // Search Trigger + // ── Speaker Detections ── + sqlx::query("CREATE TABLE IF NOT EXISTS speaker_detections (id SERIAL PRIMARY KEY, file_uuid VARCHAR(32) NOT NULL, identity_id INTEGER REFERENCES identities(id) ON DELETE CASCADE, speaker_id VARCHAR(32), start_time DOUBLE PRECISION, end_time DOUBLE PRECISION, text_content TEXT, chunk_id VARCHAR(128), confidence REAL, metadata JSONB DEFAULT '{}', created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP)").execute(pool).await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_speaker_detections_identity ON speaker_detections(identity_id)") + .execute(pool).await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_speaker_detections_file ON speaker_detections(file_uuid)") + .execute(pool).await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_speaker_detections_chunk ON speaker_detections(chunk_id)") + .execute(pool).await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_speaker_detections_search ON speaker_detections(file_uuid, identity_id)") + .execute(pool).await?; + + // ── Jobs ── + sqlx::query("CREATE TABLE IF NOT EXISTS jobs (id UUID PRIMARY KEY, file_uuid VARCHAR(32) NOT NULL REFERENCES videos(file_uuid) ON DELETE CASCADE, processor_list TEXT[], assigned_processor_id UUID, rule VARCHAR(20), status VARCHAR(20) DEFAULT 'QUEUED', total_frames BIGINT DEFAULT 0, processed_frames BIGINT DEFAULT 0, error_message TEXT, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW())").execute(pool).await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status)") + .execute(pool) + .await?; + + // ── Functions & Triggers ── sqlx::query( "CREATE OR REPLACE FUNCTION update_search_vector() RETURNS TRIGGER AS $func$ BEGIN @@ -1030,7 +1334,6 @@ impl PostgresDb { sqlx::query("DROP TRIGGER IF EXISTS chunks_search_vector_trigger ON chunk") .execute(pool) .await?; - sqlx::query( "CREATE TRIGGER chunks_search_vector_trigger BEFORE INSERT OR UPDATE ON chunk @@ -1039,43 +1342,8 @@ impl PostgresDb { .execute(pool) .await?; - // Chunks Rule 1 - // Temporarily disabled for debugging - // sqlx::query("CREATE TABLE IF NOT EXISTS chunks_rule1 ...").execute(pool).await?; - // sqlx::query("CREATE INDEX IF NOT EXISTS idx_chunks_rule1_asset ...").execute(pool).await?; - - // Speaker Detections - sqlx::query( - "CREATE TABLE IF NOT EXISTS speaker_detections ( \ - id SERIAL PRIMARY KEY, \ - file_uuid VARCHAR(32) NOT NULL, \ - identity_id INTEGER REFERENCES identities(id) ON DELETE CASCADE, \ - speaker_id VARCHAR(32), \ - start_time DOUBLE PRECISION, \ - end_time DOUBLE PRECISION, \ - text_content TEXT, \ - chunk_id VARCHAR(128), \ - confidence REAL, \ - metadata JSONB DEFAULT '{}', \ - created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP)", - ) - .execute(pool) - .await?; - sqlx::query("CREATE INDEX IF NOT EXISTS idx_speaker_detections_identity ON speaker_detections(identity_id)") - .execute(pool).await?; - sqlx::query("CREATE INDEX IF NOT EXISTS idx_speaker_detections_file ON speaker_detections(file_uuid)") - .execute(pool).await?; - sqlx::query("CREATE INDEX IF NOT EXISTS idx_speaker_detections_chunk ON speaker_detections(chunk_id)") - .execute(pool).await?; - sqlx::query("CREATE INDEX IF NOT EXISTS idx_speaker_detections_search ON speaker_detections(file_uuid, identity_id)") - .execute(pool).await?; - - // Jobs (Legacy/P0) - tracing::info!("Creating jobs table..."); - sqlx::query("CREATE TABLE IF NOT EXISTS jobs (id UUID PRIMARY KEY, file_uuid VARCHAR(32) NOT NULL REFERENCES videos(file_uuid) ON DELETE CASCADE, processor_list TEXT[], assigned_processor_id UUID, rule VARCHAR(20), status VARCHAR(20) DEFAULT 'QUEUED', total_frames BIGINT DEFAULT 0, processed_frames BIGINT DEFAULT 0, error_message TEXT, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW())").execute(pool).await?; - sqlx::query("CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status)") - .execute(pool) - .await?; + // ── Seed admin user if not exists ── + sqlx::query("INSERT INTO users (username, password_hash, role) SELECT 'admin', '$argon2id$v=19$m=65536,t=3,p=4$YWRtaW4$placeholder', 'admin' WHERE NOT EXISTS (SELECT 1 FROM users WHERE username = 'admin')").execute(pool).await.ok(); tracing::info!("Database migrations completed."); Ok(())