fix: scan status=unregistered not shown as registered; feat: config API for auto-pipeline/watcher-auto-register

This commit is contained in:
Accusys
2026-05-19 00:37:00 +08:00
parent 05e1e807c0
commit a02a83c1c3
9 changed files with 259 additions and 35 deletions

View File

@@ -69,6 +69,8 @@ Every path segment after the resource ID is a **verb** — an action on that res
| 14 | GET | `/api/v1/progress/:file_uuid` | Processing progress |
| 15 | GET | `/api/v1/jobs` | List monitor jobs (filterable by status) |
| 16 | POST | `/api/v1/config/cache` | Toggle Redis cache |
| 17 | POST | `/api/v1/config/auto-pipeline` | Toggle auto-pipeline on register |
| 18 | POST | `/api/v1/config/watcher-auto-register` | Toggle watcher auto-register |
| 17 | POST | `/api/v1/search/visual` | Search visual chunks |
| 18 | POST | `/api/v1/search/visual/class` | Search by object class |
| 19 | POST | `/api/v1/search/visual/density` | Search by spatial density |

View File

@@ -16,7 +16,7 @@ owner: "Warren"
| Environment | URL |
|-------------|-----|
| Production | `http://localhost:3002` or `https://api.momentry.ddns.net` |
| Production | `http://localhost:3002` or `https://m5api.momentry.ddns.net` |
| Development | `http://localhost:3003` |
| Auth | Header `X-API-Key: <key>` (login endpoint unprotected) |
@@ -36,6 +36,8 @@ owner: "Warren"
| 6 | GET | `/api/v1/stats/sftpgo` | SFTPGo status |
| 7 | GET | `/api/v1/stats/inference` | LLM/Embedding health |
| 8 | POST | `/api/v1/config/cache` | Toggle Redis cache |
| 9 | POST | `/api/v1/config/auto-pipeline` | Toggle auto-pipeline on register |
| 10 | POST | `/api/v1/config/watcher-auto-register` | Toggle watcher auto-register |
```bash
curl http://localhost:3002/health

View File

@@ -161,6 +161,8 @@ related_documents:
| `GET` | `/api/v1/stats/sftpgo` | Internal |
| `GET` | `/api/v1/stats/inference` | Internal |
| `POST` | `/api/v1/config/cache` | Admin |
| `POST` | `/api/v1/config/auto-pipeline` | Admin |
| `POST` | `/api/v1/config/watcher-auto-register` | Admin |
---

View File

@@ -158,6 +158,8 @@ related_documents:
| 51 | GET | `/api/v1/stats/sftpgo` | SFTPGo 使用者狀態 | ✅ |
| 52 | GET | `/api/v1/stats/inference` | 推理叢集健康狀態 | ✅ |
| 53 | POST | `/api/v1/config/cache` | 切換快取開關 | ✅ |
| 54 | POST | `/api/v1/config/auto-pipeline` | 註冊後自動處理 | ✅ |
| 55 | POST | `/api/v1/config/watcher-auto-register` | Watcher 自動註冊 | ✅ |
---

View File

@@ -11,7 +11,8 @@
Register a video file for processing. Returns the file's metadata and UUID.
**New in v0.1.2**: Registration now **automatically triggers the processing pipeline** — no need to call `POST /api/v1/file/:file_uuid/process` separately. The system will:
Registration can **automatically trigger the processing pipeline** if the
[auto-pipeline toggle](/api/v1/config/auto-pipeline) is enabled (disabled by default). The system will:
1. Register the file and run ffprobe
2. Auto-run offline TMDb probe (reads local identity files, no API calls)
3. Create a monitor job for the worker

View File

@@ -125,6 +125,59 @@ curl -s -X POST "$API/api/v1/config/cache" \
-d '{"enabled": false}'
```
### `POST /api/v1/config/auto-pipeline`
**Auth**: Required
**Scope**: system-level
Toggle automatic processing pipeline trigger on file registration (disabled by default).
#### Request Parameters
| Field | Type | Required | Description |
|-------|------|----------|-------------|
| `enabled` | boolean | Yes | `true` to enable auto-pipeline, `false` to disable |
#### Example
```bash
# Enable auto-pipeline
curl -s -X POST "$API/api/v1/config/auto-pipeline" \
-H "Content-Type: application/json" \
-H "X-API-Key: $KEY" \
-d '{"enabled": true}'
# Disable auto-pipeline
curl -s -X POST "$API/api/v1/config/auto-pipeline" \
-H "Content-Type: application/json" \
-H "X-API-Key: $KEY" \
-d '{"enabled": false}'
```
### `POST /api/v1/config/watcher-auto-register`
**Auth**: Required
**Scope**: system-level
Toggle automatic registration of newly detected files in the watcher (disabled by default).
When enabled, the file watcher automatically pre-processes and registers new files into the database.
#### Request Parameters
| Field | Type | Required | Description |
|-------|------|----------|-------------|
| `enabled` | boolean | Yes | `true` to enable watcher auto-register, `false` to disable |
#### Example
```bash
# Enable watcher auto-register
curl -s -X POST "$API/api/v1/config/watcher-auto-register" \
-H "Content-Type: application/json" \
-H "X-API-Key: $KEY" \
-d '{"enabled": true}'
```
### Unmounted Routes
The following routes are defined in source code but are **NOT** currently mounted in the router:

