fix: pipeline timeline log, chunk lookup, face processor no fallback, Qdrant UUID script, delete safety rules

This commit is contained in:
Accusys
2026-05-18 00:36:14 +08:00
parent a880c80556
commit 088aefdac7
7 changed files with 503 additions and 29 deletions

View File

@@ -3765,10 +3765,18 @@ struct IngestionStep {
detail: Option<String>,
}
#[derive(Debug, Serialize)]
struct IdentityRef {
uuid: String,
name: String,
}
#[derive(Debug, Serialize)]
struct IngestionStatusResponse {
file_uuid: String,
steps: Vec<IngestionStep>,
related_identities: Vec<IdentityRef>,
strangers: i64,
}
async fn get_ingestion_status(
@@ -3778,6 +3786,7 @@ async fn get_ingestion_status(
let pool = state.db.pool();
let chunk = schema::table_name("chunk");
let fd = schema::table_name("face_detections");
let identities = schema::table_name("identities");
let scene_meta_path = format!("{}/{}.scene_meta.json",
crate::core::config::OUTPUT_DIR.as_str(),
@@ -3796,13 +3805,30 @@ async fn get_ingestion_status(
let sentence_count = count_sql!(&format!("SELECT COUNT(*) FROM {chunk} WHERE file_uuid = '{file_uuid}' AND chunk_type = 'sentence'"));
let sentence_embedded = count_sql!(&format!("SELECT COUNT(*) FROM {chunk} WHERE file_uuid = '{file_uuid}' AND chunk_type = 'sentence' AND embedding IS NOT NULL"));
let scene_count = count_sql!(&format!("SELECT COUNT(*) FROM {chunk} WHERE file_uuid = '{file_uuid}' AND chunk_type = 'cut'"));
let face_total = count_sql!(&format!("SELECT COUNT(*) FROM {fd} WHERE file_uuid = '{file_uuid}'"));
let trace_count = count_sql!(&format!("SELECT COUNT(DISTINCT trace_id) FROM {fd} WHERE file_uuid = '{file_uuid}' AND trace_id IS NOT NULL"));
let trace_chunks = count_sql!(&format!("SELECT COUNT(*) FROM {chunk} WHERE file_uuid = '{file_uuid}' AND chunk_type = 'trace'"));
let identities = count_sql!(&format!("SELECT COUNT(DISTINCT identity_id) FROM {fd} WHERE file_uuid = '{file_uuid}' AND identity_id IS NOT NULL"));
let identity_count = count_sql!(&format!("SELECT COUNT(DISTINCT identity_id) FROM {fd} WHERE file_uuid = '{file_uuid}' AND identity_id IS NOT NULL"));
let tkg_nodes = count_sql!(&format!("SELECT COUNT(*) FROM {} WHERE file_uuid = '{file_uuid}'", schema::table_name("tkg_nodes")));
let tkg_edges = count_sql!(&format!("SELECT COUNT(*) FROM {} WHERE file_uuid = '{file_uuid}'", schema::table_name("tkg_edges")));
let scene_5w1h = count_sql!(&format!("SELECT COUNT(*) FROM {chunk} WHERE file_uuid = '{file_uuid}' AND chunk_type = 'cut' AND summary_text IS NOT NULL AND summary_text != ''"));
let related_identities: Vec<IdentityRef> = sqlx::query_as::<_, (String, String)>(&format!(
"SELECT DISTINCT i.uuid, i.name FROM {identities} i \
JOIN {fd} fd ON fd.identity_id = i.id \
WHERE fd.file_uuid = '{file_uuid}' AND fd.identity_id IS NOT NULL \
ORDER BY i.name"
)).fetch_all(pool).await.unwrap_or_default().into_iter()
.map(|(uuid, name)| {
let uuid = uuid.replace('-', "");
IdentityRef { uuid, name }
}).collect();
let strangers = count_sql!(&format!(
"SELECT COUNT(DISTINCT trace_id) FROM {fd} \
WHERE file_uuid = '{file_uuid}' AND trace_id IS NOT NULL AND identity_id IS NULL"
));
macro_rules! step {
($name:expr, $done:expr, $detail:expr) => {
IngestionStep {
@@ -3817,15 +3843,15 @@ async fn get_ingestion_status(
step!("rule1_sentence", sentence_count > 0, Some(format!("{sentence_count} sentence chunks"))),
step!("auto_vectorize", sentence_embedded > 0, Some(format!("{sentence_embedded} embedded"))),
step!("rule3_scene", scene_count > 0, Some(format!("{scene_count} scene chunks"))),
step!("face_trace", trace_count > 0, Some(format!("{trace_count} traces"))),
step!("face_trace", trace_count > 0, Some(format!("{trace_count} traces / {face_total} detections"))),
step!("trace_chunks", trace_chunks > 0, Some(format!("{trace_chunks} trace chunks"))),
step!("tkg", tkg_nodes > 0 || tkg_edges > 0, Some(format!("{tkg_nodes} nodes, {tkg_edges} edges"))),
step!("identity_match", identities > 0, Some(format!("{identities} identities matched"))),
step!("identity_match", identity_count > 0, Some(format!("{identity_count} identities matched"))),
step!("scene_metadata", scene_meta_ok, None),
step!("5w1h", scene_5w1h > 0, Some(format!("{scene_5w1h} scenes with 5W1H"))),
];
Ok(Json(IngestionStatusResponse { file_uuid, steps }))
Ok(Json(IngestionStatusResponse { file_uuid, steps, related_identities, strangers }))
}
#[derive(Debug, Deserialize)]

View File

@@ -2236,9 +2236,36 @@ impl PostgresDb {
Ok(())
}
pub async fn get_chunk_by_chunk_id_and_uuid(&self, chunk_id: &str, _uuid: &str) -> Result<Option<crate::core::chunk::types::Chunk>> {
// Returns a minimal stub. The full Chunk struct is complex to reconstruct from DB.
Ok(None)
pub async fn get_chunk_by_chunk_id_and_uuid(&self, chunk_id: &str, uuid: &str) -> Result<Option<crate::core::chunk::types::Chunk>> {
let table = schema::table_name("chunk");
let row = sqlx::query_as::<_, (String, f64, f64, f64, String, Option<String>, Option<serde_json::Value>)>(
&format!("SELECT chunk_type, start_time, end_time, fps, content::text, text_content, metadata FROM {} WHERE file_uuid = $1 AND chunk_id = $2 LIMIT 1", table)
)
.bind(uuid).bind(chunk_id)
.fetch_optional(&self.pool).await?;
Ok(row.map(|(ct, st, et, fps, content_str, text_content, metadata)| {
let content: serde_json::Value = serde_json::from_str(&content_str).unwrap_or_default();
let chunk_type = match ct.as_str() {
"time" => crate::core::chunk::types::ChunkType::TimeBased,
"sentence" => crate::core::chunk::types::ChunkType::Sentence,
"cut" => crate::core::chunk::types::ChunkType::Cut,
"trace" => crate::core::chunk::types::ChunkType::Trace,
"story" | "story_parent" | "story_child" => crate::core::chunk::types::ChunkType::Story,
"visual" => crate::core::chunk::types::ChunkType::Visual,
_ => crate::core::chunk::types::ChunkType::Story,
};
let start_frame = (st * fps).round() as i64;
let end_frame = (et * fps).round() as i64;
let mut c = crate::core::chunk::types::Chunk::new(
0, uuid.to_string(), chunk_id.to_string(),
chunk_type, crate::core::chunk::types::ChunkRule::Rule1,
start_frame, end_frame, fps, content,
);
c.text_content = text_content;
c.metadata = metadata;
c
}))
}
pub async fn get_running_jobs_with_all_processors_done(&self, _limit: i32) -> Result<Vec<MonitorJob>> {
@@ -2278,6 +2305,22 @@ impl PostgresDb {
Ok(id)
}
fn write_pipeline_timeline(uuid: &str, processor: &str, status: &str) {
let ts = chrono::Utc::now().to_rfc3339();
let entry = serde_json::json!({
"ts": ts,
"file_uuid": uuid,
"processor": processor,
"status": status,
});
let path = std::path::Path::new(crate::core::config::OUTPUT_DIR.as_str())
.join(format!("pipeline_{}.log", uuid));
if let Ok(mut file) = std::fs::OpenOptions::new().create(true).append(true).open(&path) {
use std::io::Write;
let _ = writeln!(file, "{}", entry);
}
}
pub async fn upsert_processor_result(
&self, job_id: i32, processor_type: crate::core::db::ProcessorType, uuid: &str, status: &str
) -> Result<i32> {
@@ -2291,6 +2334,9 @@ impl PostgresDb {
))
.bind(job_id).bind(ptype).bind(uuid).bind(status)
.fetch_one(&self.pool).await?;
Self::write_pipeline_timeline(uuid, ptype, status);
Ok(id)
}