feat: ingestion status endpoint + pipeline doc with 入库 steps

This commit is contained in:
Accusys
2026-05-17 21:36:55 +08:00
parent 3164a65554
commit d6c8930f84
19 changed files with 4191 additions and 72 deletions

View File

@@ -9,7 +9,7 @@ use once_cell::sync::{Lazy, OnceCell};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use sha2::{Digest, Sha256};
use sqlx::Row;
use sqlx::{PgPool, Row};
use std::time::Instant;
use tower_http::cors::{Any, CorsLayer};
@@ -3676,7 +3676,8 @@ pub async fn start_server(host: &str, port: u16) -> anyhow::Result<()> {
.route("/api/v1/auth/login", post(login))
.route("/api/v1/auth/logout", post(logout))
.route("/api/v1/stats/sftpgo", get(get_sftpgo_status))
.route("/api/v1/stats/inference", get(get_inference_health))
.route("/api/v1/stats/ingestion-status/:file_uuid", get(get_ingestion_status))
.route("/api/v1/search/visual", post(search_visual_chunks))
.route(
"/api/v1/search/visual/class",
@@ -3758,86 +3759,73 @@ async fn get_sftpgo_status(
}
#[derive(Debug, Serialize)]
struct InferenceEngineStatus {
engine: String,
model: String,
struct IngestionStep {
name: String,
status: String,
latency_ms: Option<u64>,
error: Option<String>,
detail: Option<String>,
}
#[derive(Debug, Serialize)]
struct InferenceHealthResponse {
ollama: InferenceEngineStatus,
llama_server: InferenceEngineStatus,
struct IngestionStatusResponse {
file_uuid: String,
steps: Vec<IngestionStep>,
}
async fn get_inference_health() -> Result<Json<InferenceHealthResponse>, StatusCode> {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(5))
.build()
.unwrap();
async fn get_ingestion_status(
State(state): State<AppState>,
Path(file_uuid): Path<String>,
) -> Result<Json<IngestionStatusResponse>, StatusCode> {
let pool = state.db.pool();
let chunk = schema::table_name("chunk");
let fd = schema::table_name("face_detections");
let ollama_start = std::time::Instant::now();
let ollama_status = match client.get("http://localhost:11434/api/tags").send().await {
Ok(resp) if resp.status().is_success() => {
let latency = ollama_start.elapsed().as_millis() as u64;
InferenceEngineStatus {
engine: "Ollama".to_string(),
model: "nomic-embed-text".to_string(),
status: "ok".to_string(),
latency_ms: Some(latency),
error: None,
let scene_meta_path = format!("{}/{}.scene_meta.json",
crate::core::config::OUTPUT_DIR.as_str(),
file_uuid);
let scene_meta_ok = std::path::Path::new(&scene_meta_path).exists();
macro_rules! count_sql {
($sql:expr) => {
sqlx::query_scalar::<_, i64>($sql)
.fetch_one(pool)
.await
.unwrap_or(0)
};
}
let sentence_count = count_sql!(&format!("SELECT COUNT(*) FROM {chunk} WHERE file_uuid = '{file_uuid}' AND chunk_type = 'sentence'"));
let sentence_embedded = count_sql!(&format!("SELECT COUNT(*) FROM {chunk} WHERE file_uuid = '{file_uuid}' AND chunk_type = 'sentence' AND embedding IS NOT NULL"));
let scene_count = count_sql!(&format!("SELECT COUNT(*) FROM {chunk} WHERE file_uuid = '{file_uuid}' AND chunk_type = 'cut'"));
let trace_count = count_sql!(&format!("SELECT COUNT(DISTINCT trace_id) FROM {fd} WHERE file_uuid = '{file_uuid}' AND trace_id IS NOT NULL"));
let trace_chunks = count_sql!(&format!("SELECT COUNT(*) FROM {chunk} WHERE file_uuid = '{file_uuid}' AND chunk_type = 'trace'"));
let identities = count_sql!(&format!("SELECT COUNT(DISTINCT identity_id) FROM {fd} WHERE file_uuid = '{file_uuid}' AND identity_id IS NOT NULL"));
let tkg_nodes = count_sql!(&format!("SELECT COUNT(*) FROM {} WHERE file_uuid = '{file_uuid}'", schema::table_name("tkg_nodes")));
let tkg_edges = count_sql!(&format!("SELECT COUNT(*) FROM {} WHERE file_uuid = '{file_uuid}'", schema::table_name("tkg_edges")));
let scene_5w1h = count_sql!(&format!("SELECT COUNT(*) FROM {chunk} WHERE file_uuid = '{file_uuid}' AND chunk_type = 'cut' AND summary_text IS NOT NULL AND summary_text != ''"));
macro_rules! step {
($name:expr, $done:expr, $detail:expr) => {
IngestionStep {
name: $name.into(),
status: if $done { "done" } else { "pending" }.into(),
detail: $detail,
}
}
Ok(resp) => InferenceEngineStatus {
engine: "Ollama".to_string(),
model: "nomic-embed-text".to_string(),
status: "error".to_string(),
latency_ms: Some(ollama_start.elapsed().as_millis() as u64),
error: Some(format!("HTTP {}", resp.status())),
},
Err(e) => InferenceEngineStatus {
engine: "Ollama".to_string(),
model: "nomic-embed-text".to_string(),
status: "error".to_string(),
latency_ms: None,
error: Some(e.to_string()),
},
};
};
}
let llama_start = std::time::Instant::now();
let llama_status = match client.get("http://localhost:8081/v1/models").send().await {
Ok(resp) if resp.status().is_success() => {
let latency = llama_start.elapsed().as_millis() as u64;
InferenceEngineStatus {
engine: "llama-server".to_string(),
model: "gemma4_e4b_q5".to_string(),
status: "ok".to_string(),
latency_ms: Some(latency),
error: None,
}
}
Ok(resp) => InferenceEngineStatus {
engine: "llama-server".to_string(),
model: "gemma4_e4b_q5".to_string(),
status: "error".to_string(),
latency_ms: Some(llama_start.elapsed().as_millis() as u64),
error: Some(format!("HTTP {}", resp.status())),
},
Err(e) => InferenceEngineStatus {
engine: "llama-server".to_string(),
model: "gemma4_e4b_q5".to_string(),
status: "error".to_string(),
latency_ms: None,
error: Some(e.to_string()),
},
};
let steps = vec![
step!("rule1_sentence", sentence_count > 0, Some(format!("{sentence_count} sentence chunks"))),
step!("auto_vectorize", sentence_embedded > 0, Some(format!("{sentence_embedded} embedded"))),
step!("rule3_scene", scene_count > 0, Some(format!("{scene_count} scene chunks"))),
step!("face_trace", trace_count > 0, Some(format!("{trace_count} traces"))),
step!("trace_chunks", trace_chunks > 0, Some(format!("{trace_chunks} trace chunks"))),
step!("tkg", tkg_nodes > 0 || tkg_edges > 0, Some(format!("{tkg_nodes} nodes, {tkg_edges} edges"))),
step!("identity_match", identities > 0, Some(format!("{identities} identities matched"))),
step!("scene_metadata", scene_meta_ok, None),
step!("5w1h", scene_5w1h > 0, Some(format!("{scene_5w1h} scenes with 5W1H"))),
];
Ok(Json(InferenceHealthResponse {
ollama: ollama_status,
llama_server: llama_status,
}))
Ok(Json(IngestionStatusResponse { file_uuid, steps }))
}
#[derive(Debug, Deserialize)]