diff --git a/scripts/redis_publisher.py b/scripts/redis_publisher.py index 30d759d..20734a9 100644 --- a/scripts/redis_publisher.py +++ b/scripts/redis_publisher.py @@ -86,7 +86,9 @@ class RedisPublisher: try: client: redis.Redis = self._client - client.publish(self.channel, json.dumps(asdict(msg))) + json_data = json.dumps(asdict(msg)) + client.publish(self.channel, json_data) + client.hset(self.channel, msg.processor, json_data) return True except Exception as e: import sys diff --git a/src/api/identity_api.rs b/src/api/identity_api.rs index d62a25b..fb5df94 100644 --- a/src/api/identity_api.rs +++ b/src/api/identity_api.rs @@ -155,6 +155,9 @@ pub struct FileDetailResponse { pub file_uuid: String, pub file_name: String, pub file_path: String, + pub status: String, + pub duration: f64, + pub fps: f64, pub metadata: Option, pub created_at: Option>, } @@ -175,6 +178,9 @@ async fn get_file_detail( file_uuid: f.file_uuid, file_name: f.file_name, file_path: f.file_path, + status: f.status.as_str().to_string(), + duration: f.duration, + fps: f.fps, metadata: f.probe_json, created_at: chrono::DateTime::parse_from_rfc3339(&f.created_at) .ok() @@ -745,19 +751,19 @@ async fn list_resources( .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; -let data: Vec = records - .into_iter() - .map(|r| ResourceItem { - resource_id: r.resource_id, - resource_type: r.resource_type, - category: r.category, - capabilities: r.capabilities, - config: r.config, - metadata: r.metadata, - status: r.status, - last_heartbeat: r.last_heartbeat, - }) - .collect(); + let data: Vec = records + .into_iter() + .map(|r| ResourceItem { + resource_id: r.resource_id, + resource_type: r.resource_type, + category: r.category, + capabilities: r.capabilities, + config: r.config, + metadata: r.metadata, + status: r.status, + last_heartbeat: r.last_heartbeat, + }) + .collect(); Ok(Json(ResourceResponse { success: true, diff --git a/src/api/processing.rs b/src/api/processing.rs index 9221e1b..3c60a42 100644 --- a/src/api/processing.rs +++ b/src/api/processing.rs @@ -6,13 +6,16 @@ use axum::{ Router, }; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::time::Instant; use super::types::AppState; use crate::core::cache::{keys, RedisCache}; use crate::core::config::REDIS_KEY_PREFIX; use crate::core::db::schema; -use crate::core::db::{Database, MonitorJobStatus, PostgresDb, RedisClient, VideoRecord, VideoStatus}; +use crate::core::db::{ + Database, MonitorJobStatus, PostgresDb, ProgressMessage, RedisClient, VideoRecord, VideoStatus, +}; use crate::core::probe::ffprobe; use crate::worker::processor; use crate::{Embedder, FileManager}; @@ -179,7 +182,10 @@ fn get_system_stats() -> (Option, Option, Option, Option) { .unwrap_or((None, None)); let gpu = Command::new("nvidia-smi") - .args(["--query-gpu=utilization.gpu", "--format=csv,noheader,nounits"]) + .args([ + "--query-gpu=utilization.gpu", + "--format=csv,noheader,nounits", + ]) .output() .ok() .and_then(|o| String::from_utf8_lossy(&o.stdout).trim().parse().ok()); @@ -222,23 +228,32 @@ async fn trigger_processing( let monitor_jobs_table = schema::table_name("monitor_jobs"); let redis = RedisClient::new().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - let mut conn = redis.get_conn().await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + let mut conn = redis + .get_conn() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let processors_to_run: Vec<&str> = if let Some(procs) = &req.processors { // 檢查 job 是否存在,不存在則 INSERT(state machine entry) - let existing_id: Option = sqlx::query_scalar( - &format!("SELECT id FROM {monitor_jobs_table} WHERE uuid = $1") - ) + let existing_id: Option = sqlx::query_scalar(&format!( + "SELECT id FROM {monitor_jobs_table} WHERE uuid = $1" + )) .bind(&file_uuid) .fetch_optional(state.db.pool()) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; if existing_id.is_none() { - state.db.create_monitor_job(&file_uuid, Some(&file_path)) + state + .db + .create_monitor_job(&file_uuid, Some(&file_path)) .await .map_err(|e| { - tracing::error!("[TRIGGER] Failed to create monitor job for {}: {}", file_uuid, e); + tracing::error!( + "[TRIGGER] Failed to create monitor job for {}: {}", + file_uuid, + e + ); StatusCode::INTERNAL_SERVER_ERROR })?; } @@ -280,7 +295,10 @@ async fn trigger_processing( .query_async(&mut conn) .await; - tracing::info!("[TRIGGER] Published processing notification for {}", file_uuid); + tracing::info!( + "[TRIGGER] Published processing notification for {}", + file_uuid + ); Ok(Json(serde_json::json!({ "success": true, @@ -296,8 +314,12 @@ async fn download_json( let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR") .unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string()); let path = std::path::Path::new(&output_dir).join(format!("{}.{}.json", file_uuid, processor)); - let content = tokio::fs::read_to_string(&path).await.map_err(|_| StatusCode::NOT_FOUND)?; - Ok(Json(serde_json::from_str(&content).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?)) + let content = tokio::fs::read_to_string(&path) + .await + .map_err(|_| StatusCode::NOT_FOUND)?; + Ok(Json( + serde_json::from_str(&content).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?, + )) } async fn get_chunk_by_path( @@ -318,14 +340,16 @@ async fn get_chunk_by_path( row.map(Json).ok_or(StatusCode::NOT_FOUND) } -async fn get_progress( - file_uuid: Path, -) -> Result, StatusCode> { +async fn get_progress(file_uuid: Path) -> Result, StatusCode> { let file_uuid = file_uuid.0; let redis = RedisClient::new().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - let mut conn = redis.get_conn().await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + let mut conn = redis + .get_conn() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let key = format!("{}progress:{}", REDIS_KEY_PREFIX.as_str(), file_uuid); - let progress_json: Option = redis::cmd("GET") + + let processor_map: HashMap = redis::cmd("HGETALL") .arg(&key) .query_async(&mut conn) .await @@ -345,16 +369,45 @@ async fn get_progress( running_processors: 0, }; - let pg = PostgresDb::init().await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - let video = pg.get_video_by_uuid(&file_uuid).await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + let pg = PostgresDb::init() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + let video = pg + .get_video_by_uuid(&file_uuid) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - let (overall, processors) = if let Some(json_str) = &progress_json { - match serde_json::from_str::(json_str) { - Ok(p) => (p.overall_progress, p.processors), - Err(_) => (0u32, vec![]), - } + let processors: Vec = processor_map + .into_iter() + .filter_map(|(name, json_str)| { + let msg: ProgressMessage = serde_json::from_str(&json_str).ok()?; + let current = msg.data.current.unwrap_or(0) as u32; + let total = msg.data.total.unwrap_or(0) as u32; + let progress = if total > 0 { + ((current as f64 / total as f64) * 100.0) as u32 + } else { + 0 + }; + Some(ProcessorProgressInfo { + name, + status: msg.msg_type, + current, + total, + progress, + message: msg.data.message.unwrap_or_default(), + frames_processed: 0, + chunks_produced: 0, + retry_count: 0, + eta_seconds: None, + }) + }) + .collect(); + + let overall = if processors.is_empty() { + 0 } else { - (0u32, vec![]) + let sum: u64 = processors.iter().map(|p| p.progress as u64).sum(); + (sum / processors.len() as u64) as u32 }; Ok(Json(ProgressResponse { @@ -373,10 +426,10 @@ async fn get_progress( })) } -async fn list_jobs( - Query(params): Query, -) -> Result, StatusCode> { - let pg = PostgresDb::init().await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; +async fn list_jobs(Query(params): Query) -> Result, StatusCode> { + let pg = PostgresDb::init() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let jobs_table = schema::table_name("monitor_jobs"); let videos_table = schema::table_name("videos"); @@ -412,16 +465,18 @@ async fn list_jobs( let job_list = jobs .into_iter() - .map(|(id, uuid, fname, status, cp, _vuuid, pf, tf)| JobInfoResponse { - id, - uuid, - status, - current_processor: Some(cp), - progress_current: pf, - progress_total: tf, - created_at: String::new(), - started_at: None, - }) + .map( + |(id, uuid, fname, status, cp, _vuuid, pf, tf)| JobInfoResponse { + id, + uuid, + status, + current_processor: Some(cp), + progress_current: pf, + progress_total: tf, + created_at: String::new(), + started_at: None, + }, + ) .collect(); Ok(Json(JobListResponse { @@ -432,10 +487,10 @@ async fn list_jobs( })) } -async fn get_job( - Path(uuid): Path, -) -> Result, StatusCode> { - let pg = PostgresDb::init().await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; +async fn get_job(Path(uuid): Path) -> Result, StatusCode> { + let pg = PostgresDb::init() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let jobs_table = schema::table_name("monitor_jobs"); let videos_table = schema::table_name("videos"); @@ -452,8 +507,18 @@ async fn get_job( .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - let (id, uuid, file_name, status, current_processor, pf, tf, created_at, started_at, updated_at) = - job.ok_or(StatusCode::NOT_FOUND)?; + let ( + id, + uuid, + file_name, + status, + current_processor, + pf, + tf, + created_at, + started_at, + updated_at, + ) = job.ok_or(StatusCode::NOT_FOUND)?; Ok(Json(JobDetailResponse { id, @@ -512,11 +577,20 @@ async fn watcher_auto_register_toggle( pub fn processing_routes() -> Router { Router::new() .route("/api/v1/file/:file_uuid/process", post(trigger_processing)) - .route("/api/v1/file/:file_uuid/json/:processor", post(download_json)) - .route("/api/v1/file/:file_uuid/chunk/:chunk_id", post(get_chunk_by_path)) + .route( + "/api/v1/file/:file_uuid/json/:processor", + post(download_json), + ) + .route( + "/api/v1/file/:file_uuid/chunk/:chunk_id", + post(get_chunk_by_path), + ) .route("/api/v1/progress/:file_uuid", post(get_progress)) .route("/api/v1/jobs", post(list_jobs)) .route("/api/v1/config/cache", post(cache_toggle)) .route("/api/v1/config/auto-pipeline", post(auto_pipeline_toggle)) - .route("/api/v1/config/watcher-auto-register", post(watcher_auto_register_toggle)) + .route( + "/api/v1/config/watcher-auto-register", + post(watcher_auto_register_toggle), + ) }