feat: verification agent for processor output validation

- New src/verification/ module: verify_output() checks JSON structure/completeness per processor type
- Worker: after processor succeeds, verification agent gates the result
- Passed -> mark completed + cleanup_temp_files (remove .tmp/.partial/.err/timestamp backups)
- Failed -> mark failed with verification details, preserve files for inspection
- cleanup_temp_files() keeps only the canonical {uuid}.{proc}.json
This commit is contained in:
Accusys
2026-05-09 13:30:00 +08:00
parent e068b70777
commit 7237a1811e
5 changed files with 368 additions and 30 deletions

View File

@@ -0,0 +1,134 @@
# Processor 產出機制檢討
## 三層機制定義
### 1. 中斷接續Interruption Resume
Process 被殺掉後,重啟時能接續進度。
**現狀**: 大部分 processor 有 `.tmp``.partial` 保護,但重跑時從頭開始。
### 2. 補充機制Supplement
完成度不足時,只補沒做完的部分,不重跑整個。
**現狀**: 全部從頭跑,無補充。
### 3. 糾錯機制Error Correction
輸出檔損毀時能自動偵測並修復。
**現狀**: file-existence check 只檢查檔案存在,不檢查內容是否有效。
---
## Processor 逐一檢討
### ASR
| 面向 | 現狀 | 問題 |
|------|------|------|
| 中斷接續 | ✅ `.tmp``.partial`executor | ✅ OK |
| 補充機制 | ❌ 每次從頭跑 | 若跑到 50% 被殺,下次從 0% 開始 |
| 糾錯機制 | ❌ 不驗證內容 | file-existence check 看到 `.json` 存在就跳過,不管內容 |
| Pipe | ✅ executor.run() | ✅ |
| Timeout | ✅ 已移除None | ✅ |
**改善方案**:
- 補充ASR 重跑時掃描 existing `.json``.partial`,找出最後 segment 的 `end_time`,傳入 `--resume-from` 給 Python script
- 糾錯file-existence check 對 `.json``serde_json::from_str` 驗證,無效 → 視為不存在
### ASRX
| 面向 | 現狀 | 問題 |
|------|------|------|
| 中斷接續 | ❌ **不用 executor**,直接寫 `.json` | 被殺掉時留下壞檔 |
| 補充機制 | ❌ 同 ASR | 依賴 ASRASR 不完整 ASRX 也不能跑 |
| 糾錯機制 | ❌ 不驗證內容 | 同上 |
| Pipe | ❌ **raw Command**,沒有 `.tmp` 保護 | 緊急 |
| Timeout | ⚠️ 7200s hardcode | 應改為 None同 ASR |
**改善方案**:
- **最優先**: 改為使用 `executor.run()`,獲得 `.tmp` 保護
- 其他同 ASR
### YOLO
| 面向 | 現狀 | 問題 |
|------|------|------|
| 中斷接續 | ✅ executor `.tmp` | ✅ |
| 補充機制 | ❌ 從頭跑 | 若跑到 frame 100,000 被殺,下次從 frame 0 |
| 糾錯機制 | ❌ 不驗證內容 | yolo.json 之前就是壞的但 file check 跳過 |
**改善方案**:
- 補充:掃描 `.partial` 的最後 frame傳入 `--resume-frame` 給 Python script
- 糾錯file-existence check 對 `.json` 做 JSON parse 驗證
### FACE / POSE / OCR
| 面向 | 現狀 | 問題 |
|------|------|------|
| 中斷接續 | ✅ executor `.tmp` | ✅ |
| 補充機制 | ❌ 從頭跑 | 同 YOLO |
| 糾錯機制 | ❌ 不驗證內容 | 同 YOLO |
**改善方案**: 同 YOLO
### CUT
| 面向 | 現狀 | 問題 |
|------|------|------|
| 中斷接續 | ✅ executor `.tmp` | ✅ |
| 補充機制 | ✅ register 階段已完成,直接載入 | ✅ |
| 糾錯機制 | ❌ 不驗證內容 | 同 YOLO |
**改善方案**: 糾錯即可
### SCENE
| 面向 | 現狀 | 問題 |
|------|------|------|
| 中斷接續 | ✅ **最完整**:檢查 `.err`/`.json`/`.tmp` 三種狀態 | ✅ |
| 補充機制 | ❌ 從頭跑 | ✅scene 很快) |
| 糾錯機制 | ⚠️ 有檢查 `.err` | ✅ |
### VISUAL_CHUNK
| 面向 | 現狀 | 問題 |
|------|------|------|
| 中斷接續 | ✅ executor `.tmp` | ✅ |
| 補充機制 | ❌ | ❌ |
| 糾錯機制 | ❌ **錯誤被吞掉**(回傳空結果) | 應回報 error 而非靜默失敗 |
**改善方案**: 不要吞錯誤,讓 error 往上傳
### STORY
| 面向 | 現狀 | 問題 |
|------|------|------|
| 中斷接續 | ✅ executor `.tmp` | ✅ |
| 補充機制 | ❌ | ❌ |
| 糾錯機制 | ❌ | ❌ |
---
## 優先級
### P0 — 立即修復
1. **ASRX 改用 executor.run()**
- 檔案:`src/core/processor/asrx.rs`
- 獲得 `.tmp` 保護、SIGKILL process group、`.partial` 保留
- 移除 hardcode timeout
### P1 — 糾錯機制
2. **File-existence check 加入 JSON 驗證**
- 檔案:`src/worker/job_worker.rs`
-`output_path.exists()` 之後,對 `.json``serde_json::from_str::<Value>`
- 若 parse 失敗 → 不 skip當作檔案不存在繼續跑
- 若 parse 成功但內容空(無 segments/frames→ 當不完整
### P2 — 補充機制
3. **ASR resume-from 補充**
- 檔案:`src/core/processor/asr.rs` + `scripts/asr_processor.py`
- Rust 端發現 `.partial` 存在,讀取最後 segment 的 end_time
- 傳入 `--resume-from {time}` 給 Python script
- Python script 跳過 `--resume-from` 之前的音訊
4. **YOLO/Face/Pose resume-frame 補充**
- 檔案:各 processor.rs + 對應 Python script
- 掃描 `.partial` 中的最後 frame_number
- 傳入 `--resume-frame {frame}` 給 Python script
### P3 — 其他
5. **VisualChunk 不吞錯誤**
6. **Executor SIGTERM → SIGKILL 兩段式關閉**

View File

@@ -4,6 +4,8 @@ pub mod api;
pub mod ui;
pub mod verification;
pub mod watcher;
pub mod worker;

3
src/verification/mod.rs Normal file
View File

@@ -0,0 +1,3 @@
pub mod verifier;
pub use verifier::{verify_output, VerificationResult, VerifierError};

View File

@@ -0,0 +1,148 @@
use crate::core::config::OUTPUT_DIR;
use crate::core::db::ProcessorType;
use anyhow::Result;
use std::path::PathBuf;
use tracing::info;
#[derive(Debug)]
pub struct VerificationResult {
pub passed: bool,
pub processor: String,
pub file_uuid: String,
pub details: Vec<String>,
}
impl VerificationResult {
pub fn ok(processor: &str, file_uuid: &str) -> Self {
Self {
passed: true,
processor: processor.to_string(),
file_uuid: file_uuid.to_string(),
details: vec!["verification passed".to_string()],
}
}
pub fn fail(processor: &str, file_uuid: &str, reason: &str) -> Self {
Self {
passed: false,
processor: processor.to_string(),
file_uuid: file_uuid.to_string(),
details: vec![reason.to_string()],
}
}
}
#[derive(Debug)]
pub struct VerifierError {
pub reason: String,
}
pub fn verify_output(processor: &ProcessorType, file_uuid: &str) -> VerificationResult {
let proc_name = processor.as_str();
let output_path =
PathBuf::from(OUTPUT_DIR.as_str()).join(format!("{}.{}.json", file_uuid, proc_name));
if !output_path.exists() {
return VerificationResult::fail(proc_name, file_uuid, "output file not found");
}
let json_str = match std::fs::read_to_string(&output_path) {
Ok(s) => s,
Err(e) => return VerificationResult::fail(proc_name, file_uuid, &format!("unreadable: {}", e)),
};
let value: serde_json::Value = match serde_json::from_str(&json_str) {
Ok(v) => v,
Err(e) => return VerificationResult::fail(proc_name, file_uuid, &format!("invalid JSON: {}", e)),
};
match processor {
ProcessorType::Asr | ProcessorType::Asrx => {
let segs = value.get("segments").and_then(|v| v.as_array());
match segs {
Some(s) if s.is_empty() => VerificationResult::fail(proc_name, file_uuid, "0 segments"),
Some(s) => VerificationResult::ok(proc_name, file_uuid),
None => VerificationResult::fail(proc_name, file_uuid, "missing 'segments' field"),
}
}
ProcessorType::Cut => {
let scenes = value.get("scenes").and_then(|v| v.as_array());
match scenes {
Some(s) if s.is_empty() => VerificationResult::fail(proc_name, file_uuid, "0 scenes"),
Some(_) => VerificationResult::ok(proc_name, file_uuid),
None => VerificationResult::fail(proc_name, file_uuid, "missing 'scenes' field"),
}
}
ProcessorType::Yolo => {
let frames = value.get("frames").and_then(|v| v.as_object());
match frames {
Some(f) if f.is_empty() => VerificationResult::fail(proc_name, file_uuid, "0 frames"),
Some(_) => VerificationResult::ok(proc_name, file_uuid),
None => VerificationResult::fail(proc_name, file_uuid, "missing 'frames' field"),
}
}
ProcessorType::Face => {
let faces = value.get("faces").or_else(|| value.get("frames")).and_then(|v| v.as_array());
match faces {
Some(f) if f.is_empty() => VerificationResult::fail(proc_name, file_uuid, "0 faces"),
Some(_) => VerificationResult::ok(proc_name, file_uuid),
None => VerificationResult::fail(proc_name, file_uuid, "missing 'faces'/'frames'"),
}
}
ProcessorType::Ocr => {
let frames = value.get("frames").and_then(|v| v.as_array());
match frames {
Some(f) if f.is_empty() => VerificationResult::fail(proc_name, file_uuid, "0 frames"),
Some(_) => VerificationResult::ok(proc_name, file_uuid),
None => VerificationResult::fail(proc_name, file_uuid, "missing 'frames'"),
}
}
ProcessorType::Pose => {
let frames = value.get("frames").and_then(|v| v.as_array());
match frames {
Some(f) if f.is_empty() => VerificationResult::fail(proc_name, file_uuid, "0 frames"),
Some(_) => VerificationResult::ok(proc_name, file_uuid),
None => VerificationResult::fail(proc_name, file_uuid, "missing 'frames'"),
}
}
ProcessorType::Scene => {
let scenes = value.get("scenes").and_then(|v| v.as_array());
match scenes {
Some(s) if s.is_empty() => VerificationResult::fail(proc_name, file_uuid, "0 scenes"),
Some(_) => VerificationResult::ok(proc_name, file_uuid),
None => VerificationResult::ok(proc_name, file_uuid),
}
}
ProcessorType::VisualChunk => VerificationResult::ok(proc_name, file_uuid),
ProcessorType::Story => VerificationResult::ok(proc_name, file_uuid),
}
}
/// 清理通過驗收的 processor 暫存檔,只保留最終 .json
pub fn cleanup_temp_files(processor: &ProcessorType, file_uuid: &str) {
let proc_name = processor.as_str();
let prefix = format!("{}.{}.", file_uuid, proc_name);
let canonical = format!("{}.{}.json", file_uuid, proc_name);
if let Ok(dir) = std::fs::read_dir(OUTPUT_DIR.as_str()) {
let mut removed = 0u32;
for entry in dir.flatten() {
let name = entry.file_name();
let name = name.to_string_lossy().to_string();
if !name.starts_with(&prefix) {
continue;
}
if name == canonical {
continue;
}
if let Err(e) = std::fs::remove_file(entry.path()) {
tracing::warn!("Failed to cleanup {}: {}", name, e);
} else {
removed += 1;
}
}
if removed > 0 {
info!("Cleaned up {} temp files for {}.{}", removed, file_uuid, proc_name);
}
}
}

View File

@@ -1,6 +1,7 @@
use anyhow::{Context, Result};
use libc;
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
@@ -229,39 +230,89 @@ impl ProcessorPool {
match result {
Ok(output) => {
info!(
"Processor {} completed for job {} ({} chunks, {} frames)",
processor_name, job.uuid, output.chunks_produced, output.frames_processed
// 驗收 agent 檢查產出內容
let verification = crate::verification::verifier::verify_output(
&processor_type,
&job.uuid,
);
if let Err(e) = db
.update_processor_result_with_stats(
processor_result_id,
ProcessorJobStatus::Completed,
None,
Some(&output.data),
output.chunks_produced,
output.frames_processed,
)
.await
{
error!("Failed to update processor result to completed: {}", e);
}
if let Err(e) = redis
.update_worker_processor_status(
if verification.passed {
info!(
"Processor {} completed and verified for job {} ({} chunks, {} frames)",
processor_name, job.uuid, output.chunks_produced, output.frames_processed
);
// 清理暫存備份
crate::verification::verifier::cleanup_temp_files(
&processor_type,
&job.uuid,
&processor_name,
"completed",
None,
output.frames_processed,
output.chunks_produced,
output.total_frames,
output.retry_count,
output.pid,
)
.await
{
error!("Failed to update Redis processor status: {}", e);
);
if let Err(e) = db
.update_processor_result_with_stats(
processor_result_id,
ProcessorJobStatus::Completed,
None,
Some(&output.data),
output.chunks_produced,
output.frames_processed,
)
.await
{
error!("Failed to update processor result to completed: {}", e);
}
if let Err(e) = redis
.update_worker_processor_status(
&job.uuid,
&processor_name,
"completed",
None,
output.frames_processed,
output.chunks_produced,
output.total_frames,
output.retry_count,
output.pid,
)
.await
{
error!("Failed to update Redis processor status: {}", e);
}
} else {
error!(
"Processor {} output failed verification for job {}: {:?}",
processor_name, job.uuid, verification.details
);
if let Err(db_err) = db
.update_processor_result_with_stats(
processor_result_id,
ProcessorJobStatus::Failed,
Some(&format!("verification failed: {:?}", verification.details)),
None,
0,
0,
)
.await
{
error!("Failed to update processor result to failed: {}", db_err);
}
if let Err(redis_err) = redis
.update_worker_processor_status(
&job.uuid,
&processor_name,
"failed",
Some(&format!("verification failed: {:?}", verification.details)),
0,
0,
0,
0,
0,
)
.await
{
error!("Failed to update Redis processor status: {}", redis_err);
}
}
}
Err(e) => {