diff --git a/src/api/five_w1h_agent_api.rs b/src/api/five_w1h_agent_api.rs index e73ba5c..533156e 100644 --- a/src/api/five_w1h_agent_api.rs +++ b/src/api/five_w1h_agent_api.rs @@ -757,10 +757,11 @@ pub async fn run_5w1h_agent(db: &PostgresDb, file_uuid: &str) -> anyhow::Result< let qdrant = QdrantDb::new(); qdrant.init_collection(768).await?; + let chunk_table = schema::table_name("chunk"); let rows = sqlx::query_as::<_, (String, String, String, f64, f64)>( - r#"SELECT chunk_id, chunk_type, text_content, start_time, end_time - FROM dev.chunk WHERE file_uuid = $1 AND chunk_type = 'sentence' AND embedding IS NULL - AND (text_content IS NOT NULL AND text_content != '') ORDER BY id"#, + &format!("SELECT chunk_id, chunk_type, text_content, start_time, end_time \ + FROM {} WHERE file_uuid = $1 AND chunk_type = 'sentence' AND embedding IS NULL \ + AND (text_content IS NOT NULL AND text_content != '') ORDER BY id", chunk_table), ) .bind(file_uuid) .fetch_all(db.pool()) @@ -776,7 +777,7 @@ pub async fn run_5w1h_agent(db: &PostgresDb, file_uuid: &str) -> anyhow::Result< match embedder.embed_document(text).await { Ok(vector) => { if let Err(e) = sqlx::query( - "UPDATE dev.chunk SET embedding = $1::vector WHERE chunk_id = $2 AND file_uuid = $3" + &format!("UPDATE {} SET embedding = $1::vector WHERE chunk_id = $2 AND file_uuid = $3", chunk_table) ) .bind(&vector as &[f32]) .bind(chunk_id) diff --git a/src/api/identity_agent_api.rs b/src/api/identity_agent_api.rs index d0bd3fe..81737b3 100644 --- a/src/api/identity_agent_api.rs +++ b/src/api/identity_agent_api.rs @@ -10,6 +10,7 @@ use sqlx::Row; use std::path::PathBuf; use crate::api::server::AppState; +use crate::core::db::schema; use crate::core::db::PostgresDb; pub fn identity_agent_routes() -> Router { @@ -204,7 +205,7 @@ async fn analyze_identity( }); let _ = sqlx::query( - "INSERT INTO dev.identities (name, identity_type, source, metadata, status) VALUES ($1, 'people', 'auto', $2::jsonb, 'pending') ON CONFLICT DO NOTHING" + &format!("INSERT INTO {} (name, identity_type, source, metadata, status) VALUES ($1, 'people', 'auto', $2::jsonb, 'pending') ON CONFLICT DO NOTHING", schema::table_name("identities")) ) .bind(&identity_name) .bind(&metadata) @@ -473,21 +474,21 @@ async fn suggest_clustering( None => String::new(), }; + let fd_table = schema::table_name("face_detections"); + let identities_table = schema::table_name("identities"); let query = format!( - r#" - SELECT trace_id, file_uuid, COUNT(*) as face_count - FROM dev.face_detections fd - WHERE fd.trace_id IS NOT NULL - AND NOT EXISTS ( - SELECT 1 FROM dev.identities i - WHERE i.metadata->>'trace_id' = fd.trace_id::text - ) - {} - GROUP BY trace_id, file_uuid - HAVING COUNT(*) >= $1 - ORDER BY face_count DESC - "#, - file_filter + "SELECT trace_id, file_uuid, COUNT(*) as face_count \ + FROM {} fd \ + WHERE fd.trace_id IS NOT NULL \ + AND NOT EXISTS ( \ + SELECT 1 FROM {} i \ + WHERE i.metadata->>'trace_id' = fd.trace_id::text \ + ) \ + {} \ + GROUP BY trace_id, file_uuid \ + HAVING COUNT(*) >= $1 \ + ORDER BY face_count DESC", + fd_table, identities_table, file_filter ); let pool = state.db.pool(); @@ -660,8 +661,9 @@ fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 { /// Round 2+: 用已匹配 trace 的所有 face 作為 seed,傳播到未匹配 trace async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::Result { // Step 1: 載入 TMDb identities (source='tmdb' 且有 face_embedding) + let identities_table = schema::table_name("identities"); let tmdb_rows = sqlx::query_as::<_, (i32, String, Vec)>( - "SELECT id, name, face_embedding::real[] FROM dev.identities WHERE source='tmdb' AND face_embedding IS NOT NULL" + &format!("SELECT id, name, face_embedding::real[] FROM {} WHERE source='tmdb' AND face_embedding IS NOT NULL", identities_table) ) .fetch_all(pool).await?; @@ -675,10 +677,11 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow:: ); // Step 2: 載入所有 face_detections,按 trace_id 分組 + let fd_table = schema::table_name("face_detections"); let fd_rows = sqlx::query_as::<_, (i32, Vec)>( - "SELECT trace_id, embedding FROM dev.face_detections \ + &format!("SELECT trace_id, embedding FROM {} \ WHERE file_uuid=$1 AND trace_id IS NOT NULL AND embedding IS NOT NULL \ - ORDER BY trace_id", + ORDER BY trace_id", fd_table), ) .bind(file_uuid) .fetch_all(pool) @@ -797,17 +800,19 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow:: } // Step 5: 寫入 DB + let identities_table = schema::table_name("identities"); + let fd_table = schema::table_name("face_detections"); let mut updated = 0usize; for (tid, name) in &matched { let id_opt = sqlx::query_scalar::<_, Option>( - "SELECT id FROM dev.identities WHERE name=$1 AND source='tmdb'", + &format!("SELECT id FROM {} WHERE name=$1 AND source='tmdb'", identities_table), ) .bind(name) .fetch_optional(pool) .await?; if let Some(identity_id) = id_opt { let _ = sqlx::query( - "UPDATE dev.face_detections SET identity_id=$1 WHERE file_uuid=$2 AND trace_id=$3", + &format!("UPDATE {} SET identity_id=$1 WHERE file_uuid=$2 AND trace_id=$3", fd_table), ) .bind(identity_id) .bind(file_uuid) @@ -833,10 +838,11 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow:: /// and stores bindings in identity_bindings table. pub async fn bind_speakers(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::Result { // Load face traces with identity_id and frame numbers + let fd_table = schema::table_name("face_detections"); let traces = sqlx::query_as::<_, (i32, Vec)>( - "SELECT trace_id, array_agg(frame_number ORDER BY frame_number) \ - FROM dev.face_detections WHERE file_uuid=$1 AND trace_id IS NOT NULL AND identity_id IS NOT NULL \ - GROUP BY trace_id" + &format!("SELECT trace_id, array_agg(frame_number ORDER BY frame_number) \ + FROM {} WHERE file_uuid=$1 AND trace_id IS NOT NULL AND identity_id IS NOT NULL \ + GROUP BY trace_id", fd_table) ) .bind(file_uuid) .fetch_all(pool).await?; @@ -903,8 +909,9 @@ pub async fn bind_speakers(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::Resu } // Get identity_id for this trace + let fd_table = schema::table_name("face_detections"); let identity_id: Option = sqlx::query_scalar( - "SELECT identity_id FROM dev.face_detections WHERE file_uuid=$1 AND trace_id=$2 AND identity_id IS NOT NULL LIMIT 1" + &format!("SELECT identity_id FROM {} WHERE file_uuid=$1 AND trace_id=$2 AND identity_id IS NOT NULL LIMIT 1", fd_table) ) .bind(file_uuid).bind(trace_id) .fetch_optional(pool).await?.flatten(); @@ -945,10 +952,11 @@ pub async fn bind_speakers(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::Resu "overlap_ratio": overlap_ratio, }); + let ib_table = schema::table_name("identity_bindings"); let _ = sqlx::query( - "INSERT INTO dev.identity_bindings (identity_id, identity_type, identity_value, confidence, metadata) \ + &format!("INSERT INTO {} (identity_id, identity_type, identity_value, confidence, metadata) \ VALUES ($1, 'speaker', $2, $3, $4::jsonb) \ - ON CONFLICT (identity_id, identity_type, identity_value) DO UPDATE SET confidence = EXCLUDED.confidence, metadata = EXCLUDED.metadata" + ON CONFLICT (identity_id, identity_type, identity_value) DO UPDATE SET confidence = EXCLUDED.confidence, metadata = EXCLUDED.metadata", ib_table) ) .bind(identity_id) .bind(&best_speaker) @@ -1025,7 +1033,7 @@ pub async fn run_identity_agent(db: &PostgresDb, file_uuid: &str) -> anyhow::Res "reasoning": id_result.reasoning, }); let _ = sqlx::query( - "INSERT INTO dev.identities (name, identity_type, source, metadata, status) VALUES ($1, 'people', 'auto', $2::jsonb, 'pending') ON CONFLICT DO NOTHING" + &format!("INSERT INTO {} (name, identity_type, source, metadata, status) VALUES ($1, 'people', 'auto', $2::jsonb, 'pending') ON CONFLICT DO NOTHING", schema::table_name("identities")) ) .bind(&identity_name) .bind(&metadata) diff --git a/src/api/media_api.rs b/src/api/media_api.rs index 5799206..85f5139 100644 --- a/src/api/media_api.rs +++ b/src/api/media_api.rs @@ -324,9 +324,10 @@ async fn trace_video( let pad_frames = (padding * fps) as i32; // Query identity info for this trace + let identities_table = schema::table_name("identities"); let identity_name: String = sqlx::query_scalar(&format!( - "SELECT COALESCE(i.name, 'unknown') FROM {} fd LEFT JOIN dev.identities i ON i.id = fd.identity_id WHERE fd.file_uuid = $1 AND fd.trace_id = $2 LIMIT 1", - face_table + "SELECT COALESCE(i.name, 'unknown') FROM {} fd LEFT JOIN {} i ON i.id = fd.identity_id WHERE fd.file_uuid = $1 AND fd.trace_id = $2 LIMIT 1", + face_table, identities_table )) .bind(&file_uuid).bind(trace_id) .fetch_optional(state.db.pool()).await @@ -334,8 +335,9 @@ async fn trace_video( .unwrap_or_else(|| "unknown".to_string()); // Query cut_id for the first frame + let cut_table = schema::table_name("cut"); let cut_id: i32 = sqlx::query_scalar( - "SELECT scene_number FROM public.cut WHERE file_uuid = $1 AND start_frame <= $2 AND end_frame >= $2 LIMIT 1" + &format!("SELECT scene_number FROM {} WHERE file_uuid = $1 AND start_frame <= $2 AND end_frame >= $2 LIMIT 1", cut_table) ) .bind(&file_uuid).bind(first_frame) .fetch_optional(state.db.pool()).await diff --git a/src/api/server.rs b/src/api/server.rs index 83095ed..22e6835 100644 --- a/src/api/server.rs +++ b/src/api/server.rs @@ -658,8 +658,9 @@ async fn register_single_file( } }; + let videos_table = schema::table_name("videos"); let birthday = sqlx::query_scalar::<_, chrono::DateTime>( - "SELECT registration_time FROM dev.videos WHERE file_name = $1 AND registration_time IS NOT NULL LIMIT 1" + &format!("SELECT registration_time FROM {} WHERE file_name = $1 AND registration_time IS NOT NULL LIMIT 1", videos_table) ) .bind(&file_name) .fetch_optional(db.pool()) @@ -900,8 +901,9 @@ async fn register_single_file( } } } + let videos_table = schema::table_name("videos"); let _ = sqlx::query( - "UPDATE dev.videos SET cut_done = $1, scene_done = $2, audio_tracks = $3, cut_count = $4, cut_max_duration = $5 WHERE file_uuid = $6" + &format!("UPDATE {} SET cut_done = $1, scene_done = $2, audio_tracks = $3, cut_count = $4, cut_max_duration = $5 WHERE file_uuid = $6", videos_table) ) .bind(cut_done).bind(scene_done).bind(&audio_tracks_json).bind(cut_count).bind(cut_max_duration).bind(&file_uuid) .execute(db.pool()).await; diff --git a/src/api/universal_search.rs b/src/api/universal_search.rs index 20b96c6..acdeb90 100644 --- a/src/api/universal_search.rs +++ b/src/api/universal_search.rs @@ -10,7 +10,7 @@ use axum::{ }; use serde::{Deserialize, Serialize}; -use crate::core::db::{Database, PostgresDb}; +use crate::core::db::{schema, Database, PostgresDb}; #[derive(Debug, Deserialize)] pub struct UniversalSearchRequest { @@ -326,9 +326,10 @@ async fn search_chunks( None => return Err(anyhow::anyhow!("uuid is required for chunk search")), }; + let chunk_table = schema::table_name("chunk"); let mut sql = format!( - "SELECT chunk_id, chunk_type, start_time, end_time, start_frame, end_frame, text_content, content FROM dev.chunk WHERE file_uuid = '{}'", - uuid + "SELECT chunk_id, chunk_type, start_time, end_time, start_frame, end_frame, text_content, content FROM {} WHERE file_uuid = '{}'", + chunk_table, uuid ); if let Some(tr) = &req.time_range { sql.push_str(&format!( diff --git a/src/api/visual_chunk_search.rs b/src/api/visual_chunk_search.rs index df44ceb..4e0ff7c 100644 --- a/src/api/visual_chunk_search.rs +++ b/src/api/visual_chunk_search.rs @@ -8,7 +8,7 @@ //! - Object relationships use crate::core::chunk::types::{Chunk, ChunkRule, ChunkType}; -use crate::core::db::PostgresDb; +use crate::core::db::{schema, PostgresDb}; use anyhow::Result; use serde_json::Value; use std::collections::HashMap; @@ -176,9 +176,10 @@ pub async fn search_visual_chunks( /// Get all visual chunks for a video UUID async fn get_visual_chunks_by_uuid(db: &PostgresDb, uuid: &str) -> Result> { + let chunk_table = schema::table_name("chunk"); let sql = format!( - "SELECT file_id, file_uuid, chunk_id, chunk_type, fps, start_frame, end_frame, text_content, content, metadata, vector_id, visual_stats FROM dev.chunk WHERE file_uuid = '{}' AND chunk_type = 'visual' ORDER BY start_frame ASC", - uuid.replace('\'', "''") + "SELECT file_id, file_uuid, chunk_id, chunk_type, fps, start_frame, end_frame, text_content, content, metadata, vector_id, visual_stats FROM {} WHERE file_uuid = '{}' AND chunk_type = 'visual' ORDER BY start_frame ASC", + chunk_table, uuid.replace('\'', "''") ); let rows: Vec<( @@ -373,6 +374,7 @@ pub async fn get_visual_chunk_statistics( db: &PostgresDb, uuid: &str, ) -> Result> { + let chunk_table = schema::table_name("chunk"); let sql = format!( "SELECT COUNT(*) as total_chunks, @@ -381,9 +383,10 @@ pub async fn get_visual_chunk_statistics( MAX((content->'metadata'->>'avg_confidence')::float) as max_confidence, SUM((content->'metadata'->>'object_count')::int) as total_objects, AVG((content->'metadata'->>'spatial_density')::float) as avg_density - FROM dev.chunk + FROM {} WHERE file_uuid = '{}' AND chunk_type = 'visual'", + chunk_table, uuid.replace('\'', "''") ); diff --git a/src/core/chunk/rule3_ingest.rs b/src/core/chunk/rule3_ingest.rs index 8f9c237..093cdd1 100644 --- a/src/core/chunk/rule3_ingest.rs +++ b/src/core/chunk/rule3_ingest.rs @@ -1,4 +1,5 @@ use crate::core::config::OUTPUT_DIR; +use crate::core::db::schema; use crate::core::llm::client::generate_5w1h_summary; use anyhow::{Context, Result}; use serde::Deserialize; @@ -71,13 +72,15 @@ pub async fn ingest_rule3(pool: &PgPool, file_uuid: &str) -> Result { } // Query chunks table for Rule 1 sentence chunks + let chunk_table = schema::table_name("chunk"); let rule1_rows: Vec<(String,)> = sqlx::query_as( - r#" - SELECT chunk_id FROM dev.chunk - WHERE file_uuid = $1 AND chunk_type = 'sentence' - AND start_frame >= $2 - AND end_frame <= $3 - "#, + &format!( + "SELECT chunk_id FROM {} \ + WHERE file_uuid = $1 AND chunk_type = 'sentence' \ + AND start_frame >= $2 \ + AND end_frame <= $3", + chunk_table + ), ) .bind(file_uuid) .bind(scene.start_frame as i64) @@ -97,13 +100,14 @@ pub async fn ingest_rule3(pool: &PgPool, file_uuid: &str) -> Result { } let texts: Vec = sqlx::query_scalar( - r#" - SELECT text_content FROM dev.chunk - WHERE file_uuid = $1 AND chunk_type = 'sentence' - AND start_frame >= $2 - AND end_frame <= $3 - ORDER BY start_frame ASC - "#, + &format!( + "SELECT text_content FROM {} \ + WHERE file_uuid = $1 AND chunk_type = 'sentence' \ + AND start_frame >= $2 \ + AND end_frame <= $3 \ + ORDER BY start_frame ASC", + chunk_table + ), ) .bind(file_uuid) .bind(scene.start_frame as i64) @@ -149,14 +153,14 @@ pub async fn ingest_rule3(pool: &PgPool, file_uuid: &str) -> Result { }); sqlx::query( - r#" - INSERT INTO dev.chunk ( - file_uuid, chunk_id, chunk_type, - start_time, end_time, fps, start_frame, end_frame, - content, text_content, summary_text, metadata, child_chunk_ids - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) - ON CONFLICT (file_uuid, chunk_id) DO NOTHING - "#, + &format!( + "INSERT INTO {} (file_uuid, chunk_id, chunk_type, \ + start_time, end_time, fps, start_frame, end_frame, \ + content, text_content, summary_text, metadata, child_chunk_ids) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) \ + ON CONFLICT (file_uuid, chunk_id) DO NOTHING", + chunk_table + ), ) .bind(file_uuid) .bind(&chunk_id) diff --git a/src/core/db/postgres_db.rs b/src/core/db/postgres_db.rs index e024b57..7d038b9 100644 --- a/src/core/db/postgres_db.rs +++ b/src/core/db/postgres_db.rs @@ -1233,7 +1233,7 @@ impl PostgresDb { let tx = self.pool.begin().await?; let chunk_vectors = schema::table_name("chunk_vectors"); - let chunks = "dev.chunk"; + let chunks = schema::table_name("chunk"); let processor_results = schema::table_name("processor_results"); let videos = schema::table_name("videos"); @@ -1255,7 +1255,8 @@ impl PostgresDb { .execute(&self.pool) .await?; - sqlx::query(&format!("DELETE FROM dev.pre_chunks WHERE file_uuid = $1")) + let pre_chunks = schema::table_name("pre_chunks"); + sqlx::query(&format!("DELETE FROM {} WHERE file_uuid = $1", pre_chunks)) .bind(uuid) .execute(&self.pool) .await?; @@ -1283,7 +1284,7 @@ impl PostgresDb { } pub async fn get_chunk_count(&self, uuid: &str) -> Result<(i64, i64)> { - let chunks = "dev.chunk"; + let chunks = schema::table_name("chunk"); let sentence_count: i64 = sqlx::query_scalar(&format!( "SELECT COUNT(*) FROM {} WHERE file_uuid = $1 AND chunk_type = 'sentence'", chunks @@ -2508,21 +2509,23 @@ impl PostgresDb { limit: i32, offset: i64, ) -> Result> { - let query = r#" - SELECT c.id, c.file_uuid, c.chunk_id, c.chunk_type, - c.start_time, c.end_time, c.text_content, c.content - FROM dev.chunk c - WHERE c.file_uuid IN ( - SELECT DISTINCT fd.file_uuid - FROM face_detections fd - JOIN identities i ON fd.identity_id = i.id - WHERE i.uuid = $1 - ) - ORDER BY c.start_time ASC - LIMIT $2 OFFSET $3 - "#; + let chunk_table = schema::table_name("chunk"); + let query = format!( + "SELECT c.id, c.file_uuid, c.chunk_id, c.chunk_type, \ + c.start_time, c.end_time, c.text_content, c.content \ + FROM {} c \ + WHERE c.file_uuid IN ( \ + SELECT DISTINCT fd.file_uuid \ + FROM face_detections fd \ + JOIN identities i ON fd.identity_id = i.id \ + WHERE i.uuid = $1 \ + ) \ + ORDER BY c.start_time ASC \ + LIMIT $2 OFFSET $3", + chunk_table + ); - let rows = sqlx::query_as(query) + let rows = sqlx::query_as(&query) .bind(identity_id) .bind(limit) .bind(offset) @@ -2552,7 +2555,7 @@ impl PostgresDb { } pub async fn store_chunk(&self, chunk: &Chunk) -> Result<()> { - let table = "dev.chunk"; + let table = schema::table_name("chunk"); let content_with_rule = serde_json::json!({ "rule": chunk.rule.as_str(), "data": chunk.content @@ -2629,7 +2632,7 @@ impl PostgresDb { chunk: &Chunk, tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, ) -> Result<()> { - let table = "dev.chunk"; + let table = schema::table_name("chunk"); let content_with_rule = serde_json::json!({ "rule": chunk.rule.as_str(), "data": chunk.content @@ -2699,7 +2702,7 @@ impl PostgresDb { } pub async fn get_chunks_by_uuid(&self, uuid: &str) -> Result> { - let table = "dev.chunk"; + let table = schema::table_name("chunk"); let rows = sqlx::query(&format!( "SELECT COALESCE(file_id, 0) as file_id, file_uuid as uuid, chunk_id, chunk_type, COALESCE(fps, 24.0) as fps, COALESCE(start_frame, 0) as start_frame, COALESCE(end_frame, 0) as end_frame, text_content, content, metadata, vector_id, COALESCE(frame_count, 0) as frame_count, pre_chunk_ids, parent_chunk_id::text as parent_chunk_id, child_chunk_ids, visual_stats FROM {} WHERE file_uuid = $1 ORDER BY id", table @@ -2779,7 +2782,7 @@ impl PostgresDb { chunk_id: &str, uuid: &str, ) -> Result> { - let table = "dev.chunk"; + let table = schema::table_name("chunk"); let row = sqlx::query(&format!( "SELECT COALESCE(file_id, 0) as file_id, file_uuid, chunk_id, chunk_type, COALESCE(fps, 24.0) as fps, COALESCE(start_frame, 0) as start_frame, COALESCE(end_frame, 0) as end_frame, text_content, content, metadata, vector_id, COALESCE(frame_count, 0) as frame_count, pre_chunk_ids, parent_chunk_id, child_chunk_ids, visual_stats FROM {} WHERE chunk_id = $1 AND file_uuid = $2", table @@ -3006,7 +3009,7 @@ impl PostgresDb { start_time: f64, end_time: f64, ) -> Result> { - let table = "dev.chunk"; + let table = schema::table_name("chunk"); let rows = sqlx::query(&format!( "SELECT file_id, uuid, chunk_id, chunk_type, start_time, end_time, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, pre_chunk_ids, parent_chunk_id::text as parent_chunk_id, child_chunk_ids FROM {} @@ -3091,7 +3094,7 @@ impl PostgresDb { return Ok(vec![]); } - let table = "dev.chunk"; + let table = schema::table_name("chunk"); let rows = sqlx::query(&format!( "SELECT file_id, uuid, chunk_id, chunk_type, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, pre_chunk_ids, parent_chunk_id::text as parent_chunk_id, child_chunk_ids FROM {} WHERE chunk_id = ANY($1) ORDER BY id", table @@ -3200,7 +3203,7 @@ impl PostgresDb { } pub async fn update_vector_id(&self, chunk_id: &str, vector_id: &str) -> Result<()> { - let table = "dev.chunk"; + let table = schema::table_name("chunk"); sqlx::query(&format!( "UPDATE {} SET vector_id = $1 WHERE chunk_id = $2", table @@ -3222,7 +3225,7 @@ impl PostgresDb { } pub async fn search_text(&self, query: &str, chunk_type: Option<&str>) -> Result> { - let table = "dev.chunk"; + let table = schema::table_name("chunk"); let query_pattern = format!("%{}%", query); let sql = match chunk_type { @@ -3327,7 +3330,7 @@ impl PostgresDb { uuid: Option<&str>, limit: usize, ) -> Result> { - let table = "dev.chunk"; + let table = schema::table_name("chunk"); let tsquery = self.prepare_tsquery(query).await?; let sql = match uuid { @@ -4447,7 +4450,7 @@ impl PostgresDb { total_frames: u64, ) -> Result<()> { let table = schema::table_name("videos"); - let chunks_table = "dev.chunk"; + let chunks_table = schema::table_name("chunk"); let pre_chunks_table = schema::table_name("pre_chunks"); // Query chunks count and frames @@ -4623,18 +4626,20 @@ impl PostgresDb { let vector_json = serde_json::to_string(query_vector) .map_err(|e| anyhow::anyhow!("Vector serialize error: {}", e))?; + let chunk_table = schema::table_name("chunk"); let results = sqlx::query_as::<_, SemanticSearchResult>( - r#" - SELECT - id as scene_order, start_time, end_time, - COALESCE(summary_text, text_content, '') as summary, - metadata, - (1 - (embedding <=> $1::vector)) as similarity - FROM dev.chunk - WHERE file_uuid = $2 AND chunk_type = 'cut' AND embedding IS NOT NULL - ORDER BY embedding <=> $1::vector - LIMIT $3 - "#, + &format!( + "SELECT \ + id as scene_order, start_time, end_time, \ + COALESCE(summary_text, text_content, '') as summary, \ + metadata, \ + (1 - (embedding <=> $1::vector)) as similarity \ + FROM {} \ + WHERE file_uuid = $2 AND chunk_type = 'cut' AND embedding IS NOT NULL \ + ORDER BY embedding <=> $1::vector \ + LIMIT $3", + chunk_table + ), ) .bind(&vector_json) .bind(uuid) diff --git a/src/core/ingestion.rs b/src/core/ingestion.rs index cc88073..18c6674 100644 --- a/src/core/ingestion.rs +++ b/src/core/ingestion.rs @@ -4,7 +4,7 @@ use sqlx; use std::path::Path; use tracing::{info, warn}; -use crate::core::db::{PostgresDb, VideoRecord, VideoStatus}; +use crate::core::db::{schema, PostgresDb, VideoRecord, VideoStatus}; use crate::core::probe; use crate::core::storage::uuid as uuid_utils; use crate::core::storage::FileManager; @@ -37,8 +37,9 @@ impl IngestionService { // 1. Look for existing Birthday (Identity Anchor) // If the file (by name) was registered before, use its original birth time. + let videos_table = schema::table_name("videos"); let birthday = sqlx::query_scalar::<_, chrono::DateTime>( - "SELECT registration_time FROM dev.videos WHERE file_name = $1 AND registration_time IS NOT NULL LIMIT 1" + &format!("SELECT registration_time FROM {} WHERE file_name = $1 AND registration_time IS NOT NULL LIMIT 1", videos_table) ) .bind(&filename) .fetch_optional(self.db.pool()) diff --git a/src/core/processor/heuristic_scene.rs b/src/core/processor/heuristic_scene.rs index 9a385da..18732f2 100644 --- a/src/core/processor/heuristic_scene.rs +++ b/src/core/processor/heuristic_scene.rs @@ -4,6 +4,8 @@ use sqlx::PgPool; use std::path::Path; use tracing::info; +use crate::core::db::schema; + /// Heuristic scene metadata derived from YOLO + Face + luminance data. /// Runs as a post-processing trigger, not a standalone processor. /// Replaces the removed Places365 Scene classifier. @@ -110,12 +112,13 @@ pub async fn build_heuristic_scene_meta( } // Get face counts grouped by frame + let fd_table = schema::table_name("face_detections"); let face_rows: Vec<(i64, i64)> = sqlx::query_as( - "SELECT frame_number, COUNT(*) as fc \ - FROM dev.face_detections \ + &format!("SELECT frame_number, COUNT(*) as fc \ + FROM {} \ WHERE file_uuid = $1 AND frame_number IS NOT NULL \ GROUP BY frame_number \ - ORDER BY frame_number", + ORDER BY frame_number", fd_table), ) .bind(file_uuid) .fetch_all(pool) @@ -255,8 +258,9 @@ pub async fn generate_scene_meta(db: &crate::core::db::PostgresDb, file_uuid: &s .collect() } else { // Fallback: query DB for video duration, make one segment + let videos_table = schema::table_name("videos"); let (total_frames, duration): (Option, Option) = sqlx::query_as( - "SELECT total_frames, duration FROM dev.videos WHERE file_uuid = $1", + &format!("SELECT total_frames, duration FROM {} WHERE file_uuid = $1", videos_table), ) .bind(file_uuid) .fetch_optional(pool) diff --git a/src/core/tmdb/face_agent.rs b/src/core/tmdb/face_agent.rs index ffb53c0..cd9a67b 100644 --- a/src/core/tmdb/face_agent.rs +++ b/src/core/tmdb/face_agent.rs @@ -3,7 +3,7 @@ use serde::Deserialize; use std::collections::HashMap; use tracing::{error, info, warn}; -use crate::core::db::PostgresDb; +use crate::core::db::{schema, PostgresDb}; #[derive(Debug, Deserialize)] struct TmdbIdentity { @@ -34,7 +34,7 @@ pub async fn match_faces_against_tmdb(db: &PostgresDb, file_uuid: &str) -> Resul // Step 1: Load TMDb identities with face embeddings let tmdb_rows = sqlx::query_as::<_, (i32, String, Vec)>( - "SELECT id, name, face_embedding::real[] FROM dev.identities WHERE source='tmdb' AND face_embedding IS NOT NULL" + &format!("SELECT id, name, face_embedding::real[] FROM {} WHERE source='tmdb' AND face_embedding IS NOT NULL", schema::table_name("identities")) ) .fetch_all(pool).await?; @@ -45,10 +45,11 @@ pub async fn match_faces_against_tmdb(db: &PostgresDb, file_uuid: &str) -> Resul info!("[TKG-MATCH] {} TMDb seeds loaded", tmdb_rows.len()); // Step 2: Load face_detections grouped by trace_id + let fd_table = schema::table_name("face_detections"); let fd_rows = sqlx::query_as::<_, (i32, Vec)>( - "SELECT trace_id, embedding FROM dev.face_detections \ + &format!("SELECT trace_id, embedding FROM {} \ WHERE file_uuid=$1 AND trace_id IS NOT NULL AND embedding IS NOT NULL \ - ORDER BY trace_id", + ORDER BY trace_id", fd_table), ) .bind(file_uuid) .fetch_all(pool) @@ -152,10 +153,11 @@ pub async fn match_faces_against_tmdb(db: &PostgresDb, file_uuid: &str) -> Resul // Step 4: Quality control // 4a: Remove low-confidence traces (fewer than 4 face detections) + let fd_table = schema::table_name("face_detections"); let mut after_qc = HashMap::new(); for (&tid, &(id, ref name)) in &matched { let cnt: i64 = sqlx::query_scalar( - "SELECT COUNT(*) FROM dev.face_detections WHERE file_uuid=$1 AND trace_id=$2", + &format!("SELECT COUNT(*) FROM {} WHERE file_uuid=$1 AND trace_id=$2", fd_table), ) .bind(file_uuid) .bind(tid) @@ -193,7 +195,7 @@ pub async fn match_faces_against_tmdb(db: &PostgresDb, file_uuid: &str) -> Resul let mut updated = 0usize; for (&tid, &(id, _)) in &matched { let r = sqlx::query( - "UPDATE dev.face_detections SET identity_id=$1 WHERE file_uuid=$2 AND trace_id=$3", + &format!("UPDATE {} SET identity_id=$1 WHERE file_uuid=$2 AND trace_id=$3", fd_table), ) .bind(id) .bind(file_uuid) @@ -219,20 +221,22 @@ pub async fn match_faces_against_tmdb(db: &PostgresDb, file_uuid: &str) -> Resul /// Unbind the lower-confidence trace from the conflicting pair. /// RCA reference: docs_v1.0/API_V1.0.0/INTERNAL/RCA_TRACE39_TRACE45_COLLISION_V1.0.0.md async fn quality_check_temporal_collisions(pool: &sqlx::PgPool, file_uuid: &str) -> Result { + let fd_table = schema::table_name("face_detections"); // Find all collision pairs: same identity, same frame, different trace let collisions = sqlx::query_as::<_, (i32, i32, i32, i32)>( - r#" - SELECT a.identity_id, a.trace_id, b.trace_id, a.frame_number - FROM dev.face_detections a - JOIN dev.face_detections b - ON a.file_uuid = b.file_uuid - AND a.frame_number = b.frame_number - AND a.trace_id < b.trace_id - WHERE a.file_uuid = $1 - AND a.identity_id IS NOT NULL - AND a.identity_id = b.identity_id - ORDER BY a.identity_id, a.frame_number - "#, + &format!( + "SELECT a.identity_id, a.trace_id, b.trace_id, a.frame_number \ + FROM {} a \ + JOIN {} b \ + ON a.file_uuid = b.file_uuid \ + AND a.frame_number = b.frame_number \ + AND a.trace_id < b.trace_id \ + WHERE a.file_uuid = $1 \ + AND a.identity_id IS NOT NULL \ + AND a.identity_id = b.identity_id \ + ORDER BY a.identity_id, a.frame_number", + fd_table, fd_table + ), ) .bind(file_uuid) .fetch_all(pool) @@ -253,13 +257,13 @@ async fn quality_check_temporal_collisions(pool: &sqlx::PgPool, file_uuid: &str) for ((id, ta, tb), overlap_frames) in &collision_groups { // Get face detection count for each trace let cnt_a: i64 = sqlx::query_scalar( - "SELECT COUNT(*) FROM dev.face_detections WHERE file_uuid=$1 AND trace_id=$2 AND identity_id=$3" + &format!("SELECT COUNT(*) FROM {} WHERE file_uuid=$1 AND trace_id=$2 AND identity_id=$3", fd_table) ) .bind(file_uuid).bind(ta).bind(id) .fetch_one(pool).await.unwrap_or(0); let cnt_b: i64 = sqlx::query_scalar( - "SELECT COUNT(*) FROM dev.face_detections WHERE file_uuid=$1 AND trace_id=$2 AND identity_id=$3" + &format!("SELECT COUNT(*) FROM {} WHERE file_uuid=$1 AND trace_id=$2 AND identity_id=$3", fd_table) ) .bind(file_uuid).bind(tb).bind(id) .fetch_one(pool).await.unwrap_or(0); @@ -269,7 +273,7 @@ async fn quality_check_temporal_collisions(pool: &sqlx::PgPool, file_uuid: &str) let victim_cnt = if cnt_a <= cnt_b { cnt_a } else { cnt_b }; sqlx::query( - "UPDATE dev.face_detections SET identity_id=NULL WHERE file_uuid=$1 AND trace_id=$2", + &format!("UPDATE {} SET identity_id=NULL WHERE file_uuid=$1 AND trace_id=$2", fd_table), ) .bind(file_uuid) .bind(victim) diff --git a/src/playground.rs b/src/playground.rs index 89288d0..a0cd194 100644 --- a/src/playground.rs +++ b/src/playground.rs @@ -11,6 +11,7 @@ use momentry_core::core::chunk::types::{Chunk, ChunkRule, ChunkType}; use momentry_core::core::db::Database; use momentry_core::core::time::FrameTime; use momentry_core::ui::progress::{ProcessorType, ProgressState, ProgressUi}; +use momentry_core::core::db::schema; use momentry_core::{ Embedder, OutputDir, PostgresDb, QdrantDb, RedisClient, VectorPayload, VideoRecord, VideoStatus, }; @@ -824,8 +825,11 @@ enum N8nAction { #[tokio::main] async fn main() -> Result<()> { - // Load development environment first - dotenv::from_filename(".env.development").ok(); + // Load development environment — try absolute path first + if dotenv::from_filename("/Users/accusys/momentry_core_0.1/.env.development").is_err() { + // Fallback to relative path (for development) + let _ = dotenv::from_filename(".env.development"); + } tracing_subscriber::fmt::init(); @@ -851,8 +855,9 @@ async fn main() -> Result<()> { .map(|n| n.to_string_lossy().to_string()) .unwrap_or_default(); + let videos_table = schema::table_name("videos"); let birthday = sqlx::query_scalar::<_, chrono::DateTime>( - "SELECT registration_time FROM dev.videos WHERE file_name = $1 AND registration_time IS NOT NULL LIMIT 1" + &format!("SELECT registration_time FROM {} WHERE file_name = $1 AND registration_time IS NOT NULL LIMIT 1", videos_table) ) .bind(&filename) .fetch_optional(db.pool()) @@ -1095,8 +1100,9 @@ async fn main() -> Result<()> { .map(|n| n.to_string_lossy().to_string()) .unwrap_or_default(); + let videos_table = schema::table_name("videos"); let birthday = sqlx::query_scalar::<_, chrono::DateTime>( - "SELECT registration_time FROM dev.videos WHERE file_name = $1 AND registration_time IS NOT NULL LIMIT 1" + &format!("SELECT registration_time FROM {} WHERE file_name = $1 AND registration_time IS NOT NULL LIMIT 1", videos_table) ) .bind(&filename) .fetch_optional(db.pool()) diff --git a/src/worker/job_worker.rs b/src/worker/job_worker.rs index c836589..c97d874 100644 --- a/src/worker/job_worker.rs +++ b/src/worker/job_worker.rs @@ -12,7 +12,7 @@ use crate::core::chunk::{rule1_ingest, rule3_ingest}; use crate::core::config::OUTPUT_DIR; use crate::core::db::qdrant_db::QdrantDb; use crate::core::db::{ - MonitorJobStatus, PostgresDb, ProcessorJobStatus, RedisClient, VectorPayload, VideoStatus, + schema, MonitorJobStatus, PostgresDb, ProcessorJobStatus, RedisClient, VectorPayload, VideoStatus, }; use crate::core::embedding::Embedder; use crate::worker::config::WorkerConfig; @@ -67,13 +67,15 @@ impl JobWorker { self.processor_pool.sweep_stale().await; // Reset stale running jobs: jobs stuck in 'running' with no active processor results + let monitor_jobs_table = schema::table_name("monitor_jobs"); + let processor_results_table = schema::table_name("processor_results"); if let Err(e) = sqlx::query( - "UPDATE dev.monitor_jobs SET status = 'pending', updated_at = NOW() + &format!("UPDATE {} SET status = 'pending', updated_at = NOW() WHERE status = 'running' AND id NOT IN ( - SELECT DISTINCT job_id FROM dev.processor_results + SELECT DISTINCT job_id FROM {} WHERE status IN ('pending', 'running') - )", + )", monitor_jobs_table, processor_results_table), ) .execute(self.db.pool()) .await @@ -1022,8 +1024,9 @@ impl JobWorker { let qdrant = QdrantDb::new(); let pool = db.pool(); + let chunk_table = schema::table_name("chunk"); let rows = sqlx::query_as::<_, (String, String, String, f64, f64, String)>( - "SELECT chunk_id, chunk_type, text_content, start_time, end_time, content::text FROM dev.chunk WHERE file_uuid = $1 AND chunk_type = 'sentence' AND embedding IS NULL AND (text_content IS NOT NULL AND text_content != '') ORDER BY id", + &format!("SELECT chunk_id, chunk_type, text_content, start_time, end_time, content::text FROM {} WHERE file_uuid = $1 AND chunk_type = 'sentence' AND embedding IS NULL AND (text_content IS NOT NULL AND text_content != '') ORDER BY id", chunk_table), ) .bind(uuid) .fetch_all(pool) diff --git a/src/worker/processor.rs b/src/worker/processor.rs index 9127a99..2859aff 100644 --- a/src/worker/processor.rs +++ b/src/worker/processor.rs @@ -1079,8 +1079,9 @@ impl ProcessorPool { "confidence": scene.confidence, "top_5": scene.top_5, }); + let chunk_table = crate::core::db::schema::table_name("chunk"); let _ = sqlx::query( - "UPDATE dev.chunk SET metadata = metadata || $1::jsonb WHERE file_uuid=$2 AND chunk_id=$3" + &format!("UPDATE {} SET metadata = metadata || $1::jsonb WHERE file_uuid=$2 AND chunk_id=$3", chunk_table) ) .bind(&meta) .bind(uuid)