diff --git a/src/api/processing.rs b/src/api/processing.rs index ae88750..9221e1b 100644 --- a/src/api/processing.rs +++ b/src/api/processing.rs @@ -197,7 +197,7 @@ async fn trigger_processing( let videos_table = schema::table_name("videos"); let row: Option<(String, String, String, Option, String, Option)> = sqlx::query_as(&format!( - "SELECT file_uuid, file_path, file_name, file_type, COALESCE(processing_status, 'REGISTERED'), content_hash FROM {} WHERE file_uuid = $1", + "SELECT file_uuid, file_path, file_name, file_type, COALESCE(processing_status::text, 'REGISTERED'), content_hash FROM {} WHERE file_uuid = $1", videos_table )) .bind(&uuid) @@ -218,16 +218,6 @@ async fn trigger_processing( let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR") .unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string()); - let status_update = format!( - "UPDATE {} SET processing_status = 'QUEUED' WHERE file_uuid = $1", - videos_table - ); - sqlx::query(&status_update) - .bind(&file_uuid) - .execute(state.db.pool()) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - let output_path = std::path::Path::new(&output_dir).join(format!("{}.monitor.json", file_uuid)); let monitor_jobs_table = schema::table_name("monitor_jobs"); @@ -235,13 +225,38 @@ async fn trigger_processing( let mut conn = redis.get_conn().await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let processors_to_run: Vec<&str> = if let Some(procs) = &req.processors { - let procs_str = serde_json::to_string(procs).unwrap_or_default(); - sqlx::query(&format!("UPDATE {} SET processors = $1 WHERE id = $2", schema::table_name("monitor_jobs"))) - .bind(&procs_str) - .bind(&file_uuid) - .execute(state.db.pool()) - .await - .ok(); + // 檢查 job 是否存在,不存在則 INSERT(state machine entry) + let existing_id: Option = sqlx::query_scalar( + &format!("SELECT id FROM {monitor_jobs_table} WHERE uuid = $1") + ) + .bind(&file_uuid) + .fetch_optional(state.db.pool()) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + if existing_id.is_none() { + state.db.create_monitor_job(&file_uuid, Some(&file_path)) + .await + .map_err(|e| { + tracing::error!("[TRIGGER] Failed to create monitor job for {}: {}", file_uuid, e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + } + + // UPDATE processors + reset 狀態讓 worker 可 pickup + let procs_db: Vec = procs.iter().map(|s| s.to_string()).collect(); + sqlx::query(&format!( + "UPDATE {monitor_jobs_table} SET processors = $1::text[], status = 'pending' WHERE uuid = $2" + )) + .bind(&procs_db) + .bind(&file_uuid) + .execute(state.db.pool()) + .await + .map_err(|e| { + tracing::error!("[TRIGGER] Failed to update monitor job for {}: {}", file_uuid, e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + procs.iter().map(|s| s.as_str()).collect() } else { vec![] @@ -269,7 +284,7 @@ async fn trigger_processing( Ok(Json(serde_json::json!({ "success": true, - "message": "Processing queued", + "message": "Processing triggered", "file_uuid": file_uuid, }))) }