docs: add Processor State Machine V1.0 design

This commit is contained in:
Accusys
2026-05-30 10:03:48 +08:00
parent 3d13d1390e
commit 08167d73b2

View File

@@ -0,0 +1,484 @@
---
title: Processor State Machine V1.0
version: 1.0
date: 2026-05-30
author: M5Max128
status: draft
---
# Processor State Machine V1.0
## Overview
| Attribute | Value |
|-----------|-------|
| Scope | Backend, Worker, Pipeline |
| Status | Draft |
| Applicable To | M5Max128, M5Max48 |
| Dependencies | migrations/034, job_worker.rs, redis_client.rs |
| Related Docs | [Pipeline Module](../API_WORKSPACE/modules/10_pipeline.md), [TKG Query API](TKG_QUERY_API_V1.0.md) |
---
## 1. Design Goals
### 1.1 Problem Statement
The Momentry Core pipeline lacks unified state management for processors:
- **Opaque dependency chains**: Processors depend on each other (ASR → Cut, ASRX → ASR, Story → ASRX + Cut + YOLO + Face), but failures or delays are not explicitly tracked
- **No alert mechanism**: When dependencies are not met or resources are exhausted, there is no systematic way to notify operators or trigger retries
- **Coarse-grained status**: Existing `pending/running/completed/failed` states do not capture intermediate conditions like "waiting for dependencies" or "ready but not scheduled"
### 1.2 Solution
Introduce a **State Machine** with **Alert Mechanism**:
- **8 explicit states** for each processor job: `Idle → Waiting → Ready → Pending → Running → Completed/Failed/Skipped`
- **Dependency checking**: `check_dependencies()` validates prerequisites before execution
- **Alert emission**: Emit alerts to Redis pub/sub and PostgreSQL for monitoring and debugging
### 1.3 Scope
This design **complements** the existing polling mechanism:
| Component | Responsibility |
|-----------|---------------|
| **State Machine** | Fine-grained processor status management (Idle → Running → Completed) |
| **Polling** | Coarse-grained ingestion verification (Rule 1 chunks exist? Vectorize done? TKG nodes exist?) |
**Non-Goals**:
- Does NOT replace polling for post-processing steps (入庫)
- Does NOT auto-retry failed processors (future evolution)
- Does NOT manage distributed state across workers
---
## 2. State Definitions
### 2.1 Eight States
| State | Semantics | Trigger | Next States |
|-------|-----------|---------|--------------|
| `Idle` | Initial state, no work assigned | Job created | `Waiting` |
| `Waiting` | Dependencies not met, awaiting prerequisites | Dependency check fails | `Ready`, `Failed` |
| `Ready` | Dependencies met, awaiting execution | Dependency check passes | `Pending` |
| `Pending` | Queued for execution, waiting for worker | Scheduler accepts | `Running` |
| `Running` | Currently processing | Worker starts | `Completed`, `Failed`, `Skipped` |
| `Completed` | Success, output valid | Output validated | - (terminal) |
| `Failed` | Error occurred, unrecoverable | Exception or timeout | - (terminal) |
| `Skipped` | Conditional skip (optional processor) | Unmet optional conditions | - (terminal) |
### 2.2 State Transition Examples
**Example 1: ASR depends on Cut**
```
ASR: Idle → Waiting (Cut not completed)
Cut: Running → Completed
ASR: Waiting → Ready (Cut completed) → Pending → Running → Completed
```
**Example 2: Story depends on multiple processors**
```
Story: Idle → Waiting (ASRX not completed)
ASRX: Running → Completed
Story: Waiting → Waiting (Cut not completed)
Cut: Running → Completed
Story: Waiting → Waiting (YOLO not completed)
YOLO: Running → Completed
Story: Waiting → Waiting (Face not completed)
Face: Running → Completed
Story: Waiting → Ready (all dependencies met) → Pending → Running → Completed
```
**Example 3: Optional processor skipped**
```
Pose: Idle → Ready → Pending → Running
Pose: Running → Skipped (no pose detected, optional processing)
```
---
## 3. State Transitions
### 3.1 Transition Diagram
```mermaid
stateDiagram-v2
[*] --> Idle: Job created
Idle --> Waiting: Initialize
Waiting --> Ready: Dependencies met
Waiting --> Failed: Timeout
Ready --> Pending: Scheduled
Pending --> Running: Worker pickup
Running --> Completed: Success
Running --> Failed: Error
Running --> Skipped: Conditional skip
Completed --> [*]
Failed --> [*]
Skipped --> [*]
```
### 3.2 Transition Rules
| From State | To State | Condition | Action |
|------------|-----------|-----------|--------|
| `Idle` | `Waiting` | Always (initial transition) | - |
| `Waiting` | `Ready` | `check_dependencies() == Ok` | - |
| `Waiting` | `Failed` | Timeout (default 7200s) | Emit `timeout` alert |
| `Ready` | `Pending` | Resource available | - |
| `Pending` | `Running` | Worker starts | - |
| `Running` | `Completed` | Output valid | - |
| `Running` | `Failed` | Exception or output invalid | Emit `output_invalid` alert |
| `Running` | `Skipped` | Optional processor, conditions not met | - |
### 3.3 Edge Cases
| Scenario | Detection | Resolution |
|----------|-----------|------------|
| **Circular dependencies** | `check_dependencies()` detects cycle | Mark as `Failed`, emit `dependency_not_met` alert |
| **Resource exhaustion** | GPU/CPU unavailable | Stay in `Waiting`, emit `resource_exhausted` alert |
| **Partial output** | Output validation fails | Mark as `Failed`, emit `output_invalid` alert |
| **Transient failure** | Network/API timeout | Stay in `Waiting`, retry after delay |
---
## 4. Alert Mechanism
### 4.1 Alert Types
| Type | Trigger | Severity | Action |
|------|---------|----------|--------|
| `dependency_not_met` | `check_dependencies()` fails | Warning | Retry after delay |
| `resource_exhausted` | GPU/CPU unavailable | Warning | Wait + retry |
| `output_invalid` | Validation fails | Error | Mark `Failed` |
| `timeout` | Exceeds `MOMENTRY_*_TIMEOUT` | Error | Mark `Failed` |
### 4.2 Alert Flow
```mermaid
sequenceDiagram
participant Worker as job_worker.rs
participant Checker as check_dependencies()
participant Redis as Redis Pub/Sub
participant PostgreSQL as processor_alerts table
Worker->>Checker: check_dependencies(processor, file_uuid)
alt Dependencies not met
Checker-->>Worker: ConditionResult::NotMet(reason)
Worker->>Redis: emit_processor_alert(file_uuid, processor, "dependency_not_met", reason)
Redis-->>PostgreSQL: INSERT INTO processor_alerts
Worker->>Worker: update_status(file_uuid, processor, Waiting)
else Resource exhausted
Checker-->>Worker: ConditionResult::ResourceExhausted
Worker->>Redis: emit_processor_alert(file_uuid, processor, "resource_exhausted", "GPU unavailable")
Redis-->>PostgreSQL: INSERT INTO processor_alerts
Worker->>Worker: update_status(file_uuid, processor, Waiting)
else Output invalid
Checker-->>Worker: ConditionResult::OutputInvalid(reason)
Worker->>Redis: emit_processor_alert(file_uuid, processor, "output_invalid", reason)
Redis-->>PostgreSQL: INSERT INTO processor_alerts
Worker->>Worker: update_status(file_uuid, processor, Failed)
else OK
Checker-->>Worker: ConditionResult::Ok
Worker->>Worker: update_status(file_uuid, processor, Running)
end
```
### 4.3 Redis Channel
- **Channel**: `momentry:processor:alerts`
- **Message Format**:
```json
{
"file_uuid": "bd80fec9c42afb0307eb28f22c64c76a",
"processor": "ASR",
"alert_type": "dependency_not_met",
"message": "Cut not completed",
"timestamp": "2026-05-30T10:15:30Z"
}
```
- **Consumers**: None (current implementation logs only, future: monitoring service)
### 4.4 PostgreSQL Table
**Table**: `processor_alerts` (defined in `migrations/034_processor_state_machine.sql`)
```sql
CREATE TABLE IF NOT EXISTS processor_alerts (
id SERIAL PRIMARY KEY,
file_uuid VARCHAR(32),
processor_type VARCHAR(32) NOT NULL,
alert_type VARCHAR(32) NOT NULL,
message TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE INDEX idx_alerts_file_uuid ON processor_alerts(file_uuid);
CREATE INDEX idx_alerts_processor_type ON processor_alerts(processor_type);
CREATE INDEX idx_alerts_alert_type ON processor_alerts(alert_type);
CREATE INDEX idx_alerts_created_at ON processor_alerts(created_at);
```
**Retention Policy**: 30 days (TBD, future: implement cleanup job)
---
## 5. Dependency Checking
### 5.1 ConditionResult Enum
Defined in `src/worker/job_worker.rs`:
```rust
pub enum ConditionResult {
Ok, // All dependencies met
NotMet(String), // Missing dependency (reason)
ResourceExhausted, // GPU/CPU unavailable
OutputInvalid(String), // Validation failed (reason)
}
```
### 5.2 check_dependencies() Logic
Defined in `src/worker/job_worker.rs`:
```rust
pub async fn check_dependencies(
processor: ProcessorType,
file_uuid: &str,
db: &PostgresDb,
) -> Result<ConditionResult> {
match processor {
ProcessorType::ASR => {
// Check if Cut is completed
if !db.is_processor_completed(file_uuid, ProcessorType::Cut).await? {
return Ok(ConditionResult::NotMet("Cut not completed".into()));
}
}
ProcessorType::ASRX => {
// Check if ASR is completed
if !db.is_processor_completed(file_uuid, ProcessorType::ASR).await? {
return Ok(ConditionResult::NotMet("ASR not completed".into()));
}
}
ProcessorType::Story => {
// Check if ASRX + Cut + YOLO + Face are completed
let deps = [
ProcessorType::ASRX,
ProcessorType::Cut,
ProcessorType::YOLO,
ProcessorType::Face,
];
for dep in deps {
if !db.is_processor_completed(file_uuid, dep).await? {
return Ok(ConditionResult::NotMet(format!("{:?} not completed", dep)));
}
}
}
ProcessorType::_5W1H => {
// Check if Story is completed
if !db.is_processor_completed(file_uuid, ProcessorType::Story).await? {
return Ok(ConditionResult::NotMet("Story not completed".into()));
}
}
// Other processors have no dependencies
_ => {}
}
Ok(ConditionResult::Ok)
}
```
### 5.3 Integration with job_worker.rs
```rust
// In execute_processor()
let condition = check_dependencies(processor, file_uuid, &db).await?;
match condition {
ConditionResult::Ok => {
// Proceed to Running state
self.update_status(file_uuid, processor, ProcessorJobStatus::Running).await?;
// Execute processor...
}
ConditionResult::NotMet(reason) => {
// Emit alert and mark as Waiting
emit_processor_alert(file_uuid, processor, "dependency_not_met", &reason).await?;
self.update_status(file_uuid, processor, ProcessorJobStatus::Waiting).await?;
}
ConditionResult::ResourceExhausted => {
// Emit alert and mark as Waiting
emit_processor_alert(file_uuid, processor, "resource_exhausted", "GPU unavailable").await?;
self.update_status(file_uuid, processor, ProcessorJobStatus::Waiting).await?;
}
ConditionResult::OutputInvalid(reason) => {
// Emit alert and mark as Failed
emit_processor_alert(file_uuid, processor, "output_invalid", &reason).await?;
self.update_status(file_uuid, processor, ProcessorJobStatus::Failed).await?;
}
}
```
---
## 6. Integration Points
### 6.1 With TKG Builder
- **TKG Builder** is NOT a processor, it's a **post-processing step** (入庫 step 8)
- Triggers after Face Trace is completed
- **State Machine does NOT manage TKG Builder state**
- TKG Builder has its own verification mechanism in polling
### 6.2 With Face Trace
- **Face Trace** is NOT a processor, it's a **post-processing step** (入庫 step 5)
- Triggers after all 10 processors are completed
- **State Machine does NOT manage Face Trace state**
- Face Trace has its own verification mechanism in polling
### 6.3 With 入庫 Flow
| Component | Manages | Scope |
|-----------|---------|-------|
| **State Machine** | Processor states | `Idle → Waiting → Ready → Pending → Running → Completed/Failed/Skipped` |
| **Polling** | Post-processing verification | Rule 1 chunks, Vectorize, TKG nodes, Face Trace, etc. |
**Key Insight**: Two mechanisms are **independent but complementary**:
1. **State Machine**: Granular processor status, handles dependencies
2. **Polling**: Coarse-grained ingestion verification, handles post-processing
### 6.4 Example Flow
```
=== Processor State Machine (per processor) ===
Cut: Idle → Waiting → Ready → Pending → Running → Completed ✓
ASR: Idle → Waiting (Cut not done) → Waiting → Ready → Pending → Running → Completed ✓
YOLO: Idle → Ready → Pending → Running → Completed ✓
Face: Idle → Ready → Pending → Running → Completed ✓
Story: Idle → Waiting (ASRX not done) → Waiting → Ready → Pending → Running → Completed ✓
=== 入庫 Polling (every 3s) ===
[00:00] Check: Rule 1 chunks exist? → No (ASR not done)
[00:03] Check: Rule 1 chunks exist? → Yes ✓
Check: Vectorize done? → Yes ✓
Check: TKG nodes exist? → No (Face Trace not done)
[00:06] Check: TKG nodes exist? → Yes ✓
Check: All 17 steps verified ✓
Mark job as completed
```
---
## 7. Implementation Checklist
### 7.1 Completed ✅
- [x] Migration 034: `processor_alerts` table
- [x] Enum: `ProcessorJobStatus` (8 states) - `postgres_db.rs:585-594`
- [x] Function: `emit_processor_alert()` - `redis_client.rs`
- [x] Function: `check_dependencies()` - `job_worker.rs`
- [x] Enum: `ConditionResult` - `job_worker.rs`
### 7.2 Pending 🔄
- [ ] Tests: State transitions (unit tests)
- [ ] Tests: Alert emission (integration tests)
- [ ] Tests: Dependency checking (unit tests)
- [ ] Monitoring: Alert dashboard (TBD)
- [ ] Retention: `processor_alerts` cleanup job (TBD)
---
## 8. Performance Considerations
### 8.1 Alert Emission
- **Non-blocking**: Redis pub/sub is fire-and-forget
- **Low latency**: < 1ms per alert
- **No retry**: If Redis is down, alert is lost (acceptable for debugging)
### 8.2 Dependency Checking
- **Synchronous DB queries**: `is_processor_completed()` queries PostgreSQL
- **Cacheable**: Results can be cached for 1-3 seconds (TTL based on processor duration)
- **Index usage**: Queries use `idx_processor_jobs_file_uuid_processor_type` index
### 8.3 State Updates
- **Single-row UPDATE**: `UPDATE processor_jobs SET status = $1 WHERE file_uuid = $2 AND processor_type = $3`
- **Index usage**: Uses `idx_processor_jobs_file_uuid_processor_type` index
- **Low contention**: Each processor has its own row
---
## 9. Future Evolution
### 9.1 Phase 1 (Current)
- Alert emission + PostgreSQL logging
- Manual monitoring via `processor_alerts` table
- No auto-retry
### 9.2 Phase 2 (Near-term)
- Alert consumer service (subscribes to Redis channel)
- Auto-retry for `dependency_not_met` and `resource_exhausted` alerts
- Exponential backoff for retries
### 9.3 Phase 3 (Medium-term)
- Event-driven pipeline (replace polling with Redis Streams)
- Real-time status updates via WebSocket
- Distributed state management (Redis-based)
### 9.4 Phase 4 (Long-term)
- DAG-based scheduling (Airflow/Temporal)
- Cross-worker coordination
- Priority-based resource allocation
---
## 10. Glossary
| Term | Definition |
|------|-----------|
| **State Machine** | Finite state automaton managing processor lifecycle (8 states) |
| **Alert** | Asynchronous notification of state machine events (4 types) |
| **Dependency** | Prerequisite processor that must complete before execution |
| **Polling** | Periodic verification of post-processing steps (every 3s) |
| **入庫** | Post-processing steps after 10 processors complete (17 steps) |
| **file_uuid** | Unique identifier for a video file (32-char hex string) |
| **Processor** | One of 10 processing stages (Cut, ASR, ASRX, YOLO, OCR, Face, Pose, VisualChunk, Story, 5W1H) |
| **Post-processing** | Steps that run after processors (Rule 1, Vectorize, TKG, Face Trace, etc.) |
---
## 11. References
- [Pipeline Module](../API_WORKSPACE/modules/10_pipeline.md) - Pipeline overview and 入庫 steps
- [TKG Query API V1.0](TKG_QUERY_API_V1.0.md) - TKG integration details
- [Processor Refactoring Assessment](Processor_Refactoring_Assessment.md) - Processor refactoring plans
- `migrations/034_processor_state_machine.sql` - Database schema
- `src/core/db/postgres_db.rs` - ProcessorJobStatus enum
- `src/core/db/redis_client.rs` - emit_processor_alert() function
- `src/worker/job_worker.rs` - ConditionResult enum and check_dependencies()
---
## Version History
| Version | Date | Author | Changes |
|---------|------|--------|---------|
| 1.0 | 2026-05-30 | M5Max128 | Initial design document |