From 64f29d614bceb9f587d8dadc0e020edc4f71c6e1 Mon Sep 17 00:00:00 2001 From: Accusys Date: Thu, 2 Jul 2026 13:31:38 +0800 Subject: [PATCH] fix: Rule 1/TKG trigger conditions + essential_failed guard MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rule 1 trigger: has_asr_or_asrx → has_asrx (wait for ASRX pre_chunks) - P3/P4 triggers: has_asr_or_asrx → has_asrx (need ASRX data) - Add essential_failed check: job only fails if essential processor fails - P2/P3/P4 triggers: all_completed → has_face && has_asrx - Add publish_pipeline_progress calls at each pipeline stage --- src/worker/job_worker.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/worker/job_worker.rs b/src/worker/job_worker.rs index d5bdb2e..df3d917 100644 --- a/src/worker/job_worker.rs +++ b/src/worker/job_worker.rs @@ -1237,6 +1237,13 @@ impl JobWorker { // 定義必要 processor(必須完成的才算 job 成功) let essential_processors = ["cut", "asr", "asrx", "yolo"]; + let essential_failed = essential_processors.iter().any(|ep| { + results.iter().any(|r| { + r.processor_type.as_str() == *ep + && matches!(r.status, crate::core::db::ProcessorJobStatus::Failed) + }) + }); + let essential_completed = essential_processors.iter().all(|ep| { results.iter().any(|r| { r.processor_type.as_str() == *ep @@ -1436,6 +1443,7 @@ impl JobWorker { let has_asr_or_asrx = completed_processors .iter() .any(|p| p == "asrx" || p == "asr"); + let has_asrx = completed_processors.iter().any(|p| p == "asrx"); let has_cut = completed_processors.iter().any(|p| p == "cut"); let has_face = completed_processors.iter().any(|p| p == "face"); let has_yolo = completed_processors.iter().any(|p| p == "yolo"); @@ -1444,7 +1452,7 @@ impl JobWorker { .update_job_processors_arrays(job_id, completed_processors, failed_processors.clone()) .await?; - if has_asr_or_asrx { + if has_asrx { // Guard: only spawn Rule 1 if sentence chunks don't exist yet let chunk_t = schema::table_name("chunk"); let already_spawned: bool = sqlx::query_scalar::<_, i32>(&format!( @@ -1532,7 +1540,7 @@ impl JobWorker { } } - if all_completed { + if has_face && has_asrx { let mut pp = PipelineProgress::new(uuid); pp.update_stage("processors", 1.0, "completed", None); publish_pipeline_progress(self.redis.as_ref(), uuid, &pp).await; @@ -1723,7 +1731,7 @@ impl JobWorker { } // 🚀 P3 Trigger: Identity Agent (Face + ASRX) - if has_face && has_asr_or_asrx { + if has_face && has_asrx { info!("📝 Prerequisites met for Identity Agent. Starting analysis..."); let db_clone = self.db.clone(); let redis_clone = self.redis.clone(); @@ -1742,7 +1750,7 @@ impl JobWorker { } // 🚀 P4 Trigger: TKG Build (Face + ASRX) → then Rule2 ingestion - if has_face && has_asr_or_asrx { + if has_face && has_asrx { info!("📝 Prerequisites met for TKG Build. Starting graph construction..."); let db_clone = self.db.clone(); let redis_clone = self.redis.clone(); @@ -1845,7 +1853,7 @@ impl JobWorker { job_id, failed_processors.len() ); - } else if any_failed { + } else if essential_failed { self.db .update_job_status(job_id, MonitorJobStatus::Failed) .await?;