View File

@@ -181,6 +181,30 @@ struct CacheToggleResponse {
message: String,
}
#[derive(Debug, Deserialize)]
struct AutoPipelineToggleRequest {
enabled: bool,
}
#[derive(Debug, Serialize)]
struct AutoPipelineToggleResponse {
success: bool,
auto_pipeline_enabled: bool,
message: String,
}
#[derive(Debug, Deserialize)]
struct WatcherAutoRegisterToggleRequest {
enabled: bool,
}
#[derive(Debug, Serialize)]
struct WatcherAutoRegisterToggleResponse {
success: bool,
watcher_auto_register_enabled: bool,
message: String,
}
// Missing structs added
#[derive(Debug, Deserialize)]
@@ -1692,7 +1716,8 @@ async fn register_file(
let resp = register_single_file(&state, &file_path, req.user_id, req.content_hash).await;
// Auto-trigger pipeline for newly registered video files
if resp.success && !resp.already_exists && resp.file_type.as_deref() == Some("video") {
if resp.success && !resp.already_exists && resp.file_type.as_deref() == Some("video")
&& crate::core::config::get_auto_pipeline_enabled() {
let auto_uuid = resp.file_uuid.clone();
let auto_state = state.clone();
tokio::spawn(async move {
@@ -2650,33 +2675,35 @@ fn scan_directory_recursive(
.unwrap_or_default();
// Check registration
if let Some((uuid, status, reg_time, jid)) = registered_paths.get(&abs_path)
{
files.push(ScannedFileInfo {
file_name,
relative_path: rel_path,
file_path: abs_path,
file_size: meta.len(),
modified_time,
is_registered: true,
file_uuid: Some(uuid.clone()),
status: Some(status.clone()),
registration_time: reg_time.clone(),
job_id: *jid,
});
} else {
files.push(ScannedFileInfo {
file_name,
relative_path: rel_path,
file_path: abs_path,
file_size: meta.len(),
modified_time,
is_registered: false,
file_uuid: None,
status: Some("unregistered".to_string()),
registration_time: None,
job_id: None,
});
match registered_paths.get(&abs_path) {
Some((uuid, status, reg_time, jid)) if status != "unregistered" => {
files.push(ScannedFileInfo {
file_name,
relative_path: rel_path,
file_path: abs_path,
file_size: meta.len(),
modified_time,
is_registered: true,
file_uuid: Some(uuid.clone()),
status: Some(status.clone()),
registration_time: reg_time.clone(),
job_id: *jid,
});
}
_ => {
files.push(ScannedFileInfo {
file_name,
relative_path: rel_path,
file_path: abs_path,
file_size: meta.len(),
modified_time,
is_registered: false,
file_uuid: None,
status: Some("unregistered".to_string()),
registration_time: None,
job_id: None,
});
}
}
}
}
@@ -3332,6 +3359,30 @@ async fn cache_toggle(
Ok(Json(response))
}
async fn auto_pipeline_toggle(
Json(req): Json<AutoPipelineToggleRequest>,
) -> Result<Json<AutoPipelineToggleResponse>, StatusCode> {
tracing::info!("[auto_pipeline_toggle] Setting to: {}", req.enabled);
crate::core::config::set_auto_pipeline_enabled(req.enabled);
Ok(Json(AutoPipelineToggleResponse {
success: true,
auto_pipeline_enabled: req.enabled,
message: format!("Auto-pipeline {}", if req.enabled { "enabled" } else { "disabled" }),
}))
}
async fn watcher_auto_register_toggle(
Json(req): Json<WatcherAutoRegisterToggleRequest>,
) -> Result<Json<WatcherAutoRegisterToggleResponse>, StatusCode> {
tracing::info!("[watcher_auto_register_toggle] Setting to: {}", req.enabled);
crate::core::config::set_watcher_auto_register(req.enabled);
Ok(Json(WatcherAutoRegisterToggleResponse {
success: true,
watcher_auto_register_enabled: req.enabled,
message: format!("Watcher auto-register {}", if req.enabled { "enabled" } else { "disabled" }),
}))
}
#[derive(Debug, Serialize)]
struct UnregisterResponse {
success: bool,
@@ -3682,6 +3733,8 @@ pub async fn start_server(host: &str, port: u16) -> anyhow::Result<()> {
.route("/api/v1/progress/:file_uuid", get(get_progress))
.route("/api/v1/jobs", get(list_jobs))
.route("/api/v1/config/cache", post(cache_toggle))
.route("/api/v1/config/auto-pipeline", post(auto_pipeline_toggle))
.route("/api/v1/config/watcher-auto-register", post(watcher_auto_register_toggle))
// .merge(person_identity::person_identity_routes()) // V4.0: DISABLED (person_identities table removed)
.merge(identity_binding::identity_binding_routes())
.merge(identities::identity_routes())

View File

@@ -19,6 +19,32 @@ pub fn set_cache_enabled(enabled: bool) {
tracing::info!("Cache enabled set to: {}", enabled);
}
// Switch 1: watcher detects new file → auto-register
pub static RUNTIME_WATCHER_AUTO_REGISTER: Lazy<RwLock<bool>> =
Lazy::new(|| RwLock::new(false));
pub fn get_watcher_auto_register() -> bool {
*RUNTIME_WATCHER_AUTO_REGISTER.read().unwrap()
}
pub fn set_watcher_auto_register(enabled: bool) {
*RUNTIME_WATCHER_AUTO_REGISTER.write().unwrap() = enabled;
tracing::info!("Watcher auto-register set to: {}", enabled);
}
// Switch 2: register → auto-trigger processing pipeline
pub static RUNTIME_AUTO_PIPELINE_ENABLED: Lazy<RwLock<bool>> =
Lazy::new(|| RwLock::new(false));
pub fn get_auto_pipeline_enabled() -> bool {
*RUNTIME_AUTO_PIPELINE_ENABLED.read().unwrap()
}
pub fn set_auto_pipeline_enabled(enabled: bool) {
*RUNTIME_AUTO_PIPELINE_ENABLED.write().unwrap() = enabled;
tracing::info!("Auto-pipeline enabled set to: {}", enabled);
}
pub static DATABASE_URL: Lazy<String> = Lazy::new(|| {
env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://accusys@localhost:5432/momentry".to_string())

View File

@@ -1,8 +1,11 @@
use anyhow::Result;
use std::path::Path;
use anyhow::Result;
use tokio::time;
use tracing::{info, warn};
use crate::core::db::{PostgresDb, VideoRecord, VideoStatus};
pub struct WatcherConfig {
pub directories: Vec<String>,
pub poll_interval_ms: u64,
@@ -20,7 +23,7 @@ impl Default for WatcherConfig {
}
/// Starts the file watcher in the background.
/// Detects new files and logs them. Does NOT auto-register or auto-pre-process.
/// Detects new files and logs them.
pub async fn run_watcher() -> Result<()> {
let config = WatcherConfig::default();
let dirs = config.directories.clone();
@@ -35,7 +38,6 @@ pub async fn run_watcher() -> Result<()> {
tokio::spawn(async move {
let mut interval = time::interval(std::time::Duration::from_millis(config.poll_interval_ms));
// Track known files across cycles
let mut known = std::collections::HashSet::new();
loop {
interval.tick().await;
@@ -70,16 +72,97 @@ async fn report_new_files(directories: &[String], known: &mut std::collections::
current_files.insert(fname.clone());
if !known.contains(&fname) {
info!("[WATCHER] New file detected: {} in {}", fname, dir);
if crate::core::config::get_watcher_auto_register() {
let fpath = file_path.to_string_lossy().to_string();
tokio::spawn(async move {
auto_register_file(&fpath).await;
});
}
known.insert(fname);
}
}
// Remove files that no longer exist (deleted between cycles)
known.retain(|k| current_files.contains(k));
}
}
async fn auto_register_file(file_path: &str) {
let file_uuid = match pre_process_file(file_path).await {
Some(u) => u,
None => return,
};
let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string());
let pre_path = std::path::PathBuf::from(&output_dir).join(format!("{}.pre.json", file_uuid));
let pre_content = match std::fs::read_to_string(&pre_path) {
Ok(c) => c,
Err(e) => {
warn!("[WATCHER] Failed to read pre.json: {}", e);
return;
}
};
let pre: serde_json::Value = match serde_json::from_str(&pre_content) {
Ok(v) => v,
Err(e) => {
warn!("[WATCHER] Failed to parse pre.json: {}", e);
return;
}
};
let file_name = pre.get("file_name").and_then(|v| v.as_str()).unwrap_or("unknown").to_string();
let probe = pre.get("probe_json").cloned().unwrap_or_default();
let file_type = pre.get("file_type").and_then(|v| v.as_str()).unwrap_or("unknown").to_string();
let canonical_path = pre.get("file_path").and_then(|v| v.as_str()).unwrap_or(file_path).to_string();
let duration = probe.get("format").and_then(|f| f.get("duration")).and_then(|v| v.as_f64()).unwrap_or(0.0);
let width = probe.get("format").and_then(|f| f.get("width")).and_then(|v| v.as_u64()).unwrap_or(0) as u32;
let height = probe.get("format").and_then(|f| f.get("height")).and_then(|v| v.as_u64()).unwrap_or(0) as u32;
let fps_val = probe.get("format").and_then(|f| f.get("fps")).and_then(|v| v.as_f64()).unwrap_or(0.0);
let record = VideoRecord {
id: 0,
file_uuid,
file_path: canonical_path,
file_name,
file_type: Some(file_type),
duration,
width,
height,
fps: fps_val,
probe_json: Some(probe),
storage: Default::default(),
status: VideoStatus::Registered,
processing_status: None,
birth_registration: None,
user_id: None,
job_id: None,
created_at: String::new(),
registration_time: None,
total_frames: 0,
parent_uuid: None,
cut_done: false,
cut_count: 0,
cut_max_duration: 0.0,
scene_done: false,
audio_tracks: None,
};
let database_url = crate::core::config::DATABASE_URL.as_str();
let db = match PostgresDb::new(database_url).await {
Ok(d) => d,
Err(e) => {
warn!("[WATCHER] Failed to connect DB for auto-register: {}", e);
return;
}
};
match db.register_video(&record).await {
Ok(id) => info!("[WATCHER] Auto-registered {} (id={})", record.file_uuid, id),
Err(e) => warn!("[WATCHER] Auto-register failed for {}: {}", record.file_uuid, e),
}
}
/// Pre-process a single file: compute SHA256 + probe + UUID → .pre.json
/// This is called explicitly from register API, NOT from the watcher.
pub async fn pre_process_file(file_path: &str) -> Option<String> {
let path = std::path::Path::new(file_path);
if !path.is_file() {