From 7237a1811ecd055f3c17385d15f4bca21d1b0d60 Mon Sep 17 00:00:00 2001 From: Accusys Date: Sat, 9 May 2026 13:30:00 +0800 Subject: [PATCH] feat: verification agent for processor output validation - New src/verification/ module: verify_output() checks JSON structure/completeness per processor type - Worker: after processor succeeds, verification agent gates the result - Passed -> mark completed + cleanup_temp_files (remove .tmp/.partial/.err/timestamp backups) - Failed -> mark failed with verification details, preserve files for inspection - cleanup_temp_files() keeps only the canonical {uuid}.{proc}.json --- docs/PROCESSOR_MECHANISMS_REVIEW.md | 134 +++++++++++++++++++++++++ src/lib.rs | 2 + src/verification/mod.rs | 3 + src/verification/verifier.rs | 148 ++++++++++++++++++++++++++++ src/worker/processor.rs | 111 +++++++++++++++------ 5 files changed, 368 insertions(+), 30 deletions(-) create mode 100644 docs/PROCESSOR_MECHANISMS_REVIEW.md create mode 100644 src/verification/mod.rs create mode 100644 src/verification/verifier.rs diff --git a/docs/PROCESSOR_MECHANISMS_REVIEW.md b/docs/PROCESSOR_MECHANISMS_REVIEW.md new file mode 100644 index 0000000..af4d7c0 --- /dev/null +++ b/docs/PROCESSOR_MECHANISMS_REVIEW.md @@ -0,0 +1,134 @@ +# Processor 產出機制檢討 + +## 三層機制定義 + +### 1. 中斷接續(Interruption Resume) +Process 被殺掉後,重啟時能接續進度。 +**現狀**: 大部分 processor 有 `.tmp` → `.partial` 保護,但重跑時從頭開始。 + +### 2. 補充機制(Supplement) +完成度不足時,只補沒做完的部分,不重跑整個。 +**現狀**: 全部從頭跑,無補充。 + +### 3. 糾錯機制(Error Correction) +輸出檔損毀時能自動偵測並修復。 +**現狀**: file-existence check 只檢查檔案存在,不檢查內容是否有效。 + +--- + +## Processor 逐一檢討 + +### ASR +| 面向 | 現狀 | 問題 | +|------|------|------| +| 中斷接續 | ✅ `.tmp` → `.partial`(executor) | ✅ OK | +| 補充機制 | ❌ 每次從頭跑 | 若跑到 50% 被殺,下次從 0% 開始 | +| 糾錯機制 | ❌ 不驗證內容 | file-existence check 看到 `.json` 存在就跳過,不管內容 | +| Pipe | ✅ executor.run() | ✅ | +| Timeout | ✅ 已移除(None) | ✅ | + +**改善方案**: +- 補充:ASR 重跑時掃描 existing `.json` 或 `.partial`,找出最後 segment 的 `end_time`,傳入 `--resume-from` 給 Python script +- 糾錯:file-existence check 對 `.json` 做 `serde_json::from_str` 驗證,無效 → 視為不存在 + +### ASRX +| 面向 | 現狀 | 問題 | +|------|------|------| +| 中斷接續 | ❌ **不用 executor**,直接寫 `.json` | 被殺掉時留下壞檔 | +| 補充機制 | ❌ 同 ASR | 依賴 ASR,ASR 不完整 ASRX 也不能跑 | +| 糾錯機制 | ❌ 不驗證內容 | 同上 | +| Pipe | ❌ **raw Command**,沒有 `.tmp` 保護 | 緊急 | +| Timeout | ⚠️ 7200s hardcode | 應改為 None(同 ASR) | + +**改善方案**: +- **最優先**: 改為使用 `executor.run()`,獲得 `.tmp` 保護 +- 其他同 ASR + +### YOLO +| 面向 | 現狀 | 問題 | +|------|------|------| +| 中斷接續 | ✅ executor `.tmp` | ✅ | +| 補充機制 | ❌ 從頭跑 | 若跑到 frame 100,000 被殺,下次從 frame 0 | +| 糾錯機制 | ❌ 不驗證內容 | yolo.json 之前就是壞的但 file check 跳過 | + +**改善方案**: +- 補充:掃描 `.partial` 的最後 frame,傳入 `--resume-frame` 給 Python script +- 糾錯:file-existence check 對 `.json` 做 JSON parse 驗證 + +### FACE / POSE / OCR +| 面向 | 現狀 | 問題 | +|------|------|------| +| 中斷接續 | ✅ executor `.tmp` | ✅ | +| 補充機制 | ❌ 從頭跑 | 同 YOLO | +| 糾錯機制 | ❌ 不驗證內容 | 同 YOLO | + +**改善方案**: 同 YOLO + +### CUT +| 面向 | 現狀 | 問題 | +|------|------|------| +| 中斷接續 | ✅ executor `.tmp` | ✅ | +| 補充機制 | ✅ register 階段已完成,直接載入 | ✅ | +| 糾錯機制 | ❌ 不驗證內容 | 同 YOLO | + +**改善方案**: 糾錯即可 + +### SCENE +| 面向 | 現狀 | 問題 | +|------|------|------| +| 中斷接續 | ✅ **最完整**:檢查 `.err`/`.json`/`.tmp` 三種狀態 | ✅ | +| 補充機制 | ❌ 從頭跑 | ✅(scene 很快) | +| 糾錯機制 | ⚠️ 有檢查 `.err` | ✅ | + +### VISUAL_CHUNK +| 面向 | 現狀 | 問題 | +|------|------|------| +| 中斷接續 | ✅ executor `.tmp` | ✅ | +| 補充機制 | ❌ | ❌ | +| 糾錯機制 | ❌ **錯誤被吞掉**(回傳空結果) | 應回報 error 而非靜默失敗 | + +**改善方案**: 不要吞錯誤,讓 error 往上傳 + +### STORY +| 面向 | 現狀 | 問題 | +|------|------|------| +| 中斷接續 | ✅ executor `.tmp` | ✅ | +| 補充機制 | ❌ | ❌ | +| 糾錯機制 | ❌ | ❌ | + +--- + +## 優先級 + +### P0 — 立即修復 + +1. **ASRX 改用 executor.run()** + - 檔案:`src/core/processor/asrx.rs` + - 獲得 `.tmp` 保護、SIGKILL process group、`.partial` 保留 + - 移除 hardcode timeout + +### P1 — 糾錯機制 + +2. **File-existence check 加入 JSON 驗證** + - 檔案:`src/worker/job_worker.rs` + - 在 `output_path.exists()` 之後,對 `.json` 做 `serde_json::from_str::` + - 若 parse 失敗 → 不 skip,當作檔案不存在繼續跑 + - 若 parse 成功但內容空(無 segments/frames)→ 當不完整 + +### P2 — 補充機制 + +3. **ASR resume-from 補充** + - 檔案:`src/core/processor/asr.rs` + `scripts/asr_processor.py` + - Rust 端發現 `.partial` 存在,讀取最後 segment 的 end_time + - 傳入 `--resume-from {time}` 給 Python script + - Python script 跳過 `--resume-from` 之前的音訊 + +4. **YOLO/Face/Pose resume-frame 補充** + - 檔案:各 processor.rs + 對應 Python script + - 掃描 `.partial` 中的最後 frame_number + - 傳入 `--resume-frame {frame}` 給 Python script + +### P3 — 其他 + +5. **VisualChunk 不吞錯誤** +6. **Executor SIGTERM → SIGKILL 兩段式關閉** diff --git a/src/lib.rs b/src/lib.rs index c00e5cf..a9f93b4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,8 @@ pub mod api; pub mod ui; +pub mod verification; + pub mod watcher; pub mod worker; diff --git a/src/verification/mod.rs b/src/verification/mod.rs new file mode 100644 index 0000000..8ff20e3 --- /dev/null +++ b/src/verification/mod.rs @@ -0,0 +1,3 @@ +pub mod verifier; + +pub use verifier::{verify_output, VerificationResult, VerifierError}; diff --git a/src/verification/verifier.rs b/src/verification/verifier.rs new file mode 100644 index 0000000..12aeab3 --- /dev/null +++ b/src/verification/verifier.rs @@ -0,0 +1,148 @@ +use crate::core::config::OUTPUT_DIR; +use crate::core::db::ProcessorType; +use anyhow::Result; +use std::path::PathBuf; +use tracing::info; + +#[derive(Debug)] +pub struct VerificationResult { + pub passed: bool, + pub processor: String, + pub file_uuid: String, + pub details: Vec, +} + +impl VerificationResult { + pub fn ok(processor: &str, file_uuid: &str) -> Self { + Self { + passed: true, + processor: processor.to_string(), + file_uuid: file_uuid.to_string(), + details: vec!["verification passed".to_string()], + } + } + + pub fn fail(processor: &str, file_uuid: &str, reason: &str) -> Self { + Self { + passed: false, + processor: processor.to_string(), + file_uuid: file_uuid.to_string(), + details: vec![reason.to_string()], + } + } +} + +#[derive(Debug)] +pub struct VerifierError { + pub reason: String, +} + +pub fn verify_output(processor: &ProcessorType, file_uuid: &str) -> VerificationResult { + let proc_name = processor.as_str(); + let output_path = + PathBuf::from(OUTPUT_DIR.as_str()).join(format!("{}.{}.json", file_uuid, proc_name)); + + if !output_path.exists() { + return VerificationResult::fail(proc_name, file_uuid, "output file not found"); + } + + let json_str = match std::fs::read_to_string(&output_path) { + Ok(s) => s, + Err(e) => return VerificationResult::fail(proc_name, file_uuid, &format!("unreadable: {}", e)), + }; + + let value: serde_json::Value = match serde_json::from_str(&json_str) { + Ok(v) => v, + Err(e) => return VerificationResult::fail(proc_name, file_uuid, &format!("invalid JSON: {}", e)), + }; + + match processor { + ProcessorType::Asr | ProcessorType::Asrx => { + let segs = value.get("segments").and_then(|v| v.as_array()); + match segs { + Some(s) if s.is_empty() => VerificationResult::fail(proc_name, file_uuid, "0 segments"), + Some(s) => VerificationResult::ok(proc_name, file_uuid), + None => VerificationResult::fail(proc_name, file_uuid, "missing 'segments' field"), + } + } + ProcessorType::Cut => { + let scenes = value.get("scenes").and_then(|v| v.as_array()); + match scenes { + Some(s) if s.is_empty() => VerificationResult::fail(proc_name, file_uuid, "0 scenes"), + Some(_) => VerificationResult::ok(proc_name, file_uuid), + None => VerificationResult::fail(proc_name, file_uuid, "missing 'scenes' field"), + } + } + ProcessorType::Yolo => { + let frames = value.get("frames").and_then(|v| v.as_object()); + match frames { + Some(f) if f.is_empty() => VerificationResult::fail(proc_name, file_uuid, "0 frames"), + Some(_) => VerificationResult::ok(proc_name, file_uuid), + None => VerificationResult::fail(proc_name, file_uuid, "missing 'frames' field"), + } + } + ProcessorType::Face => { + let faces = value.get("faces").or_else(|| value.get("frames")).and_then(|v| v.as_array()); + match faces { + Some(f) if f.is_empty() => VerificationResult::fail(proc_name, file_uuid, "0 faces"), + Some(_) => VerificationResult::ok(proc_name, file_uuid), + None => VerificationResult::fail(proc_name, file_uuid, "missing 'faces'/'frames'"), + } + } + ProcessorType::Ocr => { + let frames = value.get("frames").and_then(|v| v.as_array()); + match frames { + Some(f) if f.is_empty() => VerificationResult::fail(proc_name, file_uuid, "0 frames"), + Some(_) => VerificationResult::ok(proc_name, file_uuid), + None => VerificationResult::fail(proc_name, file_uuid, "missing 'frames'"), + } + } + ProcessorType::Pose => { + let frames = value.get("frames").and_then(|v| v.as_array()); + match frames { + Some(f) if f.is_empty() => VerificationResult::fail(proc_name, file_uuid, "0 frames"), + Some(_) => VerificationResult::ok(proc_name, file_uuid), + None => VerificationResult::fail(proc_name, file_uuid, "missing 'frames'"), + } + } + ProcessorType::Scene => { + let scenes = value.get("scenes").and_then(|v| v.as_array()); + match scenes { + Some(s) if s.is_empty() => VerificationResult::fail(proc_name, file_uuid, "0 scenes"), + Some(_) => VerificationResult::ok(proc_name, file_uuid), + None => VerificationResult::ok(proc_name, file_uuid), + } + } + ProcessorType::VisualChunk => VerificationResult::ok(proc_name, file_uuid), + ProcessorType::Story => VerificationResult::ok(proc_name, file_uuid), + } +} + +/// 清理通過驗收的 processor 暫存檔,只保留最終 .json +pub fn cleanup_temp_files(processor: &ProcessorType, file_uuid: &str) { + let proc_name = processor.as_str(); + let prefix = format!("{}.{}.", file_uuid, proc_name); + let canonical = format!("{}.{}.json", file_uuid, proc_name); + + if let Ok(dir) = std::fs::read_dir(OUTPUT_DIR.as_str()) { + let mut removed = 0u32; + for entry in dir.flatten() { + let name = entry.file_name(); + let name = name.to_string_lossy().to_string(); + if !name.starts_with(&prefix) { + continue; + } + if name == canonical { + continue; + } + if let Err(e) = std::fs::remove_file(entry.path()) { + tracing::warn!("Failed to cleanup {}: {}", name, e); + } else { + removed += 1; + } + } + if removed > 0 { + info!("Cleaned up {} temp files for {}.{}", removed, file_uuid, proc_name); + } + } +} diff --git a/src/worker/processor.rs b/src/worker/processor.rs index 584f0a4..be0924c 100644 --- a/src/worker/processor.rs +++ b/src/worker/processor.rs @@ -1,6 +1,7 @@ use anyhow::{Context, Result}; use libc; use std::collections::HashMap; +use std::fs; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::{mpsc, RwLock}; @@ -229,39 +230,89 @@ impl ProcessorPool { match result { Ok(output) => { - info!( - "Processor {} completed for job {} ({} chunks, {} frames)", - processor_name, job.uuid, output.chunks_produced, output.frames_processed + // 驗收 agent 檢查產出內容 + let verification = crate::verification::verifier::verify_output( + &processor_type, + &job.uuid, ); - if let Err(e) = db - .update_processor_result_with_stats( - processor_result_id, - ProcessorJobStatus::Completed, - None, - Some(&output.data), - output.chunks_produced, - output.frames_processed, - ) - .await - { - error!("Failed to update processor result to completed: {}", e); - } - if let Err(e) = redis - .update_worker_processor_status( + if verification.passed { + info!( + "Processor {} completed and verified for job {} ({} chunks, {} frames)", + processor_name, job.uuid, output.chunks_produced, output.frames_processed + ); + + // 清理暫存備份 + crate::verification::verifier::cleanup_temp_files( + &processor_type, &job.uuid, - &processor_name, - "completed", - None, - output.frames_processed, - output.chunks_produced, - output.total_frames, - output.retry_count, - output.pid, - ) - .await - { - error!("Failed to update Redis processor status: {}", e); + ); + + if let Err(e) = db + .update_processor_result_with_stats( + processor_result_id, + ProcessorJobStatus::Completed, + None, + Some(&output.data), + output.chunks_produced, + output.frames_processed, + ) + .await + { + error!("Failed to update processor result to completed: {}", e); + } + + if let Err(e) = redis + .update_worker_processor_status( + &job.uuid, + &processor_name, + "completed", + None, + output.frames_processed, + output.chunks_produced, + output.total_frames, + output.retry_count, + output.pid, + ) + .await + { + error!("Failed to update Redis processor status: {}", e); + } + } else { + error!( + "Processor {} output failed verification for job {}: {:?}", + processor_name, job.uuid, verification.details + ); + if let Err(db_err) = db + .update_processor_result_with_stats( + processor_result_id, + ProcessorJobStatus::Failed, + Some(&format!("verification failed: {:?}", verification.details)), + None, + 0, + 0, + ) + .await + { + error!("Failed to update processor result to failed: {}", db_err); + } + + if let Err(redis_err) = redis + .update_worker_processor_status( + &job.uuid, + &processor_name, + "failed", + Some(&format!("verification failed: {:?}", verification.details)), + 0, + 0, + 0, + 0, + 0, + ) + .await + { + error!("Failed to update Redis processor status: {}", redis_err); + } } } Err(e) => {