refactor: remove all dev.* and public.* schema hardcodes from runtime code
14 files updated to use schema::table_name() instead of hardcoded schema prefixes. Only src/bin/release.rs intentionally retains dev.* references.
This commit is contained in:
@@ -757,10 +757,11 @@ pub async fn run_5w1h_agent(db: &PostgresDb, file_uuid: &str) -> anyhow::Result<
|
|||||||
let qdrant = QdrantDb::new();
|
let qdrant = QdrantDb::new();
|
||||||
qdrant.init_collection(768).await?;
|
qdrant.init_collection(768).await?;
|
||||||
|
|
||||||
|
let chunk_table = schema::table_name("chunk");
|
||||||
let rows = sqlx::query_as::<_, (String, String, String, f64, f64)>(
|
let rows = sqlx::query_as::<_, (String, String, String, f64, f64)>(
|
||||||
r#"SELECT chunk_id, chunk_type, text_content, start_time, end_time
|
&format!("SELECT chunk_id, chunk_type, text_content, start_time, end_time \
|
||||||
FROM dev.chunk WHERE file_uuid = $1 AND chunk_type = 'sentence' AND embedding IS NULL
|
FROM {} WHERE file_uuid = $1 AND chunk_type = 'sentence' AND embedding IS NULL \
|
||||||
AND (text_content IS NOT NULL AND text_content != '') ORDER BY id"#,
|
AND (text_content IS NOT NULL AND text_content != '') ORDER BY id", chunk_table),
|
||||||
)
|
)
|
||||||
.bind(file_uuid)
|
.bind(file_uuid)
|
||||||
.fetch_all(db.pool())
|
.fetch_all(db.pool())
|
||||||
@@ -776,7 +777,7 @@ pub async fn run_5w1h_agent(db: &PostgresDb, file_uuid: &str) -> anyhow::Result<
|
|||||||
match embedder.embed_document(text).await {
|
match embedder.embed_document(text).await {
|
||||||
Ok(vector) => {
|
Ok(vector) => {
|
||||||
if let Err(e) = sqlx::query(
|
if let Err(e) = sqlx::query(
|
||||||
"UPDATE dev.chunk SET embedding = $1::vector WHERE chunk_id = $2 AND file_uuid = $3"
|
&format!("UPDATE {} SET embedding = $1::vector WHERE chunk_id = $2 AND file_uuid = $3", chunk_table)
|
||||||
)
|
)
|
||||||
.bind(&vector as &[f32])
|
.bind(&vector as &[f32])
|
||||||
.bind(chunk_id)
|
.bind(chunk_id)
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ use sqlx::Row;
|
|||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
use crate::api::server::AppState;
|
use crate::api::server::AppState;
|
||||||
|
use crate::core::db::schema;
|
||||||
use crate::core::db::PostgresDb;
|
use crate::core::db::PostgresDb;
|
||||||
|
|
||||||
pub fn identity_agent_routes() -> Router<AppState> {
|
pub fn identity_agent_routes() -> Router<AppState> {
|
||||||
@@ -204,7 +205,7 @@ async fn analyze_identity(
|
|||||||
});
|
});
|
||||||
|
|
||||||
let _ = sqlx::query(
|
let _ = sqlx::query(
|
||||||
"INSERT INTO dev.identities (name, identity_type, source, metadata, status) VALUES ($1, 'people', 'auto', $2::jsonb, 'pending') ON CONFLICT DO NOTHING"
|
&format!("INSERT INTO {} (name, identity_type, source, metadata, status) VALUES ($1, 'people', 'auto', $2::jsonb, 'pending') ON CONFLICT DO NOTHING", schema::table_name("identities"))
|
||||||
)
|
)
|
||||||
.bind(&identity_name)
|
.bind(&identity_name)
|
||||||
.bind(&metadata)
|
.bind(&metadata)
|
||||||
@@ -473,21 +474,21 @@ async fn suggest_clustering(
|
|||||||
None => String::new(),
|
None => String::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let fd_table = schema::table_name("face_detections");
|
||||||
|
let identities_table = schema::table_name("identities");
|
||||||
let query = format!(
|
let query = format!(
|
||||||
r#"
|
"SELECT trace_id, file_uuid, COUNT(*) as face_count \
|
||||||
SELECT trace_id, file_uuid, COUNT(*) as face_count
|
FROM {} fd \
|
||||||
FROM dev.face_detections fd
|
WHERE fd.trace_id IS NOT NULL \
|
||||||
WHERE fd.trace_id IS NOT NULL
|
AND NOT EXISTS ( \
|
||||||
AND NOT EXISTS (
|
SELECT 1 FROM {} i \
|
||||||
SELECT 1 FROM dev.identities i
|
WHERE i.metadata->>'trace_id' = fd.trace_id::text \
|
||||||
WHERE i.metadata->>'trace_id' = fd.trace_id::text
|
) \
|
||||||
)
|
{} \
|
||||||
{}
|
GROUP BY trace_id, file_uuid \
|
||||||
GROUP BY trace_id, file_uuid
|
HAVING COUNT(*) >= $1 \
|
||||||
HAVING COUNT(*) >= $1
|
ORDER BY face_count DESC",
|
||||||
ORDER BY face_count DESC
|
fd_table, identities_table, file_filter
|
||||||
"#,
|
|
||||||
file_filter
|
|
||||||
);
|
);
|
||||||
|
|
||||||
let pool = state.db.pool();
|
let pool = state.db.pool();
|
||||||
@@ -660,8 +661,9 @@ fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
|
|||||||
/// Round 2+: 用已匹配 trace 的所有 face 作為 seed,傳播到未匹配 trace
|
/// Round 2+: 用已匹配 trace 的所有 face 作為 seed,傳播到未匹配 trace
|
||||||
async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::Result<usize> {
|
async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::Result<usize> {
|
||||||
// Step 1: 載入 TMDb identities (source='tmdb' 且有 face_embedding)
|
// Step 1: 載入 TMDb identities (source='tmdb' 且有 face_embedding)
|
||||||
|
let identities_table = schema::table_name("identities");
|
||||||
let tmdb_rows = sqlx::query_as::<_, (i32, String, Vec<f32>)>(
|
let tmdb_rows = sqlx::query_as::<_, (i32, String, Vec<f32>)>(
|
||||||
"SELECT id, name, face_embedding::real[] FROM dev.identities WHERE source='tmdb' AND face_embedding IS NOT NULL"
|
&format!("SELECT id, name, face_embedding::real[] FROM {} WHERE source='tmdb' AND face_embedding IS NOT NULL", identities_table)
|
||||||
)
|
)
|
||||||
.fetch_all(pool).await?;
|
.fetch_all(pool).await?;
|
||||||
|
|
||||||
@@ -675,10 +677,11 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::
|
|||||||
);
|
);
|
||||||
|
|
||||||
// Step 2: 載入所有 face_detections,按 trace_id 分組
|
// Step 2: 載入所有 face_detections,按 trace_id 分組
|
||||||
|
let fd_table = schema::table_name("face_detections");
|
||||||
let fd_rows = sqlx::query_as::<_, (i32, Vec<f32>)>(
|
let fd_rows = sqlx::query_as::<_, (i32, Vec<f32>)>(
|
||||||
"SELECT trace_id, embedding FROM dev.face_detections \
|
&format!("SELECT trace_id, embedding FROM {} \
|
||||||
WHERE file_uuid=$1 AND trace_id IS NOT NULL AND embedding IS NOT NULL \
|
WHERE file_uuid=$1 AND trace_id IS NOT NULL AND embedding IS NOT NULL \
|
||||||
ORDER BY trace_id",
|
ORDER BY trace_id", fd_table),
|
||||||
)
|
)
|
||||||
.bind(file_uuid)
|
.bind(file_uuid)
|
||||||
.fetch_all(pool)
|
.fetch_all(pool)
|
||||||
@@ -797,17 +800,19 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Step 5: 寫入 DB
|
// Step 5: 寫入 DB
|
||||||
|
let identities_table = schema::table_name("identities");
|
||||||
|
let fd_table = schema::table_name("face_detections");
|
||||||
let mut updated = 0usize;
|
let mut updated = 0usize;
|
||||||
for (tid, name) in &matched {
|
for (tid, name) in &matched {
|
||||||
let id_opt = sqlx::query_scalar::<_, Option<i32>>(
|
let id_opt = sqlx::query_scalar::<_, Option<i32>>(
|
||||||
"SELECT id FROM dev.identities WHERE name=$1 AND source='tmdb'",
|
&format!("SELECT id FROM {} WHERE name=$1 AND source='tmdb'", identities_table),
|
||||||
)
|
)
|
||||||
.bind(name)
|
.bind(name)
|
||||||
.fetch_optional(pool)
|
.fetch_optional(pool)
|
||||||
.await?;
|
.await?;
|
||||||
if let Some(identity_id) = id_opt {
|
if let Some(identity_id) = id_opt {
|
||||||
let _ = sqlx::query(
|
let _ = sqlx::query(
|
||||||
"UPDATE dev.face_detections SET identity_id=$1 WHERE file_uuid=$2 AND trace_id=$3",
|
&format!("UPDATE {} SET identity_id=$1 WHERE file_uuid=$2 AND trace_id=$3", fd_table),
|
||||||
)
|
)
|
||||||
.bind(identity_id)
|
.bind(identity_id)
|
||||||
.bind(file_uuid)
|
.bind(file_uuid)
|
||||||
@@ -833,10 +838,11 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::
|
|||||||
/// and stores bindings in identity_bindings table.
|
/// and stores bindings in identity_bindings table.
|
||||||
pub async fn bind_speakers(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::Result<usize> {
|
pub async fn bind_speakers(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::Result<usize> {
|
||||||
// Load face traces with identity_id and frame numbers
|
// Load face traces with identity_id and frame numbers
|
||||||
|
let fd_table = schema::table_name("face_detections");
|
||||||
let traces = sqlx::query_as::<_, (i32, Vec<i32>)>(
|
let traces = sqlx::query_as::<_, (i32, Vec<i32>)>(
|
||||||
"SELECT trace_id, array_agg(frame_number ORDER BY frame_number) \
|
&format!("SELECT trace_id, array_agg(frame_number ORDER BY frame_number) \
|
||||||
FROM dev.face_detections WHERE file_uuid=$1 AND trace_id IS NOT NULL AND identity_id IS NOT NULL \
|
FROM {} WHERE file_uuid=$1 AND trace_id IS NOT NULL AND identity_id IS NOT NULL \
|
||||||
GROUP BY trace_id"
|
GROUP BY trace_id", fd_table)
|
||||||
)
|
)
|
||||||
.bind(file_uuid)
|
.bind(file_uuid)
|
||||||
.fetch_all(pool).await?;
|
.fetch_all(pool).await?;
|
||||||
@@ -903,8 +909,9 @@ pub async fn bind_speakers(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::Resu
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get identity_id for this trace
|
// Get identity_id for this trace
|
||||||
|
let fd_table = schema::table_name("face_detections");
|
||||||
let identity_id: Option<i32> = sqlx::query_scalar(
|
let identity_id: Option<i32> = sqlx::query_scalar(
|
||||||
"SELECT identity_id FROM dev.face_detections WHERE file_uuid=$1 AND trace_id=$2 AND identity_id IS NOT NULL LIMIT 1"
|
&format!("SELECT identity_id FROM {} WHERE file_uuid=$1 AND trace_id=$2 AND identity_id IS NOT NULL LIMIT 1", fd_table)
|
||||||
)
|
)
|
||||||
.bind(file_uuid).bind(trace_id)
|
.bind(file_uuid).bind(trace_id)
|
||||||
.fetch_optional(pool).await?.flatten();
|
.fetch_optional(pool).await?.flatten();
|
||||||
@@ -945,10 +952,11 @@ pub async fn bind_speakers(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::Resu
|
|||||||
"overlap_ratio": overlap_ratio,
|
"overlap_ratio": overlap_ratio,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let ib_table = schema::table_name("identity_bindings");
|
||||||
let _ = sqlx::query(
|
let _ = sqlx::query(
|
||||||
"INSERT INTO dev.identity_bindings (identity_id, identity_type, identity_value, confidence, metadata) \
|
&format!("INSERT INTO {} (identity_id, identity_type, identity_value, confidence, metadata) \
|
||||||
VALUES ($1, 'speaker', $2, $3, $4::jsonb) \
|
VALUES ($1, 'speaker', $2, $3, $4::jsonb) \
|
||||||
ON CONFLICT (identity_id, identity_type, identity_value) DO UPDATE SET confidence = EXCLUDED.confidence, metadata = EXCLUDED.metadata"
|
ON CONFLICT (identity_id, identity_type, identity_value) DO UPDATE SET confidence = EXCLUDED.confidence, metadata = EXCLUDED.metadata", ib_table)
|
||||||
)
|
)
|
||||||
.bind(identity_id)
|
.bind(identity_id)
|
||||||
.bind(&best_speaker)
|
.bind(&best_speaker)
|
||||||
@@ -1025,7 +1033,7 @@ pub async fn run_identity_agent(db: &PostgresDb, file_uuid: &str) -> anyhow::Res
|
|||||||
"reasoning": id_result.reasoning,
|
"reasoning": id_result.reasoning,
|
||||||
});
|
});
|
||||||
let _ = sqlx::query(
|
let _ = sqlx::query(
|
||||||
"INSERT INTO dev.identities (name, identity_type, source, metadata, status) VALUES ($1, 'people', 'auto', $2::jsonb, 'pending') ON CONFLICT DO NOTHING"
|
&format!("INSERT INTO {} (name, identity_type, source, metadata, status) VALUES ($1, 'people', 'auto', $2::jsonb, 'pending') ON CONFLICT DO NOTHING", schema::table_name("identities"))
|
||||||
)
|
)
|
||||||
.bind(&identity_name)
|
.bind(&identity_name)
|
||||||
.bind(&metadata)
|
.bind(&metadata)
|
||||||
|
|||||||
@@ -324,9 +324,10 @@ async fn trace_video(
|
|||||||
let pad_frames = (padding * fps) as i32;
|
let pad_frames = (padding * fps) as i32;
|
||||||
|
|
||||||
// Query identity info for this trace
|
// Query identity info for this trace
|
||||||
|
let identities_table = schema::table_name("identities");
|
||||||
let identity_name: String = sqlx::query_scalar(&format!(
|
let identity_name: String = sqlx::query_scalar(&format!(
|
||||||
"SELECT COALESCE(i.name, 'unknown') FROM {} fd LEFT JOIN dev.identities i ON i.id = fd.identity_id WHERE fd.file_uuid = $1 AND fd.trace_id = $2 LIMIT 1",
|
"SELECT COALESCE(i.name, 'unknown') FROM {} fd LEFT JOIN {} i ON i.id = fd.identity_id WHERE fd.file_uuid = $1 AND fd.trace_id = $2 LIMIT 1",
|
||||||
face_table
|
face_table, identities_table
|
||||||
))
|
))
|
||||||
.bind(&file_uuid).bind(trace_id)
|
.bind(&file_uuid).bind(trace_id)
|
||||||
.fetch_optional(state.db.pool()).await
|
.fetch_optional(state.db.pool()).await
|
||||||
@@ -334,8 +335,9 @@ async fn trace_video(
|
|||||||
.unwrap_or_else(|| "unknown".to_string());
|
.unwrap_or_else(|| "unknown".to_string());
|
||||||
|
|
||||||
// Query cut_id for the first frame
|
// Query cut_id for the first frame
|
||||||
|
let cut_table = schema::table_name("cut");
|
||||||
let cut_id: i32 = sqlx::query_scalar(
|
let cut_id: i32 = sqlx::query_scalar(
|
||||||
"SELECT scene_number FROM public.cut WHERE file_uuid = $1 AND start_frame <= $2 AND end_frame >= $2 LIMIT 1"
|
&format!("SELECT scene_number FROM {} WHERE file_uuid = $1 AND start_frame <= $2 AND end_frame >= $2 LIMIT 1", cut_table)
|
||||||
)
|
)
|
||||||
.bind(&file_uuid).bind(first_frame)
|
.bind(&file_uuid).bind(first_frame)
|
||||||
.fetch_optional(state.db.pool()).await
|
.fetch_optional(state.db.pool()).await
|
||||||
|
|||||||
@@ -658,8 +658,9 @@ async fn register_single_file(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let videos_table = schema::table_name("videos");
|
||||||
let birthday = sqlx::query_scalar::<_, chrono::DateTime<chrono::Utc>>(
|
let birthday = sqlx::query_scalar::<_, chrono::DateTime<chrono::Utc>>(
|
||||||
"SELECT registration_time FROM dev.videos WHERE file_name = $1 AND registration_time IS NOT NULL LIMIT 1"
|
&format!("SELECT registration_time FROM {} WHERE file_name = $1 AND registration_time IS NOT NULL LIMIT 1", videos_table)
|
||||||
)
|
)
|
||||||
.bind(&file_name)
|
.bind(&file_name)
|
||||||
.fetch_optional(db.pool())
|
.fetch_optional(db.pool())
|
||||||
@@ -900,8 +901,9 @@ async fn register_single_file(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
let videos_table = schema::table_name("videos");
|
||||||
let _ = sqlx::query(
|
let _ = sqlx::query(
|
||||||
"UPDATE dev.videos SET cut_done = $1, scene_done = $2, audio_tracks = $3, cut_count = $4, cut_max_duration = $5 WHERE file_uuid = $6"
|
&format!("UPDATE {} SET cut_done = $1, scene_done = $2, audio_tracks = $3, cut_count = $4, cut_max_duration = $5 WHERE file_uuid = $6", videos_table)
|
||||||
)
|
)
|
||||||
.bind(cut_done).bind(scene_done).bind(&audio_tracks_json).bind(cut_count).bind(cut_max_duration).bind(&file_uuid)
|
.bind(cut_done).bind(scene_done).bind(&audio_tracks_json).bind(cut_count).bind(cut_max_duration).bind(&file_uuid)
|
||||||
.execute(db.pool()).await;
|
.execute(db.pool()).await;
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ use axum::{
|
|||||||
};
|
};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use crate::core::db::{Database, PostgresDb};
|
use crate::core::db::{schema, Database, PostgresDb};
|
||||||
|
|
||||||
#[derive(Debug, Deserialize)]
|
#[derive(Debug, Deserialize)]
|
||||||
pub struct UniversalSearchRequest {
|
pub struct UniversalSearchRequest {
|
||||||
@@ -326,9 +326,10 @@ async fn search_chunks(
|
|||||||
None => return Err(anyhow::anyhow!("uuid is required for chunk search")),
|
None => return Err(anyhow::anyhow!("uuid is required for chunk search")),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let chunk_table = schema::table_name("chunk");
|
||||||
let mut sql = format!(
|
let mut sql = format!(
|
||||||
"SELECT chunk_id, chunk_type, start_time, end_time, start_frame, end_frame, text_content, content FROM dev.chunk WHERE file_uuid = '{}'",
|
"SELECT chunk_id, chunk_type, start_time, end_time, start_frame, end_frame, text_content, content FROM {} WHERE file_uuid = '{}'",
|
||||||
uuid
|
chunk_table, uuid
|
||||||
);
|
);
|
||||||
if let Some(tr) = &req.time_range {
|
if let Some(tr) = &req.time_range {
|
||||||
sql.push_str(&format!(
|
sql.push_str(&format!(
|
||||||
|
|||||||
@@ -8,7 +8,7 @@
|
|||||||
//! - Object relationships
|
//! - Object relationships
|
||||||
|
|
||||||
use crate::core::chunk::types::{Chunk, ChunkRule, ChunkType};
|
use crate::core::chunk::types::{Chunk, ChunkRule, ChunkType};
|
||||||
use crate::core::db::PostgresDb;
|
use crate::core::db::{schema, PostgresDb};
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
@@ -176,9 +176,10 @@ pub async fn search_visual_chunks(
|
|||||||
|
|
||||||
/// Get all visual chunks for a video UUID
|
/// Get all visual chunks for a video UUID
|
||||||
async fn get_visual_chunks_by_uuid(db: &PostgresDb, uuid: &str) -> Result<Vec<Chunk>> {
|
async fn get_visual_chunks_by_uuid(db: &PostgresDb, uuid: &str) -> Result<Vec<Chunk>> {
|
||||||
|
let chunk_table = schema::table_name("chunk");
|
||||||
let sql = format!(
|
let sql = format!(
|
||||||
"SELECT file_id, file_uuid, chunk_id, chunk_type, fps, start_frame, end_frame, text_content, content, metadata, vector_id, visual_stats FROM dev.chunk WHERE file_uuid = '{}' AND chunk_type = 'visual' ORDER BY start_frame ASC",
|
"SELECT file_id, file_uuid, chunk_id, chunk_type, fps, start_frame, end_frame, text_content, content, metadata, vector_id, visual_stats FROM {} WHERE file_uuid = '{}' AND chunk_type = 'visual' ORDER BY start_frame ASC",
|
||||||
uuid.replace('\'', "''")
|
chunk_table, uuid.replace('\'', "''")
|
||||||
);
|
);
|
||||||
|
|
||||||
let rows: Vec<(
|
let rows: Vec<(
|
||||||
@@ -373,6 +374,7 @@ pub async fn get_visual_chunk_statistics(
|
|||||||
db: &PostgresDb,
|
db: &PostgresDb,
|
||||||
uuid: &str,
|
uuid: &str,
|
||||||
) -> Result<HashMap<String, Value>> {
|
) -> Result<HashMap<String, Value>> {
|
||||||
|
let chunk_table = schema::table_name("chunk");
|
||||||
let sql = format!(
|
let sql = format!(
|
||||||
"SELECT
|
"SELECT
|
||||||
COUNT(*) as total_chunks,
|
COUNT(*) as total_chunks,
|
||||||
@@ -381,9 +383,10 @@ pub async fn get_visual_chunk_statistics(
|
|||||||
MAX((content->'metadata'->>'avg_confidence')::float) as max_confidence,
|
MAX((content->'metadata'->>'avg_confidence')::float) as max_confidence,
|
||||||
SUM((content->'metadata'->>'object_count')::int) as total_objects,
|
SUM((content->'metadata'->>'object_count')::int) as total_objects,
|
||||||
AVG((content->'metadata'->>'spatial_density')::float) as avg_density
|
AVG((content->'metadata'->>'spatial_density')::float) as avg_density
|
||||||
FROM dev.chunk
|
FROM {}
|
||||||
WHERE file_uuid = '{}'
|
WHERE file_uuid = '{}'
|
||||||
AND chunk_type = 'visual'",
|
AND chunk_type = 'visual'",
|
||||||
|
chunk_table,
|
||||||
uuid.replace('\'', "''")
|
uuid.replace('\'', "''")
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
use crate::core::config::OUTPUT_DIR;
|
use crate::core::config::OUTPUT_DIR;
|
||||||
|
use crate::core::db::schema;
|
||||||
use crate::core::llm::client::generate_5w1h_summary;
|
use crate::core::llm::client::generate_5w1h_summary;
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
@@ -71,13 +72,15 @@ pub async fn ingest_rule3(pool: &PgPool, file_uuid: &str) -> Result<usize> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Query chunks table for Rule 1 sentence chunks
|
// Query chunks table for Rule 1 sentence chunks
|
||||||
|
let chunk_table = schema::table_name("chunk");
|
||||||
let rule1_rows: Vec<(String,)> = sqlx::query_as(
|
let rule1_rows: Vec<(String,)> = sqlx::query_as(
|
||||||
r#"
|
&format!(
|
||||||
SELECT chunk_id FROM dev.chunk
|
"SELECT chunk_id FROM {} \
|
||||||
WHERE file_uuid = $1 AND chunk_type = 'sentence'
|
WHERE file_uuid = $1 AND chunk_type = 'sentence' \
|
||||||
AND start_frame >= $2
|
AND start_frame >= $2 \
|
||||||
AND end_frame <= $3
|
AND end_frame <= $3",
|
||||||
"#,
|
chunk_table
|
||||||
|
),
|
||||||
)
|
)
|
||||||
.bind(file_uuid)
|
.bind(file_uuid)
|
||||||
.bind(scene.start_frame as i64)
|
.bind(scene.start_frame as i64)
|
||||||
@@ -97,13 +100,14 @@ pub async fn ingest_rule3(pool: &PgPool, file_uuid: &str) -> Result<usize> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let texts: Vec<String> = sqlx::query_scalar(
|
let texts: Vec<String> = sqlx::query_scalar(
|
||||||
r#"
|
&format!(
|
||||||
SELECT text_content FROM dev.chunk
|
"SELECT text_content FROM {} \
|
||||||
WHERE file_uuid = $1 AND chunk_type = 'sentence'
|
WHERE file_uuid = $1 AND chunk_type = 'sentence' \
|
||||||
AND start_frame >= $2
|
AND start_frame >= $2 \
|
||||||
AND end_frame <= $3
|
AND end_frame <= $3 \
|
||||||
ORDER BY start_frame ASC
|
ORDER BY start_frame ASC",
|
||||||
"#,
|
chunk_table
|
||||||
|
),
|
||||||
)
|
)
|
||||||
.bind(file_uuid)
|
.bind(file_uuid)
|
||||||
.bind(scene.start_frame as i64)
|
.bind(scene.start_frame as i64)
|
||||||
@@ -149,14 +153,14 @@ pub async fn ingest_rule3(pool: &PgPool, file_uuid: &str) -> Result<usize> {
|
|||||||
});
|
});
|
||||||
|
|
||||||
sqlx::query(
|
sqlx::query(
|
||||||
r#"
|
&format!(
|
||||||
INSERT INTO dev.chunk (
|
"INSERT INTO {} (file_uuid, chunk_id, chunk_type, \
|
||||||
file_uuid, chunk_id, chunk_type,
|
start_time, end_time, fps, start_frame, end_frame, \
|
||||||
start_time, end_time, fps, start_frame, end_frame,
|
content, text_content, summary_text, metadata, child_chunk_ids) \
|
||||||
content, text_content, summary_text, metadata, child_chunk_ids
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) \
|
||||||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
|
ON CONFLICT (file_uuid, chunk_id) DO NOTHING",
|
||||||
ON CONFLICT (file_uuid, chunk_id) DO NOTHING
|
chunk_table
|
||||||
"#,
|
),
|
||||||
)
|
)
|
||||||
.bind(file_uuid)
|
.bind(file_uuid)
|
||||||
.bind(&chunk_id)
|
.bind(&chunk_id)
|
||||||
|
|||||||
@@ -1233,7 +1233,7 @@ impl PostgresDb {
|
|||||||
let tx = self.pool.begin().await?;
|
let tx = self.pool.begin().await?;
|
||||||
|
|
||||||
let chunk_vectors = schema::table_name("chunk_vectors");
|
let chunk_vectors = schema::table_name("chunk_vectors");
|
||||||
let chunks = "dev.chunk";
|
let chunks = schema::table_name("chunk");
|
||||||
let processor_results = schema::table_name("processor_results");
|
let processor_results = schema::table_name("processor_results");
|
||||||
let videos = schema::table_name("videos");
|
let videos = schema::table_name("videos");
|
||||||
|
|
||||||
@@ -1255,7 +1255,8 @@ impl PostgresDb {
|
|||||||
.execute(&self.pool)
|
.execute(&self.pool)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
sqlx::query(&format!("DELETE FROM dev.pre_chunks WHERE file_uuid = $1"))
|
let pre_chunks = schema::table_name("pre_chunks");
|
||||||
|
sqlx::query(&format!("DELETE FROM {} WHERE file_uuid = $1", pre_chunks))
|
||||||
.bind(uuid)
|
.bind(uuid)
|
||||||
.execute(&self.pool)
|
.execute(&self.pool)
|
||||||
.await?;
|
.await?;
|
||||||
@@ -1283,7 +1284,7 @@ impl PostgresDb {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_chunk_count(&self, uuid: &str) -> Result<(i64, i64)> {
|
pub async fn get_chunk_count(&self, uuid: &str) -> Result<(i64, i64)> {
|
||||||
let chunks = "dev.chunk";
|
let chunks = schema::table_name("chunk");
|
||||||
let sentence_count: i64 = sqlx::query_scalar(&format!(
|
let sentence_count: i64 = sqlx::query_scalar(&format!(
|
||||||
"SELECT COUNT(*) FROM {} WHERE file_uuid = $1 AND chunk_type = 'sentence'",
|
"SELECT COUNT(*) FROM {} WHERE file_uuid = $1 AND chunk_type = 'sentence'",
|
||||||
chunks
|
chunks
|
||||||
@@ -2508,21 +2509,23 @@ impl PostgresDb {
|
|||||||
limit: i32,
|
limit: i32,
|
||||||
offset: i64,
|
offset: i64,
|
||||||
) -> Result<Vec<IdentityChunkRecord>> {
|
) -> Result<Vec<IdentityChunkRecord>> {
|
||||||
let query = r#"
|
let chunk_table = schema::table_name("chunk");
|
||||||
SELECT c.id, c.file_uuid, c.chunk_id, c.chunk_type,
|
let query = format!(
|
||||||
c.start_time, c.end_time, c.text_content, c.content
|
"SELECT c.id, c.file_uuid, c.chunk_id, c.chunk_type, \
|
||||||
FROM dev.chunk c
|
c.start_time, c.end_time, c.text_content, c.content \
|
||||||
WHERE c.file_uuid IN (
|
FROM {} c \
|
||||||
SELECT DISTINCT fd.file_uuid
|
WHERE c.file_uuid IN ( \
|
||||||
FROM face_detections fd
|
SELECT DISTINCT fd.file_uuid \
|
||||||
JOIN identities i ON fd.identity_id = i.id
|
FROM face_detections fd \
|
||||||
WHERE i.uuid = $1
|
JOIN identities i ON fd.identity_id = i.id \
|
||||||
)
|
WHERE i.uuid = $1 \
|
||||||
ORDER BY c.start_time ASC
|
) \
|
||||||
LIMIT $2 OFFSET $3
|
ORDER BY c.start_time ASC \
|
||||||
"#;
|
LIMIT $2 OFFSET $3",
|
||||||
|
chunk_table
|
||||||
|
);
|
||||||
|
|
||||||
let rows = sqlx::query_as(query)
|
let rows = sqlx::query_as(&query)
|
||||||
.bind(identity_id)
|
.bind(identity_id)
|
||||||
.bind(limit)
|
.bind(limit)
|
||||||
.bind(offset)
|
.bind(offset)
|
||||||
@@ -2552,7 +2555,7 @@ impl PostgresDb {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn store_chunk(&self, chunk: &Chunk) -> Result<()> {
|
pub async fn store_chunk(&self, chunk: &Chunk) -> Result<()> {
|
||||||
let table = "dev.chunk";
|
let table = schema::table_name("chunk");
|
||||||
let content_with_rule = serde_json::json!({
|
let content_with_rule = serde_json::json!({
|
||||||
"rule": chunk.rule.as_str(),
|
"rule": chunk.rule.as_str(),
|
||||||
"data": chunk.content
|
"data": chunk.content
|
||||||
@@ -2629,7 +2632,7 @@ impl PostgresDb {
|
|||||||
chunk: &Chunk,
|
chunk: &Chunk,
|
||||||
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
|
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let table = "dev.chunk";
|
let table = schema::table_name("chunk");
|
||||||
let content_with_rule = serde_json::json!({
|
let content_with_rule = serde_json::json!({
|
||||||
"rule": chunk.rule.as_str(),
|
"rule": chunk.rule.as_str(),
|
||||||
"data": chunk.content
|
"data": chunk.content
|
||||||
@@ -2699,7 +2702,7 @@ impl PostgresDb {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_chunks_by_uuid(&self, uuid: &str) -> Result<Vec<Chunk>> {
|
pub async fn get_chunks_by_uuid(&self, uuid: &str) -> Result<Vec<Chunk>> {
|
||||||
let table = "dev.chunk";
|
let table = schema::table_name("chunk");
|
||||||
let rows = sqlx::query(&format!(
|
let rows = sqlx::query(&format!(
|
||||||
"SELECT COALESCE(file_id, 0) as file_id, file_uuid as uuid, chunk_id, chunk_type, COALESCE(fps, 24.0) as fps, COALESCE(start_frame, 0) as start_frame, COALESCE(end_frame, 0) as end_frame, text_content, content, metadata, vector_id, COALESCE(frame_count, 0) as frame_count, pre_chunk_ids, parent_chunk_id::text as parent_chunk_id, child_chunk_ids, visual_stats FROM {} WHERE file_uuid = $1 ORDER BY id",
|
"SELECT COALESCE(file_id, 0) as file_id, file_uuid as uuid, chunk_id, chunk_type, COALESCE(fps, 24.0) as fps, COALESCE(start_frame, 0) as start_frame, COALESCE(end_frame, 0) as end_frame, text_content, content, metadata, vector_id, COALESCE(frame_count, 0) as frame_count, pre_chunk_ids, parent_chunk_id::text as parent_chunk_id, child_chunk_ids, visual_stats FROM {} WHERE file_uuid = $1 ORDER BY id",
|
||||||
table
|
table
|
||||||
@@ -2779,7 +2782,7 @@ impl PostgresDb {
|
|||||||
chunk_id: &str,
|
chunk_id: &str,
|
||||||
uuid: &str,
|
uuid: &str,
|
||||||
) -> Result<Option<Chunk>> {
|
) -> Result<Option<Chunk>> {
|
||||||
let table = "dev.chunk";
|
let table = schema::table_name("chunk");
|
||||||
let row = sqlx::query(&format!(
|
let row = sqlx::query(&format!(
|
||||||
"SELECT COALESCE(file_id, 0) as file_id, file_uuid, chunk_id, chunk_type, COALESCE(fps, 24.0) as fps, COALESCE(start_frame, 0) as start_frame, COALESCE(end_frame, 0) as end_frame, text_content, content, metadata, vector_id, COALESCE(frame_count, 0) as frame_count, pre_chunk_ids, parent_chunk_id, child_chunk_ids, visual_stats FROM {} WHERE chunk_id = $1 AND file_uuid = $2",
|
"SELECT COALESCE(file_id, 0) as file_id, file_uuid, chunk_id, chunk_type, COALESCE(fps, 24.0) as fps, COALESCE(start_frame, 0) as start_frame, COALESCE(end_frame, 0) as end_frame, text_content, content, metadata, vector_id, COALESCE(frame_count, 0) as frame_count, pre_chunk_ids, parent_chunk_id, child_chunk_ids, visual_stats FROM {} WHERE chunk_id = $1 AND file_uuid = $2",
|
||||||
table
|
table
|
||||||
@@ -3006,7 +3009,7 @@ impl PostgresDb {
|
|||||||
start_time: f64,
|
start_time: f64,
|
||||||
end_time: f64,
|
end_time: f64,
|
||||||
) -> Result<Vec<Chunk>> {
|
) -> Result<Vec<Chunk>> {
|
||||||
let table = "dev.chunk";
|
let table = schema::table_name("chunk");
|
||||||
let rows = sqlx::query(&format!(
|
let rows = sqlx::query(&format!(
|
||||||
"SELECT file_id, uuid, chunk_id, chunk_type, start_time, end_time, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, pre_chunk_ids, parent_chunk_id::text as parent_chunk_id, child_chunk_ids
|
"SELECT file_id, uuid, chunk_id, chunk_type, start_time, end_time, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, pre_chunk_ids, parent_chunk_id::text as parent_chunk_id, child_chunk_ids
|
||||||
FROM {}
|
FROM {}
|
||||||
@@ -3091,7 +3094,7 @@ impl PostgresDb {
|
|||||||
return Ok(vec![]);
|
return Ok(vec![]);
|
||||||
}
|
}
|
||||||
|
|
||||||
let table = "dev.chunk";
|
let table = schema::table_name("chunk");
|
||||||
let rows = sqlx::query(&format!(
|
let rows = sqlx::query(&format!(
|
||||||
"SELECT file_id, uuid, chunk_id, chunk_type, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, pre_chunk_ids, parent_chunk_id::text as parent_chunk_id, child_chunk_ids FROM {} WHERE chunk_id = ANY($1) ORDER BY id",
|
"SELECT file_id, uuid, chunk_id, chunk_type, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, pre_chunk_ids, parent_chunk_id::text as parent_chunk_id, child_chunk_ids FROM {} WHERE chunk_id = ANY($1) ORDER BY id",
|
||||||
table
|
table
|
||||||
@@ -3200,7 +3203,7 @@ impl PostgresDb {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn update_vector_id(&self, chunk_id: &str, vector_id: &str) -> Result<()> {
|
pub async fn update_vector_id(&self, chunk_id: &str, vector_id: &str) -> Result<()> {
|
||||||
let table = "dev.chunk";
|
let table = schema::table_name("chunk");
|
||||||
sqlx::query(&format!(
|
sqlx::query(&format!(
|
||||||
"UPDATE {} SET vector_id = $1 WHERE chunk_id = $2",
|
"UPDATE {} SET vector_id = $1 WHERE chunk_id = $2",
|
||||||
table
|
table
|
||||||
@@ -3222,7 +3225,7 @@ impl PostgresDb {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn search_text(&self, query: &str, chunk_type: Option<&str>) -> Result<Vec<Chunk>> {
|
pub async fn search_text(&self, query: &str, chunk_type: Option<&str>) -> Result<Vec<Chunk>> {
|
||||||
let table = "dev.chunk";
|
let table = schema::table_name("chunk");
|
||||||
let query_pattern = format!("%{}%", query);
|
let query_pattern = format!("%{}%", query);
|
||||||
|
|
||||||
let sql = match chunk_type {
|
let sql = match chunk_type {
|
||||||
@@ -3327,7 +3330,7 @@ impl PostgresDb {
|
|||||||
uuid: Option<&str>,
|
uuid: Option<&str>,
|
||||||
limit: usize,
|
limit: usize,
|
||||||
) -> Result<Vec<Bm25Result>> {
|
) -> Result<Vec<Bm25Result>> {
|
||||||
let table = "dev.chunk";
|
let table = schema::table_name("chunk");
|
||||||
let tsquery = self.prepare_tsquery(query).await?;
|
let tsquery = self.prepare_tsquery(query).await?;
|
||||||
|
|
||||||
let sql = match uuid {
|
let sql = match uuid {
|
||||||
@@ -4447,7 +4450,7 @@ impl PostgresDb {
|
|||||||
total_frames: u64,
|
total_frames: u64,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let table = schema::table_name("videos");
|
let table = schema::table_name("videos");
|
||||||
let chunks_table = "dev.chunk";
|
let chunks_table = schema::table_name("chunk");
|
||||||
let pre_chunks_table = schema::table_name("pre_chunks");
|
let pre_chunks_table = schema::table_name("pre_chunks");
|
||||||
|
|
||||||
// Query chunks count and frames
|
// Query chunks count and frames
|
||||||
@@ -4623,18 +4626,20 @@ impl PostgresDb {
|
|||||||
let vector_json = serde_json::to_string(query_vector)
|
let vector_json = serde_json::to_string(query_vector)
|
||||||
.map_err(|e| anyhow::anyhow!("Vector serialize error: {}", e))?;
|
.map_err(|e| anyhow::anyhow!("Vector serialize error: {}", e))?;
|
||||||
|
|
||||||
|
let chunk_table = schema::table_name("chunk");
|
||||||
let results = sqlx::query_as::<_, SemanticSearchResult>(
|
let results = sqlx::query_as::<_, SemanticSearchResult>(
|
||||||
r#"
|
&format!(
|
||||||
SELECT
|
"SELECT \
|
||||||
id as scene_order, start_time, end_time,
|
id as scene_order, start_time, end_time, \
|
||||||
COALESCE(summary_text, text_content, '') as summary,
|
COALESCE(summary_text, text_content, '') as summary, \
|
||||||
metadata,
|
metadata, \
|
||||||
(1 - (embedding <=> $1::vector)) as similarity
|
(1 - (embedding <=> $1::vector)) as similarity \
|
||||||
FROM dev.chunk
|
FROM {} \
|
||||||
WHERE file_uuid = $2 AND chunk_type = 'cut' AND embedding IS NOT NULL
|
WHERE file_uuid = $2 AND chunk_type = 'cut' AND embedding IS NOT NULL \
|
||||||
ORDER BY embedding <=> $1::vector
|
ORDER BY embedding <=> $1::vector \
|
||||||
LIMIT $3
|
LIMIT $3",
|
||||||
"#,
|
chunk_table
|
||||||
|
),
|
||||||
)
|
)
|
||||||
.bind(&vector_json)
|
.bind(&vector_json)
|
||||||
.bind(uuid)
|
.bind(uuid)
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ use sqlx;
|
|||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use tracing::{info, warn};
|
use tracing::{info, warn};
|
||||||
|
|
||||||
use crate::core::db::{PostgresDb, VideoRecord, VideoStatus};
|
use crate::core::db::{schema, PostgresDb, VideoRecord, VideoStatus};
|
||||||
use crate::core::probe;
|
use crate::core::probe;
|
||||||
use crate::core::storage::uuid as uuid_utils;
|
use crate::core::storage::uuid as uuid_utils;
|
||||||
use crate::core::storage::FileManager;
|
use crate::core::storage::FileManager;
|
||||||
@@ -37,8 +37,9 @@ impl IngestionService {
|
|||||||
|
|
||||||
// 1. Look for existing Birthday (Identity Anchor)
|
// 1. Look for existing Birthday (Identity Anchor)
|
||||||
// If the file (by name) was registered before, use its original birth time.
|
// If the file (by name) was registered before, use its original birth time.
|
||||||
|
let videos_table = schema::table_name("videos");
|
||||||
let birthday = sqlx::query_scalar::<_, chrono::DateTime<chrono::Utc>>(
|
let birthday = sqlx::query_scalar::<_, chrono::DateTime<chrono::Utc>>(
|
||||||
"SELECT registration_time FROM dev.videos WHERE file_name = $1 AND registration_time IS NOT NULL LIMIT 1"
|
&format!("SELECT registration_time FROM {} WHERE file_name = $1 AND registration_time IS NOT NULL LIMIT 1", videos_table)
|
||||||
)
|
)
|
||||||
.bind(&filename)
|
.bind(&filename)
|
||||||
.fetch_optional(self.db.pool())
|
.fetch_optional(self.db.pool())
|
||||||
|
|||||||
@@ -4,6 +4,8 @@ use sqlx::PgPool;
|
|||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
|
||||||
|
use crate::core::db::schema;
|
||||||
|
|
||||||
/// Heuristic scene metadata derived from YOLO + Face + luminance data.
|
/// Heuristic scene metadata derived from YOLO + Face + luminance data.
|
||||||
/// Runs as a post-processing trigger, not a standalone processor.
|
/// Runs as a post-processing trigger, not a standalone processor.
|
||||||
/// Replaces the removed Places365 Scene classifier.
|
/// Replaces the removed Places365 Scene classifier.
|
||||||
@@ -110,12 +112,13 @@ pub async fn build_heuristic_scene_meta(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get face counts grouped by frame
|
// Get face counts grouped by frame
|
||||||
|
let fd_table = schema::table_name("face_detections");
|
||||||
let face_rows: Vec<(i64, i64)> = sqlx::query_as(
|
let face_rows: Vec<(i64, i64)> = sqlx::query_as(
|
||||||
"SELECT frame_number, COUNT(*) as fc \
|
&format!("SELECT frame_number, COUNT(*) as fc \
|
||||||
FROM dev.face_detections \
|
FROM {} \
|
||||||
WHERE file_uuid = $1 AND frame_number IS NOT NULL \
|
WHERE file_uuid = $1 AND frame_number IS NOT NULL \
|
||||||
GROUP BY frame_number \
|
GROUP BY frame_number \
|
||||||
ORDER BY frame_number",
|
ORDER BY frame_number", fd_table),
|
||||||
)
|
)
|
||||||
.bind(file_uuid)
|
.bind(file_uuid)
|
||||||
.fetch_all(pool)
|
.fetch_all(pool)
|
||||||
@@ -255,8 +258,9 @@ pub async fn generate_scene_meta(db: &crate::core::db::PostgresDb, file_uuid: &s
|
|||||||
.collect()
|
.collect()
|
||||||
} else {
|
} else {
|
||||||
// Fallback: query DB for video duration, make one segment
|
// Fallback: query DB for video duration, make one segment
|
||||||
|
let videos_table = schema::table_name("videos");
|
||||||
let (total_frames, duration): (Option<i64>, Option<f64>) = sqlx::query_as(
|
let (total_frames, duration): (Option<i64>, Option<f64>) = sqlx::query_as(
|
||||||
"SELECT total_frames, duration FROM dev.videos WHERE file_uuid = $1",
|
&format!("SELECT total_frames, duration FROM {} WHERE file_uuid = $1", videos_table),
|
||||||
)
|
)
|
||||||
.bind(file_uuid)
|
.bind(file_uuid)
|
||||||
.fetch_optional(pool)
|
.fetch_optional(pool)
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ use serde::Deserialize;
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use tracing::{error, info, warn};
|
use tracing::{error, info, warn};
|
||||||
|
|
||||||
use crate::core::db::PostgresDb;
|
use crate::core::db::{schema, PostgresDb};
|
||||||
|
|
||||||
#[derive(Debug, Deserialize)]
|
#[derive(Debug, Deserialize)]
|
||||||
struct TmdbIdentity {
|
struct TmdbIdentity {
|
||||||
@@ -34,7 +34,7 @@ pub async fn match_faces_against_tmdb(db: &PostgresDb, file_uuid: &str) -> Resul
|
|||||||
|
|
||||||
// Step 1: Load TMDb identities with face embeddings
|
// Step 1: Load TMDb identities with face embeddings
|
||||||
let tmdb_rows = sqlx::query_as::<_, (i32, String, Vec<f32>)>(
|
let tmdb_rows = sqlx::query_as::<_, (i32, String, Vec<f32>)>(
|
||||||
"SELECT id, name, face_embedding::real[] FROM dev.identities WHERE source='tmdb' AND face_embedding IS NOT NULL"
|
&format!("SELECT id, name, face_embedding::real[] FROM {} WHERE source='tmdb' AND face_embedding IS NOT NULL", schema::table_name("identities"))
|
||||||
)
|
)
|
||||||
.fetch_all(pool).await?;
|
.fetch_all(pool).await?;
|
||||||
|
|
||||||
@@ -45,10 +45,11 @@ pub async fn match_faces_against_tmdb(db: &PostgresDb, file_uuid: &str) -> Resul
|
|||||||
info!("[TKG-MATCH] {} TMDb seeds loaded", tmdb_rows.len());
|
info!("[TKG-MATCH] {} TMDb seeds loaded", tmdb_rows.len());
|
||||||
|
|
||||||
// Step 2: Load face_detections grouped by trace_id
|
// Step 2: Load face_detections grouped by trace_id
|
||||||
|
let fd_table = schema::table_name("face_detections");
|
||||||
let fd_rows = sqlx::query_as::<_, (i32, Vec<f32>)>(
|
let fd_rows = sqlx::query_as::<_, (i32, Vec<f32>)>(
|
||||||
"SELECT trace_id, embedding FROM dev.face_detections \
|
&format!("SELECT trace_id, embedding FROM {} \
|
||||||
WHERE file_uuid=$1 AND trace_id IS NOT NULL AND embedding IS NOT NULL \
|
WHERE file_uuid=$1 AND trace_id IS NOT NULL AND embedding IS NOT NULL \
|
||||||
ORDER BY trace_id",
|
ORDER BY trace_id", fd_table),
|
||||||
)
|
)
|
||||||
.bind(file_uuid)
|
.bind(file_uuid)
|
||||||
.fetch_all(pool)
|
.fetch_all(pool)
|
||||||
@@ -152,10 +153,11 @@ pub async fn match_faces_against_tmdb(db: &PostgresDb, file_uuid: &str) -> Resul
|
|||||||
|
|
||||||
// Step 4: Quality control
|
// Step 4: Quality control
|
||||||
// 4a: Remove low-confidence traces (fewer than 4 face detections)
|
// 4a: Remove low-confidence traces (fewer than 4 face detections)
|
||||||
|
let fd_table = schema::table_name("face_detections");
|
||||||
let mut after_qc = HashMap::new();
|
let mut after_qc = HashMap::new();
|
||||||
for (&tid, &(id, ref name)) in &matched {
|
for (&tid, &(id, ref name)) in &matched {
|
||||||
let cnt: i64 = sqlx::query_scalar(
|
let cnt: i64 = sqlx::query_scalar(
|
||||||
"SELECT COUNT(*) FROM dev.face_detections WHERE file_uuid=$1 AND trace_id=$2",
|
&format!("SELECT COUNT(*) FROM {} WHERE file_uuid=$1 AND trace_id=$2", fd_table),
|
||||||
)
|
)
|
||||||
.bind(file_uuid)
|
.bind(file_uuid)
|
||||||
.bind(tid)
|
.bind(tid)
|
||||||
@@ -193,7 +195,7 @@ pub async fn match_faces_against_tmdb(db: &PostgresDb, file_uuid: &str) -> Resul
|
|||||||
let mut updated = 0usize;
|
let mut updated = 0usize;
|
||||||
for (&tid, &(id, _)) in &matched {
|
for (&tid, &(id, _)) in &matched {
|
||||||
let r = sqlx::query(
|
let r = sqlx::query(
|
||||||
"UPDATE dev.face_detections SET identity_id=$1 WHERE file_uuid=$2 AND trace_id=$3",
|
&format!("UPDATE {} SET identity_id=$1 WHERE file_uuid=$2 AND trace_id=$3", fd_table),
|
||||||
)
|
)
|
||||||
.bind(id)
|
.bind(id)
|
||||||
.bind(file_uuid)
|
.bind(file_uuid)
|
||||||
@@ -219,20 +221,22 @@ pub async fn match_faces_against_tmdb(db: &PostgresDb, file_uuid: &str) -> Resul
|
|||||||
/// Unbind the lower-confidence trace from the conflicting pair.
|
/// Unbind the lower-confidence trace from the conflicting pair.
|
||||||
/// RCA reference: docs_v1.0/API_V1.0.0/INTERNAL/RCA_TRACE39_TRACE45_COLLISION_V1.0.0.md
|
/// RCA reference: docs_v1.0/API_V1.0.0/INTERNAL/RCA_TRACE39_TRACE45_COLLISION_V1.0.0.md
|
||||||
async fn quality_check_temporal_collisions(pool: &sqlx::PgPool, file_uuid: &str) -> Result<usize> {
|
async fn quality_check_temporal_collisions(pool: &sqlx::PgPool, file_uuid: &str) -> Result<usize> {
|
||||||
|
let fd_table = schema::table_name("face_detections");
|
||||||
// Find all collision pairs: same identity, same frame, different trace
|
// Find all collision pairs: same identity, same frame, different trace
|
||||||
let collisions = sqlx::query_as::<_, (i32, i32, i32, i32)>(
|
let collisions = sqlx::query_as::<_, (i32, i32, i32, i32)>(
|
||||||
r#"
|
&format!(
|
||||||
SELECT a.identity_id, a.trace_id, b.trace_id, a.frame_number
|
"SELECT a.identity_id, a.trace_id, b.trace_id, a.frame_number \
|
||||||
FROM dev.face_detections a
|
FROM {} a \
|
||||||
JOIN dev.face_detections b
|
JOIN {} b \
|
||||||
ON a.file_uuid = b.file_uuid
|
ON a.file_uuid = b.file_uuid \
|
||||||
AND a.frame_number = b.frame_number
|
AND a.frame_number = b.frame_number \
|
||||||
AND a.trace_id < b.trace_id
|
AND a.trace_id < b.trace_id \
|
||||||
WHERE a.file_uuid = $1
|
WHERE a.file_uuid = $1 \
|
||||||
AND a.identity_id IS NOT NULL
|
AND a.identity_id IS NOT NULL \
|
||||||
AND a.identity_id = b.identity_id
|
AND a.identity_id = b.identity_id \
|
||||||
ORDER BY a.identity_id, a.frame_number
|
ORDER BY a.identity_id, a.frame_number",
|
||||||
"#,
|
fd_table, fd_table
|
||||||
|
),
|
||||||
)
|
)
|
||||||
.bind(file_uuid)
|
.bind(file_uuid)
|
||||||
.fetch_all(pool)
|
.fetch_all(pool)
|
||||||
@@ -253,13 +257,13 @@ async fn quality_check_temporal_collisions(pool: &sqlx::PgPool, file_uuid: &str)
|
|||||||
for ((id, ta, tb), overlap_frames) in &collision_groups {
|
for ((id, ta, tb), overlap_frames) in &collision_groups {
|
||||||
// Get face detection count for each trace
|
// Get face detection count for each trace
|
||||||
let cnt_a: i64 = sqlx::query_scalar(
|
let cnt_a: i64 = sqlx::query_scalar(
|
||||||
"SELECT COUNT(*) FROM dev.face_detections WHERE file_uuid=$1 AND trace_id=$2 AND identity_id=$3"
|
&format!("SELECT COUNT(*) FROM {} WHERE file_uuid=$1 AND trace_id=$2 AND identity_id=$3", fd_table)
|
||||||
)
|
)
|
||||||
.bind(file_uuid).bind(ta).bind(id)
|
.bind(file_uuid).bind(ta).bind(id)
|
||||||
.fetch_one(pool).await.unwrap_or(0);
|
.fetch_one(pool).await.unwrap_or(0);
|
||||||
|
|
||||||
let cnt_b: i64 = sqlx::query_scalar(
|
let cnt_b: i64 = sqlx::query_scalar(
|
||||||
"SELECT COUNT(*) FROM dev.face_detections WHERE file_uuid=$1 AND trace_id=$2 AND identity_id=$3"
|
&format!("SELECT COUNT(*) FROM {} WHERE file_uuid=$1 AND trace_id=$2 AND identity_id=$3", fd_table)
|
||||||
)
|
)
|
||||||
.bind(file_uuid).bind(tb).bind(id)
|
.bind(file_uuid).bind(tb).bind(id)
|
||||||
.fetch_one(pool).await.unwrap_or(0);
|
.fetch_one(pool).await.unwrap_or(0);
|
||||||
@@ -269,7 +273,7 @@ async fn quality_check_temporal_collisions(pool: &sqlx::PgPool, file_uuid: &str)
|
|||||||
let victim_cnt = if cnt_a <= cnt_b { cnt_a } else { cnt_b };
|
let victim_cnt = if cnt_a <= cnt_b { cnt_a } else { cnt_b };
|
||||||
|
|
||||||
sqlx::query(
|
sqlx::query(
|
||||||
"UPDATE dev.face_detections SET identity_id=NULL WHERE file_uuid=$1 AND trace_id=$2",
|
&format!("UPDATE {} SET identity_id=NULL WHERE file_uuid=$1 AND trace_id=$2", fd_table),
|
||||||
)
|
)
|
||||||
.bind(file_uuid)
|
.bind(file_uuid)
|
||||||
.bind(victim)
|
.bind(victim)
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ use momentry_core::core::chunk::types::{Chunk, ChunkRule, ChunkType};
|
|||||||
use momentry_core::core::db::Database;
|
use momentry_core::core::db::Database;
|
||||||
use momentry_core::core::time::FrameTime;
|
use momentry_core::core::time::FrameTime;
|
||||||
use momentry_core::ui::progress::{ProcessorType, ProgressState, ProgressUi};
|
use momentry_core::ui::progress::{ProcessorType, ProgressState, ProgressUi};
|
||||||
|
use momentry_core::core::db::schema;
|
||||||
use momentry_core::{
|
use momentry_core::{
|
||||||
Embedder, OutputDir, PostgresDb, QdrantDb, RedisClient, VectorPayload, VideoRecord, VideoStatus,
|
Embedder, OutputDir, PostgresDb, QdrantDb, RedisClient, VectorPayload, VideoRecord, VideoStatus,
|
||||||
};
|
};
|
||||||
@@ -824,8 +825,11 @@ enum N8nAction {
|
|||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<()> {
|
async fn main() -> Result<()> {
|
||||||
// Load development environment first
|
// Load development environment — try absolute path first
|
||||||
dotenv::from_filename(".env.development").ok();
|
if dotenv::from_filename("/Users/accusys/momentry_core_0.1/.env.development").is_err() {
|
||||||
|
// Fallback to relative path (for development)
|
||||||
|
let _ = dotenv::from_filename(".env.development");
|
||||||
|
}
|
||||||
|
|
||||||
tracing_subscriber::fmt::init();
|
tracing_subscriber::fmt::init();
|
||||||
|
|
||||||
@@ -851,8 +855,9 @@ async fn main() -> Result<()> {
|
|||||||
.map(|n| n.to_string_lossy().to_string())
|
.map(|n| n.to_string_lossy().to_string())
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
let videos_table = schema::table_name("videos");
|
||||||
let birthday = sqlx::query_scalar::<_, chrono::DateTime<chrono::Utc>>(
|
let birthday = sqlx::query_scalar::<_, chrono::DateTime<chrono::Utc>>(
|
||||||
"SELECT registration_time FROM dev.videos WHERE file_name = $1 AND registration_time IS NOT NULL LIMIT 1"
|
&format!("SELECT registration_time FROM {} WHERE file_name = $1 AND registration_time IS NOT NULL LIMIT 1", videos_table)
|
||||||
)
|
)
|
||||||
.bind(&filename)
|
.bind(&filename)
|
||||||
.fetch_optional(db.pool())
|
.fetch_optional(db.pool())
|
||||||
@@ -1095,8 +1100,9 @@ async fn main() -> Result<()> {
|
|||||||
.map(|n| n.to_string_lossy().to_string())
|
.map(|n| n.to_string_lossy().to_string())
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
let videos_table = schema::table_name("videos");
|
||||||
let birthday = sqlx::query_scalar::<_, chrono::DateTime<chrono::Utc>>(
|
let birthday = sqlx::query_scalar::<_, chrono::DateTime<chrono::Utc>>(
|
||||||
"SELECT registration_time FROM dev.videos WHERE file_name = $1 AND registration_time IS NOT NULL LIMIT 1"
|
&format!("SELECT registration_time FROM {} WHERE file_name = $1 AND registration_time IS NOT NULL LIMIT 1", videos_table)
|
||||||
)
|
)
|
||||||
.bind(&filename)
|
.bind(&filename)
|
||||||
.fetch_optional(db.pool())
|
.fetch_optional(db.pool())
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ use crate::core::chunk::{rule1_ingest, rule3_ingest};
|
|||||||
use crate::core::config::OUTPUT_DIR;
|
use crate::core::config::OUTPUT_DIR;
|
||||||
use crate::core::db::qdrant_db::QdrantDb;
|
use crate::core::db::qdrant_db::QdrantDb;
|
||||||
use crate::core::db::{
|
use crate::core::db::{
|
||||||
MonitorJobStatus, PostgresDb, ProcessorJobStatus, RedisClient, VectorPayload, VideoStatus,
|
schema, MonitorJobStatus, PostgresDb, ProcessorJobStatus, RedisClient, VectorPayload, VideoStatus,
|
||||||
};
|
};
|
||||||
use crate::core::embedding::Embedder;
|
use crate::core::embedding::Embedder;
|
||||||
use crate::worker::config::WorkerConfig;
|
use crate::worker::config::WorkerConfig;
|
||||||
@@ -67,13 +67,15 @@ impl JobWorker {
|
|||||||
self.processor_pool.sweep_stale().await;
|
self.processor_pool.sweep_stale().await;
|
||||||
|
|
||||||
// Reset stale running jobs: jobs stuck in 'running' with no active processor results
|
// Reset stale running jobs: jobs stuck in 'running' with no active processor results
|
||||||
|
let monitor_jobs_table = schema::table_name("monitor_jobs");
|
||||||
|
let processor_results_table = schema::table_name("processor_results");
|
||||||
if let Err(e) = sqlx::query(
|
if let Err(e) = sqlx::query(
|
||||||
"UPDATE dev.monitor_jobs SET status = 'pending', updated_at = NOW()
|
&format!("UPDATE {} SET status = 'pending', updated_at = NOW()
|
||||||
WHERE status = 'running'
|
WHERE status = 'running'
|
||||||
AND id NOT IN (
|
AND id NOT IN (
|
||||||
SELECT DISTINCT job_id FROM dev.processor_results
|
SELECT DISTINCT job_id FROM {}
|
||||||
WHERE status IN ('pending', 'running')
|
WHERE status IN ('pending', 'running')
|
||||||
)",
|
)", monitor_jobs_table, processor_results_table),
|
||||||
)
|
)
|
||||||
.execute(self.db.pool())
|
.execute(self.db.pool())
|
||||||
.await
|
.await
|
||||||
@@ -1022,8 +1024,9 @@ impl JobWorker {
|
|||||||
let qdrant = QdrantDb::new();
|
let qdrant = QdrantDb::new();
|
||||||
let pool = db.pool();
|
let pool = db.pool();
|
||||||
|
|
||||||
|
let chunk_table = schema::table_name("chunk");
|
||||||
let rows = sqlx::query_as::<_, (String, String, String, f64, f64, String)>(
|
let rows = sqlx::query_as::<_, (String, String, String, f64, f64, String)>(
|
||||||
"SELECT chunk_id, chunk_type, text_content, start_time, end_time, content::text FROM dev.chunk WHERE file_uuid = $1 AND chunk_type = 'sentence' AND embedding IS NULL AND (text_content IS NOT NULL AND text_content != '') ORDER BY id",
|
&format!("SELECT chunk_id, chunk_type, text_content, start_time, end_time, content::text FROM {} WHERE file_uuid = $1 AND chunk_type = 'sentence' AND embedding IS NULL AND (text_content IS NOT NULL AND text_content != '') ORDER BY id", chunk_table),
|
||||||
)
|
)
|
||||||
.bind(uuid)
|
.bind(uuid)
|
||||||
.fetch_all(pool)
|
.fetch_all(pool)
|
||||||
|
|||||||
@@ -1079,8 +1079,9 @@ impl ProcessorPool {
|
|||||||
"confidence": scene.confidence,
|
"confidence": scene.confidence,
|
||||||
"top_5": scene.top_5,
|
"top_5": scene.top_5,
|
||||||
});
|
});
|
||||||
|
let chunk_table = crate::core::db::schema::table_name("chunk");
|
||||||
let _ = sqlx::query(
|
let _ = sqlx::query(
|
||||||
"UPDATE dev.chunk SET metadata = metadata || $1::jsonb WHERE file_uuid=$2 AND chunk_id=$3"
|
&format!("UPDATE {} SET metadata = metadata || $1::jsonb WHERE file_uuid=$2 AND chunk_id=$3", chunk_table)
|
||||||
)
|
)
|
||||||
.bind(&meta)
|
.bind(&meta)
|
||||||
.bind(uuid)
|
.bind(uuid)
|
||||||
|
|||||||
Reference in New Issue
Block a user