refactor: modularize server.rs into separate route modules

- Extract scan.rs, files.rs, types.rs, processing.rs, visual_chunk_search.rs
- Move AppState and AppConfig to types.rs
- Each module exposes pub fn xxx_routes() -> Router<AppState>
- server.rs reduced from 5005 to 118 lines (orchestrator only)
- All stubs filled with real implementations from git history
- Verify: cargo check, clippy, tests all pass
This commit is contained in:
M5Max128
2026-05-21 16:38:49 +08:00
parent 80812128e2
commit 3a33d00449
22 changed files with 3315 additions and 4962 deletions

513
src/api/processing.rs Normal file
View File

@@ -0,0 +1,513 @@
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::Json,
routing::post,
Router,
};
use serde::{Deserialize, Serialize};
use std::time::Instant;
use super::types::AppState;
use crate::core::cache::{keys, RedisCache};
use crate::core::config::REDIS_KEY_PREFIX;
use crate::core::db::schema;
use crate::core::db::{Database, MonitorJobStatus, PostgresDb, RedisClient, VideoRecord, VideoStatus};
use crate::core::probe::ffprobe;
use crate::worker::processor;
use crate::{Embedder, FileManager};
#[derive(Debug, Serialize)]
struct JobListResponse {
jobs: Vec<JobInfoResponse>,
count: i64,
page: usize,
page_size: usize,
}
#[derive(Debug, Deserialize)]
struct JobsQuery {
page: Option<usize>,
page_size: Option<usize>,
status: Option<String>,
}
#[derive(Debug, Serialize)]
struct JobInfoResponse {
id: i32,
uuid: String,
status: String,
current_processor: Option<String>,
progress_current: i32,
progress_total: i32,
created_at: String,
started_at: Option<String>,
}
#[derive(Debug, Serialize)]
struct JobDetailResponse {
id: i32,
uuid: String,
status: String,
current_processor: Option<String>,
progress_current: i32,
progress_total: i32,
processors: Vec<ProcessorInfoResponse>,
created_at: String,
started_at: Option<String>,
updated_at: Option<String>,
}
#[derive(Debug, Serialize)]
struct ProcessorInfoResponse {
processor_type: String,
status: String,
started_at: Option<String>,
completed_at: Option<String>,
duration_secs: Option<f64>,
error_message: Option<String>,
}
#[derive(Debug, Deserialize)]
struct CacheToggleRequest {
enabled: bool,
}
#[derive(Debug, Serialize)]
struct CacheToggleResponse {
success: bool,
cache_enabled: bool,
message: String,
}
#[derive(Debug, Deserialize)]
struct AutoPipelineToggleRequest {
enabled: bool,
}
#[derive(Debug, Serialize)]
struct AutoPipelineToggleResponse {
success: bool,
auto_pipeline_enabled: bool,
message: String,
}
#[derive(Debug, Deserialize)]
struct WatcherAutoRegisterToggleRequest {
enabled: bool,
}
#[derive(Debug, Serialize)]
struct WatcherAutoRegisterToggleResponse {
success: bool,
watcher_auto_register_enabled: bool,
message: String,
}
#[derive(Debug, Deserialize)]
struct ProcessRequest {
rules: Option<Vec<String>>,
processors: Option<Vec<String>>,
}
#[derive(Debug, Serialize)]
struct ProgressResponse {
file_uuid: String,
user: Option<String>,
group: Option<String>,
file_name: Option<String>,
duration: Option<f64>,
overall_progress: u32,
cpu_percent: Option<f64>,
gpu_percent: Option<f64>,
memory_percent: Option<f64>,
memory_mb: Option<u64>,
system: Option<SystemHealthInfo>,
processors: Vec<ProcessorProgressInfo>,
}
#[derive(Debug, Serialize)]
struct SystemHealthInfo {
cpu_idle_pct: f64,
memory_available_mb: u64,
memory_total_mb: u64,
memory_used_pct: f64,
gpu_available: bool,
gpu_utilization_pct: Option<f64>,
gpu_memory_used_pct: Option<f64>,
dynamic_concurrency: u32,
config_concurrency: u32,
running_processors: u32,
}
#[derive(Debug, Serialize)]
struct ProcessorProgressInfo {
name: String,
status: String,
current: u32,
total: u32,
progress: u32,
message: String,
frames_processed: i32,
chunks_produced: i32,
retry_count: i32,
eta_seconds: Option<i64>,
}
fn get_system_stats() -> (Option<f64>, Option<f64>, Option<f64>, Option<u64>) {
use std::process::Command;
let pid = std::process::id().to_string();
let cpu = Command::new("ps")
.args(["-p", &pid, "-o", "%cpu="])
.output()
.ok()
.and_then(|o| String::from_utf8_lossy(&o.stdout).trim().parse().ok());
let (mem_percent, mem_rss) = Command::new("ps")
.args(["-p", &pid, "-o", "%mem=,rss="])
.output()
.ok()
.map(|o| {
let output = String::from_utf8_lossy(&o.stdout);
let parts: Vec<&str> = output.split_whitespace().collect();
let percent = parts.first().and_then(|s| s.parse().ok());
let rss = parts.get(1).and_then(|s| s.parse().ok());
(percent, rss)
})
.unwrap_or((None, None));
let gpu = Command::new("nvidia-smi")
.args(["--query-gpu=utilization.gpu", "--format=csv,noheader,nounits"])
.output()
.ok()
.and_then(|o| String::from_utf8_lossy(&o.stdout).trim().parse().ok());
let mem_mb = mem_rss.map(|r: u64| r / 1024);
(cpu, mem_percent, gpu, mem_mb)
}
async fn trigger_processing(
State(state): State<AppState>,
Path(uuid): Path<String>,
Json(req): Json<ProcessRequest>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let videos_table = schema::table_name("videos");
let row: Option<(String, String, String, Option<String>, String, Option<String>)> =
sqlx::query_as(&format!(
"SELECT file_uuid, file_path, file_name, file_type, COALESCE(processing_status, 'REGISTERED'), content_hash FROM {} WHERE file_uuid = $1",
videos_table
))
.bind(&uuid)
.fetch_optional(state.db.pool())
.await
.map_err(|e| {
tracing::error!("DB error: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let (file_uuid, file_path, file_name, file_type, processing_status, content_hash) =
row.ok_or(StatusCode::NOT_FOUND)?;
if processing_status == "PROCESSING" || processing_status == "QUEUED" {
return Err(StatusCode::CONFLICT);
}
let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string());
let status_update = format!(
"UPDATE {} SET processing_status = 'QUEUED' WHERE file_uuid = $1",
videos_table
);
sqlx::query(&status_update)
.bind(&file_uuid)
.execute(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let output_path = std::path::Path::new(&output_dir).join(format!("{}.monitor.json", file_uuid));
let monitor_jobs_table = schema::table_name("monitor_jobs");
let redis = RedisClient::new().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let mut conn = redis.get_conn().await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let processors_to_run: Vec<&str> = if let Some(procs) = &req.processors {
let procs_str = serde_json::to_string(procs).unwrap_or_default();
sqlx::query(&format!("UPDATE {} SET processors = $1 WHERE id = $2", schema::table_name("monitor_jobs")))
.bind(&procs_str)
.bind(&file_uuid)
.execute(state.db.pool())
.await
.ok();
procs.iter().map(|s| s.as_str()).collect()
} else {
vec![]
};
let notification = serde_json::json!({
"action": "process",
"file_uuid": file_uuid,
"file_path": file_path,
"file_name": file_name,
"file_type": file_type,
"content_hash": content_hash,
"output_dir": output_dir,
"processors": processors_to_run,
});
let notification_key = format!("{}notifications", REDIS_KEY_PREFIX.as_str());
let _: Result<(), _> = redis::cmd("PUBLISH")
.arg(&notification_key)
.arg(notification.to_string())
.query_async(&mut conn)
.await;
tracing::info!("[TRIGGER] Published processing notification for {}", file_uuid);
Ok(Json(serde_json::json!({
"success": true,
"message": "Processing queued",
"file_uuid": file_uuid,
})))
}
async fn download_json(
State(state): State<AppState>,
Path((file_uuid, processor)): Path<(String, String)>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string());
let path = std::path::Path::new(&output_dir).join(format!("{}.{}.json", file_uuid, processor));
match tokio::fs::read_to_string(&path).await {
Ok(content) => serde_json::from_str(&content).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
Err(_) => Err(StatusCode::NOT_FOUND),
}
}
async fn get_chunk_by_path(
State(state): State<AppState>,
Path((file_uuid, chunk_id)): Path<(String, String)>,
) -> Result<Json<crate::core::chunk::types::Chunk>, StatusCode> {
let table = schema::table_name("chunk");
let chunk: Option<crate::core::chunk::types::Chunk> = sqlx::query_as(&format!(
"SELECT * FROM {} WHERE uuid = $1 AND chunk_id = $2",
table
))
.bind(&file_uuid)
.bind(&chunk_id)
.fetch_optional(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
chunk.ok_or(StatusCode::NOT_FOUND)
}
async fn get_progress(
file_uuid: Path<String>,
) -> Result<Json<ProgressResponse>, StatusCode> {
let file_uuid = file_uuid.0;
let redis = RedisClient::new().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let mut conn = redis.get_conn().await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let key = format!("{}progress:{}", REDIS_KEY_PREFIX.as_str(), file_uuid);
let progress_json: Option<String> = redis::cmd("GET")
.arg(&key)
.query_async(&mut conn)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let (cpu, mem_pct, gpu, mem_mb) = get_system_stats();
let sys = SystemHealthInfo {
cpu_idle_pct: cpu.map(|c: f64| 100.0 - c).unwrap_or(0.0),
memory_available_mb: mem_mb.unwrap_or(0),
memory_total_mb: 0,
memory_used_pct: mem_pct.unwrap_or(0.0),
gpu_available: gpu.is_some(),
gpu_utilization_pct: gpu,
gpu_memory_used_pct: None,
dynamic_concurrency: 0,
config_concurrency: 0,
running_processors: 0,
};
let pg = PostgresDb::init().await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let video = pg.get_video_by_uuid(&file_uuid).await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let (overall, processors) = if let Some(json_str) = &progress_json {
match serde_json::from_str::<ProgressResponse>(json_str) {
Ok(p) => (p.overall_progress, p.processors),
Err(_) => (0u32, vec![]),
}
} else {
(0u32, vec![])
};
Ok(Json(ProgressResponse {
file_uuid,
user: None,
group: None,
file_name: video.as_ref().map(|v| v.file_name.clone()),
duration: video.as_ref().map(|v| v.duration),
overall_progress: overall,
cpu_percent: cpu,
gpu_percent: gpu,
memory_percent: mem_pct,
memory_mb,
system: Some(sys),
processors,
}))
}
async fn list_jobs(
Query(params): Query<JobsQuery>,
) -> Result<Json<JobListResponse>, StatusCode> {
let pg = PostgresDb::init().await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let jobs_table = schema::table_name("monitor_jobs");
let videos_table = schema::table_name("videos");
let page = params.page.unwrap_or(1).max(1);
let page_size = params.page_size.unwrap_or(20).max(1).min(100);
let offset = (page - 1) * page_size;
let mut where_clause = String::new();
if let Some(ref status) = params.status {
where_clause = format!(" WHERE j.status = '{}'", status);
}
let count: i64 = sqlx::query_scalar(&format!(
"SELECT COUNT(*) FROM {} j{}",
jobs_table, where_clause
))
.fetch_one(pg.pool())
.await
.unwrap_or(0);
let jobs: Vec<(i32, String, String, String, String, Option<String>, i32, i32)> =
sqlx::query_as(&format!(
"SELECT j.id::int, j.uuid, v.file_name, COALESCE(j.status, 'QUEUED'), COALESCE(j.current_processor, ''), v.file_uuid, COALESCE(j.processed_frames, 0), COALESCE(j.total_frames, 0) \
FROM {} j LEFT JOIN {} v ON v.file_uuid = j.uuid{} \
ORDER BY j.id DESC LIMIT $1 OFFSET $2",
jobs_table, videos_table, where_clause
))
.bind(page_size as i64)
.bind(offset as i64)
.fetch_all(pg.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let job_list = jobs
.into_iter()
.map(|(id, uuid, fname, status, cp, _vuuid, pf, tf)| JobInfoResponse {
id,
uuid,
status,
current_processor: Some(cp),
progress_current: pf,
progress_total: tf,
created_at: String::new(),
started_at: None,
})
.collect();
Ok(Json(JobListResponse {
jobs: job_list,
count,
page,
page_size,
}))
}
async fn get_job(
Path(uuid): Path<String>,
) -> Result<Json<JobDetailResponse>, StatusCode> {
let pg = PostgresDb::init().await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let jobs_table = schema::table_name("monitor_jobs");
let videos_table = schema::table_name("videos");
let job: Option<(i32, String, String, String, Option<String>, i32, i32, String, Option<String>, Option<String>)> =
sqlx::query_as(&format!(
"SELECT j.id::int, j.uuid, COALESCE(v.file_name, 'unknown'), COALESCE(j.status, 'QUEUED'), j.current_processor, \
COALESCE(j.processed_frames, 0), COALESCE(j.total_frames, 0), \
COALESCE(j.created_at::text, ''), j.started_at::text, j.updated_at::text \
FROM {} j LEFT JOIN {} v ON v.file_uuid = j.uuid WHERE j.uuid = $1",
jobs_table, videos_table
))
.bind(&uuid)
.fetch_optional(pg.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let (id, uuid, file_name, status, current_processor, pf, tf, created_at, started_at, updated_at) =
job.ok_or(StatusCode::NOT_FOUND)?;
Ok(Json(JobDetailResponse {
id,
uuid,
status,
current_processor,
progress_current: pf,
progress_total: tf,
processors: vec![],
created_at,
started_at,
updated_at,
}))
}
async fn cache_toggle(
State(state): State<AppState>,
Json(req): Json<CacheToggleRequest>,
) -> Json<CacheToggleResponse> {
crate::core::config::set_cache_enabled(req.enabled);
if !req.enabled {
let _ = state.mongo_cache.flush_all().await;
let _ = state.redis_cache.flush().await;
}
Json(CacheToggleResponse {
success: true,
cache_enabled: req.enabled,
message: format!("Cache {}", if req.enabled { "enabled" } else { "disabled" }),
})
}
async fn auto_pipeline_toggle(
Json(req): Json<AutoPipelineToggleRequest>,
) -> Json<AutoPipelineToggleResponse> {
crate::core::config::set_auto_pipeline_enabled(req.enabled);
Json(AutoPipelineToggleResponse {
success: true,
auto_pipeline_enabled: req.enabled,
message: format!(
"Auto pipeline {}",
if req.enabled { "enabled" } else { "disabled" }
),
})
}
async fn watcher_auto_register_toggle(
Json(req): Json<WatcherAutoRegisterToggleRequest>,
) -> Json<WatcherAutoRegisterToggleResponse> {
crate::core::config::set_watcher_auto_register(req.enabled);
Json(WatcherAutoRegisterToggleResponse {
success: true,
watcher_auto_register_enabled: req.enabled,
message: format!(
"Watcher auto-register {}",
if req.enabled { "enabled" } else { "disabled" }
),
})
}
pub fn processing_routes() -> Router<AppState> {
Router::new()
.route("/api/v1/file/:file_uuid/process", post(trigger_processing))
.route("/api/v1/file/:file_uuid/json/:processor", post(download_json))
.route("/api/v1/file/:file_uuid/chunk/:chunk_id", post(get_chunk_by_path))
.route("/api/v1/progress/:file_uuid", post(get_progress))
.route("/api/v1/jobs", post(list_jobs))
.route("/api/v1/config/cache", post(cache_toggle))
.route("/api/v1/config/auto-pipeline", post(auto_pipeline_toggle))
.route("/api/v1/config/watcher-auto-register", post(watcher_auto_register_toggle))
}