feat: frame/time pipeline split + output validation

- Add PipelineType enum + pipeline() to ProcessorType
- Split ProcessorPool into frame_slots (max 2) and time_slots (max 1)
- Add can_start_for() for pipeline-aware scheduling
- Add validate_output_file() — checks JSON validity before marking complete
- Add 3 unit tests for validate_output_file()
- Create DESIGN/FRAME_TIME_PIPELINE_V1.0.md (492 lines)
This commit is contained in:
M5Max128
2026-05-23 21:14:28 +08:00
parent dddb5d4cbd
commit f8bcc0356c
5 changed files with 729 additions and 70 deletions

View File

@@ -43,7 +43,7 @@ pub use mongodb_db::MongoDb;
pub use postgres_db::{
Bm25Result, CandidateRecord, CreateApiKeyConfig, FileIdentityRecord, FileRecord,
HybridSearchResult, IdentityChunkRecord, IdentityDetailRecord, IdentityFaceRecord,
IdentityFileRecord, MonitorJob, MonitorJobStats, MonitorJobStatus, PostgresDb,
IdentityFileRecord, MonitorJob, MonitorJobStats, MonitorJobStatus, PipelineType, PostgresDb,
ProcessorJobStatus, ProcessorResult, ProcessorType, ResourceRecord, VideoRecord, VideoStatus,
};
pub use qdrant_db::{QdrantDb, VectorPayload};

View File

@@ -559,6 +559,31 @@ impl ProcessorType {
ProcessorType::FiveW1H,
]
}
/// Pipeline type for scheduling: Frame-based, Time-based, or Cross (needs both).
pub fn pipeline(&self) -> PipelineType {
match self {
Self::Cut
| Self::Yolo
| Self::Face
| Self::Ocr
| Self::Pose
| Self::VisualChunk
| Self::Scene => PipelineType::Frame,
Self::Asr | Self::Asrx => PipelineType::Time,
Self::Story | Self::FiveW1H => PipelineType::Cross,
}
}
}
/// Pipeline classification for worker scheduling.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PipelineType {
Frame,
Time,
Cross,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]

View File

