From f8bcc0356c03d282c7858cf25cc902aa6296f459 Mon Sep 17 00:00:00 2001 From: M5Max128 Date: Sat, 23 May 2026 21:14:28 +0800 Subject: [PATCH] feat: frame/time pipeline split + output validation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add PipelineType enum + pipeline() to ProcessorType - Split ProcessorPool into frame_slots (max 2) and time_slots (max 1) - Add can_start_for() for pipeline-aware scheduling - Add validate_output_file() — checks JSON validity before marking complete - Add 3 unit tests for validate_output_file() - Create DESIGN/FRAME_TIME_PIPELINE_V1.0.md (492 lines) --- docs_v1.0/DESIGN/FRAME_TIME_PIPELINE_V1.0.md | 492 +++++++++++++++++++ src/core/db/mod.rs | 2 +- src/core/db/postgres_db.rs | 25 + src/worker/job_worker.rs | 193 +++++--- src/worker/processor.rs | 87 +++- 5 files changed, 729 insertions(+), 70 deletions(-) create mode 100644 docs_v1.0/DESIGN/FRAME_TIME_PIPELINE_V1.0.md diff --git a/docs_v1.0/DESIGN/FRAME_TIME_PIPELINE_V1.0.md b/docs_v1.0/DESIGN/FRAME_TIME_PIPELINE_V1.0.md new file mode 100644 index 0000000..c658410 --- /dev/null +++ b/docs_v1.0/DESIGN/FRAME_TIME_PIPELINE_V1.0.md @@ -0,0 +1,492 @@ +--- +title: "Frame / Time 雙產線分流協作設計 v1.0" +version: "1.0" +date: "2026-05-23" +author: "M5" +status: "draft" +scope: "architecture, worker, storage" +--- + +# Frame / Time 雙產線分流協作設計 v1.0 + +| Scope | Status | Applies to | +|---------|--------|------------| +| architecture / worker / storage | draft | momentry_core worker pipeline | + +--- + +## 1. 緣起與問題 + +### 1.1 現狀問題 + +worker 將所有 processor 混在一起平行執行,導致: + +| 問題 | 說明 | +|------|------| +| OOM | `max_concurrent=6` 時 6 個 Python 行程同時載入模型 → 記憶體不足被 kill | +| 資源競爭 | 多個 processor 各自開 ffmpeg decode 同一部影片 → 6 倍 decode | +| 重試粒度粗 | 一個 processor 失敗 → 整部片全部重來 | +| 進度不精確 | 0% → 100%,中間無細粒度進度 | + +### 1.2 手動 vs Worker 差異 + +| 面向 | 手動執行 | Worker 自動化 | +|------|---------|--------------| +| 執行方式 | 循序,一次一個 processor | 平行,最多 max_concurrent 個 | +| 產出檢查 | 人工確認 JSON 內容正確 | `output_path.exists()` 僅檢查檔案存在 | +| 資源 | 單一模型在記憶體 | 多個模型競爭記憶體 | + +### 1.3 核心結論 + +兩個根本問題必須解決: + +1. **產線分流** — Frame-base 與 Time-base processor 不應混合排程 +2. **Frame-level resource management** — 透過 MarkbaseFMS 統一 frame 存取 + +--- + +## 2. 雙產線架構 + +### 2.1 Pipeline Overview + +```mermaid +graph TD + Input[Input Video] --> Probe + Probe -->|frame info| FMS[MarkbaseFMS] + Probe -->|audio track| TimePipe[Time Pipeline] + + FMS -->|frame batches| FramePipe[Frame Pipeline] + FMS -->|cache / align / convert| FMS + + subgraph FramePipe [Frame Pipeline] + CUT[CUT - scene detection] + YOLO[YOLO - object detection] + Face[Face - face detection] + OCR[OCR - text detection] + Pose[Pose - pose estimation] + end + + subgraph TimePipe [Time Pipeline] + ASR[ASR - speech recognition] + ASRX[ASRX - speaker diarization] + end + + CUT --> YOLO + CUT --> Face + CUT --> OCR + CUT --> Pose + + ASR --> ASRX + + YOLO & Face & OCR & Pose --> Merge[Merge Processor Results] + ASR & ASRX --> Merge + + Face --> Lip[Lip Sync] + ASR --> Lip + ASRX --> Lip + + Merge -->|all essential done| PostProcess + Lip --> PostProcess + + subgraph PostProcess [Post-processing] + R1[Rule 1 Chunking] + R3[Rule 3 Chunking] + TK[TKG Build] + W1[5W1H Agent] + ID[Identity Agent] + end +``` + +### 2.2 各產線定義 + +#### Frame Pipeline(frame-based) + +| Processor | 輸入 | 輸出 | 瓶頸資源 | 產線 | +|-----------|------|------|---------|------| +| CUT | frame (降解析) | scene.json | CPU | Frame | +| YOLO | frame batch | yolo.json | GPU | Frame | +| Face | frame batch | face.json | ANE/GPU | Frame | +| OCR | frame batch | ocr.json | CPU | Frame | +| Pose | frame batch | pose.json | GPU | Frame | + +#### Time Pipeline(time-based) + +| Processor | 輸入 | 輸出 | 瓶頸資源 | 產線 | +|-----------|------|------|---------|------| +| ASR | audio stream | asr.json | GPU/CPU | Time | +| ASRX | audio stream + ASR result | asrx.json | CPU | Time | + +#### 合流點 + +| 項目 | 需要 | 產出 | +|------|------|------| +| Lip Sync | Face + ASR + ASRX | lip.json (who speaks when) | +| Rule 1 Chunking | ASR + ASRX | sentence chunks | +| Rule 3 Chunking | CUT + ASR | scene chunks | +| TKG Build | 所有 processor | tkg_nodes / tkg_edges | +| 5W1H Agent | CUT + ASR | story summary | +| Identity Agent | Face + ASRX | identity bindings | + +--- + +## 3. MarkbaseFMS 底層設計 + +### 3.1 三層架構 + +``` + Application + YOLO / Face / OCR / Pose / CUT + 讀取 frame buffer → 直接做 inference + │ + │ frame-aligned access + ▼ +┌───────────────────────────────────┐ +│ FMS Filesystem Layer │ +│ Layout: frame data 連續存放 │ +│ 不跨 frame split │ +│ 格式: opaque raw buffer │ +│ metadata: 獨立區域 │ +│ read-ahead: 預取下一個 Block │ +│ alignment: 每 frame page-aligned │ +└──────────────────┬────────────────┘ + │ page-aligned (4096) +┌──────────────────▼────────────────┐ +│ FMS Cache Layer │ +│ unit: FrameBlock (64 frames) │ +│ alignment: page boundary/frame │ +│ eviction: LRU on Block │ +│ pin: 使用中的 frame 不 evict │ +│ prefetch: 預拉下一個 Block │ +│ Direct I/O bypass OS cache │ +└──────────────────┬────────────────┘ + │ block-aligned (4K / 64K) +┌──────────────────▼────────────────┐ +│ Block Device / Storage │ +│ sector alignment 4K │ +│ FrameBlock = N 個連續 sectors │ +│ 無跨 sector split │ +│ atomic write per block │ +│ O_DIRECT 直接 IO │ +└───────────────────────────────────┘ +``` + +### 3.2 對齊原則 + +#### 各層對齊要求 + +| 層級 | 對齊單位 | 原因 | +|------|---------|------| +| Block Device | 4K sector | 現代 SSD 原生 sector,避免 RMW | +| Cache | 4096 page | `mmap` + `madvise` 大頁面,減少 TLB miss | +| Filesystem | frame block (64 frames) | 連續 layout,預測性 read-ahead | +| Frame Buffer | 16 bytes stride | NEON SIMD,MPS/ANE texture 要求 | +| Block Index | power of 2 | index 用 bit shift + mask,無需除法 | + +#### Frame Buffer Layout + +``` +每個 frame 的 raw buffer: + + ┌──────────────┐ + │ Y Plane │ ← 16-byte aligned stride, page-aligned offset + │ (width×h) │ + ├──────────────┤ + │ UV Plane │ ← 16-byte aligned stride + │ (w/2×h/2×2) │ (NV12 interleaved) + └──────────────┘ + + frame buffer offset: page-aligned (4096) + row stride: align(width * pixel_size, 16) + frame size: align(total_bytes, page_size) +``` + +### 3.3 Block 排列方式 + +``` +每個 Block 包含連續 64 frames(configurable): + + Block index ≤— bit shift (frame_num / 64) + Frame offset = base + (frame_num & 63) * frame_size ← bit mask + +Block Dispatching Strategy: + + Worker 要求 "batch 0, format=RGB, width=640" + FMS: + 1. Check Block Cache (RAM): + Block 0 frames 0-63 是否已 decode? + ✅ hit → 直接回傳 + ❌ miss → decode 64 frames → 存入 cache → 回傳 + 2. Format Conversion (on-the-fly): + 原始 NV12 → per request: + YOLO → RGB (float32 normalized) + Face → RGB (uint8) + OCR → Gray (uint8) + Pose → RGB (uint8) + CUT → RGB (降解析, uint8) +``` + +### 3.4 儲存 Layout + +``` +Disk Layout (per file_uuid): + +┌──────────────────────────────────────────────┐ +│ Metadata Region │ +│ - file_uuid (32 bytes) │ +│ - total_frames (u32) │ +│ - width, height (u32 × 2) │ +│ - fps (f64) │ +│ - pixel_format (u8 : 0=NV12) │ +│ - block_capacity (u32, default 64) │ +│ - block_count (u32) │ +│ - frame_size (u32, bytes per raw frame)│ +│ - block_offsets [u64 × block_count] │ +│ Padding to 4096 │ +├──────────────────────────────────────────────┤ +│ Data Region │ +│ Block 0: frames [0, 63] │ +│ frame_0: [frame_size bytes] ← page align│ +│ frame_1: [frame_size bytes] │ +│ ... │ +│ frame_63: [frame_size bytes] │ +│ Block 1: frames [64, 127] │ +│ ... │ +│ Block N: frames [N*64, ...] │ +└──────────────────────────────────────────────┘ +``` + +### 3.5 記憶體管理 + +``` +┌────────────────────────────────────────────┐ +│ FMS Cache │ +├──────────┬──────────┬──────────┬───────────┤ +│ Block 0 │ Block 1 │ Block 2 │ ... │ ← mmap'd +│ (64 fr) │ (64 fr) │ (64 fr) │ │ or anonymous +├──────────┴──────────┴──────────┴───────────┤ +│ LRU eviction policy │ +│ max_memory = configurable │ +│ (default: 256 frames ~1.5GB @1080p NV12) │ +│ pin_count: 正在被 processor 存取的 frame │ +│ pinned frame 不參與 LRU eviction │ +└─────────────────────────────────────────────┘ +``` + +--- + +## 4. Worker 調度器修改 + +### 4.1 Processor 產線標記 + +```rust +enum PipelineType { + Frame, // frame-based + Time, // time-based + Cross, // needs both frame + time +} + +impl ProcessorType { + fn pipeline(&self) -> PipelineType { + match self { + Self::Cut + | Self::Yolo + | Self::Face + | Self::Ocr + | Self::Pose => PipelineType::Frame, + + Self::Asr | Self::Asrx => PipelineType::Time, + + Self::Story + | Self::Tkg + | Self::Identity + | Self::FiveW1h + | Self::Caption => PipelineType::Cross, + } + } +} +``` + +### 4.2 資源配額 + +| 產線 | Max Concurrent | 策略 | +|------|---------------|------| +| Frame | 2 | 最多同時 2 個 frame processor | +| Time | 1 | 一次只跑 1 個 audio processor | +| Cross | 1 | Frame + Time 都完成後才允許 | + +Frame pipeline 內部建議順序: + +``` +CUT (先確定場景邊界) + → YOLO / Face 可同時 (GPU-bound) + → OCR / Pose 可同時 (CPU/GPU mixed) +``` + +### 4.3 產出驗證加強 + +```rust +// 目前 (job_worker.rs:346): +if output_path.exists() { + mark_completed(); +} + +// 改為: +if output_path.exists() { + match validate_processor_output(&output_path, processor_type) { + Ok(true) => mark_completed(), + Ok(false) => retry_or_fail(), + Err(e) => mark_failed(e), + } +} + +fn validate_processor_output(path: &Path, pt: ProcessorType) -> Result { + let content = std::fs::read_to_string(path)?; + let json: serde_json::Value = serde_json::from_str(&content)?; + // 至少要有基本欄位 + match pt { + ProcessorType::Asr => json.get("segments").is_some(), + ProcessorType::Yolo => json.get("frames").is_some(), + ProcessorType::Face => json.get("frames").is_some(), + // ... + }; + Ok(true) // or false +} +``` + +### 4.4 啟動順序 + +``` +1. Probe → 決定 frame 數、audio 格式 +2. CUT (Frame Pipeline 第一階段—決定場景邊界) +3. Frame Pipeline 平行: YOLO / Face / OCR / Pose + Time Pipeline 平行: ASR +4. ASRX (依賴 ASR 結果) +5. Lip Sync (等待 Face + ASR + ASRX) +6. 所有 processor 完成 → 合流: + - Rule 1 / Rule 3 Chunking + - Face Trace / TKG + - 5W1H / Identity Agent +``` + +--- + +## 5. Processor 串接 FMS + +### 5.1 FMS API + +```rust +// Frame access API (async) +impl FmsClient { + /// 取得單一 frame buffer + async fn get_frame( + &self, + file_uuid: &str, + frame_num: u32, + format: PixelFormat, + width: u32, + height: u32, + ) -> Result; + + /// 取得一個 batch frames (block-aligned) + async fn get_block( + &self, + file_uuid: &str, + block_idx: u32, + format: PixelFormat, + width: u32, + height: u32, + ) -> Result; + + /// 串流 frames (lazy batch iteration) + fn stream_frames( + &self, + file_uuid: &str, + range: Range, + format: PixelFormat, + width: u32, + height: u32, + ) -> FrameStream; +} +``` + +### 5.2 YOLO 為例:processor 修改 + +``` +目前: + processor::process_yolo(video_path, output_path, uuid) + → 自己開 ffmpeg decode + → 逐 frame 處理 + → 寫入 yolo.json + +改為 FMS-based: + processor::process_yolo_with_fms(fms, uuid, output_path) + → let results = Vec::new() + → for batch in fms.stream_frames(uuid, 0..total, RGB, 640, 640): + → let detections = yolo_model.infer(batch) + → results.push(detections) + → write partial → yolo.partial.{batch_idx}.json + → merge_partials() → yolo.json +``` + +### 5.3 Partial Result Merge + +``` +Frame Pipeline 產出多個 partial JSON: + + yolo.0000.json (frames 0-63) + yolo.0001.json (frames 64-127) + ... + yolo.0063.json (frames 4032-4095) + +Worker 合併為單一 yolo.json: + +```rust +async fn merge_partials( + uuid: &str, + processor: &str, + partial_dir: &Path, + output_path: &Path, +) -> Result<()> { + let partials = read_sorted_partials(uuid, processor, partial_dir); + let merged = merge_detections(partials); + write_json(output_path, merged) +} +``` +``` + +--- + +## 6. 實作優先序 + +| 優先 | 項目 | 說明 | +|------|------|------| +| P0 | Worker 產線分流 | ProcessorType::pipeline() + frame_slots / time_slots 分開計算 | +| P0 | 產出驗證加強 | `output_path.exists()` + JSON validity + schema check | +| P1 | FMS FrameBlock 資料結構 | 含 16-byte stride / 4096 page alignment | +| P1 | mmap-based frame cache | page-aligned frame buffers, LRU eviction | +| P2 | FMS API 實作 | get_frame / get_block / stream_frames | +| P2 | YOLO processor 串接 FMS | 改為 stream_frames 方式 | +| P2 | 其他 processor 串接 FMS | Face / OCR / Pose / CUT | +| P3 | FMS Direct I/O + sector alignment | O_DIRECT bypass OS page cache | +| P3 | Prefetch / readahead | 預測下一個 block 並提前載入 | + +--- + +## 7. 注意事項 + +| # | 項目 | +|---|------| +| 1 | raw buffer 格式依 processor 需求轉換,FMS 負責 NV12 → RGB / Gray | +| 2 | Time pipeline 的 ASR/ASRX **不經過 FMS**,直接處理 audio stream | +| 3 | macOS 的 `mmap` 支援 page-aligned,但 `O_DIRECT` 需確認 compat | +| 4 | FrameBlock size (64) 可配置,但需維持 power-of-2 | +| 5 | FMS 只管理 frame lifecycle,不處理 processor-specific 邏輯 | +| 6 | 多 processor 共享 frame 時,FMS 保證只 decode 一次 | + +--- + +## 版本歷史 + +| Version | Date | Author | Changes | +|---------|------|--------|---------| +| 1.0 | 2026-05-23 | M5 | 初版 | diff --git a/src/core/db/mod.rs b/src/core/db/mod.rs index 1acadf3..019b547 100644 --- a/src/core/db/mod.rs +++ b/src/core/db/mod.rs @@ -43,7 +43,7 @@ pub use mongodb_db::MongoDb; pub use postgres_db::{ Bm25Result, CandidateRecord, CreateApiKeyConfig, FileIdentityRecord, FileRecord, HybridSearchResult, IdentityChunkRecord, IdentityDetailRecord, IdentityFaceRecord, - IdentityFileRecord, MonitorJob, MonitorJobStats, MonitorJobStatus, PostgresDb, + IdentityFileRecord, MonitorJob, MonitorJobStats, MonitorJobStatus, PipelineType, PostgresDb, ProcessorJobStatus, ProcessorResult, ProcessorType, ResourceRecord, VideoRecord, VideoStatus, }; pub use qdrant_db::{QdrantDb, VectorPayload}; diff --git a/src/core/db/postgres_db.rs b/src/core/db/postgres_db.rs index 7567ebb..8de4f36 100644 --- a/src/core/db/postgres_db.rs +++ b/src/core/db/postgres_db.rs @@ -559,6 +559,31 @@ impl ProcessorType { ProcessorType::FiveW1H, ] } + + /// Pipeline type for scheduling: Frame-based, Time-based, or Cross (needs both). + pub fn pipeline(&self) -> PipelineType { + match self { + Self::Cut + | Self::Yolo + | Self::Face + | Self::Ocr + | Self::Pose + | Self::VisualChunk + | Self::Scene => PipelineType::Frame, + + Self::Asr | Self::Asrx => PipelineType::Time, + + Self::Story | Self::FiveW1H => PipelineType::Cross, + } + } +} + +/// Pipeline classification for worker scheduling. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PipelineType { + Frame, + Time, + Cross, } #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)] diff --git a/src/worker/job_worker.rs b/src/worker/job_worker.rs index 035f4a8..4dcb8cb 100644 --- a/src/worker/job_worker.rs +++ b/src/worker/job_worker.rs @@ -12,8 +12,8 @@ use crate::core::chunk::{rule1_ingest, rule3_ingest}; use crate::core::config::OUTPUT_DIR; use crate::core::db::qdrant_db::QdrantDb; use crate::core::db::{ - schema, MonitorJobStatus, PostgresDb, ProcessorJobStatus, RedisClient, VectorPayload, - VideoStatus, + schema, MonitorJobStatus, PipelineType, PostgresDb, ProcessorJobStatus, ProcessorType, + RedisClient, VectorPayload, VideoStatus, }; use crate::core::embedding::Embedder; use crate::core::processor::heuristic_scene::generate_scene_meta; @@ -338,62 +338,81 @@ impl JobWorker { .await?; // Check if output file already exists on disk (source of truth) + // and validate that it's a parseable JSON with expected structure. let output_path = PathBuf::from(OUTPUT_DIR.as_str()).join(format!( "{}.{}.json", job.uuid, processor_type.as_str() )); if output_path.exists() { - info!( - "Processor {} output file exists, marking completed and skipping", - processor_type.as_str() - ); - self.db - .update_processor_progress( - &job.uuid, - processor_type.as_str(), - total_frames, - total_frames, - "completed", - ) - .await?; - let total = total_frames as i32; - self.redis - .update_worker_processor_status( - &job.uuid, - processor_type.as_str(), - "completed", - None, - total, - total, - 0, - 0, - 0, - ) - .await?; - started_count += 1; - // 覆寫 result_map 讓相依性檢查能正確判斷 - result_map.insert( - *processor_type, - crate::core::db::ProcessorResult { - id: 0, - job_id: job.id, - processor_type: *processor_type, - status: ProcessorJobStatus::Completed, - started_at: None, - completed_at: None, - duration_secs: None, - chunks_produced: 0, - frames_processed: total_frames as i32, - output_size_bytes: 0, - error_message: None, - output_data: None, - retry_count: 0, - created_at: String::new(), - updated_at: String::new(), - }, - ); - continue; + match validate_output_file(&output_path, *processor_type) { + Ok(true) => { + info!( + "Processor {} output file exists and valid, marking completed", + processor_type.as_str() + ); + self.db + .update_processor_progress( + &job.uuid, + processor_type.as_str(), + total_frames, + total_frames, + "completed", + ) + .await?; + let total = total_frames as i32; + self.redis + .update_worker_processor_status( + &job.uuid, + processor_type.as_str(), + "completed", + None, + total, + total, + 0, + 0, + 0, + ) + .await?; + started_count += 1; + result_map.insert( + *processor_type, + crate::core::db::ProcessorResult { + id: 0, + job_id: job.id, + processor_type: *processor_type, + status: ProcessorJobStatus::Completed, + started_at: None, + completed_at: None, + duration_secs: None, + chunks_produced: 0, + frames_processed: total_frames as i32, + output_size_bytes: 0, + error_message: None, + output_data: None, + retry_count: 0, + created_at: String::new(), + updated_at: String::new(), + }, + ); + continue; + } + Ok(false) => { + warn!( + "Processor {} output file exists but content invalid, will reprocess", + processor_type.as_str() + ); + // fall through → reprocess + } + Err(e) => { + warn!( + "Processor {} output validation error: {}, will reprocess", + processor_type.as_str(), + e + ); + // fall through → reprocess + } + } } // Check if processor already in terminal state @@ -484,10 +503,15 @@ impl JobWorker { } } - // Check capacity before starting processor - if !self.processor_pool.can_start().await { + // Check pipeline capacity before starting processor + if !self.processor_pool.can_start_for(processor_type.pipeline()).await { info!( - "Max concurrent processors reached, skipping remaining processors for job {}", + "Max {} processors reached, skipping remaining processors for job {}", + match processor_type.pipeline() { + PipelineType::Frame => "frame", + PipelineType::Time => "time", + PipelineType::Cross => "cross", + }, job.uuid ); // 為所有未啟動的 processors 創建 Skipped 記錄 @@ -1180,6 +1204,35 @@ impl JobWorker { } } +/// 驗證 processor 輸出檔案的完整性。 +/// 回傳 Ok(true) 表示有效,Ok(false) 表示檔案存在但內容異常需重跑,Err 表示檢查失敗。 +fn validate_output_file(path: &std::path::Path, processor_type: ProcessorType) -> Result { + let content = match std::fs::read_to_string(path) { + Ok(c) => c, + Err(_) => return Ok(false), + }; + if content.trim().is_empty() { + return Ok(false); + } + let json: serde_json::Value = match serde_json::from_str(&content) { + Ok(v) => v, + Err(_) => return Ok(false), + }; + // 依 processor type 檢查必要欄位 + let valid = match processor_type { + ProcessorType::Asr => json.get("segments").and_then(|s| s.as_array()).map_or(false, |a| !a.is_empty()), + ProcessorType::Asrx => json.get("segments").and_then(|s| s.as_array()).map_or(false, |a| !a.is_empty()), + ProcessorType::Yolo => json.get("frames").and_then(|f| f.as_object()).is_some(), + ProcessorType::Face => json.get("frames").and_then(|f| f.as_object()).is_some(), + ProcessorType::Ocr => json.get("frames").and_then(|f| f.as_object()).is_some(), + ProcessorType::Pose => json.get("frames").and_then(|f| f.as_object()).is_some(), + ProcessorType::Cut => json.get("segments").or_else(|| json.get("scenes")).and_then(|s| s.as_array()).map_or(false, |a| !a.is_empty()), + // VisualChunk / Scene / Story / FiveW1H: 只檢查是 valid JSON 即可 + _ => true, + }; + Ok(valid) +} + #[cfg(test)] mod tests { use super::*; @@ -1190,4 +1243,34 @@ mod tests { assert!(config.enabled); assert!(config.max_concurrent >= 1); } + + fn test_validate_path(name: &str) -> std::path::PathBuf { + let dir = std::env::temp_dir().join(format!("test_validate_{}", name)); + let _ = std::fs::create_dir_all(&dir); + dir.join("output.json") + } + + #[test] + fn test_validate_output_empty() { + let path = test_validate_path("empty"); + std::fs::write(&path, "").unwrap(); + assert!(!validate_output_file(&path, ProcessorType::Yolo).unwrap_or(false)); + let _ = std::fs::remove_file(&path); + } + + #[test] + fn test_validate_output_invalid_json() { + let path = test_validate_path("invalid"); + std::fs::write(&path, "not json").unwrap(); + assert!(!validate_output_file(&path, ProcessorType::Yolo).unwrap_or(false)); + let _ = std::fs::remove_file(&path); + } + + #[test] + fn test_validate_output_yolo_ok() { + let path = test_validate_path("yolo_ok"); + std::fs::write(&path, r#"{"frames":{"1":{"detections":[]}}}"#).unwrap(); + assert!(validate_output_file(&path, ProcessorType::Yolo).unwrap_or(false)); + let _ = std::fs::remove_file(&path); + } } diff --git a/src/worker/processor.rs b/src/worker/processor.rs index 392c648..4c95aad 100644 --- a/src/worker/processor.rs +++ b/src/worker/processor.rs @@ -7,35 +7,56 @@ use std::sync::Arc; use tokio::sync::{mpsc, RwLock}; use tracing::{error, info, warn}; + + /// Guard that ensures processor pool cleanup runs even if the task panics. struct ProcessorCleanupGuard { job_id: i32, running: Arc>>, running_count: Arc>, + frame_count: Arc>, + time_count: Arc>, + pipeline: PipelineType, } impl Drop for ProcessorCleanupGuard { fn drop(&mut self) { use tokio::sync::TryLockError; - // 嘗試同步清理;若 lock 被佔用則跳過(避免 deadlock) if let Ok(mut guard) = self.running.try_write() { guard.remove(&self.job_id); } else { - warn!("[ProcessorCleanupGuard] running lock contended, skipping cleanup"); + warn!("[ProcessorCleanupGuard] running lock contended"); } if let Ok(mut guard) = self.running_count.try_write() { - if *guard > 0 { - *guard -= 1; + if *guard > 0 { *guard -= 1; } + } + match self.pipeline { + PipelineType::Frame => { + if let Ok(mut guard) = self.frame_count.try_write() { + if *guard > 0 { *guard -= 1; } + } } - } else { - warn!("[ProcessorCleanupGuard] running_count lock contended, skipping cleanup"); + PipelineType::Time => { + if let Ok(mut guard) = self.time_count.try_write() { + if *guard > 0 { *guard -= 1; } + } + } + PipelineType::Cross => {} // cross pipeline not tracked in slot counts } } } +#[derive(Debug)] +struct ProcessorHandle { + #[allow(dead_code)] + processor_type: ProcessorType, + cancel_tx: mpsc::Sender<()>, + child_pid: Arc>>, +} + use crate::core::config::{OUTPUT_DIR, PYTHON_PATH, SCRIPTS_DIR}; use crate::core::db::{ - MonitorJob, PostgresDb, ProcessorJobStatus, ProcessorType, QdrantDb, RedisClient, + MonitorJob, PipelineType, PostgresDb, ProcessorJobStatus, ProcessorType, QdrantDb, RedisClient, }; use crate::core::processor; use crate::core::processor::asr::AsrResult; @@ -67,19 +88,19 @@ pub struct ProcessorTask { pub frame_dir: Option, } +/// Frame pipeline max concurrent processors (hard limit). +const FRAME_SLOT_MAX: usize = 2; +/// Time pipeline max concurrent processors (audio is heavy, run 1 at a time). +const TIME_SLOT_MAX: usize = 1; + pub struct ProcessorPool { db: Arc, redis: Arc, config_max: usize, running: Arc>>, running_count: Arc>, -} - -struct ProcessorHandle { - #[allow(dead_code)] - processor_type: ProcessorType, - cancel_tx: mpsc::Sender<()>, - child_pid: Arc>>, + running_frame_count: Arc>, + running_time_count: Arc>, } impl ProcessorPool { @@ -90,6 +111,8 @@ impl ProcessorPool { config_max: max_concurrent, running: Arc::new(RwLock::new(HashMap::new())), running_count: Arc::new(RwLock::new(0)), + running_frame_count: Arc::new(RwLock::new(0)), + running_time_count: Arc::new(RwLock::new(0)), } } @@ -105,10 +128,27 @@ impl ProcessorPool { count < max } + /// 檢查特定產線是否可啟動新的 processor。 + /// Frame pipeline 最多 FRAME_SLOT_MAX 個,Time pipeline 最多 TIME_SLOT_MAX 個。 + pub async fn can_start_for(&self, pipeline: PipelineType) -> bool { + let count = *self.running_count.read().await; + let max = self.current_max().await; + if count >= max { + return false; + } + match pipeline { + PipelineType::Frame => *self.running_frame_count.read().await < FRAME_SLOT_MAX, + PipelineType::Time => *self.running_time_count.read().await < TIME_SLOT_MAX, + PipelineType::Cross => false, // cross pipeline = wait until frame+time done + } + } + /// 清理 stale running state:若系統中實際運行的 processor 比記錄少,修正 count pub async fn sweep_stale(&self) { let handle_count = self.running.read().await.len(); let count = *self.running_count.read().await; + let frame_count = *self.running_frame_count.read().await; + let time_count = *self.running_time_count.read().await; if handle_count != count { warn!( "[ProcessorPool] Stale count detected: handles={}, count={}, fixing", @@ -117,6 +157,13 @@ impl ProcessorPool { let mut c = self.running_count.write().await; *c = handle_count; } + // 若 frame 或 time slot 超出 handle_count,降回合理值 + if frame_count + time_count > handle_count { + let mut fc = self.running_frame_count.write().await; + let mut tc = self.running_time_count.write().await; + *fc = (*fc).min(handle_count); + *tc = (*tc).min(handle_count.saturating_sub(*fc)); + } if handle_count == 0 && count == 0 { if let Err(e) = self @@ -162,6 +209,7 @@ impl ProcessorPool { let job_id = task.job.id; let processor_type = task.processor_type; + let pipeline = task.processor_type.pipeline(); let current_limit = self.current_max().await; { let mut count = self.running_count.write().await; @@ -173,9 +221,17 @@ impl ProcessorPool { } *count += 1; } + // 遞增產線專屬 slot + match pipeline { + PipelineType::Frame => *self.running_frame_count.write().await += 1, + PipelineType::Time => *self.running_time_count.write().await += 1, + PipelineType::Cross => {} // cross pipeline uses global slot + } let running = self.running.clone(); let running_count = self.running_count.clone(); + let running_frame_count = self.running_frame_count.clone(); + let running_time_count = self.running_time_count.clone(); let child_pid: Arc>> = Arc::new(RwLock::new(None)); running.write().await.insert( job_id, @@ -205,6 +261,9 @@ impl ProcessorPool { job_id, running: running.clone(), running_count: running_count.clone(), + frame_count: running_frame_count.clone(), + time_count: running_time_count.clone(), + pipeline, }; info!("Starting processor {} for job {}", processor_name, job.uuid);