fix: pipeline not complete until ingestion steps done

This commit is contained in:
Accusys
2026-05-18 00:50:33 +08:00
parent 01bebb645a
commit 70646871b9

View File

@@ -19,6 +19,7 @@ use crate::worker::config::WorkerConfig;
use crate::worker::processor::{ProcessorPool, ProcessorTask};
use crate::core::processor::heuristic_scene::generate_scene_meta;
use crate::worker::resources::SystemResources;
use sqlx::PgPool;
pub struct JobWorker {
db: Arc<PostgresDb>,
@@ -591,6 +592,39 @@ impl JobWorker {
Ok(())
}
/// 檢查所有入庫步驟是否已完成(與 ingestion-status endpoint 同步邏輯)
async fn ingestion_complete(pool: &PgPool, uuid: &str) -> bool {
let chunk_t = schema::table_name("chunk");
let fd_t = schema::table_name("face_detections");
macro_rules! check {
($sql:expr) => {
sqlx::query_scalar::<_, i64>($sql)
.fetch_one(pool)
.await
.unwrap_or(0)
> 0
};
}
let fu = uuid;
let rule1 = check!(&format!("SELECT 1 FROM {chunk_t} WHERE file_uuid = '{fu}' AND chunk_type = 'sentence' LIMIT 1"));
let vector = check!(&format!("SELECT 1 FROM {chunk_t} WHERE file_uuid = '{fu}' AND chunk_type = 'sentence' AND embedding IS NOT NULL LIMIT 1"));
let rule3 = check!(&format!("SELECT 1 FROM {chunk_t} WHERE file_uuid = '{fu}' AND chunk_type = 'cut' LIMIT 1"));
let trace = check!(&format!("SELECT COUNT(DISTINCT trace_id) FROM {fd_t} WHERE file_uuid = '{fu}' AND trace_id IS NOT NULL"));
let tkg = check!(&format!("SELECT 1 FROM {} WHERE file_uuid = '{fu}' LIMIT 1", schema::table_name("tkg_nodes")));
let scene_meta = std::path::Path::new(&format!("{}/{fu}.scene_meta.json", crate::core::config::OUTPUT_DIR.as_str())).exists();
let five_w1h = check!(&format!("SELECT 1 FROM {chunk_t} WHERE file_uuid = '{fu}' AND chunk_type = 'cut' AND summary_text IS NOT NULL AND summary_text != '' LIMIT 1"));
let all_ok = rule1 && vector && rule3 && trace && tkg && scene_meta && five_w1h;
if !all_ok {
tracing::info!(
"[Ingestion] waiting: rule1={rule1} vector={vector} rule3={rule3} trace={trace} tkg={tkg} scene={scene_meta} 5w1h={five_w1h}"
);
}
all_ok
}
async fn check_and_complete_job(
&self,
job_id: i32,
@@ -956,6 +990,14 @@ impl JobWorker {
});
}
if !Self::ingestion_complete(self.db.pool(), uuid).await {
info!(
"Job {}: all 10 processors done, waiting for ingestion...",
job_id
);
return Ok(());
}
self.db
.update_job_status(job_id, MonitorJobStatus::Completed)
.await?;
@@ -978,7 +1020,7 @@ impl JobWorker {
self.redis.delete_worker_job(uuid).await?;
info!("Job {} completed successfully", job_id);
info!("Job {} completed successfully (ingestion done)", job_id);
} else if essential_completed && !all_completed && !any_pending && !any_skipped {
// 必要 processor 完成但部分非必要失敗 → 仍算完成(但無 pending 者才觸發)
info!(