feat: schema version tracking, SHA256 integrity, setup scripts, bug fixes
This commit is contained in:
@@ -470,12 +470,38 @@ struct DetailedHealthResponse {
|
||||
services: ServiceHealth,
|
||||
resources: ResourceStatus,
|
||||
pipeline: PipelineStatus,
|
||||
schema: SchemaHealth,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct SchemaHealth {
|
||||
table_exists: bool,
|
||||
applied: Vec<MigrationInfo>,
|
||||
required: Vec<MigrationInfo>,
|
||||
ok: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct MigrationInfo {
|
||||
filename: String,
|
||||
checksum: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct PipelineStatus {
|
||||
scripts: bool,
|
||||
models: bool,
|
||||
/// Scripts directory available
|
||||
scripts_ready: bool,
|
||||
/// Number of Python processor scripts found
|
||||
scripts_count: usize,
|
||||
/// Key processor scripts present
|
||||
processors: ProcessorInventory,
|
||||
/// Models directory available
|
||||
models_ready: bool,
|
||||
/// Number of model files found
|
||||
models_count: usize,
|
||||
/// SHA256 checksum integrity: (pass_count, total_count)
|
||||
scripts_integrity: ScriptIntegrity,
|
||||
/// ffmpeg path
|
||||
ffmpeg: bool,
|
||||
/// Embedding server (port 11436)
|
||||
embedding_server: ServiceStatus,
|
||||
@@ -487,6 +513,31 @@ struct PipelineStatus {
|
||||
rsync: ServiceStatus,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct ScriptIntegrity {
|
||||
matched: usize,
|
||||
total: usize,
|
||||
ok: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct ProcessorInventory {
|
||||
asr: bool,
|
||||
yolo: bool,
|
||||
face: bool,
|
||||
pose: bool,
|
||||
ocr: bool,
|
||||
cut: bool,
|
||||
caption: bool,
|
||||
scene: bool,
|
||||
story: bool,
|
||||
asrx: bool,
|
||||
probe: bool,
|
||||
visual_chunk: bool,
|
||||
/// Count of total Python files in scripts dir
|
||||
total_py_files: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct ResourceStatus {
|
||||
cpu_used_percent: f64,
|
||||
@@ -558,12 +609,72 @@ async fn health_detailed(State(state): State<AppState>) -> Json<DetailedHealthRe
|
||||
|
||||
let sys = SystemResources::check();
|
||||
|
||||
let scripts_dir = std::path::Path::new("/Users/accusys/momentry_core_0.1/scripts");
|
||||
let models_dir = std::path::Path::new("/Users/accusys/momentry_core_0.1/models");
|
||||
let ffmpeg_full = std::path::Path::new("/opt/homebrew/opt/ffmpeg-full/bin/ffmpeg");
|
||||
let has_scripts = scripts_dir.is_dir();
|
||||
let has_models = models_dir.is_dir();
|
||||
let has_ffmpeg = ffmpeg_full.exists();
|
||||
let scripts_base = crate::core::config::SCRIPTS_DIR.clone();
|
||||
let scripts_dir = std::path::Path::new(&scripts_base);
|
||||
let scripts_path = scripts_dir.to_path_buf();
|
||||
let models_path = std::path::PathBuf::from("/Users/accusys/momentry_core_0.1/models");
|
||||
|
||||
let py_files = std::fs::read_dir(&scripts_path)
|
||||
.map(|d| d.filter_map(|e| e.ok()).filter(|e| e.path().extension().map(|x| x == "py").unwrap_or(false)).count())
|
||||
.unwrap_or(0);
|
||||
|
||||
let total_model_files = std::fs::read_dir(&models_path)
|
||||
.map(|d| d.filter_map(|e| e.ok()).filter(|e| {
|
||||
let p = e.path();
|
||||
let ext = p.extension().and_then(|x| x.to_str()).unwrap_or("");
|
||||
matches!(ext, "pt" | "mlpackage" | "gguf" | "bin" | "onnx")
|
||||
}).count())
|
||||
.unwrap_or(0);
|
||||
|
||||
let check_script = |name: &str| -> bool {
|
||||
let candidate = scripts_path.join(name);
|
||||
candidate.exists()
|
||||
};
|
||||
|
||||
let check_python_module = |module: &str| -> bool {
|
||||
std::process::Command::new(
|
||||
&*crate::core::config::PYTHON_PATH,
|
||||
)
|
||||
.arg("-c")
|
||||
.arg(format!("import {}", module))
|
||||
.output()
|
||||
.map(|o| o.status.success())
|
||||
.unwrap_or(false)
|
||||
};
|
||||
|
||||
// SHA256 checksum verification against checksums.sha256 manifest
|
||||
let checksums_path = scripts_path.join("checksums.sha256");
|
||||
let scripts_integrity = match std::fs::read_to_string(&checksums_path) {
|
||||
Ok(content) => {
|
||||
let mut matched = 0usize;
|
||||
let mut total = 0usize;
|
||||
for line in content.lines() {
|
||||
let line = line.trim();
|
||||
if line.is_empty() { continue; }
|
||||
let parts: Vec<&str> = line.splitn(2, ' ').collect();
|
||||
if parts.len() < 2 { continue; }
|
||||
let expected_hash = parts[0];
|
||||
let file_path = parts[1].trim_start();
|
||||
total += 1;
|
||||
let full_path = scripts_path.join(file_path);
|
||||
if full_path.exists() {
|
||||
if let Ok(actual) = std::process::Command::new("shasum")
|
||||
.arg("-a").arg("256")
|
||||
.arg(&full_path)
|
||||
.output()
|
||||
{
|
||||
let out = String::from_utf8_lossy(&actual.stdout);
|
||||
let actual_hash = out.split(' ').next().unwrap_or("").to_string();
|
||||
if actual_hash == expected_hash {
|
||||
matched += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
ScriptIntegrity { matched, total, ok: matched == total }
|
||||
}
|
||||
Err(_) => ScriptIntegrity { matched: 0, total: 0, ok: false },
|
||||
};
|
||||
|
||||
Json(DetailedHealthResponse {
|
||||
status: overall_status.to_string(),
|
||||
@@ -588,14 +699,33 @@ async fn health_detailed(State(state): State<AppState>) -> Json<DetailedHealthRe
|
||||
gpu_memory_used_pct: sys.gpu_memory_used_pct,
|
||||
},
|
||||
pipeline: PipelineStatus {
|
||||
scripts: has_scripts,
|
||||
models: has_models,
|
||||
ffmpeg: has_ffmpeg,
|
||||
scripts_ready: scripts_path.is_dir(),
|
||||
scripts_count: py_files,
|
||||
scripts_integrity,
|
||||
processors: ProcessorInventory {
|
||||
asr: check_script("asr_processor.py"),
|
||||
yolo: check_script("yolo_processor.py"),
|
||||
face: check_script("face_processor.py"),
|
||||
pose: check_script("pose_processor.py"),
|
||||
ocr: check_script("ocr_processor.py"),
|
||||
cut: check_script("cut_processor.py"),
|
||||
caption: check_script("caption_processor.py"),
|
||||
scene: check_script("scene_classifier.py"),
|
||||
story: check_script("story_processor.py"),
|
||||
asrx: check_script("asrx_processor.py"),
|
||||
probe: check_script("probe_file.py"),
|
||||
visual_chunk: check_script("visual_chunk_processor.py"),
|
||||
total_py_files: py_files,
|
||||
},
|
||||
models_ready: models_path.is_dir(),
|
||||
models_count: total_model_files,
|
||||
ffmpeg: std::process::Command::new("which").arg("ffmpeg").output().map(|o| o.status.success()).unwrap_or(false),
|
||||
embedding_server: check_http("http://127.0.0.1:11436/health").await,
|
||||
gdino_api: check_http("http://127.0.0.1:8080/health").await,
|
||||
llm: check_http("http://127.0.0.1:8082/health").await,
|
||||
rsync: check_rsync().await,
|
||||
},
|
||||
schema: check_schema_migrations(state.db.pool()).await,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -729,6 +859,70 @@ async fn check_mongodb(cache: &MongoCache) -> ServiceStatus {
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_required_migrations() -> Vec<MigrationInfo> {
|
||||
let raw = env!("REQUIRED_MIGRATIONS");
|
||||
if raw.is_empty() {
|
||||
return vec![];
|
||||
}
|
||||
raw.split(',')
|
||||
.filter_map(|entry| {
|
||||
let mut parts = entry.splitn(2, ':');
|
||||
let filename = parts.next()?.trim().to_string();
|
||||
let checksum = parts.next()?.trim().to_string();
|
||||
if filename.is_empty() || checksum.is_empty() {
|
||||
return None;
|
||||
}
|
||||
Some(MigrationInfo { filename, checksum })
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
async fn check_schema_migrations(pool: &sqlx::PgPool) -> SchemaHealth {
|
||||
let required = parse_required_migrations();
|
||||
|
||||
// Check if table exists
|
||||
let table_exists: bool = sqlx::query_scalar(
|
||||
"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'schema_migrations')",
|
||||
)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.unwrap_or(false);
|
||||
|
||||
if !table_exists {
|
||||
return SchemaHealth {
|
||||
table_exists: false,
|
||||
applied: vec![],
|
||||
required,
|
||||
ok: false,
|
||||
};
|
||||
}
|
||||
|
||||
// Get applied migrations
|
||||
let applied: Vec<MigrationInfo> = sqlx::query_as::<_, (String, String)>(
|
||||
"SELECT filename, checksum FROM schema_migrations ORDER BY id",
|
||||
)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.map(|(filename, checksum)| MigrationInfo { filename, checksum })
|
||||
.collect();
|
||||
|
||||
// Compare: every required migration must be in applied with matching checksum
|
||||
let ok = required.iter().all(|req| {
|
||||
applied
|
||||
.iter()
|
||||
.any(|app| app.filename == req.filename && app.checksum == req.checksum)
|
||||
});
|
||||
|
||||
SchemaHealth {
|
||||
table_exists: true,
|
||||
applied,
|
||||
required,
|
||||
ok,
|
||||
}
|
||||
}
|
||||
|
||||
async fn check_rsync() -> ServiceStatus {
|
||||
let start = Instant::now();
|
||||
let paths = [
|
||||
@@ -1318,7 +1512,7 @@ async fn register_file(
|
||||
async fn probe_by_uuid(
|
||||
State(state): State<AppState>,
|
||||
Path(file_uuid): Path<String>,
|
||||
) -> Result<Json<ProbeResponse>, StatusCode> {
|
||||
) -> Result<Json<ProbeResponse>, (StatusCode, Json<serde_json::Value>)> {
|
||||
let table = schema::table_name("videos");
|
||||
let row: Option<(String, String)> = sqlx::query_as(&format!(
|
||||
"SELECT file_name, file_path FROM {} WHERE file_uuid = $1",
|
||||
@@ -1329,12 +1523,18 @@ async fn probe_by_uuid(
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tracing::error!("DB error fetching video: {}", e);
|
||||
StatusCode::INTERNAL_SERVER_ERROR
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(serde_json::json!({"error": format!("DB error: {}", e), "file_uuid": file_uuid})),
|
||||
)
|
||||
})?;
|
||||
|
||||
let (file_name, path) = row.ok_or_else(|| {
|
||||
tracing::warn!("Video not found: {}", file_uuid);
|
||||
StatusCode::NOT_FOUND
|
||||
(
|
||||
StatusCode::NOT_FOUND,
|
||||
Json(serde_json::json!({"error": "Video not found", "file_uuid": file_uuid})),
|
||||
)
|
||||
})?;
|
||||
|
||||
// 2. Check for cached probe.json
|
||||
@@ -1349,14 +1549,30 @@ async fn probe_by_uuid(
|
||||
let result: crate::core::probe::ProbeResult =
|
||||
serde_json::from_str(&content).map_err(|e| {
|
||||
tracing::error!("Failed to parse cached probe.json: {}", e);
|
||||
StatusCode::INTERNAL_SERVER_ERROR
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(serde_json::json!({"error": format!("Failed to parse cached probe.json: {}", e), "file_uuid": file_uuid})),
|
||||
)
|
||||
})?;
|
||||
(result, true)
|
||||
} else {
|
||||
// Check if file still exists before running ffprobe
|
||||
let file_path = std::path::Path::new(&path);
|
||||
if !file_path.exists() {
|
||||
tracing::error!("File not found at path: {}", path);
|
||||
return Err((
|
||||
StatusCode::NOT_FOUND,
|
||||
Json(serde_json::json!({"error": "File does not exist at registered path", "file_uuid": file_uuid, "file_path": path})),
|
||||
));
|
||||
}
|
||||
|
||||
tracing::info!("Running ffprobe for: {}", path);
|
||||
let result = crate::core::probe::probe_video(&path).map_err(|e| {
|
||||
tracing::error!("ffprobe failed: {}", e);
|
||||
StatusCode::INTERNAL_SERVER_ERROR
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(serde_json::json!({"error": format!("ffprobe failed: {}", e), "file_uuid": file_uuid, "file_path": path})),
|
||||
)
|
||||
})?;
|
||||
|
||||
// Save probe.json to OUTPUT_DIR
|
||||
@@ -1365,13 +1581,19 @@ async fn probe_by_uuid(
|
||||
));
|
||||
let json_str = serde_json::to_string(&result).map_err(|e| {
|
||||
tracing::error!("Failed to serialize probe result: {}", e);
|
||||
StatusCode::INTERNAL_SERVER_ERROR
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(serde_json::json!({"error": format!("Failed to serialize probe result: {}", e), "file_uuid": file_uuid})),
|
||||
)
|
||||
})?;
|
||||
file_manager
|
||||
.save_json(&file_uuid, "probe", &json_str)
|
||||
.map_err(|e| {
|
||||
tracing::error!("Failed to save probe.json: {}", e);
|
||||
StatusCode::INTERNAL_SERVER_ERROR
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(serde_json::json!({"error": format!("Failed to save probe.json: {}", e), "file_uuid": file_uuid})),
|
||||
)
|
||||
})?;
|
||||
|
||||
(result, false)
|
||||
@@ -2797,6 +3019,39 @@ pub async fn start_server(host: &str, port: u16) -> anyhow::Result<()> {
|
||||
let mongo_cache = MongoCache::init().await?;
|
||||
let redis_cache = RedisCache::new()?;
|
||||
let db = PostgresDb::init().await?;
|
||||
|
||||
// ── Schema migration startup check ──
|
||||
let schema_health = check_schema_migrations(db.pool()).await;
|
||||
if schema_health.ok {
|
||||
tracing::info!(
|
||||
"[SCHEMA] All {}/{} required migrations applied ✓",
|
||||
schema_health.required.len(),
|
||||
schema_health.required.len()
|
||||
);
|
||||
} else if !schema_health.table_exists {
|
||||
tracing::warn!(
|
||||
"[SCHEMA] schema_migrations table not found! Run: psql -U accusys -d momentry -f release/migrate_add_schema_version.sql"
|
||||
);
|
||||
} else {
|
||||
let missing: Vec<&str> = schema_health
|
||||
.required
|
||||
.iter()
|
||||
.filter(|req| {
|
||||
!schema_health
|
||||
.applied
|
||||
.iter()
|
||||
.any(|app| app.filename == req.filename && app.checksum == req.checksum)
|
||||
})
|
||||
.map(|m| m.filename.as_str())
|
||||
.collect();
|
||||
tracing::warn!(
|
||||
"[SCHEMA] {}/{} migrations match. Missing/corrupt: {}",
|
||||
schema_health.applied.len(),
|
||||
schema_health.required.len(),
|
||||
missing.join(", ")
|
||||
);
|
||||
}
|
||||
|
||||
let db = std::sync::Arc::new(db);
|
||||
let api_state = super::middleware::ApiState { db: db.clone() };
|
||||
|
||||
|
||||
Reference in New Issue
Block a user