feat: health consistency agent — 4 data integrity checks, GET /health/consistency
This commit is contained in:
153
src/core/health_agent.rs
Normal file
153
src/core/health_agent.rs
Normal file
@@ -0,0 +1,153 @@
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::core::db::{schema, PostgresDb};
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct ConsistencyFile {
|
||||
pub file_name: String,
|
||||
pub file_uuid: String,
|
||||
pub status: String,
|
||||
pub detail: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct ConsistencyCheck {
|
||||
pub check: String,
|
||||
pub severity: String,
|
||||
pub count: usize,
|
||||
#[serde(skip_serializing_if = "Vec::is_empty")]
|
||||
pub files: Vec<ConsistencyFile>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct ConsistencyReport {
|
||||
pub status: String,
|
||||
pub checked_at: String,
|
||||
pub checks: Vec<ConsistencyCheck>,
|
||||
}
|
||||
|
||||
pub async fn run_consistency_checks(db: &PostgresDb) -> ConsistencyReport {
|
||||
let checked_at = chrono::Utc::now().to_rfc3339();
|
||||
let mut checks = Vec::new();
|
||||
let mut any_issue = false;
|
||||
|
||||
// Check 1: stale_processing — status=processing but job_id is null
|
||||
let c1 = check_stale_processing(db).await;
|
||||
if c1.count > 0 { any_issue = true; }
|
||||
checks.push(c1);
|
||||
|
||||
// Check 2: orphaned_processing — status=processing but no active monitor_job
|
||||
let c2 = check_orphaned_processing(db).await;
|
||||
if c2.count > 0 { any_issue = true; }
|
||||
checks.push(c2);
|
||||
|
||||
// Check 3: unregistered_with_uuid — DB rows left behind by migration
|
||||
let c3 = check_unregistered_with_uuid(db).await;
|
||||
if c3.count > 0 { any_issue = true; }
|
||||
checks.push(c3);
|
||||
|
||||
// Check 4: processing_job_done — status=processing but job already completed
|
||||
let c4 = check_processing_job_done(db).await;
|
||||
if c4.count > 0 { any_issue = true; }
|
||||
checks.push(c4);
|
||||
|
||||
ConsistencyReport {
|
||||
status: if any_issue { "degraded".to_string() } else { "ok".to_string() },
|
||||
checked_at,
|
||||
checks,
|
||||
}
|
||||
}
|
||||
|
||||
async fn check_stale_processing(db: &PostgresDb) -> ConsistencyCheck {
|
||||
let vt = schema::table_name("videos");
|
||||
let rows: Vec<(String, String, String)> = sqlx::query_as::<_, (String, String, String)>(&format!(
|
||||
"SELECT file_uuid, file_name, status FROM {} WHERE status = 'processing' AND job_id IS NULL",
|
||||
vt
|
||||
))
|
||||
.fetch_all(db.pool())
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
|
||||
let files: Vec<ConsistencyFile> = rows.into_iter().map(|(file_uuid, file_name, status): (String, String, String)| ConsistencyFile {
|
||||
file_uuid, file_name, status, detail: "job_id is null".to_string(),
|
||||
}).collect();
|
||||
|
||||
ConsistencyCheck {
|
||||
check: "stale_processing".to_string(),
|
||||
severity: "warn".to_string(),
|
||||
count: files.len(),
|
||||
files,
|
||||
}
|
||||
}
|
||||
|
||||
async fn check_orphaned_processing(db: &PostgresDb) -> ConsistencyCheck {
|
||||
let vt = schema::table_name("videos");
|
||||
let mj = schema::table_name("monitor_jobs");
|
||||
let rows: Vec<(String, String, String)> = sqlx::query_as::<_, (String, String, String)>(&format!(
|
||||
"SELECT v.file_uuid, v.file_name, v.status \
|
||||
FROM {} v LEFT JOIN {} m ON v.file_uuid = m.uuid AND m.status IN ('pending','running') \
|
||||
WHERE v.status = 'processing' AND m.id IS NULL",
|
||||
vt, mj
|
||||
))
|
||||
.fetch_all(db.pool())
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
|
||||
let files: Vec<ConsistencyFile> = rows.into_iter().map(|(file_uuid, file_name, status): (String, String, String)| ConsistencyFile {
|
||||
file_uuid, file_name, status, detail: "no active monitor_job".to_string(),
|
||||
}).collect();
|
||||
|
||||
ConsistencyCheck {
|
||||
check: "orphaned_processing".to_string(),
|
||||
severity: "warn".to_string(),
|
||||
count: files.len(),
|
||||
files,
|
||||
}
|
||||
}
|
||||
|
||||
async fn check_unregistered_with_uuid(db: &PostgresDb) -> ConsistencyCheck {
|
||||
let vt = schema::table_name("videos");
|
||||
let rows: Vec<(String, String, String)> = sqlx::query_as::<_, (String, String, String)>(&format!(
|
||||
"SELECT file_uuid, file_name, status FROM {} WHERE status = 'unregistered'",
|
||||
vt
|
||||
))
|
||||
.fetch_all(db.pool())
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
|
||||
let files: Vec<ConsistencyFile> = rows.into_iter().map(|(file_uuid, file_name, status): (String, String, String)| ConsistencyFile {
|
||||
file_uuid, file_name, status, detail: "migration residue".to_string(),
|
||||
}).collect();
|
||||
|
||||
ConsistencyCheck {
|
||||
check: "unregistered_with_uuid".to_string(),
|
||||
severity: "info".to_string(),
|
||||
count: files.len(),
|
||||
files,
|
||||
}
|
||||
}
|
||||
|
||||
async fn check_processing_job_done(db: &PostgresDb) -> ConsistencyCheck {
|
||||
let vt = schema::table_name("videos");
|
||||
let mj = schema::table_name("monitor_jobs");
|
||||
let rows: Vec<(String, String, String)> = sqlx::query_as::<_, (String, String, String)>(&format!(
|
||||
"SELECT v.file_uuid, v.file_name, v.status \
|
||||
FROM {} v JOIN {} m ON v.file_uuid = m.uuid \
|
||||
WHERE v.status = 'processing' AND m.status = 'completed'",
|
||||
vt, mj
|
||||
))
|
||||
.fetch_all(db.pool())
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
|
||||
let files: Vec<ConsistencyFile> = rows.into_iter().map(|(file_uuid, file_name, status): (String, String, String)| ConsistencyFile {
|
||||
file_uuid, file_name, status, detail: "monitor_job already completed".to_string(),
|
||||
}).collect();
|
||||
|
||||
ConsistencyCheck {
|
||||
check: "processing_job_done".to_string(),
|
||||
severity: "warn".to_string(),
|
||||
count: files.len(),
|
||||
files,
|
||||
}
|
||||
}
|
||||
@@ -6,6 +6,7 @@ pub mod config;
|
||||
pub mod db;
|
||||
pub mod embedding;
|
||||
pub mod frame_cache;
|
||||
pub mod health_agent;
|
||||
pub mod identity;
|
||||
pub mod ingestion;
|
||||
pub mod llm;
|
||||
|
||||
Reference in New Issue
Block a user