From 08167d73b26412cd00c6684422d3b09dc897e489 Mon Sep 17 00:00:00 2001 From: Accusys Date: Sat, 30 May 2026 10:03:48 +0800 Subject: [PATCH] docs: add Processor State Machine V1.0 design --- .../DESIGN/Processor_State_Machine_V1.0.md | 484 ++++++++++++++++++ 1 file changed, 484 insertions(+) create mode 100644 docs_v1.0/DESIGN/Processor_State_Machine_V1.0.md diff --git a/docs_v1.0/DESIGN/Processor_State_Machine_V1.0.md b/docs_v1.0/DESIGN/Processor_State_Machine_V1.0.md new file mode 100644 index 0000000..3e7514d --- /dev/null +++ b/docs_v1.0/DESIGN/Processor_State_Machine_V1.0.md @@ -0,0 +1,484 @@ +--- +title: Processor State Machine V1.0 +version: 1.0 +date: 2026-05-30 +author: M5Max128 +status: draft +--- + +# Processor State Machine V1.0 + +## Overview + +| Attribute | Value | +|-----------|-------| +| Scope | Backend, Worker, Pipeline | +| Status | Draft | +| Applicable To | M5Max128, M5Max48 | +| Dependencies | migrations/034, job_worker.rs, redis_client.rs | +| Related Docs | [Pipeline Module](../API_WORKSPACE/modules/10_pipeline.md), [TKG Query API](TKG_QUERY_API_V1.0.md) | + +--- + +## 1. Design Goals + +### 1.1 Problem Statement + +The Momentry Core pipeline lacks unified state management for processors: + +- **Opaque dependency chains**: Processors depend on each other (ASR → Cut, ASRX → ASR, Story → ASRX + Cut + YOLO + Face), but failures or delays are not explicitly tracked +- **No alert mechanism**: When dependencies are not met or resources are exhausted, there is no systematic way to notify operators or trigger retries +- **Coarse-grained status**: Existing `pending/running/completed/failed` states do not capture intermediate conditions like "waiting for dependencies" or "ready but not scheduled" + +### 1.2 Solution + +Introduce a **State Machine** with **Alert Mechanism**: + +- **8 explicit states** for each processor job: `Idle → Waiting → Ready → Pending → Running → Completed/Failed/Skipped` +- **Dependency checking**: `check_dependencies()` validates prerequisites before execution +- **Alert emission**: Emit alerts to Redis pub/sub and PostgreSQL for monitoring and debugging + +### 1.3 Scope + +This design **complements** the existing polling mechanism: + +| Component | Responsibility | +|-----------|---------------| +| **State Machine** | Fine-grained processor status management (Idle → Running → Completed) | +| **Polling** | Coarse-grained ingestion verification (Rule 1 chunks exist? Vectorize done? TKG nodes exist?) | + +**Non-Goals**: + +- Does NOT replace polling for post-processing steps (入庫) +- Does NOT auto-retry failed processors (future evolution) +- Does NOT manage distributed state across workers + +--- + +## 2. State Definitions + +### 2.1 Eight States + +| State | Semantics | Trigger | Next States | +|-------|-----------|---------|--------------| +| `Idle` | Initial state, no work assigned | Job created | `Waiting` | +| `Waiting` | Dependencies not met, awaiting prerequisites | Dependency check fails | `Ready`, `Failed` | +| `Ready` | Dependencies met, awaiting execution | Dependency check passes | `Pending` | +| `Pending` | Queued for execution, waiting for worker | Scheduler accepts | `Running` | +| `Running` | Currently processing | Worker starts | `Completed`, `Failed`, `Skipped` | +| `Completed` | Success, output valid | Output validated | - (terminal) | +| `Failed` | Error occurred, unrecoverable | Exception or timeout | - (terminal) | +| `Skipped` | Conditional skip (optional processor) | Unmet optional conditions | - (terminal) | + +### 2.2 State Transition Examples + +**Example 1: ASR depends on Cut** + +``` +ASR: Idle → Waiting (Cut not completed) +Cut: Running → Completed +ASR: Waiting → Ready (Cut completed) → Pending → Running → Completed +``` + +**Example 2: Story depends on multiple processors** + +``` +Story: Idle → Waiting (ASRX not completed) +ASRX: Running → Completed +Story: Waiting → Waiting (Cut not completed) +Cut: Running → Completed +Story: Waiting → Waiting (YOLO not completed) +YOLO: Running → Completed +Story: Waiting → Waiting (Face not completed) +Face: Running → Completed +Story: Waiting → Ready (all dependencies met) → Pending → Running → Completed +``` + +**Example 3: Optional processor skipped** + +``` +Pose: Idle → Ready → Pending → Running +Pose: Running → Skipped (no pose detected, optional processing) +``` + +--- + +## 3. State Transitions + +### 3.1 Transition Diagram + +```mermaid +stateDiagram-v2 + [*] --> Idle: Job created + + Idle --> Waiting: Initialize + + Waiting --> Ready: Dependencies met + Waiting --> Failed: Timeout + + Ready --> Pending: Scheduled + + Pending --> Running: Worker pickup + + Running --> Completed: Success + Running --> Failed: Error + Running --> Skipped: Conditional skip + + Completed --> [*] + Failed --> [*] + Skipped --> [*] +``` + +### 3.2 Transition Rules + +| From State | To State | Condition | Action | +|------------|-----------|-----------|--------| +| `Idle` | `Waiting` | Always (initial transition) | - | +| `Waiting` | `Ready` | `check_dependencies() == Ok` | - | +| `Waiting` | `Failed` | Timeout (default 7200s) | Emit `timeout` alert | +| `Ready` | `Pending` | Resource available | - | +| `Pending` | `Running` | Worker starts | - | +| `Running` | `Completed` | Output valid | - | +| `Running` | `Failed` | Exception or output invalid | Emit `output_invalid` alert | +| `Running` | `Skipped` | Optional processor, conditions not met | - | + +### 3.3 Edge Cases + +| Scenario | Detection | Resolution | +|----------|-----------|------------| +| **Circular dependencies** | `check_dependencies()` detects cycle | Mark as `Failed`, emit `dependency_not_met` alert | +| **Resource exhaustion** | GPU/CPU unavailable | Stay in `Waiting`, emit `resource_exhausted` alert | +| **Partial output** | Output validation fails | Mark as `Failed`, emit `output_invalid` alert | +| **Transient failure** | Network/API timeout | Stay in `Waiting`, retry after delay | + +--- + +## 4. Alert Mechanism + +### 4.1 Alert Types + +| Type | Trigger | Severity | Action | +|------|---------|----------|--------| +| `dependency_not_met` | `check_dependencies()` fails | Warning | Retry after delay | +| `resource_exhausted` | GPU/CPU unavailable | Warning | Wait + retry | +| `output_invalid` | Validation fails | Error | Mark `Failed` | +| `timeout` | Exceeds `MOMENTRY_*_TIMEOUT` | Error | Mark `Failed` | + +### 4.2 Alert Flow + +```mermaid +sequenceDiagram + participant Worker as job_worker.rs + participant Checker as check_dependencies() + participant Redis as Redis Pub/Sub + participant PostgreSQL as processor_alerts table + + Worker->>Checker: check_dependencies(processor, file_uuid) + alt Dependencies not met + Checker-->>Worker: ConditionResult::NotMet(reason) + Worker->>Redis: emit_processor_alert(file_uuid, processor, "dependency_not_met", reason) + Redis-->>PostgreSQL: INSERT INTO processor_alerts + Worker->>Worker: update_status(file_uuid, processor, Waiting) + else Resource exhausted + Checker-->>Worker: ConditionResult::ResourceExhausted + Worker->>Redis: emit_processor_alert(file_uuid, processor, "resource_exhausted", "GPU unavailable") + Redis-->>PostgreSQL: INSERT INTO processor_alerts + Worker->>Worker: update_status(file_uuid, processor, Waiting) + else Output invalid + Checker-->>Worker: ConditionResult::OutputInvalid(reason) + Worker->>Redis: emit_processor_alert(file_uuid, processor, "output_invalid", reason) + Redis-->>PostgreSQL: INSERT INTO processor_alerts + Worker->>Worker: update_status(file_uuid, processor, Failed) + else OK + Checker-->>Worker: ConditionResult::Ok + Worker->>Worker: update_status(file_uuid, processor, Running) + end +``` + +### 4.3 Redis Channel + +- **Channel**: `momentry:processor:alerts` +- **Message Format**: + ```json + { + "file_uuid": "bd80fec9c42afb0307eb28f22c64c76a", + "processor": "ASR", + "alert_type": "dependency_not_met", + "message": "Cut not completed", + "timestamp": "2026-05-30T10:15:30Z" + } + ``` +- **Consumers**: None (current implementation logs only, future: monitoring service) + +### 4.4 PostgreSQL Table + +**Table**: `processor_alerts` (defined in `migrations/034_processor_state_machine.sql`) + +```sql +CREATE TABLE IF NOT EXISTS processor_alerts ( + id SERIAL PRIMARY KEY, + file_uuid VARCHAR(32), + processor_type VARCHAR(32) NOT NULL, + alert_type VARCHAR(32) NOT NULL, + message TEXT, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE INDEX idx_alerts_file_uuid ON processor_alerts(file_uuid); +CREATE INDEX idx_alerts_processor_type ON processor_alerts(processor_type); +CREATE INDEX idx_alerts_alert_type ON processor_alerts(alert_type); +CREATE INDEX idx_alerts_created_at ON processor_alerts(created_at); +``` + +**Retention Policy**: 30 days (TBD, future: implement cleanup job) + +--- + +## 5. Dependency Checking + +### 5.1 ConditionResult Enum + +Defined in `src/worker/job_worker.rs`: + +```rust +pub enum ConditionResult { + Ok, // All dependencies met + NotMet(String), // Missing dependency (reason) + ResourceExhausted, // GPU/CPU unavailable + OutputInvalid(String), // Validation failed (reason) +} +``` + +### 5.2 check_dependencies() Logic + +Defined in `src/worker/job_worker.rs`: + +```rust +pub async fn check_dependencies( + processor: ProcessorType, + file_uuid: &str, + db: &PostgresDb, +) -> Result { + match processor { + ProcessorType::ASR => { + // Check if Cut is completed + if !db.is_processor_completed(file_uuid, ProcessorType::Cut).await? { + return Ok(ConditionResult::NotMet("Cut not completed".into())); + } + } + ProcessorType::ASRX => { + // Check if ASR is completed + if !db.is_processor_completed(file_uuid, ProcessorType::ASR).await? { + return Ok(ConditionResult::NotMet("ASR not completed".into())); + } + } + ProcessorType::Story => { + // Check if ASRX + Cut + YOLO + Face are completed + let deps = [ + ProcessorType::ASRX, + ProcessorType::Cut, + ProcessorType::YOLO, + ProcessorType::Face, + ]; + for dep in deps { + if !db.is_processor_completed(file_uuid, dep).await? { + return Ok(ConditionResult::NotMet(format!("{:?} not completed", dep))); + } + } + } + ProcessorType::_5W1H => { + // Check if Story is completed + if !db.is_processor_completed(file_uuid, ProcessorType::Story).await? { + return Ok(ConditionResult::NotMet("Story not completed".into())); + } + } + // Other processors have no dependencies + _ => {} + } + Ok(ConditionResult::Ok) +} +``` + +### 5.3 Integration with job_worker.rs + +```rust +// In execute_processor() +let condition = check_dependencies(processor, file_uuid, &db).await?; +match condition { + ConditionResult::Ok => { + // Proceed to Running state + self.update_status(file_uuid, processor, ProcessorJobStatus::Running).await?; + // Execute processor... + } + ConditionResult::NotMet(reason) => { + // Emit alert and mark as Waiting + emit_processor_alert(file_uuid, processor, "dependency_not_met", &reason).await?; + self.update_status(file_uuid, processor, ProcessorJobStatus::Waiting).await?; + } + ConditionResult::ResourceExhausted => { + // Emit alert and mark as Waiting + emit_processor_alert(file_uuid, processor, "resource_exhausted", "GPU unavailable").await?; + self.update_status(file_uuid, processor, ProcessorJobStatus::Waiting).await?; + } + ConditionResult::OutputInvalid(reason) => { + // Emit alert and mark as Failed + emit_processor_alert(file_uuid, processor, "output_invalid", &reason).await?; + self.update_status(file_uuid, processor, ProcessorJobStatus::Failed).await?; + } +} +``` + +--- + +## 6. Integration Points + +### 6.1 With TKG Builder + +- **TKG Builder** is NOT a processor, it's a **post-processing step** (入庫 step 8) +- Triggers after Face Trace is completed +- **State Machine does NOT manage TKG Builder state** +- TKG Builder has its own verification mechanism in polling + +### 6.2 With Face Trace + +- **Face Trace** is NOT a processor, it's a **post-processing step** (入庫 step 5) +- Triggers after all 10 processors are completed +- **State Machine does NOT manage Face Trace state** +- Face Trace has its own verification mechanism in polling + +### 6.3 With 入庫 Flow + +| Component | Manages | Scope | +|-----------|---------|-------| +| **State Machine** | Processor states | `Idle → Waiting → Ready → Pending → Running → Completed/Failed/Skipped` | +| **Polling** | Post-processing verification | Rule 1 chunks, Vectorize, TKG nodes, Face Trace, etc. | + +**Key Insight**: Two mechanisms are **independent but complementary**: + +1. **State Machine**: Granular processor status, handles dependencies +2. **Polling**: Coarse-grained ingestion verification, handles post-processing + +### 6.4 Example Flow + +``` +=== Processor State Machine (per processor) === +Cut: Idle → Waiting → Ready → Pending → Running → Completed ✓ +ASR: Idle → Waiting (Cut not done) → Waiting → Ready → Pending → Running → Completed ✓ +YOLO: Idle → Ready → Pending → Running → Completed ✓ +Face: Idle → Ready → Pending → Running → Completed ✓ +Story: Idle → Waiting (ASRX not done) → Waiting → Ready → Pending → Running → Completed ✓ + +=== 入庫 Polling (every 3s) === +[00:00] Check: Rule 1 chunks exist? → No (ASR not done) +[00:03] Check: Rule 1 chunks exist? → Yes ✓ + Check: Vectorize done? → Yes ✓ + Check: TKG nodes exist? → No (Face Trace not done) +[00:06] Check: TKG nodes exist? → Yes ✓ + Check: All 17 steps verified ✓ + Mark job as completed +``` + +--- + +## 7. Implementation Checklist + +### 7.1 Completed ✅ + +- [x] Migration 034: `processor_alerts` table +- [x] Enum: `ProcessorJobStatus` (8 states) - `postgres_db.rs:585-594` +- [x] Function: `emit_processor_alert()` - `redis_client.rs` +- [x] Function: `check_dependencies()` - `job_worker.rs` +- [x] Enum: `ConditionResult` - `job_worker.rs` + +### 7.2 Pending 🔄 + +- [ ] Tests: State transitions (unit tests) +- [ ] Tests: Alert emission (integration tests) +- [ ] Tests: Dependency checking (unit tests) +- [ ] Monitoring: Alert dashboard (TBD) +- [ ] Retention: `processor_alerts` cleanup job (TBD) + +--- + +## 8. Performance Considerations + +### 8.1 Alert Emission + +- **Non-blocking**: Redis pub/sub is fire-and-forget +- **Low latency**: < 1ms per alert +- **No retry**: If Redis is down, alert is lost (acceptable for debugging) + +### 8.2 Dependency Checking + +- **Synchronous DB queries**: `is_processor_completed()` queries PostgreSQL +- **Cacheable**: Results can be cached for 1-3 seconds (TTL based on processor duration) +- **Index usage**: Queries use `idx_processor_jobs_file_uuid_processor_type` index + +### 8.3 State Updates + +- **Single-row UPDATE**: `UPDATE processor_jobs SET status = $1 WHERE file_uuid = $2 AND processor_type = $3` +- **Index usage**: Uses `idx_processor_jobs_file_uuid_processor_type` index +- **Low contention**: Each processor has its own row + +--- + +## 9. Future Evolution + +### 9.1 Phase 1 (Current) + +- Alert emission + PostgreSQL logging +- Manual monitoring via `processor_alerts` table +- No auto-retry + +### 9.2 Phase 2 (Near-term) + +- Alert consumer service (subscribes to Redis channel) +- Auto-retry for `dependency_not_met` and `resource_exhausted` alerts +- Exponential backoff for retries + +### 9.3 Phase 3 (Medium-term) + +- Event-driven pipeline (replace polling with Redis Streams) +- Real-time status updates via WebSocket +- Distributed state management (Redis-based) + +### 9.4 Phase 4 (Long-term) + +- DAG-based scheduling (Airflow/Temporal) +- Cross-worker coordination +- Priority-based resource allocation + +--- + +## 10. Glossary + +| Term | Definition | +|------|-----------| +| **State Machine** | Finite state automaton managing processor lifecycle (8 states) | +| **Alert** | Asynchronous notification of state machine events (4 types) | +| **Dependency** | Prerequisite processor that must complete before execution | +| **Polling** | Periodic verification of post-processing steps (every 3s) | +| **入庫** | Post-processing steps after 10 processors complete (17 steps) | +| **file_uuid** | Unique identifier for a video file (32-char hex string) | +| **Processor** | One of 10 processing stages (Cut, ASR, ASRX, YOLO, OCR, Face, Pose, VisualChunk, Story, 5W1H) | +| **Post-processing** | Steps that run after processors (Rule 1, Vectorize, TKG, Face Trace, etc.) | + +--- + +## 11. References + +- [Pipeline Module](../API_WORKSPACE/modules/10_pipeline.md) - Pipeline overview and 入庫 steps +- [TKG Query API V1.0](TKG_QUERY_API_V1.0.md) - TKG integration details +- [Processor Refactoring Assessment](Processor_Refactoring_Assessment.md) - Processor refactoring plans +- `migrations/034_processor_state_machine.sql` - Database schema +- `src/core/db/postgres_db.rs` - ProcessorJobStatus enum +- `src/core/db/redis_client.rs` - emit_processor_alert() function +- `src/worker/job_worker.rs` - ConditionResult enum and check_dependencies() + +--- + +## Version History + +| Version | Date | Author | Changes | +|---------|------|--------|---------| +| 1.0 | 2026-05-30 | M5Max128 | Initial design document | \ No newline at end of file