diff --git a/docs_v1.0/API_WORKSPACE/modules/02_health.md b/docs_v1.0/API_WORKSPACE/modules/02_health.md new file mode 100644 index 0000000..fa77710 --- /dev/null +++ b/docs_v1.0/API_WORKSPACE/modules/02_health.md @@ -0,0 +1,187 @@ + + + + +## Health Check + +### `GET /health` + +**Auth**: Public +**Scope**: system-level + +Returns basic server health status — used by load balancers and monitoring. + +#### Example + +```bash +curl "$API/health" | jq '{status, version}' +``` + +#### Response (200) + +```json +{ + "status": "ok", + "version": "1.0.0", + "build_git_hash": "3a6c1865", + "build_timestamp": "2026-05-16T13:38:15Z", + "uptime_ms": 3015 +} +``` + +| Field | Type | Description | +|-------|------|-------------| +| `status` | string | `ok` or `degraded` | +| `version` | string | Semver version | +| `build_git_hash` | string | Git commit hash | +| `build_timestamp` | string | Binary build time | +| `uptime_ms` | integer | Milliseconds since server start | + +--- + +### `GET /health/detailed` + +**Auth**: Required +**Scope**: system-level + +Returns full system health including each service status, resource utilization, pipeline readiness, schema migration status, identity file sync status, and external integrations. + +> Requires authentication (JWT, session cookie, or API key). The basic `/health` endpoint remains public for load balancer checks. + +#### Example + +```bash +curl "$API/health/detailed" | jq '{status, services, resources: {cpu: .resources.cpu_used_percent, memory: .resources.memory_used_percent}}' +``` + +#### Response (200) + +```json +{ + "status": "ok", + "version": "1.0.0", + "services": { + "postgres": {"status": "ok", "latency_ms": 3}, + "redis": {"status": "ok", "latency_ms": 1}, + "qdrant": {"status": "ok", "latency_ms": 5} + }, + "resources": { + "cpu_used_percent": 12.5, + "memory_available_mb": 32768, + "memory_used_percent": 31.7 + }, + "pipeline": { + "scripts_ready": true, + "scripts_count": 345, + "processors": { + "asr": true, + "yolo": true, + "face": true, + "pose": true, + "ocr": true, + "cut": true, + "scene": true, + "asrx": true, + "visual_chunk": true + }, + "models_ready": true, + "models_count": 42, + "scripts_integrity": {"matched": 332, "total": 345, "ok": false}, + "ffmpeg": true + }, + "schema": { + "table_exists": true, + "applied": [{"filename": "migrate_add_users_table.sql"}], + "required": [], + "ok": true + }, + "identities": { + "directory_exists": true, + "files_count": 3481, + "index_ok": true, + "db_count": 3481, + "synced": true + }, + "integrations": { + "tmdb": { + "api_key_configured": false, + "enabled": false, + "api_reachable": null + } + } +} +``` + +#### Response Fields + +| Field | Type | Description | +|-------|------|-------------| +| `status` | string | `ok` if all essential services healthy | +| `services` | object | Per-service status (postgres, redis, qdrant) | +| `services.*.status` | string | `ok`, `error`, or `degraded` | +| `services.*.latency_ms` | int | Response time in milliseconds | +| `resources` | object | CPU, memory usage | +| `pipeline.scripts_ready` | boolean | Scripts directory accessible | +| `pipeline.scripts_count` | int | Number of Python processor scripts | +| `pipeline.processors` | object | Per-processor availability | +| `pipeline.models_ready` | boolean | Models directory accessible | +| `pipeline.scripts_integrity` | object | SHA256 checksum verification results | +| `schema.ok` | boolean | All required migrations applied | +| `identities.synced` | boolean | Identity file count matches DB count | +| `config` | object | Runtime toggle states (cache, auto-pipeline, watcher) | +| `integrations.tmdb` | object | TMDB API key config and reachability | + +### `GET /health/consistency` + +**Auth**: Required +**Scope**: system-level + +Scans the database for data consistency issues. Reports anomalies without modifying any data. + +#### Example + +```bash +curl -s "$API/health/consistency" -H "X-API-Key: $KEY" | jq '.checks[] | {check, severity, count}' +``` + +#### Response (200) + +```json +{ + "status": "degraded", + "checked_at": "2026-05-18T17:30:00Z", + "checks": [ + { + "check": "stale_processing", + "severity": "warn", + "count": 3, + "files": [ + {"file_name": "video.mp4", "file_uuid": "abc123...", "status": "processing", "detail": "job_id is null"} + ] + } + ] +} +``` + +| Check | Description | Severity | +|-------|-------------|---------| +| `stale_processing` | Status=processing but job_id is null | `warn` | +| `orphaned_processing` | Status=processing but no active monitor_job | `warn` | +| `processing_job_done` | Status=processing but job already completed | `warn` | +| `unregistered_with_uuid` | Status=unregistered but row still in DB (migration residue) | `info` | + +#### Health status rules + +| Condition | status | +|-----------|--------| +| All services ok | `ok` | +| Any service error | `degraded` | +| Postgres or Redis error | `degraded` (server still responds) | + +--- + +### Stats Endpoints + +| Method | Endpoint | Auth | Description | +|--------|----------|------|-------------| +| GET | `/api/v1/stats/sftpgo` | No | SFTPGo service status | diff --git a/src/api/server.rs b/src/api/server.rs index d87c18a..33688e2 100644 --- a/src/api/server.rs +++ b/src/api/server.rs @@ -865,6 +865,19 @@ async fn health_detailed(State(state): State) -> Json, +) -> Result, (StatusCode, String)> { + let report = crate::core::health_agent::run_consistency_checks(&state.db).await; + if report.checks.iter().any(|c| c.count > 0) { + tracing::warn!( + "[HEALTH] Consistency issues found: {}", + report.checks.iter().filter(|c| c.count > 0).map(|c| format!("{}={}", c.check, c.count)).collect::>().join(", ") + ); + } + Ok(Json(report)) +} + async fn login( State(state): State, Json(req): Json, @@ -3761,6 +3774,7 @@ pub async fn start_server(host: &str, port: u16) -> anyhow::Result<()> { .merge(search_routes()) // Smart search drill-down .merge(universal_search_routes()) // Universal / frames / persons search .route("/health/detailed", get(health_detailed)) + .route("/health/consistency", get(health_consistency)) .layer(axum::middleware::from_fn_with_state( state.api_state.clone(), unified_auth, diff --git a/src/core/health_agent.rs b/src/core/health_agent.rs new file mode 100644 index 0000000..e578225 --- /dev/null +++ b/src/core/health_agent.rs @@ -0,0 +1,153 @@ +use serde::Serialize; + +use crate::core::db::{schema, PostgresDb}; + +#[derive(Debug, Serialize)] +pub struct ConsistencyFile { + pub file_name: String, + pub file_uuid: String, + pub status: String, + pub detail: String, +} + +#[derive(Debug, Serialize)] +pub struct ConsistencyCheck { + pub check: String, + pub severity: String, + pub count: usize, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub files: Vec, +} + +#[derive(Debug, Serialize)] +pub struct ConsistencyReport { + pub status: String, + pub checked_at: String, + pub checks: Vec, +} + +pub async fn run_consistency_checks(db: &PostgresDb) -> ConsistencyReport { + let checked_at = chrono::Utc::now().to_rfc3339(); + let mut checks = Vec::new(); + let mut any_issue = false; + + // Check 1: stale_processing — status=processing but job_id is null + let c1 = check_stale_processing(db).await; + if c1.count > 0 { any_issue = true; } + checks.push(c1); + + // Check 2: orphaned_processing — status=processing but no active monitor_job + let c2 = check_orphaned_processing(db).await; + if c2.count > 0 { any_issue = true; } + checks.push(c2); + + // Check 3: unregistered_with_uuid — DB rows left behind by migration + let c3 = check_unregistered_with_uuid(db).await; + if c3.count > 0 { any_issue = true; } + checks.push(c3); + + // Check 4: processing_job_done — status=processing but job already completed + let c4 = check_processing_job_done(db).await; + if c4.count > 0 { any_issue = true; } + checks.push(c4); + + ConsistencyReport { + status: if any_issue { "degraded".to_string() } else { "ok".to_string() }, + checked_at, + checks, + } +} + +async fn check_stale_processing(db: &PostgresDb) -> ConsistencyCheck { + let vt = schema::table_name("videos"); + let rows: Vec<(String, String, String)> = sqlx::query_as::<_, (String, String, String)>(&format!( + "SELECT file_uuid, file_name, status FROM {} WHERE status = 'processing' AND job_id IS NULL", + vt + )) + .fetch_all(db.pool()) + .await + .unwrap_or_default(); + + let files: Vec = rows.into_iter().map(|(file_uuid, file_name, status): (String, String, String)| ConsistencyFile { + file_uuid, file_name, status, detail: "job_id is null".to_string(), + }).collect(); + + ConsistencyCheck { + check: "stale_processing".to_string(), + severity: "warn".to_string(), + count: files.len(), + files, + } +} + +async fn check_orphaned_processing(db: &PostgresDb) -> ConsistencyCheck { + let vt = schema::table_name("videos"); + let mj = schema::table_name("monitor_jobs"); + let rows: Vec<(String, String, String)> = sqlx::query_as::<_, (String, String, String)>(&format!( + "SELECT v.file_uuid, v.file_name, v.status \ + FROM {} v LEFT JOIN {} m ON v.file_uuid = m.uuid AND m.status IN ('pending','running') \ + WHERE v.status = 'processing' AND m.id IS NULL", + vt, mj + )) + .fetch_all(db.pool()) + .await + .unwrap_or_default(); + + let files: Vec = rows.into_iter().map(|(file_uuid, file_name, status): (String, String, String)| ConsistencyFile { + file_uuid, file_name, status, detail: "no active monitor_job".to_string(), + }).collect(); + + ConsistencyCheck { + check: "orphaned_processing".to_string(), + severity: "warn".to_string(), + count: files.len(), + files, + } +} + +async fn check_unregistered_with_uuid(db: &PostgresDb) -> ConsistencyCheck { + let vt = schema::table_name("videos"); + let rows: Vec<(String, String, String)> = sqlx::query_as::<_, (String, String, String)>(&format!( + "SELECT file_uuid, file_name, status FROM {} WHERE status = 'unregistered'", + vt + )) + .fetch_all(db.pool()) + .await + .unwrap_or_default(); + + let files: Vec = rows.into_iter().map(|(file_uuid, file_name, status): (String, String, String)| ConsistencyFile { + file_uuid, file_name, status, detail: "migration residue".to_string(), + }).collect(); + + ConsistencyCheck { + check: "unregistered_with_uuid".to_string(), + severity: "info".to_string(), + count: files.len(), + files, + } +} + +async fn check_processing_job_done(db: &PostgresDb) -> ConsistencyCheck { + let vt = schema::table_name("videos"); + let mj = schema::table_name("monitor_jobs"); + let rows: Vec<(String, String, String)> = sqlx::query_as::<_, (String, String, String)>(&format!( + "SELECT v.file_uuid, v.file_name, v.status \ + FROM {} v JOIN {} m ON v.file_uuid = m.uuid \ + WHERE v.status = 'processing' AND m.status = 'completed'", + vt, mj + )) + .fetch_all(db.pool()) + .await + .unwrap_or_default(); + + let files: Vec = rows.into_iter().map(|(file_uuid, file_name, status): (String, String, String)| ConsistencyFile { + file_uuid, file_name, status, detail: "monitor_job already completed".to_string(), + }).collect(); + + ConsistencyCheck { + check: "processing_job_done".to_string(), + severity: "warn".to_string(), + count: files.len(), + files, + } +} diff --git a/src/core/mod.rs b/src/core/mod.rs index eece566..8fd786c 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -6,6 +6,7 @@ pub mod config; pub mod db; pub mod embedding; pub mod frame_cache; +pub mod health_agent; pub mod identity; pub mod ingestion; pub mod llm;