merge: M5Max128 server.rs split + path updates

This commit is contained in:
Accusys
2026-05-21 21:04:37 +08:00
25 changed files with 3382 additions and 4965 deletions

1
.gitignore vendored
View File

@@ -15,3 +15,4 @@ __pycache__/
node_modules/
*.log
/tmp/
*.log

View File

@@ -551,6 +551,40 @@ shellcheck scripts/*.sh monitor/**/*.sh
**注意**: Hook 只檢查 error 等級的 shellcheck 問題style 警告會顯示但不阻擋提交。
## Gitea Sync
主要 sync 管道為 Gitea`http://192.168.110.200:3000/admin/momentry_core.git`
### 產生 Access Token首次設定
```bash
# admin 帳號密碼為 AccusysTest!
TOKEN=$(curl -s -X POST "http://192.168.110.200:3000/api/v1/users/admin/tokens" \
-u "admin:AccusysTest!" \
-H "Content-Type: application/json" \
-d '{"name":"m5max128_push","scopes":["write:repository"]}' | jq -r '.sha1')
echo $TOKEN
```
### 設定 Remote
```bash
# 用 token 取代密碼
git remote add origin http://admin:TOKEN@192.168.110.200:3000/admin/momentry_core.git
# 同步
git pull origin main
git push origin main
```
### Token 記錄
| 機器 | Token |
|------|-------|
| M5Max128 | `a7cf946148063c2bfa8d59ad629ae541813f0db8` |
**注意**: Token 有 write:repository scope勿外洩。如需新增 token 給其他機器,各自產自己的 token。
## Release Workflow
### Release 前準備

View File

@@ -134,6 +134,7 @@ Aggregated face traces with sorting and filtering.
| `limit` | int | 200 | Max faces (capped 1000) |
| `offset` | int | 0 | Pagination |
| `interpolate` | bool | false | Enable linear interpolation |
| `dimension` | string | — | If `"3d"`, returns `z_rel` depth per detection |
#### Response
@@ -153,13 +154,15 @@ Aggregated face traces with sorting and filtering.
"width": 187,
"height": 187,
"confidence": 0.834,
"interpolated": false
"interpolated": false,
"z_rel": 0.045
}
]
}
```
Interpolated frames: `id=0, confidence=0.0, interpolated=true`.
When `?dimension=3d`, each face includes `z_rel` (0.0 = nearest, 1.0 = farthest), derived from bbox area ratio. Without `dimension=3d`, `z_rel` is omitted.
#### Interpolation Algorithm

View File

@@ -39,6 +39,7 @@ python3.11 scripts/demo_runner.py demo.json --voice en_US
| `markdown` | 用 md_reader Preview 渲染 .md 文件(含 Mermaid | `cmd`(檔案路徑) |
| `note` | 純文字解說 | `note` |
| `separator` | 章節分隔線 | `label` |
| `ask` | 互動問答 — 問問題、等回應、顯示解答 | `question`, `answer` |
## JSON 腳本結構
@@ -66,6 +67,12 @@ python3.11 scripts/demo_runner.py demo.json --voice en_US
"note": "說明文字",
"cmd": "docs_v1.0/API_V1.0.0/API_USAGE_GUIDE_V1.0.0.md",
"focus": "自動聚焦的章節名稱"
},
{
"type": "ask",
"label": "互動問答",
"question": "問題文字(語音會朗讀)",
"answer": "解答文字(語音會朗讀)"
}
]
}
@@ -92,7 +99,7 @@ python3.11 scripts/demo_runner.py demo.json --voice en_US
## 語音指令(--voice-control
啟用麥克風語音控制,可用說的操作展示流程:
啟用 Display Audio 麥克風語音控制,可用說的操作展示流程:
```bash
python3 scripts/demo_runner.py demo.json --voice zh_TW --voice-control
@@ -105,7 +112,25 @@ python3 scripts/demo_runner.py demo.json --voice zh_TW --voice-control
| "重複" | "repeat" / "again" | 重複朗讀當前解說 |
| "跳到第 5 步" | "go to 5" | 跳到指定步驟 |
語音辨識使用 Google Speech Recognition需網路),背景執行不影響主流程。
語音辨識使用 **faster-whisper small**(離線、中英雙語),背景執行不影響主流程。
模型快取:`~/.cache/huggingface/hub/models--Systran--faster-whisper-small/`
## 互動問答ask 步驟)
`ask` 步驟讓展示系統問問題、等待使用者回答、顯示預設解答:
-`--voice-control` 時:自動錄音 4 秒 → faster-whisper 轉文字 → 顯示辨識結果
- 無語音控制時鍵盤輸入Enter 送出)
- 解答由 TTS 朗讀 + 螢幕顯示
```json
{
"type": "ask",
"label": "互動問答",
"question": "您知道 Momentry Core 可以分析哪些類型的資料嗎?",
"answer": "可以分析影片中的人臉、文字、物件、姿勢、聲音等。"
}
```
## 展示節奏
@@ -155,5 +180,5 @@ python3 scripts/demo_runner.py demo.json --voice zh_TW --voice-control
| 檔案 | 說明 |
|------|------|
| `scripts/demo_runner.py` | 執行器主程式 |
| `docs_v1.0/API_V1.0.0/DEMO_SCRIPT_v1.0.0.json` | 21 步驟預設展示腳本 |
| `docs_v1.0/API_V1.0.0/DEMO_SCRIPT_v1.0.0.json` | 23 步驟預設展示腳本(含 ask 互動問答) |
| `~/_md_reader/target/release/md_reader` | Markdown 渲染工具 |

View File

@@ -2,7 +2,7 @@ use axum::{extract::State, http::StatusCode, response::Json, routing::post, Rout
use reqwest::Client;
use serde::{Deserialize, Serialize};
use crate::api::server::AppState;
use crate::api::types::AppState;
pub fn agent_routes() -> Router<AppState> {
Router::new().route("/api/v1/agents/translate", post(translate_text))

139
src/api/auth.rs Normal file
View File

@@ -0,0 +1,139 @@
use axum::{extract::State, http::StatusCode, response::Json, routing::post, Router};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use super::middleware::extract_cookies;
use super::types::AppState;
static DEMO_USER_API_KEY: Lazy<String> = Lazy::new(|| {
std::env::var("MOMENTRY_DEMO_API_KEY")
.unwrap_or_else(|_| "muser_demo_key_32chars_abcdef1234567890".to_string())
});
#[derive(Debug, Deserialize)]
struct LoginRequest {
username: String,
password: String,
}
#[derive(Debug, Serialize)]
struct LoginResponse {
success: bool,
message: Option<String>,
api_key: Option<String>,
user: Option<UserInfo>,
}
#[derive(Debug, Serialize)]
struct UserInfo {
username: String,
}
async fn login(
State(state): State<AppState>,
Json(req): Json<LoginRequest>,
) -> Result<axum::response::Response<axum::body::Body>, (StatusCode, Json<serde_json::Value>)> {
let (user_id, username, role) = 'resolve: {
if let Ok(Some((uid, uname, pw_hash, role_str))) =
state.db.get_user_by_username(&req.username).await
{
if crate::core::auth::password::verify_password(&req.password, &pw_hash) {
break 'resolve (uid, uname, role_str);
}
tracing::debug!(
"[LOGIN] Local password mismatch for {}, trying SFTPGo",
&req.username
);
}
if req.username == "demo" && req.password == "demo" {
let uid = state
.db
.get_user_by_username("demo")
.await
.ok()
.flatten()
.map(|(id, _, _, _)| id)
.unwrap_or(0);
break 'resolve (uid, "demo".to_string(), "user".to_string());
}
return Err((
StatusCode::UNAUTHORIZED,
Json(serde_json::json!({
"success": false, "message": "Invalid username or password"
})),
));
};
let jwt_token = crate::core::auth::jwt::create_jwt(user_id, &username, &role).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"success": false, "message": format!("JWT creation failed: {}", e)
})),
)
})?;
let session_id = uuid::Uuid::new_v4().to_string().replace('-', "");
state
.db
.create_session(&session_id, user_id, &DEMO_USER_API_KEY, 24)
.await
.ok();
if user_id > 0 {
state.db.update_last_login(user_id).await.ok();
}
let body = serde_json::json!({
"success": true,
"jwt": jwt_token,
"api_key": DEMO_USER_API_KEY.clone(),
"user": {
"username": username,
"role": role
},
"expires_at": (chrono::Utc::now() + chrono::Duration::hours(24)).to_rfc3339()
});
let json_body = axum::body::Body::from(serde_json::to_string(&body).unwrap_or_default());
let response = axum::response::Response::builder()
.header("Content-Type", "application/json")
.header(
"Set-Cookie",
format!(
"session_id={}; Path=/; HttpOnly; SameSite=Strict; Max-Age=86400",
session_id
),
)
.body(json_body)
.unwrap();
Ok(response)
}
async fn logout(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
) -> Json<serde_json::value::Value> {
let cookies = extract_cookies(&headers);
if let Some(sid) = cookies
.iter()
.find(|(k, _)| k == "session_id")
.map(|(_, v)| v.clone())
{
state.db.delete_session(&sid).await.ok();
}
Json(serde_json::json!({
"success": true,
"message": "Logged out"
}))
}
pub fn auth_routes() -> Router<AppState> {
Router::new()
.route("/api/v1/auth/login", post(login))
.route("/api/v1/auth/logout", post(logout))
}

54
src/api/docs.rs Normal file
View File

