From 70646871b96855f49bd48c8c7d3f8f9620d68f3d Mon Sep 17 00:00:00 2001 From: Accusys Date: Mon, 18 May 2026 00:50:33 +0800 Subject: [PATCH] fix: pipeline not complete until ingestion steps done --- src/worker/job_worker.rs | 44 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/src/worker/job_worker.rs b/src/worker/job_worker.rs index 511d5ca..2843488 100644 --- a/src/worker/job_worker.rs +++ b/src/worker/job_worker.rs @@ -19,6 +19,7 @@ use crate::worker::config::WorkerConfig; use crate::worker::processor::{ProcessorPool, ProcessorTask}; use crate::core::processor::heuristic_scene::generate_scene_meta; use crate::worker::resources::SystemResources; +use sqlx::PgPool; pub struct JobWorker { db: Arc, @@ -591,6 +592,39 @@ impl JobWorker { Ok(()) } + /// 檢查所有入庫步驟是否已完成(與 ingestion-status endpoint 同步邏輯) + async fn ingestion_complete(pool: &PgPool, uuid: &str) -> bool { + let chunk_t = schema::table_name("chunk"); + let fd_t = schema::table_name("face_detections"); + + macro_rules! check { + ($sql:expr) => { + sqlx::query_scalar::<_, i64>($sql) + .fetch_one(pool) + .await + .unwrap_or(0) + > 0 + }; + } + + let fu = uuid; + let rule1 = check!(&format!("SELECT 1 FROM {chunk_t} WHERE file_uuid = '{fu}' AND chunk_type = 'sentence' LIMIT 1")); + let vector = check!(&format!("SELECT 1 FROM {chunk_t} WHERE file_uuid = '{fu}' AND chunk_type = 'sentence' AND embedding IS NOT NULL LIMIT 1")); + let rule3 = check!(&format!("SELECT 1 FROM {chunk_t} WHERE file_uuid = '{fu}' AND chunk_type = 'cut' LIMIT 1")); + let trace = check!(&format!("SELECT COUNT(DISTINCT trace_id) FROM {fd_t} WHERE file_uuid = '{fu}' AND trace_id IS NOT NULL")); + let tkg = check!(&format!("SELECT 1 FROM {} WHERE file_uuid = '{fu}' LIMIT 1", schema::table_name("tkg_nodes"))); + let scene_meta = std::path::Path::new(&format!("{}/{fu}.scene_meta.json", crate::core::config::OUTPUT_DIR.as_str())).exists(); + let five_w1h = check!(&format!("SELECT 1 FROM {chunk_t} WHERE file_uuid = '{fu}' AND chunk_type = 'cut' AND summary_text IS NOT NULL AND summary_text != '' LIMIT 1")); + + let all_ok = rule1 && vector && rule3 && trace && tkg && scene_meta && five_w1h; + if !all_ok { + tracing::info!( + "[Ingestion] waiting: rule1={rule1} vector={vector} rule3={rule3} trace={trace} tkg={tkg} scene={scene_meta} 5w1h={five_w1h}" + ); + } + all_ok + } + async fn check_and_complete_job( &self, job_id: i32, @@ -956,6 +990,14 @@ impl JobWorker { }); } + if !Self::ingestion_complete(self.db.pool(), uuid).await { + info!( + "Job {}: all 10 processors done, waiting for ingestion...", + job_id + ); + return Ok(()); + } + self.db .update_job_status(job_id, MonitorJobStatus::Completed) .await?; @@ -978,7 +1020,7 @@ impl JobWorker { self.redis.delete_worker_job(uuid).await?; - info!("Job {} completed successfully", job_id); + info!("Job {} completed successfully (ingestion done)", job_id); } else if essential_completed && !all_completed && !any_pending && !any_skipped { // 必要 processor 完成但部分非必要失敗 → 仍算完成(但無 pending 者才觸發) info!(