fix: file-based source of truth for worker + backup protocol
- Worker: check {uuid}.{processor}.json existence before starting processor
- Worker: timestamp-copy backup existing output files before re-run (no delete, no overwrite)
- Executor: partial output saved as .json.partial (not .json) to avoid false completion
- Start script: removed set-e, log dir changed to momentry/logs, Qdrant collection status fix
- docs: M4 release incident report + M4/M5 collaboration protocol
This commit is contained in:
@@ -1,11 +1,13 @@
|
||||
use anyhow::Result;
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::time::sleep;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use crate::core::chunk::{rule1_ingest, rule3_ingest};
|
||||
use crate::core::config::OUTPUT_DIR;
|
||||
use crate::core::db::qdrant_db::QdrantDb;
|
||||
use crate::api::five_w1h_agent_api::run_5w1h_agent;
|
||||
use crate::api::identity_agent_api::run_identity_agent;
|
||||
@@ -63,6 +65,21 @@ impl JobWorker {
|
||||
let dynamic_max = resources.safe_max_concurrent(self.config.max_concurrent);
|
||||
self.processor_pool.sweep_stale().await;
|
||||
|
||||
// Reset stale running jobs: jobs stuck in 'running' with no active processor results
|
||||
if let Err(e) = sqlx::query(
|
||||
"UPDATE dev.monitor_jobs SET status = 'pending', updated_at = NOW()
|
||||
WHERE status = 'running'
|
||||
AND id NOT IN (
|
||||
SELECT DISTINCT job_id FROM dev.processor_results
|
||||
WHERE status IN ('pending', 'running')
|
||||
)"
|
||||
)
|
||||
.execute(self.db.pool())
|
||||
.await
|
||||
{
|
||||
error!("Failed to reset stale running jobs: {}", e);
|
||||
}
|
||||
|
||||
let health_key = format!("{}health", crate::core::config::REDIS_KEY_PREFIX.as_str());
|
||||
let now = chrono::Utc::now().to_rfc3339();
|
||||
let running_count = self.processor_pool.get_running_count().await;
|
||||
@@ -311,6 +328,59 @@ impl JobWorker {
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Check if output file already exists on disk (source of truth)
|
||||
let output_path =
|
||||
PathBuf::from(OUTPUT_DIR.as_str()).join(format!("{}.{}.json", job.uuid, processor_type.as_str()));
|
||||
if output_path.exists() {
|
||||
info!(
|
||||
"Processor {} output file exists, marking completed and skipping",
|
||||
processor_type.as_str()
|
||||
);
|
||||
self.db
|
||||
.update_processor_progress(
|
||||
&job.uuid,
|
||||
processor_type.as_str(),
|
||||
total_frames,
|
||||
total_frames,
|
||||
"completed",
|
||||
)
|
||||
.await?;
|
||||
let total = total_frames as i32;
|
||||
self.redis
|
||||
.update_worker_processor_status(
|
||||
&job.uuid,
|
||||
processor_type.as_str(),
|
||||
"completed",
|
||||
None,
|
||||
total,
|
||||
total,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
)
|
||||
.await?;
|
||||
started_count += 1;
|
||||
// 覆寫 result_map 讓相依性檢查能正確判斷
|
||||
result_map.insert(*processor_type, crate::core::db::ProcessorResult {
|
||||
id: 0,
|
||||
job_id: job.id,
|
||||
processor_type: *processor_type,
|
||||
status: ProcessorJobStatus::Completed,
|
||||
started_at: None,
|
||||
completed_at: None,
|
||||
duration_secs: None,
|
||||
chunks_produced: 0,
|
||||
frames_processed: total_frames as i32,
|
||||
output_size_bytes: 0,
|
||||
error_message: None,
|
||||
output_data: None,
|
||||
retry_count: 0,
|
||||
created_at: String::new(),
|
||||
updated_at: String::new(),
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if processor already in terminal state
|
||||
if let Some(result) = result_map.get(processor_type) {
|
||||
match result.status {
|
||||
@@ -432,6 +502,35 @@ impl JobWorker {
|
||||
break;
|
||||
}
|
||||
|
||||
// 備份既有輸出檔(copy + timestamp suffix,不刪原檔)
|
||||
let uuid = &job.uuid;
|
||||
let proc = processor_type.as_str();
|
||||
let prefix = format!("{}.{}.", uuid, proc);
|
||||
if let Ok(dir) = std::fs::read_dir(OUTPUT_DIR.as_str()) {
|
||||
let ts = chrono::Utc::now().format("%Y%m%d_%H%M%S");
|
||||
for entry in dir.flatten() {
|
||||
let name = entry.file_name();
|
||||
let name = name.to_string_lossy();
|
||||
if !name.starts_with(&prefix) {
|
||||
continue;
|
||||
}
|
||||
let suffix = &name[prefix.len()..];
|
||||
if suffix.starts_with(|c: char| c.is_ascii_digit()) {
|
||||
continue;
|
||||
}
|
||||
let bak_name = format!("{}{}.{}", prefix, ts, &name[prefix.len()..]);
|
||||
let bak_path = PathBuf::from(OUTPUT_DIR.as_str()).join(&bak_name);
|
||||
if bak_path.exists() {
|
||||
info!("Backup already exists: {}, skipping", bak_path.display());
|
||||
} else {
|
||||
match std::fs::copy(entry.path(), &bak_path) {
|
||||
Ok(bytes) => info!("Backed up {} -> {} ({} bytes)", name, bak_path.display(), bytes),
|
||||
Err(e) => warn!("Failed to backup {}: {}", name, e),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let processor_result_id = self
|
||||
.db
|
||||
.create_processor_result(job.id, *processor_type, &job.uuid)
|
||||
|
||||
Reference in New Issue
Block a user