diff --git a/docs_v1.0/API_V1.0.0/API_DICTIONARY_V1.0.0.md b/docs_v1.0/API_V1.0.0/API_DICTIONARY_V1.0.0.md index 000d0bf..59d84b3 100644 --- a/docs_v1.0/API_V1.0.0/API_DICTIONARY_V1.0.0.md +++ b/docs_v1.0/API_V1.0.0/API_DICTIONARY_V1.0.0.md @@ -69,6 +69,8 @@ Every path segment after the resource ID is a **verb** — an action on that res | 14 | GET | `/api/v1/progress/:file_uuid` | Processing progress | | 15 | GET | `/api/v1/jobs` | List monitor jobs (filterable by status) | | 16 | POST | `/api/v1/config/cache` | Toggle Redis cache | +| 17 | POST | `/api/v1/config/auto-pipeline` | Toggle auto-pipeline on register | +| 18 | POST | `/api/v1/config/watcher-auto-register` | Toggle watcher auto-register | | 17 | POST | `/api/v1/search/visual` | Search visual chunks | | 18 | POST | `/api/v1/search/visual/class` | Search by object class | | 19 | POST | `/api/v1/search/visual/density` | Search by spatial density | diff --git a/docs_v1.0/API_V1.0.0/API_REFERENCE_v1.0.0.md b/docs_v1.0/API_V1.0.0/API_REFERENCE_v1.0.0.md index 11d6d61..3a3d9de 100644 --- a/docs_v1.0/API_V1.0.0/API_REFERENCE_v1.0.0.md +++ b/docs_v1.0/API_V1.0.0/API_REFERENCE_v1.0.0.md @@ -16,7 +16,7 @@ owner: "Warren" | Environment | URL | |-------------|-----| -| Production | `http://localhost:3002` or `https://api.momentry.ddns.net` | +| Production | `http://localhost:3002` or `https://m5api.momentry.ddns.net` | | Development | `http://localhost:3003` | | Auth | Header `X-API-Key: ` (login endpoint unprotected) | @@ -36,6 +36,8 @@ owner: "Warren" | 6 | GET | `/api/v1/stats/sftpgo` | SFTPGo status | | 7 | GET | `/api/v1/stats/inference` | LLM/Embedding health | | 8 | POST | `/api/v1/config/cache` | Toggle Redis cache | +| 9 | POST | `/api/v1/config/auto-pipeline` | Toggle auto-pipeline on register | +| 10 | POST | `/api/v1/config/watcher-auto-register` | Toggle watcher auto-register | ```bash curl http://localhost:3002/health diff --git a/docs_v1.0/API_V1.0.0/INTERNAL/API_DICTIONARY_V1.0.0.md b/docs_v1.0/API_V1.0.0/INTERNAL/API_DICTIONARY_V1.0.0.md index 9e41deb..af2d892 100644 --- a/docs_v1.0/API_V1.0.0/INTERNAL/API_DICTIONARY_V1.0.0.md +++ b/docs_v1.0/API_V1.0.0/INTERNAL/API_DICTIONARY_V1.0.0.md @@ -161,6 +161,8 @@ related_documents: | `GET` | `/api/v1/stats/sftpgo` | Internal | | `GET` | `/api/v1/stats/inference` | Internal | | `POST` | `/api/v1/config/cache` | Admin | +| `POST` | `/api/v1/config/auto-pipeline` | Admin | +| `POST` | `/api/v1/config/watcher-auto-register` | Admin | --- diff --git a/docs_v1.0/API_V1.0.0/INTERNAL/DEV_API_REFERENCE_V1.0.0.md b/docs_v1.0/API_V1.0.0/INTERNAL/DEV_API_REFERENCE_V1.0.0.md index 57e6bf2..f0abdff 100644 --- a/docs_v1.0/API_V1.0.0/INTERNAL/DEV_API_REFERENCE_V1.0.0.md +++ b/docs_v1.0/API_V1.0.0/INTERNAL/DEV_API_REFERENCE_V1.0.0.md @@ -158,6 +158,8 @@ related_documents: | 51 | GET | `/api/v1/stats/sftpgo` | SFTPGo 使用者狀態 | ✅ | | 52 | GET | `/api/v1/stats/inference` | 推理叢集健康狀態 | ✅ | | 53 | POST | `/api/v1/config/cache` | 切換快取開關 | ✅ | +| 54 | POST | `/api/v1/config/auto-pipeline` | 註冊後自動處理 | ✅ | +| 55 | POST | `/api/v1/config/watcher-auto-register` | Watcher 自動註冊 | ✅ | --- diff --git a/docs_v1.0/API_WORKSPACE/modules/03_register.md b/docs_v1.0/API_WORKSPACE/modules/03_register.md index 9bae26f..b1643cf 100644 --- a/docs_v1.0/API_WORKSPACE/modules/03_register.md +++ b/docs_v1.0/API_WORKSPACE/modules/03_register.md @@ -11,7 +11,8 @@ Register a video file for processing. Returns the file's metadata and UUID. -**New in v0.1.2**: Registration now **automatically triggers the processing pipeline** — no need to call `POST /api/v1/file/:file_uuid/process` separately. The system will: +Registration can **automatically trigger the processing pipeline** if the +[auto-pipeline toggle](/api/v1/config/auto-pipeline) is enabled (disabled by default). The system will: 1. Register the file and run ffprobe 2. Auto-run offline TMDb probe (reads local identity files, no API calls) 3. Create a monitor job for the worker diff --git a/docs_v1.0/API_WORKSPACE/modules/10_pipeline.md b/docs_v1.0/API_WORKSPACE/modules/10_pipeline.md index a14cf11..04c0894 100644 --- a/docs_v1.0/API_WORKSPACE/modules/10_pipeline.md +++ b/docs_v1.0/API_WORKSPACE/modules/10_pipeline.md @@ -125,6 +125,59 @@ curl -s -X POST "$API/api/v1/config/cache" \ -d '{"enabled": false}' ``` +### `POST /api/v1/config/auto-pipeline` + +**Auth**: Required +**Scope**: system-level + +Toggle automatic processing pipeline trigger on file registration (disabled by default). + +#### Request Parameters + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `enabled` | boolean | Yes | `true` to enable auto-pipeline, `false` to disable | + +#### Example + +```bash +# Enable auto-pipeline +curl -s -X POST "$API/api/v1/config/auto-pipeline" \ + -H "Content-Type: application/json" \ + -H "X-API-Key: $KEY" \ + -d '{"enabled": true}' + +# Disable auto-pipeline +curl -s -X POST "$API/api/v1/config/auto-pipeline" \ + -H "Content-Type: application/json" \ + -H "X-API-Key: $KEY" \ + -d '{"enabled": false}' +``` + +### `POST /api/v1/config/watcher-auto-register` + +**Auth**: Required +**Scope**: system-level + +Toggle automatic registration of newly detected files in the watcher (disabled by default). +When enabled, the file watcher automatically pre-processes and registers new files into the database. + +#### Request Parameters + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `enabled` | boolean | Yes | `true` to enable watcher auto-register, `false` to disable | + +#### Example + +```bash +# Enable watcher auto-register +curl -s -X POST "$API/api/v1/config/watcher-auto-register" \ + -H "Content-Type: application/json" \ + -H "X-API-Key: $KEY" \ + -d '{"enabled": true}' +``` + ### Unmounted Routes The following routes are defined in source code but are **NOT** currently mounted in the router: diff --git a/src/api/server.rs b/src/api/server.rs index d1b0783..2fa2ffd 100644 --- a/src/api/server.rs +++ b/src/api/server.rs @@ -181,6 +181,30 @@ struct CacheToggleResponse { message: String, } +#[derive(Debug, Deserialize)] +struct AutoPipelineToggleRequest { + enabled: bool, +} + +#[derive(Debug, Serialize)] +struct AutoPipelineToggleResponse { + success: bool, + auto_pipeline_enabled: bool, + message: String, +} + +#[derive(Debug, Deserialize)] +struct WatcherAutoRegisterToggleRequest { + enabled: bool, +} + +#[derive(Debug, Serialize)] +struct WatcherAutoRegisterToggleResponse { + success: bool, + watcher_auto_register_enabled: bool, + message: String, +} + // Missing structs added #[derive(Debug, Deserialize)] @@ -1692,7 +1716,8 @@ async fn register_file( let resp = register_single_file(&state, &file_path, req.user_id, req.content_hash).await; // Auto-trigger pipeline for newly registered video files - if resp.success && !resp.already_exists && resp.file_type.as_deref() == Some("video") { + if resp.success && !resp.already_exists && resp.file_type.as_deref() == Some("video") + && crate::core::config::get_auto_pipeline_enabled() { let auto_uuid = resp.file_uuid.clone(); let auto_state = state.clone(); tokio::spawn(async move { @@ -2650,33 +2675,35 @@ fn scan_directory_recursive( .unwrap_or_default(); // Check registration - if let Some((uuid, status, reg_time, jid)) = registered_paths.get(&abs_path) - { - files.push(ScannedFileInfo { - file_name, - relative_path: rel_path, - file_path: abs_path, - file_size: meta.len(), - modified_time, - is_registered: true, - file_uuid: Some(uuid.clone()), - status: Some(status.clone()), - registration_time: reg_time.clone(), - job_id: *jid, - }); - } else { - files.push(ScannedFileInfo { - file_name, - relative_path: rel_path, - file_path: abs_path, - file_size: meta.len(), - modified_time, - is_registered: false, - file_uuid: None, - status: Some("unregistered".to_string()), - registration_time: None, - job_id: None, - }); + match registered_paths.get(&abs_path) { + Some((uuid, status, reg_time, jid)) if status != "unregistered" => { + files.push(ScannedFileInfo { + file_name, + relative_path: rel_path, + file_path: abs_path, + file_size: meta.len(), + modified_time, + is_registered: true, + file_uuid: Some(uuid.clone()), + status: Some(status.clone()), + registration_time: reg_time.clone(), + job_id: *jid, + }); + } + _ => { + files.push(ScannedFileInfo { + file_name, + relative_path: rel_path, + file_path: abs_path, + file_size: meta.len(), + modified_time, + is_registered: false, + file_uuid: None, + status: Some("unregistered".to_string()), + registration_time: None, + job_id: None, + }); + } } } } @@ -3332,6 +3359,30 @@ async fn cache_toggle( Ok(Json(response)) } +async fn auto_pipeline_toggle( + Json(req): Json, +) -> Result, StatusCode> { + tracing::info!("[auto_pipeline_toggle] Setting to: {}", req.enabled); + crate::core::config::set_auto_pipeline_enabled(req.enabled); + Ok(Json(AutoPipelineToggleResponse { + success: true, + auto_pipeline_enabled: req.enabled, + message: format!("Auto-pipeline {}", if req.enabled { "enabled" } else { "disabled" }), + })) +} + +async fn watcher_auto_register_toggle( + Json(req): Json, +) -> Result, StatusCode> { + tracing::info!("[watcher_auto_register_toggle] Setting to: {}", req.enabled); + crate::core::config::set_watcher_auto_register(req.enabled); + Ok(Json(WatcherAutoRegisterToggleResponse { + success: true, + watcher_auto_register_enabled: req.enabled, + message: format!("Watcher auto-register {}", if req.enabled { "enabled" } else { "disabled" }), + })) +} + #[derive(Debug, Serialize)] struct UnregisterResponse { success: bool, @@ -3682,6 +3733,8 @@ pub async fn start_server(host: &str, port: u16) -> anyhow::Result<()> { .route("/api/v1/progress/:file_uuid", get(get_progress)) .route("/api/v1/jobs", get(list_jobs)) .route("/api/v1/config/cache", post(cache_toggle)) + .route("/api/v1/config/auto-pipeline", post(auto_pipeline_toggle)) + .route("/api/v1/config/watcher-auto-register", post(watcher_auto_register_toggle)) // .merge(person_identity::person_identity_routes()) // V4.0: DISABLED (person_identities table removed) .merge(identity_binding::identity_binding_routes()) .merge(identities::identity_routes()) diff --git a/src/core/config.rs b/src/core/config.rs index 80622af..683081f 100644 --- a/src/core/config.rs +++ b/src/core/config.rs @@ -19,6 +19,32 @@ pub fn set_cache_enabled(enabled: bool) { tracing::info!("Cache enabled set to: {}", enabled); } +// Switch 1: watcher detects new file → auto-register +pub static RUNTIME_WATCHER_AUTO_REGISTER: Lazy> = + Lazy::new(|| RwLock::new(false)); + +pub fn get_watcher_auto_register() -> bool { + *RUNTIME_WATCHER_AUTO_REGISTER.read().unwrap() +} + +pub fn set_watcher_auto_register(enabled: bool) { + *RUNTIME_WATCHER_AUTO_REGISTER.write().unwrap() = enabled; + tracing::info!("Watcher auto-register set to: {}", enabled); +} + +// Switch 2: register → auto-trigger processing pipeline +pub static RUNTIME_AUTO_PIPELINE_ENABLED: Lazy> = + Lazy::new(|| RwLock::new(false)); + +pub fn get_auto_pipeline_enabled() -> bool { + *RUNTIME_AUTO_PIPELINE_ENABLED.read().unwrap() +} + +pub fn set_auto_pipeline_enabled(enabled: bool) { + *RUNTIME_AUTO_PIPELINE_ENABLED.write().unwrap() = enabled; + tracing::info!("Auto-pipeline enabled set to: {}", enabled); +} + pub static DATABASE_URL: Lazy = Lazy::new(|| { env::var("DATABASE_URL") .unwrap_or_else(|_| "postgres://accusys@localhost:5432/momentry".to_string()) diff --git a/src/watcher/watcher.rs b/src/watcher/watcher.rs index aa2391a..b507838 100644 --- a/src/watcher/watcher.rs +++ b/src/watcher/watcher.rs @@ -1,8 +1,11 @@ -use anyhow::Result; use std::path::Path; + +use anyhow::Result; use tokio::time; use tracing::{info, warn}; +use crate::core::db::{PostgresDb, VideoRecord, VideoStatus}; + pub struct WatcherConfig { pub directories: Vec, pub poll_interval_ms: u64, @@ -20,7 +23,7 @@ impl Default for WatcherConfig { } /// Starts the file watcher in the background. -/// Detects new files and logs them. Does NOT auto-register or auto-pre-process. +/// Detects new files and logs them. pub async fn run_watcher() -> Result<()> { let config = WatcherConfig::default(); let dirs = config.directories.clone(); @@ -35,7 +38,6 @@ pub async fn run_watcher() -> Result<()> { tokio::spawn(async move { let mut interval = time::interval(std::time::Duration::from_millis(config.poll_interval_ms)); - // Track known files across cycles let mut known = std::collections::HashSet::new(); loop { interval.tick().await; @@ -70,16 +72,97 @@ async fn report_new_files(directories: &[String], known: &mut std::collections:: current_files.insert(fname.clone()); if !known.contains(&fname) { info!("[WATCHER] New file detected: {} in {}", fname, dir); + if crate::core::config::get_watcher_auto_register() { + let fpath = file_path.to_string_lossy().to_string(); + tokio::spawn(async move { + auto_register_file(&fpath).await; + }); + } known.insert(fname); } } - // Remove files that no longer exist (deleted between cycles) known.retain(|k| current_files.contains(k)); } } +async fn auto_register_file(file_path: &str) { + let file_uuid = match pre_process_file(file_path).await { + Some(u) => u, + None => return, + }; + + let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR") + .unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string()); + let pre_path = std::path::PathBuf::from(&output_dir).join(format!("{}.pre.json", file_uuid)); + let pre_content = match std::fs::read_to_string(&pre_path) { + Ok(c) => c, + Err(e) => { + warn!("[WATCHER] Failed to read pre.json: {}", e); + return; + } + }; + let pre: serde_json::Value = match serde_json::from_str(&pre_content) { + Ok(v) => v, + Err(e) => { + warn!("[WATCHER] Failed to parse pre.json: {}", e); + return; + } + }; + + let file_name = pre.get("file_name").and_then(|v| v.as_str()).unwrap_or("unknown").to_string(); + let probe = pre.get("probe_json").cloned().unwrap_or_default(); + let file_type = pre.get("file_type").and_then(|v| v.as_str()).unwrap_or("unknown").to_string(); + let canonical_path = pre.get("file_path").and_then(|v| v.as_str()).unwrap_or(file_path).to_string(); + + let duration = probe.get("format").and_then(|f| f.get("duration")).and_then(|v| v.as_f64()).unwrap_or(0.0); + let width = probe.get("format").and_then(|f| f.get("width")).and_then(|v| v.as_u64()).unwrap_or(0) as u32; + let height = probe.get("format").and_then(|f| f.get("height")).and_then(|v| v.as_u64()).unwrap_or(0) as u32; + let fps_val = probe.get("format").and_then(|f| f.get("fps")).and_then(|v| v.as_f64()).unwrap_or(0.0); + + let record = VideoRecord { + id: 0, + file_uuid, + file_path: canonical_path, + file_name, + file_type: Some(file_type), + duration, + width, + height, + fps: fps_val, + probe_json: Some(probe), + storage: Default::default(), + status: VideoStatus::Registered, + processing_status: None, + birth_registration: None, + user_id: None, + job_id: None, + created_at: String::new(), + registration_time: None, + total_frames: 0, + parent_uuid: None, + cut_done: false, + cut_count: 0, + cut_max_duration: 0.0, + scene_done: false, + audio_tracks: None, + }; + + let database_url = crate::core::config::DATABASE_URL.as_str(); + let db = match PostgresDb::new(database_url).await { + Ok(d) => d, + Err(e) => { + warn!("[WATCHER] Failed to connect DB for auto-register: {}", e); + return; + } + }; + + match db.register_video(&record).await { + Ok(id) => info!("[WATCHER] Auto-registered {} (id={})", record.file_uuid, id), + Err(e) => warn!("[WATCHER] Auto-register failed for {}: {}", record.file_uuid, e), + } +} + /// Pre-process a single file: compute SHA256 + probe + UUID → .pre.json -/// This is called explicitly from register API, NOT from the watcher. pub async fn pre_process_file(file_path: &str) -> Option { let path = std::path::Path::new(file_path); if !path.is_file() {