@@ -12,8 +12,8 @@ use crate::core::chunk::{rule1_ingest, rule3_ingest};
use crate::core::config::OUTPUT_DIR;
use crate::core::db::qdrant_db::QdrantDb;
use crate::core::db::{
schema, MonitorJobStatus, PostgresDb, ProcessorJobStatus, RedisClient, VectorPayload,
VideoStatus,
schema, MonitorJobStatus, PipelineType, PostgresDb, ProcessorJobStatus, ProcessorType,
RedisClient, VectorPayload, VideoStatus,
};
use crate::core::embedding::Embedder;
use crate::core::processor::heuristic_scene::generate_scene_meta;
@@ -338,62 +338,81 @@ impl JobWorker {
.await?;
// Check if output file already exists on disk (source of truth)
// and validate that it's a parseable JSON with expected structure.
let output_path = PathBuf::from(OUTPUT_DIR.as_str()).join(format!(
"{}.{}.json",
job.uuid,
processor_type.as_str()
));
if output_path.exists() {
info!(
"Processor {} output file exists, marking completed and skipping",
processor_type.as_str()
);
self.db
.update_processor_progress(
&job.uuid,
processor_type.as_str(),
total_frames,
total_frames,
"completed",
)
.await?;
let total = total_frames as i32;
self.redis
.update_worker_processor_status(
&job.uuid,
processor_type.as_str(),
"completed",
None,
total,
total,
0,
0,
0,
)
.await?;
started_count += 1;
// 覆寫 result_map 讓相依性檢查能正確判斷
result_map.insert(
*processor_type,
crate::core::db::ProcessorResult {
id: 0,
job_id: job.id,
processor_type: *processor_type,
status: ProcessorJobStatus::Completed,
started_at: None,
completed_at: None,
duration_secs: None,
chunks_produced: 0,
frames_processed: total_frames as i32,
output_size_bytes: 0,
error_message: None,
output_data: None,
retry_count: 0,
created_at: String::new(),
updated_at: String::new(),
},
);
continue;
match validate_output_file(&output_path, *processor_type) {
Ok(true) => {
info!(
"Processor {} output file exists and valid, marking completed",
processor_type.as_str()
);
self.db
.update_processor_progress(
&job.uuid,
processor_type.as_str(),
total_frames,
total_frames,
"completed",
)
.await?;
let total = total_frames as i32;
self.redis
.update_worker_processor_status(
&job.uuid,
processor_type.as_str(),
"completed",
None,
total,
total,
0,
0,
0,
)
.await?;
started_count += 1;
result_map.insert(
*processor_type,
crate::core::db::ProcessorResult {
id: 0,
job_id: job.id,
processor_type: *processor_type,
status: ProcessorJobStatus::Completed,
started_at: None,
completed_at: None,
duration_secs: None,
chunks_produced: 0,
frames_processed: total_frames as i32,
output_size_bytes: 0,
error_message: None,
output_data: None,
retry_count: 0,
created_at: String::new(),
updated_at: String::new(),
},
);
continue;
}
Ok(false) => {
warn!(
"Processor {} output file exists but content invalid, will reprocess",
processor_type.as_str()
);
// fall through → reprocess
}
Err(e) => {
warn!(
"Processor {} output validation error: {}, will reprocess",
processor_type.as_str(),
e
);
// fall through → reprocess
}
}
}
// Check if processor already in terminal state
@@ -484,10 +503,15 @@ impl JobWorker {
}
}
// Check capacity before starting processor
if !self.processor_pool.can_start().await {
// Check pipeline capacity before starting processor
if !self.processor_pool.can_start_for(processor_type.pipeline()).await {
info!(
"Max concurrent processors reached, skipping remaining processors for job {}",
"Max {} processors reached, skipping remaining processors for job {}",
match processor_type.pipeline() {
PipelineType::Frame => "frame",
PipelineType::Time => "time",
PipelineType::Cross => "cross",
},
job.uuid
);
// 為所有未啟動的 processors 創建 Skipped 記錄
@@ -1180,6 +1204,35 @@ impl JobWorker {
}
}
/// 驗證 processor 輸出檔案的完整性。
/// 回傳 Ok(true) 表示有效Ok(false) 表示檔案存在但內容異常需重跑Err 表示檢查失敗。
fn validate_output_file(path: &std::path::Path, processor_type: ProcessorType) -> Result<bool> {
let content = match std::fs::read_to_string(path) {
Ok(c) => c,
Err(_) => return Ok(false),
};
if content.trim().is_empty() {
return Ok(false);
}
let json: serde_json::Value = match serde_json::from_str(&content) {
Ok(v) => v,
Err(_) => return Ok(false),
};
// 依 processor type 檢查必要欄位
let valid = match processor_type {
ProcessorType::Asr => json.get("segments").and_then(|s| s.as_array()).map_or(false, |a| !a.is_empty()),
ProcessorType::Asrx => json.get("segments").and_then(|s| s.as_array()).map_or(false, |a| !a.is_empty()),
ProcessorType::Yolo => json.get("frames").and_then(|f| f.as_object()).is_some(),
ProcessorType::Face => json.get("frames").and_then(|f| f.as_object()).is_some(),
ProcessorType::Ocr => json.get("frames").and_then(|f| f.as_object()).is_some(),
ProcessorType::Pose => json.get("frames").and_then(|f| f.as_object()).is_some(),
ProcessorType::Cut => json.get("segments").or_else(|| json.get("scenes")).and_then(|s| s.as_array()).map_or(false, |a| !a.is_empty()),
// VisualChunk / Scene / Story / FiveW1H: 只檢查是 valid JSON 即可
_ => true,
};
Ok(valid)
}
#[cfg(test)]
mod tests {
use super::*;
@@ -1190,4 +1243,34 @@ mod tests {
assert!(config.enabled);
assert!(config.max_concurrent >= 1);
}
fn test_validate_path(name: &str) -> std::path::PathBuf {
let dir = std::env::temp_dir().join(format!("test_validate_{}", name));
let _ = std::fs::create_dir_all(&dir);
dir.join("output.json")
}
#[test]
fn test_validate_output_empty() {
let path = test_validate_path("empty");
std::fs::write(&path, "").unwrap();
assert!(!validate_output_file(&path, ProcessorType::Yolo).unwrap_or(false));
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_validate_output_invalid_json() {
let path = test_validate_path("invalid");
std::fs::write(&path, "not json").unwrap();
assert!(!validate_output_file(&path, ProcessorType::Yolo).unwrap_or(false));
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_validate_output_yolo_ok() {
let path = test_validate_path("yolo_ok");
std::fs::write(&path, r#"{"frames":{"1":{"detections":[]}}}"#).unwrap();
assert!(validate_output_file(&path, ProcessorType::Yolo).unwrap_or(false));
let _ = std::fs::remove_file(&path);
}
}

View File

@@ -7,35 +7,56 @@ use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use tracing::{error, info, warn};
/// Guard that ensures processor pool cleanup runs even if the task panics.
struct ProcessorCleanupGuard {
job_id: i32,
running: Arc<RwLock<HashMap<i32, ProcessorHandle>>>,
running_count: Arc<RwLock<usize>>,
frame_count: Arc<RwLock<usize>>,
time_count: Arc<RwLock<usize>>,
pipeline: PipelineType,
}
impl Drop for ProcessorCleanupGuard {
fn drop(&mut self) {
use tokio::sync::TryLockError;
// 嘗試同步清理;若 lock 被佔用則跳過(避免 deadlock
if let Ok(mut guard) = self.running.try_write() {
guard.remove(&self.job_id);
} else {
warn!("[ProcessorCleanupGuard] running lock contended, skipping cleanup");
warn!("[ProcessorCleanupGuard] running lock contended");
}
if let Ok(mut guard) = self.running_count.try_write() {
if *guard > 0 {
*guard -= 1;
if *guard > 0 { *guard -= 1; }
}
match self.pipeline {
PipelineType::Frame => {
if let Ok(mut guard) = self.frame_count.try_write() {
if *guard > 0 { *guard -= 1; }
}
}
} else {
warn!("[ProcessorCleanupGuard] running_count lock contended, skipping cleanup");
PipelineType::Time => {
if let Ok(mut guard) = self.time_count.try_write() {
if *guard > 0 { *guard -= 1; }
}
}
PipelineType::Cross => {} // cross pipeline not tracked in slot counts
}
}
}
#[derive(Debug)]
struct ProcessorHandle {
#[allow(dead_code)]
processor_type: ProcessorType,
cancel_tx: mpsc::Sender<()>,
child_pid: Arc<RwLock<Option<i32>>>,
}
use crate::core::config::{OUTPUT_DIR, PYTHON_PATH, SCRIPTS_DIR};
use crate::core::db::{
MonitorJob, PostgresDb, ProcessorJobStatus, ProcessorType, QdrantDb, RedisClient,
MonitorJob, PipelineType, PostgresDb, ProcessorJobStatus, ProcessorType, QdrantDb, RedisClient,
};
use crate::core::processor;
use crate::core::processor::asr::AsrResult;
@@ -67,19 +88,19 @@ pub struct ProcessorTask {
pub frame_dir: Option<String>,
}
/// Frame pipeline max concurrent processors (hard limit).
const FRAME_SLOT_MAX: usize = 2;
/// Time pipeline max concurrent processors (audio is heavy, run 1 at a time).
const TIME_SLOT_MAX: usize = 1;
pub struct ProcessorPool {
db: Arc<PostgresDb>,
redis: Arc<RedisClient>,
config_max: usize,
running: Arc<RwLock<HashMap<i32, ProcessorHandle>>>,
running_count: Arc<RwLock<usize>>,
}
struct ProcessorHandle {
#[allow(dead_code)]
processor_type: ProcessorType,
cancel_tx: mpsc::Sender<()>,
child_pid: Arc<RwLock<Option<i32>>>,
running_frame_count: Arc<RwLock<usize>>,
running_time_count: Arc<RwLock<usize>>,
}
impl ProcessorPool {
@@ -90,6 +111,8 @@ impl ProcessorPool {
config_max: max_concurrent,
running: Arc::new(RwLock::new(HashMap::new())),
running_count: Arc::new(RwLock::new(0)),
running_frame_count: Arc::new(RwLock::new(0)),
running_time_count: Arc::new(RwLock::new(0)),
}
}
@@ -105,10 +128,27 @@ impl ProcessorPool {
count < max
}
/// 檢查特定產線是否可啟動新的 processor。
/// Frame pipeline 最多 FRAME_SLOT_MAX 個Time pipeline 最多 TIME_SLOT_MAX 個。
pub async fn can_start_for(&self, pipeline: PipelineType) -> bool {
let count = *self.running_count.read().await;
let max = self.current_max().await;
if count >= max {
return false;
}
match pipeline {
PipelineType::Frame => *self.running_frame_count.read().await < FRAME_SLOT_MAX,
PipelineType::Time => *self.running_time_count.read().await < TIME_SLOT_MAX,
PipelineType::Cross => false, // cross pipeline = wait until frame+time done
}
}
/// 清理 stale running state若系統中實際運行的 processor 比記錄少,修正 count
pub async fn sweep_stale(&self) {
let handle_count = self.running.read().await.len();
let count = *self.running_count.read().await;
let frame_count = *self.running_frame_count.read().await;
let time_count = *self.running_time_count.read().await;
if handle_count != count {
warn!(
"[ProcessorPool] Stale count detected: handles={}, count={}, fixing",
@@ -117,6 +157,13 @@ impl ProcessorPool {
let mut c = self.running_count.write().await;
*c = handle_count;
}
// 若 frame 或 time slot 超出 handle_count降回合理值
if frame_count + time_count > handle_count {
let mut fc = self.running_frame_count.write().await;
let mut tc = self.running_time_count.write().await;
*fc = (*fc).min(handle_count);
*tc = (*tc).min(handle_count.saturating_sub(*fc));
}
if handle_count == 0 && count == 0 {
if let Err(e) = self
@@ -162,6 +209,7 @@ impl ProcessorPool {
let job_id = task.job.id;
let processor_type = task.processor_type;
let pipeline = task.processor_type.pipeline();
let current_limit = self.current_max().await;
{
let mut count = self.running_count.write().await;
@@ -173,9 +221,17 @@ impl ProcessorPool {
}
*count += 1;
}
// 遞增產線專屬 slot
match pipeline {
PipelineType::Frame => *self.running_frame_count.write().await += 1,
PipelineType::Time => *self.running_time_count.write().await += 1,
PipelineType::Cross => {} // cross pipeline uses global slot
}
let running = self.running.clone();
let running_count = self.running_count.clone();
let running_frame_count = self.running_frame_count.clone();
let running_time_count = self.running_time_count.clone();
let child_pid: Arc<RwLock<Option<i32>>> = Arc::new(RwLock::new(None));
running.write().await.insert(
job_id,
@@ -205,6 +261,9 @@ impl ProcessorPool {
job_id,
running: running.clone(),
running_count: running_count.clone(),
frame_count: running_frame_count.clone(),
time_count: running_time_count.clone(),
pipeline,
};
info!("Starting processor {} for job {}", processor_name, job.uuid);