docs: update for new architecture (Probe API, Job Worker)

- VIDEO_REGISTRATION.md: Add Probe API reference and comparison table
- JOB_WORKER_IMPLEMENTATION_PLAN.md: Update status to implemented
- MOMENTRY_CORE_MONITORING.md: Add Job Worker monitoring section
  - monitor_jobs and processor_results table docs
  - Worker status check commands
  - Redis job monitoring commands
This commit is contained in:
accusys
2026-03-25 03:35:36 +08:00
parent f27e51a905
commit 44cf1ee4b6
3 changed files with 999 additions and 2 deletions

View File

@@ -0,0 +1,682 @@
# Job Worker 實作計畫
| 項目 | 內容 |
|------|------|
| 建立者 | Warren / OpenCode |
| 建立時間 | 2026-03-24 |
| 文件版本 | V1.1 |
| 狀態 | ✅ 已實作 |
---
## 版本歷史
| 版本 | 日期 | 目的 | 操作人 |
|------|------|------|--------|
| V1.0 | 2026-03-24 | 建立實作計畫 | OpenCode |
| V1.1 | 2026-03-25 | 實作完成,更新狀態 | OpenCode |
---
## 實作狀態
### ✅ 已完成
| 元件 | 檔案 | 狀態 |
|------|------|------|
| MonitorJob 結構 | `src/core/db/postgres_db.rs` | ✅ |
| ProcessorResult 結構 | `src/core/db/postgres_db.rs` | ✅ |
| Worker 配置 | `src/worker/config.rs` | ✅ |
| Job Worker | `src/worker/job_worker.rs` | ✅ |
| Processor Pool | `src/worker/processor.rs` | ✅ |
| Worker 模組 | `src/worker/mod.rs` | ✅ |
| PostgreSQL 表格 | `monitor_jobs`, `processor_results` | ✅ |
| 類型修復 | `i32`, `NaiveDateTime` | ✅ |
### 待整合
| 項目 | 說明 |
|------|------|
| Worker 服務啟動 | 需要加入 launchd plist |
| 監控整合 | 需要加入 MOMENTRY_CORE_MONITORING.md |
| 備份涵蓋 | 需要確認備份包含新表格 |
---
## 1. 設計決策
### 1.1 確認的設計決策
| 項目 | 決策 | 理由 |
|------|------|------|
| 觸發方式 | 輪詢Job Worker | 暫無可靠的 API 觸發機制 |
| 並行處理 | 最多 2 個 | 可根據 CPU/GPU 能力調整 |
| 失敗處理 | 獨立模組,部分完成可接續 | 任何模組失敗都產出狀態記錄 |
| Worker 啟動 | 獨立進程 | 隔離、易管理 |
| 並行上限調整 | 環境變數 + 預設值 | 靈活、可調整 |
| 狀態同步 | PostgreSQL + Redis | 可靠 + 即時 |
### 1.2 環境變數
| 變數 | 預設值 | 說明 |
|------|--------|------|
| `MOMENTRY_MAX_CONCURRENT` | 2 | 最大並行 processor 數 |
| `MOMENTRY_POLL_INTERVAL` | 5 | 輪詢間隔(秒) |
| `MOMENTRY_WORKER_ENABLED` | true | 是否啟用 worker |
---
## 2. 系統架構
### 2.1 完整流程圖
```
┌─────────────────────────────────────────────────────────────────────────┐
│ 檔案註冊觸發處理流程 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. SFTPGo 上傳 │
│ │ │
│ ▼ │
│ 2. Hook 呼叫 Register API │
│ │ │
│ ▼ │
│ 3. Register API │
│ ├─► ffprobe 提取 metadata │
│ ├─► 寫入 videos 表 │
│ └─► 建立 monitor_jobs 記錄 (status=pending) │
│ │ │
│ ▼ │
│ 4. Job Worker (獨立進程,輪詢機制) │
│ ├─► 輪詢 pending jobs │
│ ├─► 檢查 videos 表 fs_json 決定需要處理什麼 │
│ ├─► 並行執行 processors (最多 2 個) │
│ └─► 更新 videos, monitor_jobs, processor_results 表 │
│ │ │
│ ▼ │
│ 5. 處理結果 │
│ ├─► 更新 videos 表 (fs_json, psql_chunk, qvector_chunk) │
│ ├─► 更新 monitor_jobs 表 (status, progress) │
│ ├─► 更新 processor_results 表 (每個模組狀態) │
│ └─► Redis Pub/Sub 即時進度 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
```
### 2.2 Job Worker 架構
```
┌─────────────────────────────────────────────────────────────────────┐
│ Job Worker 架構 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ PostgreSQL │ ───▶ │ Worker │ ───▶ │ Processor │ │
│ │ Job Queue │ │ Loop │ │ Pool │ │
│ └─────────────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Video State │ │ Processor 1 │ │
│ │ Check │ │ (ASR/YOLO) │ │
│ └─────────────┘ ├─────────────┤ │
│ │ Processor 2 │ │
│ │ (CUT/OCR) │ │
│ └─────────────┘ │
│ │
│ Redis ──── Pub/Sub ──── 即時進度 │
│ │
└─────────────────────────────────────────────────────────────────────┘
```
---
## 3. 資料庫結構
### 3.1 Migration 檔案
**檔案**: `migrations/003_job_worker.sql`
```sql
-- ================================================================
-- Migration 003: Job Worker System
-- ================================================================
-- 3.1.1 更新 videos 表
ALTER TABLE videos ADD COLUMN IF NOT EXISTS status VARCHAR(20) DEFAULT 'pending';
ALTER TABLE videos ADD COLUMN IF NOT EXISTS user_id BIGINT;
ALTER TABLE videos ADD COLUMN IF NOT EXISTS job_id INTEGER REFERENCES monitor_jobs(id);
COMMENT ON COLUMN videos.status IS 'pending, processing, completed, failed';
COMMENT ON COLUMN videos.user_id IS 'WordPress user ID';
COMMENT ON COLUMN videos.job_id IS 'Associated monitor_jobs ID';
-- 3.1.2 更新 monitor_jobs 表
ALTER TABLE monitor_jobs ADD COLUMN IF NOT EXISTS video_id BIGINT REFERENCES videos(id);
ALTER TABLE monitor_jobs ADD COLUMN IF NOT EXISTS user_id BIGINT;
ALTER TABLE monitor_jobs ADD COLUMN IF NOT EXISTS processors VARCHAR(20)[];
ALTER TABLE monitor_jobs ADD COLUMN IF NOT EXISTS completed_processors VARCHAR(20)[];
ALTER TABLE monitor_jobs ADD COLUMN IF NOT EXISTS failed_processors VARCHAR(20)[];
COMMENT ON COLUMN monitor_jobs.processors IS 'Processors to run: asr, cut, yolo, ocr, face, pose, asrx';
COMMENT ON COLUMN monitor_jobs.completed_processors IS 'Successfully completed processors';
COMMENT ON COLUMN monitor_jobs.failed_processors IS 'Failed processors';
-- 3.1.3 新增 processor_results 表
CREATE TABLE IF NOT EXISTS processor_results (
id SERIAL PRIMARY KEY,
job_id INTEGER REFERENCES monitor_jobs(id) ON DELETE CASCADE,
video_id BIGINT REFERENCES videos(id) ON DELETE CASCADE,
processor VARCHAR(20) NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
output_path TEXT,
started_at TIMESTAMP,
completed_at TIMESTAMP,
error_message TEXT,
progress_total INT DEFAULT 0,
progress_current INT DEFAULT 0,
last_checkpoint JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(job_id, processor)
);
CREATE INDEX IF NOT EXISTS idx_processor_results_job ON processor_results(job_id);
CREATE INDEX IF NOT EXISTS idx_processor_results_video ON processor_results(video_id);
CREATE INDEX IF NOT EXISTS idx_processor_results_status ON processor_results(status);
COMMENT ON TABLE processor_results IS 'Tracks individual processor execution status';
COMMENT ON COLUMN processor_results.status IS 'pending, running, completed, failed, skipped';
-- 3.1.4 更新 videos 表標記欄位用途
COMMENT ON COLUMN videos.fs_video IS 'Video file exists on filesystem';
COMMENT ON COLUMN videos.fs_json IS 'All processor JSON files generated';
COMMENT ON COLUMN videos.fs_chunks IS 'Chunk files generated';
COMMENT ON COLUMN videos.fs_vectors IS 'Vector files generated';
COMMENT ON COLUMN videos.psql_chunk IS 'Chunks stored in PostgreSQL';
COMMENT ON COLUMN videos.pvector_chunk IS 'Vectors stored in PostgreSQL';
COMMENT ON COLUMN videos.qvector_chunk IS 'Vectors stored in Qdrant';
```
### 3.2 表關係圖
```
videos monitor_jobs
┌──────────────────────┐ ┌──────────────────────┐
│ id (PK) │◄────────│ video_id (FK) │
│ uuid │ │ user_id │
│ status │ │ processors[] │
│ fs_video │ │ completed_processors[]│
│ fs_json │ │ failed_processors[] │
│ job_id (FK)─────────┼────────►│ status │
│ user_id │ │ id (PK) │
└──────────────────────┘ └──────────────────────┘
processor_results
┌──────────────────────┐
│ job_id (FK) │
│ video_id (FK) │
│ processor │
│ status │
│ progress_current │
│ last_checkpoint │
│ id (PK) │
└──────────────────────┘
```
---
## 4. 模組並行策略
### 4.1 模組分類
| 模組 | 資源需求 | 獨立性 | 建議並行 |
|------|----------|--------|----------|
| ASR | GPU/CPU | 高 | ✅ 可並行 |
| CUT | CPU | 高 | ✅ 可並行 |
| YOLO | GPU | 中 | ✅ 可並行 |
| OCR | GPU/CPU | 高 | ✅ 可並行 |
| Face | GPU | 中 | ✅ 可並行 |
| Pose | GPU | 中 | ✅ 可並行 |
| ASRX | GPU/CPU | 高 | ✅ 可並行 |
### 4.2 建議並行組合
| 組合 | 模組 1 | 模組 2 | 說明 |
|------|---------|---------|------|
| GPU+CPU | YOLO/Pose/Face | ASR/CUT/OCR | 平衡負載 |
| 雙GPU | YOLO | Pose | 雙 GPU 卡片 |
| 雙CPU | ASR | CUT/OCR | 無 GPU 時 |
### 4.3 Worker 配置
```rust
// src/worker/config.rs
#[derive(Debug, Clone)]
pub struct WorkerConfig {
pub max_concurrent: usize, // 預設 2
pub poll_interval_secs: u64, // 預設 5
pub enabled: bool, // 預設 true
}
impl Default for WorkerConfig {
fn default() -> Self {
Self {
max_concurrent: 2,
poll_interval_secs: 5,
enabled: true,
}
}
}
impl WorkerConfig {
pub fn from_env() -> Self {
Self {
max_concurrent: std::env::var("MOMENTRY_MAX_CONCURRENT")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(2),
poll_interval_secs: std::env::var("MOMENTRY_POLL_INTERVAL")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(5),
enabled: std::env::var("MOMENTRY_WORKER_ENABLED")
.ok()
.map(|v| v != "false")
.unwrap_or(true),
}
}
}
```
---
## 5. 失敗處理機制
### 5.1 設計原則
```
每個模組獨立處理:
- 成功 → 產出完整 .jsonstatus=completed
- 失敗 → 產出 .json 包含 error 狀態status=failed
- 部分完成 → 可從 checkpoint 繼續status=running
```
### 5.2 Processor 輸出格式
```json
{
"processor": "asr",
"status": "completed|failed|partial",
"completed_at": "2026-03-24T12:00:00Z",
"result": { ... },
"error": null,
"last_checkpoint": {
"frame": 5000,
"timestamp": 180.5
}
}
```
### 5.3 失敗處理流程
```rust
async fn run_processor(&self, module: &str, video: &Video) -> Result<()> {
let output_path = self.get_output_path(video, module);
match self.execute_processor(module, video, &output_path).await {
Ok(result) => {
// 成功:更新狀態
self.db.update_processor_status(job_id, module, "completed").await?;
self.publish_progress(job_id, module, 100).await?;
}
Err(e) => {
// 失敗:仍然保存部分結果
let partial_result = self.get_partial_result(&output_path);
self.db.update_processor_status(job_id, module, "failed").await?;
self.db.save_error_message(job_id, module, &e.to_string()).await?;
// 記錄錯誤但不中斷其他模組
tracing::warn!("Processor {} failed: {}", module, e);
}
}
Ok(())
}
```
---
## 6. 實作結構
### 6.1 目錄結構
```
src/
├── worker/
│ ├── mod.rs # Worker 模組導出
│ ├── config.rs # Worker 配置
│ ├── worker.rs # Worker 主邏輯
│ ├── processor.rs # Processor 執行器
│ ├── queue.rs # Job 佇列管理
│ └── progress.rs # 進度追蹤
├── api/
│ └── server.rs # 更新 Register API
└── main.rs # 新增 worker 命令
```
### 6.2 核心模組
#### 6.2.1 Worker Config (`src/worker/config.rs`)
```rust
pub struct WorkerConfig {
pub max_concurrent: usize,
pub poll_interval_secs: u64,
pub enabled: bool,
}
impl WorkerConfig {
pub fn from_env() -> Self { ... }
}
```
#### 6.2.2 Worker Loop (`src/worker/worker.rs`)
```rust
pub struct JobWorker {
db: PostgresDb,
redis: RedisCache,
config: WorkerConfig,
semaphore: Arc<Semaphore>,
}
impl JobWorker {
pub async fn run(&self) -> Result<()> {
loop {
if self.config.enabled {
self.process_pending_jobs().await?;
}
tokio::time::sleep(Duration::from_secs(self.config.poll_interval_secs)).await;
}
}
async fn process_pending_jobs(&self) -> Result<()> {
// 1. 檢查並發數
// 2. 取得 pending jobs
// 3. 分配給 worker pool
// 4. 並行執行 processors
}
}
```
#### 6.2.3 Processor Pool (`src/worker/processor.rs`)
```rust
pub struct ProcessorPool {
max_concurrent: usize,
}
impl ProcessorPool {
pub async fn execute(&self, job: &Job, video: &Video) -> Result<ProcessorResult> {
// 根據 videos 表決定需要執行哪些 processor
// 並行執行最多 2 個
// 處理失敗但不中斷其他 processor
}
}
```
---
## 7. API 端點設計
### 7.1 新增端點
| 端點 | 方法 | 說明 |
|------|------|------|
| `/api/v1/jobs` | GET | 列出所有 jobs |
| `/api/v1/jobs/:uuid` | GET | 取得特定 job 詳細 |
| `/api/v1/jobs/:uuid/retry` | POST | 重試失敗的 processor |
| `/api/v1/jobs/:uuid/cancel` | POST | 取消 job |
### 7.2 端點詳情
#### GET /api/v1/jobs
```json
Response:
{
"jobs": [
{
"id": 1,
"uuid": "abc123def456",
"status": "running",
"progress": 60,
"processors": ["asr", "cut", "yolo", "ocr", "face", "pose"],
"completed": ["asr", "cut", "yolo"],
"failed": []
}
]
}
```
#### GET /api/v1/jobs/:uuid
```json
Response:
{
"id": 1,
"uuid": "abc123def456",
"video_id": 10,
"status": "running",
"processors": {
"asr": {"status": "completed", "progress": 100},
"cut": {"status": "completed", "progress": 100},
"yolo": {"status": "running", "progress": 45, "current": 5000, "total": 11000},
"ocr": {"status": "pending"},
"face": {"status": "pending"},
"pose": {"status": "pending"}
},
"created_at": "2026-03-24T12:00:00Z",
"started_at": "2026-03-24T12:01:00Z"
}
```
---
## 8. Redis Key 設計
### 8.1 現有 Key 保持
```bash
momentry:job:{uuid} # Job Hash
momentry:job:{uuid}:processor:{name} # Processor Hash
momentry:progress:{uuid} # Pub/Sub Channel
momentry:jobs:active # Set: 運行中 UUIDs
momentry:jobs:completed # Set: 完成 UUIDs
momentry:jobs:failed # Set: 失敗 UUIDs
```
### 8.2 進度更新時序
```
Processor 執行
├─► 每秒更新 Redis Hash (即時)
├─► 每 10% 或完成時更新 PostgreSQL (持久)
└─► 失敗時立即更新 PostgreSQL (錯誤記錄)
```
---
## 9. 實作順序
### Phase 1: 資料庫遷移
| 任務 | 說明 |
|------|------|
| 1.1 | 建立 `migrations/003_job_worker.sql` |
| 1.2 | 更新 `postgres_db.rs` 對應的 struct |
| 1.3 | 執行 migration 驗證 |
### Phase 2: Worker 框架
| 任務 | 說明 |
|------|------|
| 2.1 | 建立 `src/worker/mod.rs` |
| 2.2 | 建立 `src/worker/config.rs` |
| 2.3 | 建立 `src/worker/worker.rs` |
| 2.4 | 建立 `src/worker/processor.rs` |
### Phase 3: Register API 整合
| 任務 | 說明 |
|------|------|
| 3.1 | 修改 `src/api/server.rs` 的 register 函數 |
| 3.2 | 加入建立 monitor_jobs 的邏輯 |
| 3.3 | 更新 videos 表 status 欄位 |
### Phase 4: Processor 執行
| 任務 | 說明 |
|------|------|
| 4.1 | 實作 processor 並行執行(最多 2 個) |
| 4.2 | 實作失敗處理(保存部分結果) |
| 4.3 | 實作 checkpoint 恢復 |
### Phase 5: 進度追蹤
| 任務 | 說明 |
|------|------|
| 5.1 | Redis Pub/Sub 整合 |
| 5.2 | PostgreSQL 定期同步 |
| 5.3 | API 進度端點更新 |
### Phase 6: API 端點
| 任務 | 說明 |
|------|------|
| 6.1 | GET /api/v1/jobs |
| 6.2 | GET /api/v1/jobs/:uuid |
| 6.3 | POST /api/v1/jobs/:uuid/retry |
| 6.4 | POST /api/v1/jobs/:uuid/cancel |
### Phase 7: CLI 命令
| 任務 | 說明 |
|------|------|
| 7.1 | `cargo run -- worker` 命令 |
| 7.2 | Worker 啟動/停止/狀態顯示 |
| 7.3 | launchd plist 設定 |
### Phase 8: 測試
| 任務 | 說明 |
|------|------|
| 8.1 | 單元測試 |
| 8.2 | 端到端測試 |
| 8.3 | 失敗處理測試 |
| 8.4 | 並行執行測試 |
---
## 10. CLI 命令
### 10.1 Worker 命令
```bash
# 啟動 worker
cargo run -- worker
# 顯示 worker 幫助
cargo run -- worker --help
```
### 10.2 環境變數
```bash
# Worker 配置
export MOMENTRY_MAX_CONCURRENT=2
export MOMENTRY_POLL_INTERVAL=5
export MOMENTRY_WORKER_ENABLED=true
# 現有環境變數
export DATABASE_URL=postgres://accusys@localhost:5432/momentry
export REDIS_URL=redis://:accusys@localhost:6379
```
---
## 11. 預估工時
| Phase | 任務 | 預估工時 |
|-------|------|----------|
| 1 | 資料庫遷移 | 2h |
| 2 | Worker 框架 | 4h |
| 3 | Register API 整合 | 2h |
| 4 | Processor 執行 | 4h |
| 5 | 進度追蹤 | 2h |
| 6 | API 端點 | 3h |
| 7 | CLI 命令 | 2h |
| 8 | 測試 | 4h |
| **總計** | | **23h** |
---
## 12. 參考文件
| 文件 | 用途 |
|------|------|
| `docs/MOMENTRY_CORE_MONITORING.md` | 監控系統規範 |
| `docs/MOMENTRY_CORE_REDIS_KEYS.md` | Redis Key 設計 |
| `docs/PROCESSING_PIPELINE.md` | 處理流程 |
| `docs/CHUNK_DESIGN.md` | 資料庫設計 |
| `docs/API_REFERENCE.md` | API 參考 |
---
## 13. 附錄
### A. 狀態機
```
┌──────────────┐
│ PENDING │
└──────┬───────┘
│ register 後
┌──────────────┐
┌─────▶│ PROCESSING │◀──────┐
│ └──────┬───────┘ │
│ │ │
部分失敗 all completed 全部失敗
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ PARTIAL │ │COMPLETED │ │ FAILED │
└──────────┘ └──────────┘ └──────────┘
```
### B. videos 表 status 欄位
| 值 | 說明 |
|------|------|
| `pending` | 已註冊,等待處理 |
| `processing` | 處理中 |
| `completed` | 所有處理完成 |
| `failed` | 處理失敗 |
### C. processor_results 表 status 欄位
| 值 | 說明 |
|------|------|
| `pending` | 等待執行 |
| `running` | 執行中 |
| `completed` | 執行成功 |
| `failed` | 執行失敗 |
| `skipped` | 跳過(如檔案已存在) |