Files
momentry_core/src/api/processing.rs
M5Max128 932e43518d fix: trigger_processing — remove fake QUEUED state, create monitor_job if missing
- Remove SET processing_status = 'QUEUED' (no queue exists)
- Fix COALESCE type mismatch (jsonb vs text)
- Fix UPDATE WHERE id =  should be WHERE uuid =
- Check monitor_jobs existence, INSERT if missing via create_monitor_job
- Add UNIQUE constraint on monitor_jobs.uuid
- Fix response message: 'Processing queued' → 'Processing triggered'
2026-05-23 23:06:37 +08:00

523 lines
16 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::Json,
routing::post,
Router,
};
use serde::{Deserialize, Serialize};
use std::time::Instant;
use super::types::AppState;
use crate::core::cache::{keys, RedisCache};
use crate::core::config::REDIS_KEY_PREFIX;
use crate::core::db::schema;
use crate::core::db::{Database, MonitorJobStatus, PostgresDb, RedisClient, VideoRecord, VideoStatus};
use crate::core::probe::ffprobe;
use crate::worker::processor;
use crate::{Embedder, FileManager};
#[derive(Debug, Serialize)]
struct JobListResponse {
jobs: Vec<JobInfoResponse>,
count: i64,
page: usize,
page_size: usize,
}
#[derive(Debug, Deserialize)]
struct JobsQuery {
page: Option<usize>,
page_size: Option<usize>,
status: Option<String>,
}
#[derive(Debug, Serialize)]
struct JobInfoResponse {
id: i32,
uuid: String,
status: String,
current_processor: Option<String>,
progress_current: i32,
progress_total: i32,
created_at: String,
started_at: Option<String>,
}
#[derive(Debug, Serialize)]
struct JobDetailResponse {
id: i32,
uuid: String,
status: String,
current_processor: Option<String>,
progress_current: i32,
progress_total: i32,
processors: Vec<ProcessorInfoResponse>,
created_at: String,
started_at: Option<String>,
updated_at: Option<String>,
}
#[derive(Debug, Serialize)]
struct ProcessorInfoResponse {
processor_type: String,
status: String,
started_at: Option<String>,
completed_at: Option<String>,
duration_secs: Option<f64>,
error_message: Option<String>,
}
#[derive(Debug, Deserialize)]
struct CacheToggleRequest {
enabled: bool,
}
#[derive(Debug, Serialize)]
struct CacheToggleResponse {
success: bool,
cache_enabled: bool,
message: String,
}
#[derive(Debug, Deserialize)]
struct AutoPipelineToggleRequest {
enabled: bool,
}
#[derive(Debug, Serialize)]
struct AutoPipelineToggleResponse {
success: bool,
auto_pipeline_enabled: bool,
message: String,
}
#[derive(Debug, Deserialize)]
struct WatcherAutoRegisterToggleRequest {
enabled: bool,
}
#[derive(Debug, Serialize)]
struct WatcherAutoRegisterToggleResponse {
success: bool,
watcher_auto_register_enabled: bool,
message: String,
}
#[derive(Debug, Deserialize)]
struct ProcessRequest {
rules: Option<Vec<String>>,
processors: Option<Vec<String>>,
}
#[derive(Debug, Serialize, Deserialize)]
struct ProgressResponse {
file_uuid: String,
user: Option<String>,
group: Option<String>,
file_name: Option<String>,
duration: Option<f64>,
overall_progress: u32,
cpu_percent: Option<f64>,
gpu_percent: Option<f64>,
memory_percent: Option<f64>,
memory_mb: Option<u64>,
system: Option<SystemHealthInfo>,
processors: Vec<ProcessorProgressInfo>,
}
#[derive(Debug, Serialize, Deserialize)]
struct SystemHealthInfo {
cpu_idle_pct: f64,
memory_available_mb: u64,
memory_total_mb: u64,
memory_used_pct: f64,
gpu_available: bool,
gpu_utilization_pct: Option<f64>,
gpu_memory_used_pct: Option<f64>,
dynamic_concurrency: u32,
config_concurrency: u32,
running_processors: u32,
}
#[derive(Debug, Serialize, Deserialize)]
struct ProcessorProgressInfo {
name: String,
status: String,
current: u32,
total: u32,
progress: u32,
message: String,
frames_processed: i32,
chunks_produced: i32,
retry_count: i32,
eta_seconds: Option<i64>,
}
fn get_system_stats() -> (Option<f64>, Option<f64>, Option<f64>, Option<u64>) {
use std::process::Command;
let pid = std::process::id().to_string();
let cpu = Command::new("ps")
.args(["-p", &pid, "-o", "%cpu="])
.output()
.ok()
.and_then(|o| String::from_utf8_lossy(&o.stdout).trim().parse().ok());
let (mem_percent, mem_rss) = Command::new("ps")
.args(["-p", &pid, "-o", "%mem=,rss="])
.output()
.ok()
.map(|o| {
let output = String::from_utf8_lossy(&o.stdout);
let parts: Vec<&str> = output.split_whitespace().collect();
let percent = parts.first().and_then(|s| s.parse().ok());
let rss = parts.get(1).and_then(|s| s.parse().ok());
(percent, rss)
})
.unwrap_or((None, None));
let gpu = Command::new("nvidia-smi")
.args(["--query-gpu=utilization.gpu", "--format=csv,noheader,nounits"])
.output()
.ok()
.and_then(|o| String::from_utf8_lossy(&o.stdout).trim().parse().ok());
let mem_mb = mem_rss.map(|r: u64| r / 1024);
(cpu, mem_percent, gpu, mem_mb)
}
async fn trigger_processing(
State(state): State<AppState>,
Path(uuid): Path<String>,
Json(req): Json<ProcessRequest>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let videos_table = schema::table_name("videos");
let row: Option<(String, String, String, Option<String>, String, Option<String>)> =
sqlx::query_as(&format!(
"SELECT file_uuid, file_path, file_name, file_type, COALESCE(processing_status::text, 'REGISTERED'), content_hash FROM {} WHERE file_uuid = $1",
videos_table
))
.bind(&uuid)
.fetch_optional(state.db.pool())
.await
.map_err(|e| {
tracing::error!("DB error: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let (file_uuid, file_path, file_name, file_type, processing_status, content_hash) =
row.ok_or(StatusCode::NOT_FOUND)?;
if processing_status == "PROCESSING" || processing_status == "QUEUED" {
return Err(StatusCode::CONFLICT);
}
let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string());
let output_path = std::path::Path::new(&output_dir).join(format!("{}.monitor.json", file_uuid));
let monitor_jobs_table = schema::table_name("monitor_jobs");
let redis = RedisClient::new().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let mut conn = redis.get_conn().await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let processors_to_run: Vec<&str> = if let Some(procs) = &req.processors {
// 檢查 job 是否存在,不存在則 INSERTstate machine entry
let existing_id: Option<i32> = sqlx::query_scalar(
&format!("SELECT id FROM {monitor_jobs_table} WHERE uuid = $1")
)
.bind(&file_uuid)
.fetch_optional(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
if existing_id.is_none() {
state.db.create_monitor_job(&file_uuid, Some(&file_path))
.await
.map_err(|e| {
tracing::error!("[TRIGGER] Failed to create monitor job for {}: {}", file_uuid, e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
}
// UPDATE processors + reset 狀態讓 worker 可 pickup
let procs_db: Vec<String> = procs.iter().map(|s| s.to_string()).collect();
sqlx::query(&format!(
"UPDATE {monitor_jobs_table} SET processors = $1::text[], status = 'pending' WHERE uuid = $2"
))
.bind(&procs_db)
.bind(&file_uuid)
.execute(state.db.pool())
.await
.map_err(|e| {
tracing::error!("[TRIGGER] Failed to update monitor job for {}: {}", file_uuid, e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
procs.iter().map(|s| s.as_str()).collect()
} else {
vec![]
};
let notification = serde_json::json!({
"action": "process",
"file_uuid": file_uuid,
"file_path": file_path,
"file_name": file_name,
"file_type": file_type,
"content_hash": content_hash,
"output_dir": output_dir,
"processors": processors_to_run,
});
let notification_key = format!("{}notifications", REDIS_KEY_PREFIX.as_str());
let _: Result<(), _> = redis::cmd("PUBLISH")
.arg(&notification_key)
.arg(notification.to_string())
.query_async(&mut conn)
.await;
tracing::info!("[TRIGGER] Published processing notification for {}", file_uuid);
Ok(Json(serde_json::json!({
"success": true,
"message": "Processing triggered",
"file_uuid": file_uuid,
})))
}
async fn download_json(
State(state): State<AppState>,
Path((file_uuid, processor)): Path<(String, String)>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string());
let path = std::path::Path::new(&output_dir).join(format!("{}.{}.json", file_uuid, processor));
let content = tokio::fs::read_to_string(&path).await.map_err(|_| StatusCode::NOT_FOUND)?;
Ok(Json(serde_json::from_str(&content).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?))
}
async fn get_chunk_by_path(
State(state): State<AppState>,
Path((file_uuid, chunk_id)): Path<(String, String)>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let table = schema::table_name("chunk");
let row: Option<serde_json::Value> = sqlx::query_scalar(&format!(
"SELECT row_to_json(t) FROM (SELECT * FROM {} WHERE uuid = $1 AND chunk_id = $2) t",
table
))
.bind(&file_uuid)
.bind(&chunk_id)
.fetch_optional(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
row.map(Json).ok_or(StatusCode::NOT_FOUND)
}
async fn get_progress(
file_uuid: Path<String>,
) -> Result<Json<ProgressResponse>, StatusCode> {
let file_uuid = file_uuid.0;
let redis = RedisClient::new().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let mut conn = redis.get_conn().await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let key = format!("{}progress:{}", REDIS_KEY_PREFIX.as_str(), file_uuid);
let progress_json: Option<String> = redis::cmd("GET")
.arg(&key)
.query_async(&mut conn)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let (cpu, mem_pct, gpu, mem_mb) = get_system_stats();
let sys = SystemHealthInfo {
cpu_idle_pct: cpu.map(|c: f64| 100.0 - c).unwrap_or(0.0),
memory_available_mb: mem_mb.unwrap_or(0),
memory_total_mb: 0,
memory_used_pct: mem_pct.unwrap_or(0.0),
gpu_available: gpu.is_some(),
gpu_utilization_pct: gpu,
gpu_memory_used_pct: None,
dynamic_concurrency: 0,
config_concurrency: 0,
running_processors: 0,
};
let pg = PostgresDb::init().await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let video = pg.get_video_by_uuid(&file_uuid).await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let (overall, processors) = if let Some(json_str) = &progress_json {
match serde_json::from_str::<ProgressResponse>(json_str) {
Ok(p) => (p.overall_progress, p.processors),
Err(_) => (0u32, vec![]),
}
} else {
(0u32, vec![])
};
Ok(Json(ProgressResponse {
file_uuid,
user: None,
group: None,
file_name: video.as_ref().map(|v| v.file_name.clone()),
duration: video.as_ref().map(|v| v.duration),
overall_progress: overall,
cpu_percent: cpu,
gpu_percent: gpu,
memory_percent: mem_pct,
memory_mb: mem_mb,
system: Some(sys),
processors,
}))
}
async fn list_jobs(
Query(params): Query<JobsQuery>,
) -> Result<Json<JobListResponse>, StatusCode> {
let pg = PostgresDb::init().await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let jobs_table = schema::table_name("monitor_jobs");
let videos_table = schema::table_name("videos");
let page = params.page.unwrap_or(1).max(1);
let page_size = params.page_size.unwrap_or(20).max(1).min(100);
let offset = (page - 1) * page_size;
let mut where_clause = String::new();
if let Some(ref status) = params.status {
where_clause = format!(" WHERE j.status = '{}'", status);
}
let count: i64 = sqlx::query_scalar(&format!(
"SELECT COUNT(*) FROM {} j{}",
jobs_table, where_clause
))
.fetch_one(pg.pool())
.await
.unwrap_or(0);
let jobs: Vec<(i32, String, String, String, String, Option<String>, i32, i32)> =
sqlx::query_as(&format!(
"SELECT j.id::int, j.uuid, v.file_name, COALESCE(j.status, 'QUEUED'), COALESCE(j.current_processor, ''), v.file_uuid, COALESCE(j.processed_frames, 0), COALESCE(j.total_frames, 0) \
FROM {} j LEFT JOIN {} v ON v.file_uuid = j.uuid{} \
ORDER BY j.id DESC LIMIT $1 OFFSET $2",
jobs_table, videos_table, where_clause
))
.bind(page_size as i64)
.bind(offset as i64)
.fetch_all(pg.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let job_list = jobs
.into_iter()
.map(|(id, uuid, fname, status, cp, _vuuid, pf, tf)| JobInfoResponse {
id,
uuid,
status,
current_processor: Some(cp),
progress_current: pf,
progress_total: tf,
created_at: String::new(),
started_at: None,
})
.collect();
Ok(Json(JobListResponse {
jobs: job_list,
count,
page,
page_size,
}))
}
async fn get_job(
Path(uuid): Path<String>,
) -> Result<Json<JobDetailResponse>, StatusCode> {
let pg = PostgresDb::init().await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let jobs_table = schema::table_name("monitor_jobs");
let videos_table = schema::table_name("videos");
let job: Option<(i32, String, String, String, Option<String>, i32, i32, String, Option<String>, Option<String>)> =
sqlx::query_as(&format!(
"SELECT j.id::int, j.uuid, COALESCE(v.file_name, 'unknown'), COALESCE(j.status, 'QUEUED'), j.current_processor, \
COALESCE(j.processed_frames, 0), COALESCE(j.total_frames, 0), \
COALESCE(j.created_at::text, ''), j.started_at::text, j.updated_at::text \
FROM {} j LEFT JOIN {} v ON v.file_uuid = j.uuid WHERE j.uuid = $1",
jobs_table, videos_table
))
.bind(&uuid)
.fetch_optional(pg.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let (id, uuid, file_name, status, current_processor, pf, tf, created_at, started_at, updated_at) =
job.ok_or(StatusCode::NOT_FOUND)?;
Ok(Json(JobDetailResponse {
id,
uuid,
status,
current_processor,
progress_current: pf,
progress_total: tf,
processors: vec![],
created_at,
started_at,
updated_at,
}))
}
async fn cache_toggle(
State(state): State<AppState>,
Json(req): Json<CacheToggleRequest>,
) -> Json<CacheToggleResponse> {
crate::core::config::set_cache_enabled(req.enabled);
Json(CacheToggleResponse {
success: true,
cache_enabled: req.enabled,
message: format!("Cache {}", if req.enabled { "enabled" } else { "disabled" }),
})
}
async fn auto_pipeline_toggle(
Json(req): Json<AutoPipelineToggleRequest>,
) -> Json<AutoPipelineToggleResponse> {
crate::core::config::set_auto_pipeline_enabled(req.enabled);
Json(AutoPipelineToggleResponse {
success: true,
auto_pipeline_enabled: req.enabled,
message: format!(
"Auto pipeline {}",
if req.enabled { "enabled" } else { "disabled" }
),
})
}
async fn watcher_auto_register_toggle(
Json(req): Json<WatcherAutoRegisterToggleRequest>,
) -> Json<WatcherAutoRegisterToggleResponse> {
crate::core::config::set_watcher_auto_register(req.enabled);
Json(WatcherAutoRegisterToggleResponse {
success: true,
watcher_auto_register_enabled: req.enabled,
message: format!(
"Watcher auto-register {}",
if req.enabled { "enabled" } else { "disabled" }
),
})
}
pub fn processing_routes() -> Router<AppState> {
Router::new()
.route("/api/v1/file/:file_uuid/process", post(trigger_processing))
.route("/api/v1/file/:file_uuid/json/:processor", post(download_json))
.route("/api/v1/file/:file_uuid/chunk/:chunk_id", post(get_chunk_by_path))
.route("/api/v1/progress/:file_uuid", post(get_progress))
.route("/api/v1/jobs", post(list_jobs))
.route("/api/v1/config/cache", post(cache_toggle))
.route("/api/v1/config/auto-pipeline", post(auto_pipeline_toggle))
.route("/api/v1/config/watcher-auto-register", post(watcher_auto_register_toggle))
}