From 96e13e40cb7b2f3819e0afd9bddf91b98ea451ef Mon Sep 17 00:00:00 2001 From: Accusys Date: Sat, 4 Jul 2026 22:09:38 +0800 Subject: [PATCH] fix: all_completed now checks ALL expected processors have results Bug: all_completed only checked existing results, not missing processors. If a processor (like pose) never created a result row, all_completed would still return true and mark the job as completed. Fix: all_completed now checks that every processor in job_processors has a corresponding completed result. Added logging for missing processors. Also fixed: - any_pending now checks all expected processors, not just existing results - Added missing_processors detection and logging --- src/worker/job_worker.rs | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/src/worker/job_worker.rs b/src/worker/job_worker.rs index 4b5cb34..2c04c05 100644 --- a/src/worker/job_worker.rs +++ b/src/worker/job_worker.rs @@ -1304,20 +1304,36 @@ impl JobWorker { }) }); - let all_completed = results - .iter() - .filter(|r| job_processors.contains(&r.processor_type.as_str().to_string())) - .all(|r| matches!(r.status, crate::core::db::ProcessorJobStatus::Completed)); + let all_completed = job_processors.iter().all(|p| { + results.iter().any(|r| { + r.processor_type.as_str() == p + && matches!(r.status, crate::core::db::ProcessorJobStatus::Completed) + }) + }); let any_failed = results .iter() .filter(|r| job_processors.contains(&r.processor_type.as_str().to_string())) .any(|r| matches!(r.status, crate::core::db::ProcessorJobStatus::Failed)); - let any_pending = results - .iter() - .filter(|r| job_processors.contains(&r.processor_type.as_str().to_string())) - .any(|r| matches!(r.status, crate::core::db::ProcessorJobStatus::Pending)); + let any_pending = job_processors.iter().any(|p| { + results.iter().any(|r| { + r.processor_type.as_str() == p + && matches!(r.status, crate::core::db::ProcessorJobStatus::Pending) + }) + }); + + // Check for missing processors (in job_processors but not in results) + let missing_processors: Vec = job_processors.iter().filter(|p| { + !results.iter().any(|r| r.processor_type.as_str() == *p) + }).cloned().collect(); + + if !missing_processors.is_empty() { + info!( + "check_and_complete_job: {} missing processor results: {:?}", + uuid, missing_processors + ); + } const MAX_RETRIES: i32 = 3;