@@ -0,0 +1,54 @@
use axum::{extract::Path, http::StatusCode, routing::get, Router};
use super::types::AppState;
async fn doc_redirect() -> axum::response::Redirect {
axum::response::Redirect::to("/doc-wasm")
}
async fn wasm_doc_handler() -> Result<impl axum::response::IntoResponse, (StatusCode, &'static str)>
{
let path =
std::path::Path::new("/Users/accusys/momentry_core_0.1/docs_v1.0/doc_wasm/index.html");
match tokio::fs::read_to_string(path).await {
Ok(html) => Ok(([("content-type", "text/html; charset=utf-8")], html)),
Err(_) => Err((StatusCode::NOT_FOUND, "Doc not found")),
}
}
async fn wasm_doc_file_handler(
Path(file): Path<String>,
) -> Result<impl axum::response::IntoResponse, (StatusCode, &'static str)> {
if file.contains("..") || file.contains("//") {
return Err((StatusCode::NOT_FOUND, "Invalid path"));
}
let base = std::path::Path::new("/Users/accusys/momentry_core_0.1/docs_v1.0/doc_wasm");
let path = base.join(&file);
if !path.exists() || !path.starts_with(base) {
return Err((StatusCode::NOT_FOUND, "File not found"));
}
let data = tokio::fs::read(&path)
.await
.map_err(|_| (StatusCode::NOT_FOUND, "Read error"))?;
let mime = if file.ends_with(".wasm") {
"application/wasm"
} else if file.ends_with(".js") {
"application/javascript"
} else if file.ends_with(".md") {
"text/markdown; charset=utf-8"
} else if file.ends_with(".css") {
"text/css"
} else {
"application/octet-stream"
};
Ok(([("content-type", mime)], data))
}
pub fn doc_routes() -> Router<AppState> {
Router::new()
.route("/doc", get(doc_redirect))
.route("/doc/*file", get(doc_redirect))
.route("/dev-doc", get(doc_redirect))
.route("/doc-wasm", get(wasm_doc_handler))
.route("/doc-wasm/*file", get(wasm_doc_file_handler))
}

1105
src/api/files.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -9,7 +9,7 @@ use reqwest::Client;
use serde::{Deserialize, Serialize};
use sqlx::Row;
use crate::api::server::AppState;
use crate::api::types::AppState;
use crate::core::db::qdrant_db::QdrantDb;
use crate::core::db::schema;
use crate::core::db::{PostgresDb, VectorPayload};

680
src/api/health.rs Normal file
View File

@@ -0,0 +1,680 @@
use axum::{extract::State, http::StatusCode, response::Json, routing::get, Router};
use once_cell::sync::OnceCell;
use serde::Serialize;
use std::time::Instant;
use super::types::AppState;
use crate::core::cache::MongoCache;
use crate::core::config;
use crate::core::db::{Database, PostgresDb, RedisClient};
use crate::worker::resources::SystemResources;
// Global State
static SERVER_START: OnceCell<Instant> = OnceCell::new();
static SERVER_HOST: OnceCell<String> = OnceCell::new();
static SERVER_PORT: OnceCell<u16> = OnceCell::new();
pub fn init_server_state(host: &str, port: u16) {
let _ = SERVER_START.set(Instant::now());
let resolved_ip = if host == "0.0.0.0" {
if let Ok(addrs) = std::net::ToSocketAddrs::to_socket_addrs(&"localhost:0") {
addrs
.filter_map(|a| if a.is_ipv4() { Some(a.ip()) } else { None })
.next()
.map(|ip| ip.to_string())
.unwrap_or_else(|| "127.0.0.1".to_string())
} else {
"127.0.0.1".to_string()
}
} else {
host.to_string()
};
let _ = SERVER_HOST.set(resolved_ip);
let _ = SERVER_PORT.set(port);
}
pub fn get_host() -> String {
SERVER_HOST
.get()
.cloned()
.unwrap_or_else(|| "0.0.0.0".to_string())
}
pub fn get_port() -> u16 {
SERVER_PORT.get().copied().unwrap_or(0)
}
pub fn get_uptime_ms() -> u64 {
SERVER_START
.get()
.map(|i| i.elapsed().as_millis() as u64)
.unwrap_or(0)
}
#[derive(Debug, Serialize)]
struct HealthResponse {
ip: String,
port: u16,
status: String,
version: String,
build_git_hash: String,
build_timestamp: String,
uptime_ms: u64,
watcher_running: bool,
worker_running: bool,
auto_pipeline_enabled: bool,
watcher_auto_register_enabled: bool,
system_timezone: String,
}
#[derive(Debug, Serialize)]
struct DetailedHealthResponse {
ip: String,
port: u16,
status: String,
version: String,
build_git_hash: String,
build_timestamp: String,
uptime_ms: u64,
services: ServiceHealth,
resources: ResourceStatus,
pipeline: PipelineStatus,
schema: SchemaHealth,
identities: IdentityHealth,
integrations: IntegrationHealth,
config: ConfigHealth,
}
#[derive(Debug, Serialize)]
struct IntegrationHealth {
tmdb: crate::core::tmdb::status::TmdbResourceStatus,
}
#[derive(Debug, Serialize)]
struct IdentityHealth {
directory_exists: bool,
files_count: usize,
index_ok: bool,
db_count: i64,
synced: bool,
}
#[derive(Debug, Serialize)]
struct ConfigHealth {
cache_enabled: bool,
auto_pipeline_enabled: bool,
watcher_auto_register_enabled: bool,
system_timezone: String,
}
#[derive(Debug, Serialize)]
pub struct SchemaHealth {
pub table_exists: bool,
pub applied: Vec<MigrationInfo>,
pub required: Vec<MigrationInfo>,
pub ok: bool,
}
#[derive(Debug, Serialize)]
pub struct MigrationInfo {
pub filename: String,
pub checksum: String,
}
#[derive(Debug, Serialize)]
struct PipelineStatus {
scripts_ready: bool,
scripts_count: usize,
processors: ProcessorInventory,
models_ready: bool,
models_count: usize,
scripts_integrity: ScriptIntegrity,
ffmpeg: bool,
embedding_server: ServiceStatus,
gdino_api: ServiceStatus,
llm: ServiceStatus,
rsync: ServiceStatus,
watcher_running: bool,
worker_running: bool,
}
#[derive(Debug, Serialize)]
struct ScriptIntegrity {
matched: usize,
total: usize,
ok: bool,
}
#[derive(Debug, Serialize)]
struct ProcessorInventory {
asr: bool,
yolo: bool,
face: bool,
pose: bool,
ocr: bool,
cut: bool,
caption: bool,
scene: bool,
story: bool,
asrx: bool,
probe: bool,
visual_chunk: bool,
total_py_files: usize,
}
#[derive(Debug, Serialize)]
struct ResourceStatus {
cpu_used_percent: f64,
cpu_idle_percent: f64,
memory_available_mb: u64,
memory_total_mb: u64,
memory_used_percent: f64,
gpu_available: bool,
gpu_utilization: Option<f64>,
gpu_memory_used_pct: Option<f64>,
}
#[derive(Debug, Serialize)]
struct ServiceHealth {
postgres: ServiceStatus,
redis: ServiceStatus,
qdrant: ServiceStatus,
mongodb: ServiceStatus,
}
#[derive(Debug, Serialize)]
struct ServiceStatus {
status: String,
latency_ms: Option<u64>,
error: Option<String>,
}
async fn health(State(state): State<AppState>) -> Json<HealthResponse> {
let postgres = check_postgres().await;
let redis = check_redis().await;
let qdrant = check_qdrant().await;
let mongodb = check_mongodb(&state.mongo_cache).await;
let all_ok = postgres.status == "ok"
&& redis.status == "ok"
&& qdrant.status == "ok"
&& mongodb.status == "ok";
let status = if all_ok { "ok" } else { "degraded" };
if all_ok {
let _ = state.redis_cache.set_health(status).await;
}
Json(HealthResponse {
ip: get_host(),
port: get_port(),
status: status.to_string(),
version: env!("BUILD_VERSION").to_string(),
build_git_hash: env!("BUILD_GIT_HASH").to_string(),
build_timestamp: env!("BUILD_TIMESTAMP").to_string(),
uptime_ms: get_uptime_ms(),
watcher_running: check_process_running("watcher"),
worker_running: check_process_running("worker"),
auto_pipeline_enabled: config::get_auto_pipeline_enabled(),
watcher_auto_register_enabled: config::get_watcher_auto_register(),
system_timezone: config::SYSTEM_TIMEZONE.clone(),
})
}
async fn health_detailed(State(state): State<AppState>) -> Json<DetailedHealthResponse> {
let postgres = check_postgres().await;
let redis = check_redis().await;
let qdrant = check_qdrant().await;
let mongodb = check_mongodb(&state.mongo_cache).await;
let overall_status = if postgres.status == "ok"
&& redis.status == "ok"
&& qdrant.status == "ok"
&& mongodb.status == "ok"
{
"ok"
} else {
"degraded"
};
let sys = SystemResources::check();
let scripts_base = config::SCRIPTS_DIR.clone();
let scripts_dir = std::path::Path::new(&scripts_base);
let scripts_path = scripts_dir.to_path_buf();
let models_path = std::path::PathBuf::from("/Users/accusys/momentry_core_0.1/models");
let py_files = std::fs::read_dir(&scripts_path)
.map(|d| {
d.filter_map(|e| e.ok())
.filter(|e| e.path().extension().map(|x| x == "py").unwrap_or(false))
.count()
})
.unwrap_or(0);
let total_model_files = std::fs::read_dir(&models_path)
.map(|d| {
d.filter_map(|e| e.ok())
.filter(|e| {
let p = e.path();
let ext = p.extension().and_then(|x| x.to_str()).unwrap_or("");
matches!(ext, "pt" | "mlpackage" | "gguf" | "bin" | "onnx")
})
.count()
})
.unwrap_or(0);
let check_script = |name: &str| -> bool {
let candidate = scripts_path.join(name);
candidate.exists()
};
let check_python_module = |module: &str| -> bool {
std::process::Command::new(&*config::PYTHON_PATH)
.arg("-c")
.arg(format!("import {}", module))
.output()
.map(|o| o.status.success())
.unwrap_or(false)
};
let checksums_path = scripts_path.join("checksums.sha256");
let scripts_integrity = match std::fs::read_to_string(&checksums_path) {
Ok(content) => {
let mut matched = 0usize;
let mut total = 0usize;
for line in content.lines() {
let line = line.trim();
if line.is_empty() {
continue;
}
let parts: Vec<&str> = line.splitn(2, ' ').collect();
if parts.len() < 2 {
continue;
}
let expected_hash = parts[0];
let file_path = parts[1].trim_start();
total += 1;
let full_path = scripts_path.join(file_path);
if full_path.exists() {
if let Ok(actual) = std::process::Command::new("shasum")
.arg("-a")
.arg("256")
.arg(&full_path)
.output()
{
let out = String::from_utf8_lossy(&actual.stdout);
let actual_hash = out.split(' ').next().unwrap_or("").to_string();
if actual_hash == expected_hash {
matched += 1;
}
}
}
}
ScriptIntegrity {
matched,
total,
ok: matched == total,
}
}
Err(_) => ScriptIntegrity {
matched: 0,
total: 0,
ok: false,
},
};
Json(DetailedHealthResponse {
ip: get_host(),
port: get_port(),
status: overall_status.to_string(),
version: env!("BUILD_VERSION").to_string(),
build_git_hash: env!("BUILD_GIT_HASH").to_string(),
build_timestamp: env!("BUILD_TIMESTAMP").to_string(),
uptime_ms: get_uptime_ms(),
services: ServiceHealth {
postgres,
redis,
qdrant,
mongodb,
},
resources: ResourceStatus {
cpu_used_percent: sys.cpu_used_percent,
cpu_idle_percent: sys.cpu_idle_percent,
memory_available_mb: sys.memory_available_mb,
memory_total_mb: sys.memory_total_mb,
memory_used_percent: sys.memory_used_percent,
gpu_available: sys.gpu_available,
gpu_utilization: sys.gpu_utilization,
gpu_memory_used_pct: sys.gpu_memory_used_pct,
},
pipeline: PipelineStatus {
scripts_ready: scripts_path.is_dir(),
scripts_count: py_files,
scripts_integrity,
processors: ProcessorInventory {
asr: check_script("asr_processor.py"),
yolo: check_script("yolo_processor.py"),
face: check_script("face_processor.py"),
pose: check_script("pose_processor.py"),
ocr: check_script("ocr_processor.py"),
cut: check_script("cut_processor.py"),
caption: check_script("caption_processor.py"),
scene: check_script("scene_classifier.py"),
story: check_script("story_processor.py"),
asrx: check_script("asrx_processor.py"),
probe: check_script("probe_file.py"),
visual_chunk: check_script("visual_chunk_processor.py"),
total_py_files: py_files,
},
models_ready: models_path.is_dir(),
models_count: total_model_files,
ffmpeg: std::process::Command::new("which")
.arg("ffmpeg")
.output()
.map(|o| o.status.success())
.unwrap_or(false),
embedding_server: check_http("http://127.0.0.1:11436/health").await,
gdino_api: check_http("http://127.0.0.1:8080/health").await,
llm: check_http("http://127.0.0.1:8082/health").await,
rsync: check_rsync().await,
watcher_running: check_process_running("watcher"),
worker_running: check_process_running("worker"),
},
schema: check_schema_migrations(state.db.pool()).await,
identities: {
let identities_root = std::path::Path::new(&*config::OUTPUT_DIR).join("identities");
let directory_exists = identities_root.is_dir();
let files_count = crate::core::identity::storage::count_identity_files();
let index_ok = crate::core::identity::storage::read_index().is_ok();
let db_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM identities")
.fetch_one(state.db.pool())
.await
.unwrap_or(0);
IdentityHealth {
directory_exists,
files_count,
index_ok,
db_count,
synced: directory_exists && files_count as i64 == db_count,
}
},
integrations: IntegrationHealth {
tmdb: crate::core::tmdb::status::quick_status(),
},
config: ConfigHealth {
cache_enabled: config::get_cache_enabled(),
auto_pipeline_enabled: config::get_auto_pipeline_enabled(),
watcher_auto_register_enabled: config::get_watcher_auto_register(),
system_timezone: config::SYSTEM_TIMEZONE.clone(),
},
})
}
async fn health_consistency(
State(state): State<AppState>,
) -> Result<Json<crate::core::health_agent::ConsistencyReport>, (StatusCode, String)> {
let report = crate::core::health_agent::run_consistency_checks(&state.db).await;
if report.checks.iter().any(|c| c.count > 0) {
tracing::warn!(
"[HEALTH] Consistency issues found: {}",
report
.checks
.iter()
.filter(|c| c.count > 0)
.map(|c| format!("{}={}", c.check, c.count))
.collect::<Vec<_>>()
.join(", ")
);
}
Ok(Json(report))
}
async fn check_postgres() -> ServiceStatus {
let start = Instant::now();
match PostgresDb::init().await {
Ok(db) => match db.list_videos(1, 0).await {
Ok(_) => ServiceStatus {
status: "ok".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: None,
},
Err(e) => ServiceStatus {
status: "error".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: Some(e.to_string()),
},
},
Err(e) => ServiceStatus {
status: "error".to_string(),
latency_ms: None,
error: Some(e.to_string()),
},
}
}
async fn check_redis() -> ServiceStatus {
let start = Instant::now();
match RedisClient::new() {
Ok(redis) => match redis.get_conn().await {
Ok(mut conn) => {
let result: Result<String, _> = redis::cmd("PING").query_async(&mut conn).await;
match result {
Ok(_) => ServiceStatus {
status: "ok".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: None,
},
Err(e) => ServiceStatus {
status: "error".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: Some(e.to_string()),
},
}
}
Err(e) => ServiceStatus {
status: "error".to_string(),
latency_ms: None,
error: Some(e.to_string()),
},
},
Err(e) => ServiceStatus {
status: "error".to_string(),
latency_ms: None,
error: Some(e.to_string()),
},
}
}
async fn check_qdrant() -> ServiceStatus {
let start = Instant::now();
let base_url =
std::env::var("QDRANT_URL").unwrap_or_else(|_| "http://localhost:6333".to_string());
let api_key =
std::env::var("QDRANT_API_KEY").unwrap_or_else(|_| "Test3200Test3200Test3200".to_string());
let url = format!("{}/collections", base_url);
let client = reqwest::Client::new();
match client
.get(&url)
.header("api-key", api_key)
.timeout(std::time::Duration::from_secs(5))
.send()
.await
{
Ok(resp) if resp.status().is_success() => ServiceStatus {
status: "ok".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: None,
},
Ok(resp) => ServiceStatus {
status: "error".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: Some(format!("HTTP {}", resp.status())),
},
Err(e) => ServiceStatus {
status: "error".to_string(),
latency_ms: None,
error: Some(e.to_string()),
},
}
}
async fn check_mongodb(cache: &MongoCache) -> ServiceStatus {
let start = Instant::now();
match cache.health_check().await {
Ok(_) => ServiceStatus {
status: "ok".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: None,
},
Err(e) => ServiceStatus {
status: "error".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: Some(e.to_string()),
},
}
}
fn parse_required_migrations() -> Vec<MigrationInfo> {
let raw = env!("REQUIRED_MIGRATIONS");
if raw.is_empty() {
return vec![];
}
raw.split(',')
.filter_map(|entry| {
let mut parts = entry.splitn(2, ':');
let filename = parts.next()?.trim().to_string();
let checksum = parts.next()?.trim().to_string();
if filename.is_empty() || checksum.is_empty() {
return None;
}
Some(MigrationInfo { filename, checksum })
})
.collect()
}
pub async fn check_schema_migrations(pool: &sqlx::PgPool) -> SchemaHealth {
let required = parse_required_migrations();
let table_exists: bool = sqlx::query_scalar(
"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'schema_migrations')",
)
.fetch_one(pool)
.await
.unwrap_or(false);
if !table_exists {
return SchemaHealth {
table_exists: false,
applied: vec![],
required,
ok: false,
};
}
let applied: Vec<MigrationInfo> = sqlx::query_as::<_, (String, String)>(
"SELECT filename, checksum FROM schema_migrations ORDER BY id",
)
.fetch_all(pool)
.await
.unwrap_or_default()
.into_iter()
.map(|(filename, checksum)| MigrationInfo { filename, checksum })
.collect();
let ok = required.iter().all(|req| {
applied
.iter()
.any(|app| app.filename == req.filename && app.checksum == req.checksum)
});
SchemaHealth {
table_exists: true,
applied,
required,
ok,
}
}
async fn check_rsync() -> ServiceStatus {
let start = Instant::now();
let paths = [
std::path::Path::new("/Users/accusys/bin/rsync"),
std::path::Path::new("/opt/homebrew/bin/rsync"),
];
for p in &paths {
if p.exists() {
return ServiceStatus {
status: "ok".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: None,
};
}
}
ServiceStatus {
status: "error".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: Some("rsync not found (built from source expected at ~/bin/rsync)".to_string()),
}
}
fn check_process_running(name: &str) -> bool {
let patterns: &[&str] = match name {
"watcher" => &[
"target/release/momentry watcher",
"target/debug/momentry_playground watcher",
],
"worker" => &[
"target/release/momentry worker",
"target/debug/momentry_playground worker",
],
_ => return false,
};
for pattern in patterns {
if let Ok(o) = std::process::Command::new("pgrep")
.arg("-f")
.arg(pattern)
.output()
{
if o.status.success() {
return true;
}
}
}
false
}
async fn check_http(url: &str) -> ServiceStatus {
let start = Instant::now();
match reqwest::get(url).await {
Ok(resp) => {
if resp.status().is_success() {
ServiceStatus {
status: "ok".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: None,
}
} else {
ServiceStatus {
status: "error".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: Some(format!("HTTP {}", resp.status())),
}
}
}
Err(e) => ServiceStatus {
status: "error".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: Some(e.to_string()),
},
}
}
pub fn health_routes() -> Router<AppState> {
Router::new()
.route("/health", get(health))
.route("/health/detailed", get(health_detailed))
.route("/health/consistency", get(health_consistency))
}

View File

@@ -29,7 +29,7 @@ pub struct CreateIdentityResponse {
pub quality_avg: Option<f64>,
}
pub fn identity_routes() -> Router<crate::api::server::AppState> {
pub fn identity_routes() -> Router<crate::api::types::AppState> {
Router::new()
.route("/api/v1/identities", get(list_identities))
.route("/api/v1/identity", post(create_identity))
@@ -39,7 +39,7 @@ pub fn identity_routes() -> Router<crate::api::server::AppState> {
/// Register a Global Identity from face.json with multi-angle reference vectors.
/// Calls select_face_reference_vectors_v2.py for automatic reference selection.
async fn create_identity(
State(_state): State<crate::api::server::AppState>,
State(_state): State<crate::api::types::AppState>,
Json(req): Json<CreateIdentityRequest>,
) -> Result<Json<CreateIdentityResponse>, (StatusCode, String)> {
let schema = req.schema.unwrap_or("dev".to_string());
@@ -147,7 +147,7 @@ async fn create_identity(
/// List all global identities
async fn list_identities(
State(_state): State<crate::api::server::AppState>,
State(_state): State<crate::api::types::AppState>,
Query(query): Query<ListIdentitiesQuery>,
) -> Result<Json<IdentityListResponse>, (StatusCode, String)> {
let db = match PostgresDb::init().await {

View File

@@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize};
use sqlx::Row;
use std::path::PathBuf;
use crate::api::server::AppState;
use crate::api::types::AppState;
use crate::core::db::schema;
use crate::core::db::PostgresDb;

View File

@@ -10,7 +10,7 @@ use sqlx::Row;
use crate::core::db::ResourceRecord;
pub fn identity_routes() -> Router<crate::api::server::AppState> {
pub fn identity_routes() -> Router<crate::api::types::AppState> {
Router::new()
.route("/api/v1/files", get(list_files))
.route("/api/v1/file/:file_uuid", get(get_file_detail))
@@ -65,7 +65,7 @@ pub struct FilesQuery {
}
async fn list_files(
State(state): State<crate::api::server::AppState>,
State(state): State<crate::api::types::AppState>,
Query(params): Query<FilesQuery>,
) -> Result<Json<FilesResponse>, (StatusCode, String)> {
let page = params.page.unwrap_or(1);
@@ -158,7 +158,7 @@ pub struct FileDetailResponse {
}
async fn get_file_detail(
State(state): State<crate::api::server::AppState>,
State(state): State<crate::api::types::AppState>,
Path(file_uuid): Path<String>,
) -> Result<Json<FileDetailResponse>, (StatusCode, String)> {
let file = state
@@ -212,7 +212,7 @@ pub struct FileIdentityItem {
}
async fn get_file_identities(
State(state): State<crate::api::server::AppState>,
State(state): State<crate::api::types::AppState>,
Path(file_uuid): Path<String>,
Query(params): Query<FilesQuery>,
) -> Result<Json<FileIdentitiesResponse>, (StatusCode, String)> {
@@ -300,7 +300,7 @@ fn strip_uuid(u: &uuid::Uuid) -> String {
}
async fn get_identity_detail(
State(state): State<crate::api::server::AppState>,
State(state): State<crate::api::types::AppState>,
Path(identity_uuid): Path<String>,
) -> Result<Json<IdentityDetailResponse>, (StatusCode, String)> {
let uuid_clean = identity_uuid.replace('-', "");
@@ -338,7 +338,7 @@ async fn get_identity_detail(
}
async fn get_identity_status(
State(state): State<crate::api::server::AppState>,
State(state): State<crate::api::types::AppState>,
Path(identity_uuid): Path<String>,
) -> Result<Json<IdentityStatusResponse>, (StatusCode, String)> {
let uuid_clean = identity_uuid.replace('-', "");
@@ -386,7 +386,7 @@ pub struct IdentityFilesResponse {
}
async fn delete_identity(
State(state): State<crate::api::server::AppState>,
State(state): State<crate::api::types::AppState>,
Path(identity_uuid): Path<String>,
) -> Result<StatusCode, StatusCode> {
let table = crate::core::db::schema::table_name("face_detections");
@@ -438,7 +438,7 @@ pub struct IdentityFileItem {
}
async fn get_identity_files(
State(state): State<crate::api::server::AppState>,
State(state): State<crate::api::types::AppState>,
Path(identity_uuid): Path<String>,
Query(params): Query<FilesQuery>,
) -> Result<Json<IdentityFilesResponse>, (StatusCode, String)> {
@@ -524,7 +524,7 @@ pub struct BBox {
}
async fn get_identity_faces(
State(state): State<crate::api::server::AppState>,
State(state): State<crate::api::types::AppState>,
Path(identity_uuid): Path<String>,
Query(params): Query<FilesQuery>,
) -> Result<Json<IdentityFacesResponse>, (StatusCode, String)> {
@@ -608,7 +608,7 @@ pub struct IdentityChunkItem {
}
async fn get_identity_chunks(
State(state): State<crate::api::server::AppState>,
State(state): State<crate::api::types::AppState>,
Path(identity_uuid): Path<String>,
Query(params): Query<FilesQuery>,
) -> Result<Json<IdentityChunksResponse>, (StatusCode, String)> {
@@ -680,7 +680,7 @@ pub struct ResourceItem {
}
async fn register_resource(
State(state): State<crate::api::server::AppState>,
State(state): State<crate::api::types::AppState>,
Json(req): Json<RegisterResourceRequest>,
) -> Result<Json<ResourceResponse>, (StatusCode, String)> {
let resource = ResourceRecord {
@@ -715,7 +715,7 @@ pub struct HeartbeatRequest {
}
async fn heartbeat_resource(
State(state): State<crate::api::server::AppState>,
State(state): State<crate::api::types::AppState>,
Json(req): Json<HeartbeatRequest>,
) -> Result<Json<ResourceResponse>, (StatusCode, String)> {
let status = req.status.unwrap_or("online".to_string());
@@ -733,7 +733,7 @@ async fn heartbeat_resource(
}
async fn list_resources(
State(state): State<crate::api::server::AppState>,
State(state): State<crate::api::types::AppState>,
) -> Result<Json<ResourceResponse>, (StatusCode, String)> {
let records = state
.db
@@ -771,7 +771,7 @@ struct IdentityUploadResponse {
}
async fn upload_identity(
State(state): State<crate::api::server::AppState>,
State(state): State<crate::api::types::AppState>,
Json(payload): Json<crate::core::identity::storage::IdentityFile>,
) -> Result<Json<IdentityUploadResponse>, (StatusCode, Json<serde_json::Value>)> {
let parsed = uuid::Uuid::parse_str(&payload.identity_uuid)
@@ -838,7 +838,7 @@ struct ProfileImageResponse {
}
async fn upload_profile_image(
State(state): State<crate::api::server::AppState>,
State(state): State<crate::api::types::AppState>,
Path(identity_uuid): Path<String>,
mut multipart: Multipart,
) -> Result<Json<ProfileImageResponse>, (StatusCode, Json<serde_json::Value>)> {
@@ -951,7 +951,7 @@ async fn get_profile_image(
}
async fn get_identity_json(
State(state): State<crate::api::server::AppState>,
State(state): State<crate::api::types::AppState>,
Path(identity_uuid): Path<String>,
) -> Result<(StatusCode, [(String, String); 1], Vec<u8>), StatusCode> {
let clean = identity_uuid.replace('-', "");
@@ -1040,7 +1040,7 @@ struct IdentityTextResponse {
/// Path A: Search chunk text → associated identities
async fn search_identity_text(
State(state): State<crate::api::server::AppState>,
State(state): State<crate::api::types::AppState>,
Query(params): Query<IdentityTextQuery>,
) -> Result<Json<IdentityTextResponse>, StatusCode> {
use crate::core::db::schema;
@@ -1148,7 +1148,7 @@ struct IdentitySearchResponse {
/// Path B: Search identity name → associated chunk text
async fn search_identities_by_text(
State(state): State<crate::api::server::AppState>,
State(state): State<crate::api::types::AppState>,
Query(params): Query<IdentitySearchQuery>,
) -> Result<Json<IdentitySearchResponse>, StatusCode> {
use crate::core::db::schema;

View File

@@ -389,7 +389,7 @@ pub struct TracesQuery {
}
pub async fn get_identity_traces(
State(state): State<crate::api::server::AppState>,
State(state): State<crate::api::types::AppState>,
Path(identity_uuid): Path<String>,
Query(params): Query<TracesQuery>,
) -> Result<Json<IdentityTracesResponse>, (StatusCode, String)> {
@@ -485,7 +485,7 @@ pub async fn get_identity_traces(
}))
}
pub fn identity_binding_routes() -> Router<crate::api::server::AppState> {
pub fn identity_binding_routes() -> Router<crate::api::types::AppState> {
Router::new()
.route("/api/v1/identity/:identity_uuid/bind", post(bind_identity))
.route(

View File

@@ -47,7 +47,7 @@ fn ffmpeg_cmd() -> std::process::Command {
cmd
}
pub fn bbox_routes() -> Router<crate::api::server::AppState> {
pub fn bbox_routes() -> Router<crate::api::types::AppState> {
Router::new()
.route(
"/api/v1/file/:file_uuid/video/bbox",
@@ -168,7 +168,7 @@ fn resolve_frame_range(
}
async fn bbox_overlay_video(
State(state): State<crate::api::server::AppState>,
State(state): State<crate::api::types::AppState>,
Path(file_uuid): Path<String>,
Query(p): Query<BboxParams>,
) -> Result<impl IntoResponse, StatusCode> {
@@ -297,7 +297,7 @@ fn parse_range(range: &str, file_size: u64) -> (u64, u64) {
}
async fn trace_video(
State(state): State<crate::api::server::AppState>,
State(state): State<crate::api::types::AppState>,
Path((file_uuid, trace_id)): Path<(String, i32)>,
Query(params): Query<std::collections::HashMap<String, String>>,
) -> Result<impl IntoResponse, StatusCode> {
@@ -554,7 +554,7 @@ async fn trace_video(
}
async fn stream_video(
State(state): State<crate::api::server::AppState>,
State(state): State<crate::api::types::AppState>,
Path(file_uuid): Path<String>,
Query(params): Query<std::collections::HashMap<String, String>>,
request: axum::http::Request<Body>,
@@ -698,7 +698,7 @@ struct ThumbQuery {
}
async fn face_thumbnail(
State(state): State<crate::api::server::AppState>,
State(state): State<crate::api::types::AppState>,
Path(file_uuid): Path<String>,
Query(q): Query<ThumbQuery>,
) -> Result<impl IntoResponse, StatusCode> {
@@ -761,7 +761,7 @@ struct ClipQuery {
}
async fn video_clip(
State(state): State<crate::api::server::AppState>,
State(state): State<crate::api::types::AppState>,
Path(file_uuid): Path<String>,
Query(q): Query<ClipQuery>,
) -> Result<impl IntoResponse, StatusCode> {

View File

@@ -1,16 +1,23 @@
pub mod agent_api;
pub mod auth;
pub mod docs;
pub mod files;
pub mod five_w1h_agent_api;
pub mod health;
pub mod identities;
pub mod identity_agent_api;
pub mod identity_api;
pub mod identity_binding;
pub mod media_api;
pub mod middleware;
pub mod scan;
pub mod search;
pub mod server;
pub mod tmdb_api;
pub mod trace_agent_api;
pub mod types;
pub mod universal_search;
pub mod visual_chunk_search;
pub mod visual_search;
pub use server::start_server;

513
src/api/processing.rs Normal file
View File

@@ -0,0 +1,513 @@
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::Json,
routing::post,
Router,
};
use serde::{Deserialize, Serialize};
use std::time::Instant;
use super::types::AppState;
use crate::core::cache::{keys, RedisCache};
use crate::core::config::REDIS_KEY_PREFIX;
use crate::core::db::schema;
use crate::core::db::{Database, MonitorJobStatus, PostgresDb, RedisClient, VideoRecord, VideoStatus};
use crate::core::probe::ffprobe;
use crate::worker::processor;
use crate::{Embedder, FileManager};
#[derive(Debug, Serialize)]
struct JobListResponse {
jobs: Vec<JobInfoResponse>,
count: i64,
page: usize,
page_size: usize,
}
#[derive(Debug, Deserialize)]
struct JobsQuery {
page: Option<usize>,
page_size: Option<usize>,
status: Option<String>,
}
#[derive(Debug, Serialize)]
struct JobInfoResponse {
id: i32,
uuid: String,
status: String,
current_processor: Option<String>,
progress_current: i32,
progress_total: i32,
created_at: String,
started_at: Option<String>,
}
#[derive(Debug, Serialize)]
struct JobDetailResponse {
id: i32,
uuid: String,
status: String,
current_processor: Option<String>,
progress_current: i32,
progress_total: i32,
processors: Vec<ProcessorInfoResponse>,
created_at: String,
started_at: Option<String>,
updated_at: Option<String>,
}
#[derive(Debug, Serialize)]
struct ProcessorInfoResponse {
processor_type: String,
status: String,
started_at: Option<String>,
completed_at: Option<String>,
duration_secs: Option<f64>,
error_message: Option<String>,
}
#[derive(Debug, Deserialize)]
struct CacheToggleRequest {
enabled: bool,
}
#[derive(Debug, Serialize)]
struct CacheToggleResponse {
success: bool,
cache_enabled: bool,
message: String,
}
#[derive(Debug, Deserialize)]
struct AutoPipelineToggleRequest {
enabled: bool,
}
#[derive(Debug, Serialize)]
struct AutoPipelineToggleResponse {
success: bool,
auto_pipeline_enabled: bool,
message: String,
}
#[derive(Debug, Deserialize)]
struct WatcherAutoRegisterToggleRequest {
enabled: bool,
}
#[derive(Debug, Serialize)]
struct WatcherAutoRegisterToggleResponse {
success: bool,
watcher_auto_register_enabled: bool,
message: String,
}
#[derive(Debug, Deserialize)]
struct ProcessRequest {
rules: Option<Vec<String>>,
processors: Option<Vec<String>>,
}
#[derive(Debug, Serialize)]
struct ProgressResponse {
file_uuid: String,
user: Option<String>,
group: Option<String>,
file_name: Option<String>,
duration: Option<f64>,
overall_progress: u32,
cpu_percent: Option<f64>,
gpu_percent: Option<f64>,
memory_percent: Option<f64>,
memory_mb: Option<u64>,
system: Option<SystemHealthInfo>,
processors: Vec<ProcessorProgressInfo>,
}
#[derive(Debug, Serialize)]
struct SystemHealthInfo {
cpu_idle_pct: f64,
memory_available_mb: u64,
memory_total_mb: u64,
memory_used_pct: f64,
gpu_available: bool,
gpu_utilization_pct: Option<f64>,
gpu_memory_used_pct: Option<f64>,
dynamic_concurrency: u32,
config_concurrency: u32,
running_processors: u32,
}
#[derive(Debug, Serialize)]
struct ProcessorProgressInfo {
name: String,
status: String,
current: u32,
total: u32,
progress: u32,
message: String,
frames_processed: i32,
chunks_produced: i32,
retry_count: i32,
eta_seconds: Option<i64>,
}
fn get_system_stats() -> (Option<f64>, Option<f64>, Option<f64>, Option<u64>) {
use std::process::Command;
let pid = std::process::id().to_string();
let cpu = Command::new("ps")
.args(["-p", &pid, "-o", "%cpu="])
.output()
.ok()
.and_then(|o| String::from_utf8_lossy(&o.stdout).trim().parse().ok());
let (mem_percent, mem_rss) = Command::new("ps")
.args(["-p", &pid, "-o", "%mem=,rss="])
.output()
.ok()
.map(|o| {
let output = String::from_utf8_lossy(&o.stdout);
let parts: Vec<&str> = output.split_whitespace().collect();
let percent = parts.first().and_then(|s| s.parse().ok());
let rss = parts.get(1).and_then(|s| s.parse().ok());
(percent, rss)
})
.unwrap_or((None, None));
let gpu = Command::new("nvidia-smi")
.args(["--query-gpu=utilization.gpu", "--format=csv,noheader,nounits"])
.output()
.ok()
.and_then(|o| String::from_utf8_lossy(&o.stdout).trim().parse().ok());
let mem_mb = mem_rss.map(|r: u64| r / 1024);
(cpu, mem_percent, gpu, mem_mb)
}
async fn trigger_processing(
State(state): State<AppState>,
Path(uuid): Path<String>,
Json(req): Json<ProcessRequest>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let videos_table = schema::table_name("videos");
let row: Option<(String, String, String, Option<String>, String, Option<String>)> =
sqlx::query_as(&format!(
"SELECT file_uuid, file_path, file_name, file_type, COALESCE(processing_status, 'REGISTERED'), content_hash FROM {} WHERE file_uuid = $1",
videos_table
))
.bind(&uuid)
.fetch_optional(state.db.pool())
.await
.map_err(|e| {
tracing::error!("DB error: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let (file_uuid, file_path, file_name, file_type, processing_status, content_hash) =
row.ok_or(StatusCode::NOT_FOUND)?;
if processing_status == "PROCESSING" || processing_status == "QUEUED" {
return Err(StatusCode::CONFLICT);
}
let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string());
let status_update = format!(
"UPDATE {} SET processing_status = 'QUEUED' WHERE file_uuid = $1",
videos_table
);
sqlx::query(&status_update)
.bind(&file_uuid)
.execute(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let output_path = std::path::Path::new(&output_dir).join(format!("{}.monitor.json", file_uuid));
let monitor_jobs_table = schema::table_name("monitor_jobs");
let redis = RedisClient::new().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let mut conn = redis.get_conn().await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let processors_to_run: Vec<&str> = if let Some(procs) = &req.processors {
let procs_str = serde_json::to_string(procs).unwrap_or_default();
sqlx::query(&format!("UPDATE {} SET processors = $1 WHERE id = $2", schema::table_name("monitor_jobs")))
.bind(&procs_str)
.bind(&file_uuid)
.execute(state.db.pool())
.await
.ok();
procs.iter().map(|s| s.as_str()).collect()
} else {
vec![]
};
let notification = serde_json::json!({
"action": "process",
"file_uuid": file_uuid,
"file_path": file_path,
"file_name": file_name,
"file_type": file_type,
"content_hash": content_hash,
"output_dir": output_dir,
"processors": processors_to_run,
});
let notification_key = format!("{}notifications", REDIS_KEY_PREFIX.as_str());
let _: Result<(), _> = redis::cmd("PUBLISH")
.arg(&notification_key)
.arg(notification.to_string())
.query_async(&mut conn)
.await;
tracing::info!("[TRIGGER] Published processing notification for {}", file_uuid);
Ok(Json(serde_json::json!({
"success": true,
"message": "Processing queued",
"file_uuid": file_uuid,
})))
}
async fn download_json(
State(state): State<AppState>,
Path((file_uuid, processor)): Path<(String, String)>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string());
let path = std::path::Path::new(&output_dir).join(format!("{}.{}.json", file_uuid, processor));
match tokio::fs::read_to_string(&path).await {
Ok(content) => serde_json::from_str(&content).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
Err(_) => Err(StatusCode::NOT_FOUND),
}
}
async fn get_chunk_by_path(
State(state): State<AppState>,
Path((file_uuid, chunk_id)): Path<(String, String)>,
) -> Result<Json<crate::core::chunk::types::Chunk>, StatusCode> {
let table = schema::table_name("chunk");
let chunk: Option<crate::core::chunk::types::Chunk> = sqlx::query_as(&format!(
"SELECT * FROM {} WHERE uuid = $1 AND chunk_id = $2",
table
))
.bind(&file_uuid)
.bind(&chunk_id)
.fetch_optional(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
chunk.ok_or(StatusCode::NOT_FOUND)
}
async fn get_progress(
file_uuid: Path<String>,
) -> Result<Json<ProgressResponse>, StatusCode> {
let file_uuid = file_uuid.0;
let redis = RedisClient::new().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let mut conn = redis.get_conn().await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let key = format!("{}progress:{}", REDIS_KEY_PREFIX.as_str(), file_uuid);
let progress_json: Option<String> = redis::cmd("GET")
.arg(&key)
.query_async(&mut conn)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let (cpu, mem_pct, gpu, mem_mb) = get_system_stats();
let sys = SystemHealthInfo {
cpu_idle_pct: cpu.map(|c: f64| 100.0 - c).unwrap_or(0.0),
memory_available_mb: mem_mb.unwrap_or(0),
memory_total_mb: 0,
memory_used_pct: mem_pct.unwrap_or(0.0),
gpu_available: gpu.is_some(),
gpu_utilization_pct: gpu,
gpu_memory_used_pct: None,
dynamic_concurrency: 0,
config_concurrency: 0,
running_processors: 0,
};
let pg = PostgresDb::init().await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let video = pg.get_video_by_uuid(&file_uuid).await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let (overall, processors) = if let Some(json_str) = &progress_json {
match serde_json::from_str::<ProgressResponse>(json_str) {
Ok(p) => (p.overall_progress, p.processors),
Err(_) => (0u32, vec![]),
}
} else {
(0u32, vec![])
};
Ok(Json(ProgressResponse {
file_uuid,
user: None,
group: None,
file_name: video.as_ref().map(|v| v.file_name.clone()),
duration: video.as_ref().map(|v| v.duration),
overall_progress: overall,
cpu_percent: cpu,
gpu_percent: gpu,
memory_percent: mem_pct,
memory_mb,
system: Some(sys),
processors,
}))
}
async fn list_jobs(
Query(params): Query<JobsQuery>,
) -> Result<Json<JobListResponse>, StatusCode> {
let pg = PostgresDb::init().await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let jobs_table = schema::table_name("monitor_jobs");
let videos_table = schema::table_name("videos");
let page = params.page.unwrap_or(1).max(1);
let page_size = params.page_size.unwrap_or(20).max(1).min(100);
let offset = (page - 1) * page_size;
let mut where_clause = String::new();
if let Some(ref status) = params.status {
where_clause = format!(" WHERE j.status = '{}'", status);
}
let count: i64 = sqlx::query_scalar(&format!(
"SELECT COUNT(*) FROM {} j{}",
jobs_table, where_clause
))
.fetch_one(pg.pool())
.await
.unwrap_or(0);
let jobs: Vec<(i32, String, String, String, String, Option<String>, i32, i32)> =
sqlx::query_as(&format!(
"SELECT j.id::int, j.uuid, v.file_name, COALESCE(j.status, 'QUEUED'), COALESCE(j.current_processor, ''), v.file_uuid, COALESCE(j.processed_frames, 0), COALESCE(j.total_frames, 0) \
FROM {} j LEFT JOIN {} v ON v.file_uuid = j.uuid{} \
ORDER BY j.id DESC LIMIT $1 OFFSET $2",
jobs_table, videos_table, where_clause
))
.bind(page_size as i64)
.bind(offset as i64)
.fetch_all(pg.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let job_list = jobs
.into_iter()
.map(|(id, uuid, fname, status, cp, _vuuid, pf, tf)| JobInfoResponse {
id,
uuid,
status,
current_processor: Some(cp),
progress_current: pf,
progress_total: tf,
created_at: String::new(),
started_at: None,
})
.collect();
Ok(Json(JobListResponse {
jobs: job_list,
count,
page,
page_size,
}))
}
async fn get_job(
Path(uuid): Path<String>,
) -> Result<Json<JobDetailResponse>, StatusCode> {
let pg = PostgresDb::init().await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let jobs_table = schema::table_name("monitor_jobs");
let videos_table = schema::table_name("videos");
let job: Option<(i32, String, String, String, Option<String>, i32, i32, String, Option<String>, Option<String>)> =
sqlx::query_as(&format!(
"SELECT j.id::int, j.uuid, COALESCE(v.file_name, 'unknown'), COALESCE(j.status, 'QUEUED'), j.current_processor, \
COALESCE(j.processed_frames, 0), COALESCE(j.total_frames, 0), \
COALESCE(j.created_at::text, ''), j.started_at::text, j.updated_at::text \
FROM {} j LEFT JOIN {} v ON v.file_uuid = j.uuid WHERE j.uuid = $1",
jobs_table, videos_table
))
.bind(&uuid)
.fetch_optional(pg.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let (id, uuid, file_name, status, current_processor, pf, tf, created_at, started_at, updated_at) =
job.ok_or(StatusCode::NOT_FOUND)?;
Ok(Json(JobDetailResponse {
id,
uuid,
status,
current_processor,
progress_current: pf,
progress_total: tf,
processors: vec![],
created_at,
started_at,
updated_at,
}))
}
async fn cache_toggle(
State(state): State<AppState>,
Json(req): Json<CacheToggleRequest>,
) -> Json<CacheToggleResponse> {
crate::core::config::set_cache_enabled(req.enabled);
if !req.enabled {
let _ = state.mongo_cache.flush_all().await;
let _ = state.redis_cache.flush().await;
}
Json(CacheToggleResponse {
success: true,
cache_enabled: req.enabled,
message: format!("Cache {}", if req.enabled { "enabled" } else { "disabled" }),
})
}
async fn auto_pipeline_toggle(
Json(req): Json<AutoPipelineToggleRequest>,
) -> Json<AutoPipelineToggleResponse> {
crate::core::config::set_auto_pipeline_enabled(req.enabled);
Json(AutoPipelineToggleResponse {
success: true,
auto_pipeline_enabled: req.enabled,
message: format!(
"Auto pipeline {}",
if req.enabled { "enabled" } else { "disabled" }
),
})
}
async fn watcher_auto_register_toggle(
Json(req): Json<WatcherAutoRegisterToggleRequest>,
) -> Json<WatcherAutoRegisterToggleResponse> {
crate::core::config::set_watcher_auto_register(req.enabled);
Json(WatcherAutoRegisterToggleResponse {
success: true,
watcher_auto_register_enabled: req.enabled,
message: format!(
"Watcher auto-register {}",
if req.enabled { "enabled" } else { "disabled" }
),
})
}
pub fn processing_routes() -> Router<AppState> {
Router::new()
.route("/api/v1/file/:file_uuid/process", post(trigger_processing))
.route("/api/v1/file/:file_uuid/json/:processor", post(download_json))
.route("/api/v1/file/:file_uuid/chunk/:chunk_id", post(get_chunk_by_path))
.route("/api/v1/progress/:file_uuid", post(get_progress))
.route("/api/v1/jobs", post(list_jobs))
.route("/api/v1/config/cache", post(cache_toggle))
.route("/api/v1/config/auto-pipeline", post(auto_pipeline_toggle))
.route("/api/v1/config/watcher-auto-register", post(watcher_auto_register_toggle))
}

517
src/api/scan.rs Normal file
View File

@@ -0,0 +1,517 @@
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::Json,
routing::get,
Router,
};
use serde::{Deserialize, Serialize};
use super::types::AppState;
use crate::core::db::schema;
#[derive(Debug, Serialize, Deserialize)]
struct ScannedFileInfo {
file_name: String,
relative_path: String,
file_path: String,
file_size: u64,
modified_time: String,
is_registered: bool,
file_uuid: Option<String>,
status: Option<String>,
registration_time: Option<String>,
job_id: Option<i32>,
}
#[derive(Debug, Serialize, Deserialize)]
struct ScanFilesResponse {
files: Vec<ScannedFileInfo>,
total: usize,
filtered_total: usize,
page: usize,
page_size: usize,
total_pages: usize,
registered_count: usize,
unregistered_count: usize,
total_chunks: i64,
searchable_chunks: i64,
pending_videos: i64,
}
#[derive(Debug, Deserialize)]
struct ScanFilesQuery {
limit: Option<usize>,
page: Option<usize>,
page_size: Option<usize>,
pattern: Option<String>,
sort_by: Option<String>,
sort_order: Option<String>,
}
fn scan_directory_recursive(
dir: &std::path::Path,
root: &std::path::Path,
allowed_extensions: &[&str],
registered_paths: &std::collections::HashMap<
String,
(String, String, Option<String>, Option<i32>),
>,
files: &mut Vec<ScannedFileInfo>,
) {
if let Ok(entries) = std::fs::read_dir(dir) {
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
if name.starts_with('.') {
return;
}
}
scan_directory_recursive(&path, root, allowed_extensions, registered_paths, files);
} else if path.is_file() {
if let Some(ext) = path.extension().and_then(|e| e.to_str()) {
if allowed_extensions.contains(&ext.to_lowercase().as_str()) {
if let Ok(meta) = entry.metadata() {
let abs_path = path.to_string_lossy().to_string();
let rel_path = path
.strip_prefix(root)
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|_| abs_path.clone());
let file_name = path
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_default();
let modified_time = meta
.modified()
.ok()
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.map(|d| {
chrono::DateTime::from_timestamp(d.as_secs() as i64, 0)
.map(|dt| dt.to_rfc3339())
.unwrap_or_default()
})
.unwrap_or_default();
match registered_paths.get(&abs_path) {
Some((uuid, status, reg_time, jid)) if status != "unregistered" => {
files.push(ScannedFileInfo {
file_name,
relative_path: rel_path,
file_path: abs_path,
file_size: meta.len(),
modified_time,
is_registered: true,
file_uuid: Some(uuid.clone()),
status: Some(status.clone()),
registration_time: reg_time.clone(),
job_id: *jid,
});
}
_ => {
files.push(ScannedFileInfo {
file_name,
relative_path: rel_path,
file_path: abs_path,
file_size: meta.len(),
modified_time,
is_registered: false,
file_uuid: None,
status: Some("unregistered".to_string()),
registration_time: None,
job_id: None,
});
}
}
}
}
}
}
}
}
}
async fn scan_files(
State(state): State<AppState>,
Query(params): Query<ScanFilesQuery>,
) -> Result<Json<ScanFilesResponse>, StatusCode> {
let demo_dir_str = std::env::var("MOMENTRY_SFTP_ROOT")
.unwrap_or_else(|_| "/Users/accusys/momentry/var/sftpgo/data/demo".to_string());
let demo_dir = std::path::Path::new(&demo_dir_str);
let allowed_extensions = vec![
"mp4", "mov", "mkv", "avi", "webm", "jpg", "jpeg", "png", "gif", "webp",
];
let table = schema::table_name("videos");
let mj_table = schema::table_name("monitor_jobs");
let registered_db: Vec<(String, String, String, String, Option<String>, Option<i32>)> =
sqlx::query_as(&format!(
"SELECT v.file_path, v.file_name, v.file_uuid, v.status, v.registration_time::text, \
latest_job.id as job_id \
FROM {} v \
LEFT JOIN LATERAL ( \
SELECT id FROM {} WHERE uuid = v.file_uuid ORDER BY id DESC LIMIT 1 \
) latest_job ON true \
ORDER BY v.id",
table, mj_table
))
.fetch_all(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let registered_paths: std::collections::HashMap<
String,
(String, String, Option<String>, Option<i32>),
> = registered_db
.into_iter()
.map(|(path, _name, uuid, status, reg_time, jid)| (path, (uuid, status, reg_time, jid)))
.collect();
let mut result_files = Vec::new();
if demo_dir.exists() {
scan_directory_recursive(
demo_dir,
demo_dir,
&allowed_extensions,
&registered_paths,
&mut result_files,
);
}
let desc = params.sort_order.as_deref().unwrap_or("asc") == "desc";
match params.sort_by.as_deref().unwrap_or("name") {
"size" => {
if desc {
result_files.sort_by(|a, b| b.file_size.cmp(&a.file_size));
} else {
result_files.sort_by(|a, b| a.file_size.cmp(&b.file_size));
}
}
"modified" | "time" => {
if desc {
result_files.sort_by(|a, b| b.modified_time.cmp(&a.modified_time));
} else {
result_files.sort_by(|a, b| a.modified_time.cmp(&b.modified_time));
}
}
"status" => {
if desc {
result_files
.sort_by(|a, b| b.status.cmp(&a.status).then(b.file_name.cmp(&a.file_name)));
} else {
result_files
.sort_by(|a, b| a.status.cmp(&b.status).then(a.file_name.cmp(&b.file_name)));
}
}
_ => {
if desc {
result_files.sort_by(|a, b| {
a.is_registered
.cmp(&b.is_registered)
.then(b.file_name.cmp(&a.file_name))
});
} else {
result_files.sort_by(|a, b| {
b.is_registered
.cmp(&a.is_registered)
.then(a.file_name.cmp(&b.file_name))
});
}
}
}
let total_all = result_files.len();
let registered_count = result_files.iter().filter(|f| f.is_registered).count();
let unregistered_count = result_files.iter().filter(|f| !f.is_registered).count();
let filtered: Vec<ScannedFileInfo> = if let Some(ref pat) = params.pattern {
let re = match regex::Regex::new(&format!("(?i){}", pat)) {
Ok(r) => r,
Err(_) => return Err(StatusCode::BAD_REQUEST),
};
result_files
.into_iter()
.filter(|f| re.is_match(&f.file_name))
.collect()
} else {
result_files
};
let filtered_total = filtered.len();
let page = params.page.unwrap_or(1).max(1);
let page_size = params
.page_size
.or(params.limit)
.unwrap_or(filtered_total.max(1));
let total_pages = if page_size > 0 {
(filtered_total + page_size - 1) / page_size
} else {
1
};
let start = (page - 1) * page_size;
let files: Vec<ScannedFileInfo> = filtered.into_iter().skip(start).take(page_size).collect();
let table_videos = schema::table_name("videos");
let table_chunks = schema::table_name("chunk");
let total_chunks: i64 = sqlx::query_scalar(&format!("SELECT COUNT(*) FROM {}", table_chunks))
.fetch_one(state.db.pool())
.await
.unwrap_or(0);
let searchable_chunks: i64 = sqlx::query_scalar(&format!(
"SELECT COUNT(*) FROM {} WHERE vector_id IS NOT NULL",
table_chunks
))
.fetch_one(state.db.pool())
.await
.unwrap_or(0);
let pending_videos: i64 = sqlx::query_scalar(&format!(
"SELECT COUNT(*) FROM {} WHERE status = 'pending'",
table_videos
))
.fetch_one(state.db.pool())
.await
.unwrap_or(0);
Ok(Json(ScanFilesResponse {
files,
total: total_all,
filtered_total,
page,
page_size,
total_pages,
registered_count,
unregistered_count,
total_chunks,
searchable_chunks,
pending_videos,
}))
}
#[derive(Debug, Serialize)]
struct SftpgoStatusResponse {
username: String,
home_dir: String,
files_count: i64,
registered_videos: Vec<RegisteredVideo>,
last_login: Option<String>,
}
#[derive(Debug, Serialize)]
struct RegisteredVideo {
uuid: String,
file_name: String,
status: String,
}
async fn get_sftpgo_status(
State(state): State<AppState>,
) -> Result<Json<SftpgoStatusResponse>, StatusCode> {
let demo_dir = "/Users/accusys/momentry/var/sftpgo/data/demo";
let files_count: i64 = std::fs::read_dir(demo_dir)
.map(|entries| entries.count() as i64)
.unwrap_or(0);
let table_videos = schema::table_name("videos");
let registered_videos: Vec<(String, String, String)> = sqlx::query_as(&format!(
"SELECT file_uuid, file_name, status FROM {} WHERE file_path LIKE '%demo%' ORDER BY id",
table_videos
))
.fetch_all(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let registered_videos = registered_videos
.into_iter()
.map(|(uuid, file_name, status)| RegisteredVideo {
uuid,
file_name,
status,
})
.collect();
Ok(Json(SftpgoStatusResponse {
username: "demo".to_string(),
home_dir: demo_dir.to_string(),
files_count,
registered_videos,
last_login: None,
}))
}
#[derive(Debug, Serialize)]
struct IngestionStep {
name: String,
status: String,
detail: Option<String>,
}
#[derive(Debug, Serialize)]
struct IdentityRef {
uuid: String,
name: String,
}
#[derive(Debug, Serialize)]
struct IngestionStatusResponse {
file_uuid: String,
steps: Vec<IngestionStep>,
related_identities: Vec<IdentityRef>,
strangers: i64,
}
async fn get_ingestion_status(
State(state): State<AppState>,
Path(file_uuid): Path<String>,
) -> Result<Json<IngestionStatusResponse>, StatusCode> {
let pool = state.db.pool();
let chunk = schema::table_name("chunk");
let fd = schema::table_name("face_detections");
let identities = schema::table_name("identities");
let scene_meta_path = format!(
"{}/{}.scene_meta.json",
crate::core::config::OUTPUT_DIR.as_str(),
file_uuid
);
let scene_meta_ok = std::path::Path::new(&scene_meta_path).exists();
macro_rules! count_sql {
($sql:expr) => {
sqlx::query_scalar::<_, i64>($sql)
.fetch_one(pool)
.await
.unwrap_or(0)
};
}
let sentence_count = count_sql!(&format!(
"SELECT COUNT(*) FROM {chunk} WHERE file_uuid = '{file_uuid}' AND chunk_type = 'sentence'"
));
let sentence_embedded = count_sql!(&format!("SELECT COUNT(*) FROM {chunk} WHERE file_uuid = '{file_uuid}' AND chunk_type = 'sentence' AND embedding IS NOT NULL"));
let scene_count = count_sql!(&format!(
"SELECT COUNT(*) FROM {chunk} WHERE file_uuid = '{file_uuid}' AND chunk_type = 'cut'"
));
let face_total = count_sql!(&format!(
"SELECT COUNT(*) FROM {fd} WHERE file_uuid = '{file_uuid}'"
));
let trace_count = count_sql!(&format!("SELECT COUNT(DISTINCT trace_id) FROM {fd} WHERE file_uuid = '{file_uuid}' AND trace_id IS NOT NULL"));
let trace_chunks = count_sql!(&format!(
"SELECT COUNT(*) FROM {chunk} WHERE file_uuid = '{file_uuid}' AND chunk_type = 'trace'"
));
let identity_count = count_sql!(&format!("SELECT COUNT(DISTINCT identity_id) FROM {fd} WHERE file_uuid = '{file_uuid}' AND identity_id IS NOT NULL"));
let tkg_nodes = count_sql!(&format!(
"SELECT COUNT(*) FROM {} WHERE file_uuid = '{file_uuid}'",
schema::table_name("tkg_nodes")
));
let tkg_edges = count_sql!(&format!(
"SELECT COUNT(*) FROM {} WHERE file_uuid = '{file_uuid}'",
schema::table_name("tkg_edges")
));
let scene_5w1h = count_sql!(&format!("SELECT COUNT(*) FROM {chunk} WHERE file_uuid = '{file_uuid}' AND chunk_type = 'cut' AND summary_text IS NOT NULL AND summary_text != ''"));
let related_identities: Vec<IdentityRef> =
match sqlx::query_as::<_, (String, String)>(&format!(
"SELECT DISTINCT i.uuid::text, i.name FROM {identities} i \
JOIN {fd} fd ON fd.identity_id = i.id \
WHERE fd.file_uuid = '{file_uuid}' AND fd.identity_id IS NOT NULL \
ORDER BY i.name"
))
.fetch_all(pool)
.await
{
Ok(rows) => rows
.into_iter()
.map(|(uuid, name)| IdentityRef {
uuid: uuid.replace('-', ""),
name,
})
.collect(),
Err(e) => {
tracing::error!("related_identities query failed: {}", e);
vec![]
}
};
let strangers = count_sql!(&format!(
"SELECT COUNT(DISTINCT trace_id) FROM {fd} \
WHERE file_uuid = '{file_uuid}' AND trace_id IS NOT NULL AND identity_id IS NULL"
));
macro_rules! step {
($name:expr, $done:expr, $detail:expr) => {
IngestionStep {
name: $name.into(),
status: if $done { "done" } else { "pending" }.into(),
detail: $detail,
}
};
}
let steps = vec![
step!(
"rule1_sentence",
sentence_count > 0,
Some(format!("{sentence_count} sentence chunks"))
),
step!(
"auto_vectorize",
sentence_embedded > 0,
Some(format!("{sentence_embedded} embedded"))
),
step!(
"rule3_scene",
scene_count > 0,
Some(format!("{scene_count} scene chunks"))
),
step!(
"face_trace",
trace_count > 0,
Some(format!("{trace_count} traces / {face_total} detections"))
),
step!(
"trace_chunks",
trace_chunks > 0,
Some(format!("{trace_chunks} trace chunks"))
),
step!(
"tkg",
tkg_nodes > 0 || tkg_edges > 0,
Some(format!("{tkg_nodes} nodes, {tkg_edges} edges"))
),
step!(
"identity_match",
identity_count > 0,
Some(format!("{identity_count} identities matched"))
),
step!("scene_metadata", scene_meta_ok, None),
step!(
"5w1h",
scene_5w1h > 0,
Some(format!("{scene_5w1h} scenes with 5W1H"))
),
];
Ok(Json(IngestionStatusResponse {
file_uuid,
steps,
related_identities,
strangers,
}))
}
pub fn scan_routes() -> Router<AppState> {
Router::new()
.route("/api/v1/files/scan", get(scan_files))
.route("/api/v1/stats/sftpgo", get(get_sftpgo_status))
.route(
"/api/v1/stats/ingestion-status/:file_uuid",
get(get_ingestion_status),
)
}

View File

@@ -51,7 +51,7 @@ pub struct SmartSearchResponse {
// --- API Handler ---
pub async fn smart_search(
State(state): State<crate::api::server::AppState>,
State(state): State<crate::api::types::AppState>,
Json(req): Json<SmartSearchRequest>,
) -> Result<Json<SmartSearchResponse>, (StatusCode, Json<serde_json::Value>)> {
let db = &state.db;
@@ -126,6 +126,6 @@ pub async fn smart_search(
// --- Router Setup ---
pub fn search_routes() -> Router<crate::api::server::AppState> {
pub fn search_routes() -> Router<crate::api::types::AppState> {
Router::new().route("/api/v1/search/smart", post(smart_search))
}

File diff suppressed because it is too large Load Diff

View File

@@ -7,7 +7,7 @@ use axum::{
};
use serde::{Deserialize, Serialize};
use crate::api::server::AppState;
use crate::api::types::AppState;
use crate::core::config;
use crate::core::db::{PostgresDb, QdrantDb};
use crate::core::tmdb;

View File

@@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize};
use crate::core::db::PostgresDb;
pub fn trace_agent_routes() -> Router<crate::api::server::AppState> {
pub fn trace_agent_routes() -> Router<crate::api::types::AppState> {
Router::new()
.route("/api/v1/file/:file_uuid/traces", post(list_traces_sorted))
.route(
@@ -55,7 +55,7 @@ struct TracesResponse {
}
async fn list_traces_sorted(
State(state): State<crate::api::server::AppState>,
State(state): State<crate::api::types::AppState>,
Path(file_uuid): Path<String>,
Json(req): Json<TracesRequest>,
) -> Result<Json<TracesResponse>, (StatusCode, String)> {
@@ -202,7 +202,7 @@ fn lerp_i32(a: Option<i32>, b: Option<i32>, t: f64) -> Option<i32> {
}
async fn list_trace_faces(
State(state): State<crate::api::server::AppState>,
State(state): State<crate::api::types::AppState>,
Path((file_uuid, trace_id)): Path<(String, i32)>,
Query(q): Query<TraceFacesQuery>,
) -> Result<Json<TraceFacesResponse>, (StatusCode, String)> {

9
src/api/types.rs Normal file
View File

@@ -0,0 +1,9 @@
#[derive(Clone)]
pub struct AppState {
pub db: std::sync::Arc<crate::core::db::PostgresDb>,
pub embedder: std::sync::Arc<crate::Embedder>,
pub embedder_model: String,
pub mongo_cache: crate::core::cache::MongoCache,
pub redis_cache: crate::core::cache::RedisCache,
pub api_state: super::middleware::ApiState,
}

View File

@@ -98,7 +98,7 @@ pub enum SearchResult {
},
}
pub fn universal_search_routes() -> Router<crate::api::server::AppState> {
pub fn universal_search_routes() -> Router<crate::api::types::AppState> {
Router::new()
.route("/api/v1/search/universal", post(universal_search))
.route("/api/v1/search/frames", post(search_frames))
@@ -106,7 +106,7 @@ pub fn universal_search_routes() -> Router<crate::api::server::AppState> {
/// Unified search across all data types
pub async fn universal_search(
State(_state): State<crate::api::server::AppState>,
State(_state): State<crate::api::types::AppState>,
Json(req): Json<UniversalSearchRequest>,
) -> Result<Json<UniversalSearchResponse>, (StatusCode, Json<serde_json::Value>)> {
let start_time = std::time::Instant::now();
@@ -212,7 +212,7 @@ pub async fn universal_search(
/// Search frames by YOLO objects, OCR text, or face IDs
pub async fn search_frames(
State(_state): State<crate::api::server::AppState>,
State(_state): State<crate::api::types::AppState>,
Json(req): Json<FrameSearchRequest>,
) -> Result<Json<FrameSearchResponse>, (StatusCode, Json<serde_json::Value>)> {
let db = PostgresDb::init().await.map_err(|e| {
@@ -238,7 +238,7 @@ pub async fn search_frames(
/// Search persons by name or speaker_id
pub async fn search_persons(
State(_state): State<crate::api::server::AppState>,
State(_state): State<crate::api::types::AppState>,
Query(query): Query<PersonSearchQuery>,
) -> Result<Json<PersonSearchResponse>, (StatusCode, Json<serde_json::Value>)> {
let db = PostgresDb::init().await.map_err(|e| {

217
src/api/visual_search.rs Normal file
View File

@@ -0,0 +1,217 @@
use axum::{extract::State, http::StatusCode, response::Json, routing::post, Router};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use super::types::AppState;
use super::visual_chunk_search;
use crate::core::cache::keys;
use crate::core::chunk::types::Chunk;
use crate::core::db::{Database, PostgresDb};
fn generate_visual_search_hash(
uuid: &str,
criteria: &visual_chunk_search::VisualChunkSearchCriteria,
) -> String {
let data = serde_json::json!({
"uuid": uuid,
"criteria": criteria,
});
let mut hasher = Sha256::new();
hasher.update(data.to_string().as_bytes());
format!("{:x}", hasher.finalize())[..16].to_string()
}
#[derive(Debug, Deserialize)]
struct VisualChunkSearchRequest {
file_uuid: String,
criteria: visual_chunk_search::VisualChunkSearchCriteria,
}
#[derive(Debug, Serialize)]
struct VisualChunkSearchResponse {
chunks: Vec<Chunk>,
total: usize,
}
async fn search_visual_chunks(
State(state): State<AppState>,
Json(req): Json<VisualChunkSearchRequest>,
) -> Result<Json<VisualChunkSearchResponse>, StatusCode> {
let criteria_hash = generate_visual_search_hash(&req.file_uuid, &req.criteria);
let cache_key = keys::visual_search(&req.file_uuid, &criteria_hash);
let ttl = state.mongo_cache.ttl_visual_search();
let chunks = state
.mongo_cache
.get_or_fetch(&cache_key, ttl, keys::CATEGORY_VISUAL_SEARCH, || async {
let db = PostgresDb::init()
.await
.map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?;
visual_chunk_search::search_visual_chunks(&db, &req.file_uuid, &req.criteria)
.await
.map_err(|e| anyhow::anyhow!("Visual search failed: {}", e))
})
.await
.map_err(|e| {
tracing::error!("Visual chunk search failed: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(VisualChunkSearchResponse {
total: chunks.len(),
chunks,
}))
}
#[derive(Debug, Deserialize)]
struct VisualChunkSearchByClassRequest {
uuid: String,
object_class: String,
min_count: Option<u32>,
max_count: Option<u32>,
}
#[derive(Debug, Deserialize)]
struct VisualChunkSearchByDensityRequest {
uuid: String,
min_density: f32,
max_density: Option<f32>,
}
#[derive(Debug, Deserialize)]
struct VisualChunkStatsRequest {
uuid: String,
}
async fn search_visual_chunks_by_class(
State(state): State<AppState>,
Json(req): Json<VisualChunkSearchByClassRequest>,
) -> Result<Json<VisualChunkSearchResponse>, StatusCode> {
let db = PostgresDb::init()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let chunks = visual_chunk_search::search_visual_chunks_by_class(
&db,
&req.uuid,
&req.object_class,
req.min_count,
req.max_count,
)
.await
.map_err(|e| {
tracing::error!("Visual chunk search by class failed: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(VisualChunkSearchResponse {
total: chunks.len(),
chunks,
}))
}
async fn search_visual_chunks_by_density(
State(state): State<AppState>,
Json(req): Json<VisualChunkSearchByDensityRequest>,
) -> Result<Json<VisualChunkSearchResponse>, StatusCode> {
let db = PostgresDb::init()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let chunks = visual_chunk_search::search_visual_chunks_by_density(
&db,
&req.uuid,
req.min_density,
req.max_density,
)
.await
.map_err(|e| {
tracing::error!("Visual chunk search by density failed: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(VisualChunkSearchResponse {
total: chunks.len(),
chunks,
}))
}
#[derive(Debug, Serialize)]
struct VisualChunkStatsResponse {
uuid: String,
stats: std::collections::HashMap<String, serde_json::Value>,
}
async fn get_visual_chunk_stats(
State(state): State<AppState>,
Json(req): Json<VisualChunkStatsRequest>,
) -> Result<Json<VisualChunkStatsResponse>, StatusCode> {
let db = PostgresDb::init()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let stats = visual_chunk_search::get_visual_chunk_statistics(&db, &req.uuid)
.await
.map_err(|e| {
tracing::error!("Get visual chunk stats failed: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(VisualChunkStatsResponse {
uuid: req.uuid,
stats,
}))
}
#[derive(Debug, Deserialize)]
struct VisualChunkSearchByCombinationRequest {
uuid: String,
combination: Vec<(String, u32)>,
}
async fn search_visual_chunks_by_combination(
State(state): State<AppState>,
Json(req): Json<VisualChunkSearchByCombinationRequest>,
) -> Result<Json<VisualChunkSearchResponse>, StatusCode> {
let db = PostgresDb::init()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let combination: Vec<(&str, u32)> = req
.combination
.iter()
.map(|(c, n)| (c.as_str(), *n))
.collect();
let chunks =
visual_chunk_search::search_visual_chunks_by_combination(&db, &req.uuid, &combination)
.await
.map_err(|e| {
tracing::error!("Visual chunk search by combination failed: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(VisualChunkSearchResponse {
total: chunks.len(),
chunks,
}))
}
pub fn visual_search_routes() -> Router<AppState> {
Router::new()
.route("/api/v1/search/visual", post(search_visual_chunks))
.route(
"/api/v1/search/visual/class",
post(search_visual_chunks_by_class),
)
.route(
"/api/v1/search/visual/density",
post(search_visual_chunks_by_density),
)
.route("/api/v1/search/visual/stats", post(get_visual_chunk_stats))
.route(
"/api/v1/search/visual/combination",
post(search_visual_chunks_by_combination),
)
}