Files
momentry_core/src/api/server.rs

4572 lines
155 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::Json,
routing::{delete, get, post},
Router,
};
use once_cell::sync::{Lazy, OnceCell};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use sha2::{Digest, Sha256};
use sqlx::{PgPool, Row};
use std::time::Instant;
use tower_http::cors::{Any, CorsLayer};
use crate::core::cache::{keys, MongoCache, RedisCache};
use crate::core::config::REDIS_KEY_PREFIX;
use crate::core::db::schema;
use crate::core::db::{Database, PostgresDb, QdrantDb, RedisClient, VideoRecord, VideoStatus};
use crate::worker::resources::SystemResources;
use crate::core::text::tokenizer::tokenize_chinese_text;
use crate::{Embedder, FileManager};
use super::agent_api;
use super::five_w1h_agent_api;
use super::identities;
use super::identity_api;
use super::identity_binding;
use super::middleware::unified_auth;
use super::search::search_routes;
use super::tmdb_api;
use super::trace_agent_api;
use super::universal_search::universal_search_routes;
use super::visual_chunk_search;
use crate::core::chunk::types::Chunk;
static DEMO_USER_API_KEY: Lazy<String> = Lazy::new(|| {
std::env::var("MOMENTRY_DEMO_API_KEY")
.unwrap_or_else(|_| "muser_demo_key_32chars_abcdef1234567890".to_string())
});
fn hash_password(password: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(password.as_bytes());
format!("{:x}", hasher.finalize())
}
#[derive(Debug, Deserialize)]
struct LoginRequest {
username: String,
password: String,
}
#[derive(Debug, Serialize)]
struct LoginResponse {
success: bool,
message: Option<String>,
api_key: Option<String>,
user: Option<UserInfo>,
}
#[derive(Debug, Serialize)]
struct UserInfo {
username: String,
}
// Global State
static SERVER_START: OnceCell<Instant> = OnceCell::new();
static SERVER_HOST: OnceCell<String> = OnceCell::new();
static SERVER_PORT: OnceCell<u16> = OnceCell::new();
fn get_host() -> String {
SERVER_HOST
.get()
.cloned()
.unwrap_or_else(|| "0.0.0.0".to_string())
}
fn get_port() -> u16 {
SERVER_PORT.get().copied().unwrap_or(0)
}
fn get_uptime_ms() -> u64 {
SERVER_START
.get()
.map(|i| i.elapsed().as_millis() as u64)
.unwrap_or(0)
}
#[derive(Debug, Serialize)]
struct HealthResponse {
ip: String,
port: u16,
status: String,
version: String,
build_git_hash: String,
build_timestamp: String,
uptime_ms: u64,
}
#[derive(Debug, Serialize)]
struct JobListResponse {
jobs: Vec<JobInfoResponse>,
count: i64,
page: usize,
page_size: usize,
}
#[derive(Debug, Deserialize)]
struct JobsQuery {
page: Option<usize>,
page_size: Option<usize>,
status: Option<String>,
}
#[derive(Debug, Serialize)]
struct JobInfoResponse {
id: i32,
uuid: String,
status: String,
current_processor: Option<String>,
progress_current: i32,
progress_total: i32,
created_at: String,
started_at: Option<String>,
}
#[derive(Debug, Serialize)]
struct JobDetailResponse {
id: i32,
uuid: String,
status: String,
current_processor: Option<String>,
progress_current: i32,
progress_total: i32,
processors: Vec<ProcessorInfoResponse>,
created_at: String,
started_at: Option<String>,
updated_at: Option<String>,
}
#[derive(Debug, Serialize)]
struct ProcessorInfoResponse {
processor_type: String,
status: String,
started_at: Option<String>,
completed_at: Option<String>,
duration_secs: Option<f64>,
error_message: Option<String>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
enum SearchMode {
Vector,
Smart,
}
#[derive(Debug, Deserialize)]
struct SearchRequest {
query: String,
mode: Option<SearchMode>,
collection: Option<String>,
uuid: Option<String>,
limit: Option<usize>,
page: Option<usize>,
page_size: Option<usize>,
vector_weight: Option<f32>,
bm25_weight: Option<f32>,
}
#[derive(Debug, Deserialize)]
struct CacheToggleRequest {
enabled: bool,
}
#[derive(Debug, Serialize)]
struct CacheToggleResponse {
success: bool,
cache_enabled: bool,
message: String,
}
#[derive(Debug, Deserialize)]
struct AutoPipelineToggleRequest {
enabled: bool,
}
#[derive(Debug, Serialize)]
struct AutoPipelineToggleResponse {
success: bool,
auto_pipeline_enabled: bool,
message: String,
}
#[derive(Debug, Deserialize)]
struct WatcherAutoRegisterToggleRequest {
enabled: bool,
}
#[derive(Debug, Serialize)]
struct WatcherAutoRegisterToggleResponse {
success: bool,
watcher_auto_register_enabled: bool,
message: String,
}
// Missing structs added
#[derive(Debug, Deserialize)]
#[derive(Serialize)]
struct FileLookupMatch {
file_uuid: String,
file_name: String,
file_type: Option<String>,
status: String,
content_hash: Option<String>,
file_size: Option<i64>,
duration: Option<f64>,
width: Option<i32>,
height: Option<i32>,
}
#[derive(Serialize)]
struct FileLookupResponse {
file_name: String,
exists: bool,
matches: Vec<FileLookupMatch>,
next_name: String,
}
async fn lookup_file_by_name(
State(state): State<AppState>,
Query(params): Query<HashMap<String, String>>,
) -> Result<Json<FileLookupResponse>, StatusCode> {
let base = params.get("file_name").map(|s| s.trim().to_string()).unwrap_or_default();
if base.is_empty() {
return Ok(Json(FileLookupResponse {
file_name: String::new(),
exists: false,
matches: vec![],
next_name: String::new(),
}));
}
let table = schema::table_name("videos");
let dot_pos = base.rfind('.');
let (stem, ext) = match dot_pos {
Some(p) => (base[..p].to_string(), base[p..].to_string()),
None => (base.clone(), String::new()),
};
let pattern = format!("{}%%", &stem);
let query_sql = format!("SELECT file_uuid, file_name, file_type, status, content_hash, duration, width, height FROM {} WHERE file_name = $1 OR file_name LIKE $2 ORDER BY file_name", table);
let rows = sqlx::query(&query_sql)
.bind(&base)
.bind(&pattern)
.fetch_all(state.db.pool())
.await
.map_err(|e| { tracing::error!("lookup query error: {}", e); StatusCode::INTERNAL_SERVER_ERROR })?;
let exists = rows.iter().any(|r| r.get::<String, _>("file_name") == base);
let matches: Vec<FileLookupMatch> = rows.iter().map(|r| {
FileLookupMatch {
file_uuid: r.get("file_uuid"),
file_name: r.get("file_name"),
file_type: r.get("file_type"),
status: r.get("status"),
content_hash: r.get("content_hash"),
file_size: None,
duration: r.get("duration"),
width: r.get("width"),
height: r.get("height"),
}
}).collect();
let max_n: usize = rows.iter().filter_map(|r| {
let n: String = r.get("file_name");
if n == base { return Some(0usize); }
let rest = n.strip_prefix(&stem).and_then(|r| r.strip_suffix(&ext))?;
let inner = rest.trim().strip_prefix('(').and_then(|r| r.strip_suffix(')'))?;
inner.parse::<usize>().ok()
}).max().unwrap_or(0);
let next_name = if max_n == 0 && !exists {
base.clone()
} else {
format!("{} ({}){}", stem, max_n + 1, ext)
};
Ok(Json(FileLookupResponse {
file_name: base,
exists,
matches,
next_name,
}))
}
#[derive(Debug, Deserialize)]
struct RegisterFileRequest {
file_path: String,
pattern: Option<String>,
user_id: Option<i64>,
content_hash: Option<String>,
}
#[derive(Debug, Serialize)]
struct RegisterFileResponse {
success: bool,
file_uuid: String,
file_name: String,
file_path: String,
file_type: Option<String>,
duration: f64,
width: u32,
height: u32,
fps: f64,
total_frames: u64,
registration_time: Option<String>,
already_exists: bool,
message: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct SearchResult {
uuid: String,
chunk_id: String,
chunk_type: String,
start_time: f64,
end_time: f64,
text: String,
score: f32,
}
#[derive(Debug, Serialize, Deserialize)]
struct SearchResponse {
results: Vec<SearchResult>,
query: String,
total: usize,
page: usize,
page_size: usize,
limit: usize,
}
#[derive(Debug, Serialize)]
struct ProbeResponse {
file_uuid: String,
file_name: String,
file_size: Option<i64>,
duration: f64,
width: u32,
height: u32,
fps: f64,
total_frames: i64,
cached: bool,
format: crate::core::probe::FormatInfo,
streams: Vec<crate::core::probe::StreamInfo>,
}
// --- P0 API Structs ---
#[derive(Debug, Deserialize)]
struct ProcessRequest {
rules: Option<Vec<String>>,
processors: Option<Vec<String>>,
}
#[derive(Debug, Serialize)]
struct FrameProgress {
total_frames: i64,
processed_frames: i64,
progress_percent: f64,
}
#[derive(Debug, Serialize)]
struct AssetStatusResponse {
uuid: String,
file_name: String,
registration_time: String,
processing_status: String,
current_job_id: Option<String>,
frame_progress: Option<FrameProgress>,
}
#[derive(Debug, Serialize)]
struct JobStatusResponse {
job_id: String,
file_uuid: String,
rule: String,
status: String,
current_processor_id: Option<String>,
frame_progress: FrameProgress,
}
#[derive(Debug, Serialize)]
struct RuleStatusResponse {
rule: String,
supported_processor_ids: Vec<String>,
active_jobs: Vec<JobStatusResponse>,
}
// --- End P0 API Structs ---
#[derive(Debug, Deserialize)]
struct HybridSearchRequest {
query: String,
limit: Option<usize>,
page: Option<usize>,
page_size: Option<usize>,
uuid: Option<String>,
vector_weight: Option<f32>,
bm25_weight: Option<f32>,
}
#[derive(Debug, Serialize, Deserialize)]
struct HybridSearchResult {
uuid: String,
chunk_id: String,
chunk_type: String,
start_time: f64,
end_time: f64,
text: String,
vector_score: f64,
bm25_score: f64,
combined_score: f64,
}
#[derive(Debug, Serialize, Deserialize)]
struct HybridSearchResponse {
results: Vec<HybridSearchResult>,
query: String,
total: usize,
page: usize,
page_size: usize,
limit: usize,
}
fn dedup_search_results(results: Vec<SearchResult>) -> Vec<SearchResult> {
let mut seen: std::collections::HashMap<String, SearchResult> = std::collections::HashMap::new();
for r in results {
let key = r.chunk_id.clone();
match seen.get(&key) {
Some(existing) if existing.score >= r.score => continue,
_ => { seen.insert(key, r); }
}
}
let mut deduped: Vec<SearchResult> = seen.into_values().collect();
deduped.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap_or(std::cmp::Ordering::Equal));
deduped
}
fn dedup_hybrid_results(results: Vec<HybridSearchResult>) -> Vec<HybridSearchResult> {
let mut seen: std::collections::HashMap<String, HybridSearchResult> = std::collections::HashMap::new();
for r in results {
let key = r.chunk_id.clone();
match seen.get(&key) {
Some(existing) if existing.combined_score >= r.combined_score => continue,
_ => { seen.insert(key, r); }
}
}
let mut deduped: Vec<HybridSearchResult> = seen.into_values().collect();
deduped.sort_by(|a, b| b.combined_score.partial_cmp(&a.combined_score).unwrap_or(std::cmp::Ordering::Equal));
deduped
}
fn extract_text_from_content(content: &serde_json::Value) -> String {
let raw_text = content
.get("data")
.and_then(|data| data.get("text"))
.and_then(|v| v.as_str())
.or_else(|| content.get("text").and_then(|v| v.as_str()))
.unwrap_or("");
tokenize_chinese_text(raw_text)
}
fn extract_title_from_content(content: &serde_json::Value) -> String {
content
.get("data")
.and_then(|data| data.get("title"))
.and_then(|v| v.as_str())
.or_else(|| content.get("title").and_then(|v| v.as_str()))
.unwrap_or("")
.to_string()
}
#[derive(Debug, Deserialize)]
struct LookupQuery {
path: Option<String>,
uuid: Option<String>,
}
#[derive(Debug, Serialize)]
struct LookupResponse {
file_uuid: String,
file_path: Option<String>,
file_name: Option<String>,
duration: Option<f64>,
}
#[derive(Debug, Serialize, Deserialize)]
struct FileInfoResponse {
file_uuid: String,
file_path: String,
file_name: String,
file_type: Option<String>,
duration: f64,
width: u32,
height: u32,
status: String,
processing_status: Option<serde_json::Value>,
birth_registration: Option<serde_json::Value>,
created_at: Option<String>,
registration_time: Option<String>,
file_size: Option<i64>,
probe_json: Option<String>,
total_frames: u64,
}
#[derive(Debug, Serialize, Deserialize)]
struct FilesResponse {
files: Vec<FileInfoResponse>,
count: i64,
page: usize,
page_size: usize,
}
#[derive(Debug, Deserialize)]
struct FilesQuery {
page: Option<usize>,
page_size: Option<usize>,
status: Option<String>,
q: Option<String>,
uuid: Option<String>,
}
#[derive(Clone)]
pub struct AppState {
pub db: std::sync::Arc<crate::core::db::PostgresDb>,
pub embedder: std::sync::Arc<crate::Embedder>,
pub embedder_model: String,
pub mongo_cache: crate::core::cache::MongoCache,
pub redis_cache: crate::core::cache::RedisCache,
pub api_state: super::middleware::ApiState,
}
#[derive(Debug, Serialize)]
struct DetailedHealthResponse {
ip: String,
port: u16,
status: String,
version: String,
build_git_hash: String,
build_timestamp: String,
uptime_ms: u64,
services: ServiceHealth,
resources: ResourceStatus,
pipeline: PipelineStatus,
schema: SchemaHealth,
identities: IdentityHealth,
integrations: IntegrationHealth,
config: ConfigHealth,
}
#[derive(Debug, Serialize)]
struct IntegrationHealth {
tmdb: crate::core::tmdb::status::TmdbResourceStatus,
}
#[derive(Debug, Serialize)]
struct IdentityHealth {
directory_exists: bool,
files_count: usize,
index_ok: bool,
db_count: i64,
synced: bool,
}
#[derive(Debug, Serialize)]
struct ConfigHealth {
cache_enabled: bool,
auto_pipeline_enabled: bool,
watcher_auto_register_enabled: bool,
}
#[derive(Debug, Serialize)]
struct SchemaHealth {
table_exists: bool,
applied: Vec<MigrationInfo>,
required: Vec<MigrationInfo>,
ok: bool,
}
#[derive(Debug, Serialize)]
struct MigrationInfo {
filename: String,
checksum: String,
}
#[derive(Debug, Serialize)]
struct PipelineStatus {
/// Scripts directory available
scripts_ready: bool,
/// Number of Python processor scripts found
scripts_count: usize,
/// Key processor scripts present
processors: ProcessorInventory,
/// Models directory available
models_ready: bool,
/// Number of model files found
models_count: usize,
/// SHA256 checksum integrity: (pass_count, total_count)
scripts_integrity: ScriptIntegrity,
/// ffmpeg path
ffmpeg: bool,
/// Embedding server (port 11436)
embedding_server: ServiceStatus,
/// GDINO API (port 8080)
gdino_api: ServiceStatus,
/// LLM via llama.cpp (port 8082)
llm: ServiceStatus,
/// rsync file sync tool
rsync: ServiceStatus,
}
#[derive(Debug, Serialize)]
struct ScriptIntegrity {
matched: usize,
total: usize,
ok: bool,
}
#[derive(Debug, Serialize)]
struct ProcessorInventory {
asr: bool,
yolo: bool,
face: bool,
pose: bool,
ocr: bool,
cut: bool,
caption: bool,
scene: bool,
story: bool,
asrx: bool,
probe: bool,
visual_chunk: bool,
/// Count of total Python files in scripts dir
total_py_files: usize,
}
#[derive(Debug, Serialize)]
struct ResourceStatus {
cpu_used_percent: f64,
cpu_idle_percent: f64,
memory_available_mb: u64,
memory_total_mb: u64,
memory_used_percent: f64,
gpu_available: bool,
gpu_utilization: Option<f64>,
gpu_memory_used_pct: Option<f64>,
}
#[derive(Debug, Serialize)]
struct ServiceHealth {
postgres: ServiceStatus,
redis: ServiceStatus,
qdrant: ServiceStatus,
mongodb: ServiceStatus,
}
#[derive(Debug, Serialize)]
struct ServiceStatus {
status: String,
latency_ms: Option<u64>,
error: Option<String>,
}
async fn health(State(state): State<AppState>) -> Json<HealthResponse> {
let postgres = check_postgres().await;
let redis = check_redis().await;
let qdrant = check_qdrant().await;
let mongodb = check_mongodb(&state.mongo_cache).await;
let all_ok = postgres.status == "ok"
&& redis.status == "ok"
&& qdrant.status == "ok"
&& mongodb.status == "ok";
let status = if all_ok { "ok" } else { "degraded" };
if all_ok {
let _ = state.redis_cache.set_health(status).await;
}
Json(HealthResponse {
ip: get_host(),
port: get_port(),
status: status.to_string(),
version: env!("BUILD_VERSION").to_string(),
build_git_hash: env!("BUILD_GIT_HASH").to_string(),
build_timestamp: env!("BUILD_TIMESTAMP").to_string(),
uptime_ms: get_uptime_ms(),
})
}
async fn health_detailed(State(state): State<AppState>) -> Json<DetailedHealthResponse> {
let postgres = check_postgres().await;
let redis = check_redis().await;
let qdrant = check_qdrant().await;
let mongodb = check_mongodb(&state.mongo_cache).await;
let overall_status = if postgres.status == "ok"
&& redis.status == "ok"
&& qdrant.status == "ok"
&& mongodb.status == "ok"
{
"ok"
} else {
"degraded"
};
let sys = SystemResources::check();
let scripts_base = crate::core::config::SCRIPTS_DIR.clone();
let scripts_dir = std::path::Path::new(&scripts_base);
let scripts_path = scripts_dir.to_path_buf();
let models_path = std::path::PathBuf::from("/Users/accusys/momentry_core_0.1/models");
let py_files = std::fs::read_dir(&scripts_path)
.map(|d| d.filter_map(|e| e.ok()).filter(|e| e.path().extension().map(|x| x == "py").unwrap_or(false)).count())
.unwrap_or(0);
let total_model_files = std::fs::read_dir(&models_path)
.map(|d| d.filter_map(|e| e.ok()).filter(|e| {
let p = e.path();
let ext = p.extension().and_then(|x| x.to_str()).unwrap_or("");
matches!(ext, "pt" | "mlpackage" | "gguf" | "bin" | "onnx")
}).count())
.unwrap_or(0);
let check_script = |name: &str| -> bool {
let candidate = scripts_path.join(name);
candidate.exists()
};
let check_python_module = |module: &str| -> bool {
std::process::Command::new(
&*crate::core::config::PYTHON_PATH,
)
.arg("-c")
.arg(format!("import {}", module))
.output()
.map(|o| o.status.success())
.unwrap_or(false)
};
// SHA256 checksum verification against checksums.sha256 manifest
let checksums_path = scripts_path.join("checksums.sha256");
let scripts_integrity = match std::fs::read_to_string(&checksums_path) {
Ok(content) => {
let mut matched = 0usize;
let mut total = 0usize;
for line in content.lines() {
let line = line.trim();
if line.is_empty() { continue; }
let parts: Vec<&str> = line.splitn(2, ' ').collect();
if parts.len() < 2 { continue; }
let expected_hash = parts[0];
let file_path = parts[1].trim_start();
total += 1;
let full_path = scripts_path.join(file_path);
if full_path.exists() {
if let Ok(actual) = std::process::Command::new("shasum")
.arg("-a").arg("256")
.arg(&full_path)
.output()
{
let out = String::from_utf8_lossy(&actual.stdout);
let actual_hash = out.split(' ').next().unwrap_or("").to_string();
if actual_hash == expected_hash {
matched += 1;
}
}
}
}
ScriptIntegrity { matched, total, ok: matched == total }
}
Err(_) => ScriptIntegrity { matched: 0, total: 0, ok: false },
};
Json(DetailedHealthResponse {
ip: get_host(),
port: get_port(),
status: overall_status.to_string(),
version: env!("BUILD_VERSION").to_string(),
build_git_hash: env!("BUILD_GIT_HASH").to_string(),
build_timestamp: env!("BUILD_TIMESTAMP").to_string(),
uptime_ms: get_uptime_ms(),
services: ServiceHealth {
postgres,
redis,
qdrant,
mongodb,
},
resources: ResourceStatus {
cpu_used_percent: sys.cpu_used_percent,
cpu_idle_percent: sys.cpu_idle_percent,
memory_available_mb: sys.memory_available_mb,
memory_total_mb: sys.memory_total_mb,
memory_used_percent: sys.memory_used_percent,
gpu_available: sys.gpu_available,
gpu_utilization: sys.gpu_utilization,
gpu_memory_used_pct: sys.gpu_memory_used_pct,
},
pipeline: PipelineStatus {
scripts_ready: scripts_path.is_dir(),
scripts_count: py_files,
scripts_integrity,
processors: ProcessorInventory {
asr: check_script("asr_processor.py"),
yolo: check_script("yolo_processor.py"),
face: check_script("face_processor.py"),
pose: check_script("pose_processor.py"),
ocr: check_script("ocr_processor.py"),
cut: check_script("cut_processor.py"),
caption: check_script("caption_processor.py"),
scene: check_script("scene_classifier.py"),
story: check_script("story_processor.py"),
asrx: check_script("asrx_processor.py"),
probe: check_script("probe_file.py"),
visual_chunk: check_script("visual_chunk_processor.py"),
total_py_files: py_files,
},
models_ready: models_path.is_dir(),
models_count: total_model_files,
ffmpeg: std::process::Command::new("which").arg("ffmpeg").output().map(|o| o.status.success()).unwrap_or(false),
embedding_server: check_http("http://127.0.0.1:11436/health").await,
gdino_api: check_http("http://127.0.0.1:8080/health").await,
llm: check_http("http://127.0.0.1:8082/health").await,
rsync: check_rsync().await,
},
schema: check_schema_migrations(state.db.pool()).await,
identities: {
let identities_root = std::path::Path::new(&*crate::core::config::OUTPUT_DIR).join("identities");
let directory_exists = identities_root.is_dir();
let files_count = crate::core::identity::storage::count_identity_files();
let index_ok = crate::core::identity::storage::read_index().is_ok();
let db_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM identities")
.fetch_one(state.db.pool())
.await
.unwrap_or(0);
IdentityHealth {
directory_exists,
files_count,
index_ok,
db_count,
synced: directory_exists && files_count as i64 == db_count,
}
},
integrations: IntegrationHealth {
tmdb: crate::core::tmdb::status::quick_status(),
},
config: ConfigHealth {
cache_enabled: crate::core::config::get_cache_enabled(),
auto_pipeline_enabled: crate::core::config::get_auto_pipeline_enabled(),
watcher_auto_register_enabled: crate::core::config::get_watcher_auto_register(),
},
})
}
async fn health_consistency(
State(state): State<AppState>,
) -> Result<Json<crate::core::health_agent::ConsistencyReport>, (StatusCode, String)> {
let report = crate::core::health_agent::run_consistency_checks(&state.db).await;
if report.checks.iter().any(|c| c.count > 0) {
tracing::warn!(
"[HEALTH] Consistency issues found: {}",
report.checks.iter().filter(|c| c.count > 0).map(|c| format!("{}={}", c.check, c.count)).collect::<Vec<_>>().join(", ")
);
}
Ok(Json(report))
}
async fn login(
State(state): State<AppState>,
Json(req): Json<LoginRequest>,
) -> Result<axum::response::Response<axum::body::Body>, (StatusCode, Json<serde_json::Value>)> {
// Try users table first, fall back to legacy demo/demo
let (user_id, username, role) = 'resolve: {
// Step 1: Check local users table
if let Ok(Some((uid, uname, pw_hash, role_str))) = state.db.get_user_by_username(&req.username).await {
if crate::core::auth::password::verify_password(&req.password, &pw_hash) {
break 'resolve (uid, uname, role_str);
}
// Password mismatch — log and continue to SFTPGo
tracing::debug!("[LOGIN] Local password mismatch for {}, trying SFTPGo", &req.username);
}
// Step 3: Legacy demo/demo fallback
if req.username == "demo" && req.password == "demo" {
// Get actual user id from DB if exists
let uid = state.db.get_user_by_username("demo").await.ok()
.flatten().map(|(id, _, _, _)| id).unwrap_or(0);
break 'resolve (uid, "demo".to_string(), "user".to_string());
}
return Err((StatusCode::UNAUTHORIZED, Json(serde_json::json!({
"success": false, "message": "Invalid username or password"
}))));
};
// Create JWT
let jwt_token = crate::core::auth::jwt::create_jwt(user_id, &username, &role)
.map_err(|e| {
(StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({
"success": false, "message": format!("JWT creation failed: {}", e)
})))
})?;
// Create session
let session_id = uuid::Uuid::new_v4().to_string().replace('-', "");
state.db.create_session(&session_id, user_id, &DEMO_USER_API_KEY, 24).await.ok();
// Update last_login if real user
if user_id > 0 {
state.db.update_last_login(user_id).await.ok();
}
// Build response with session cookie
let body = serde_json::json!({
"success": true,
"jwt": jwt_token,
"api_key": DEMO_USER_API_KEY.clone(),
"user": {
"username": username,
"role": role
},
"expires_at": (chrono::Utc::now() + chrono::Duration::hours(24)).to_rfc3339()
});
let json_body = axum::body::Body::from(serde_json::to_string(&body).unwrap_or_default());
let response = axum::response::Response::builder()
.header("Content-Type", "application/json")
.header("Set-Cookie", format!(
"session_id={}; Path=/; HttpOnly; SameSite=Strict; Max-Age=86400", session_id
))
.body(json_body)
.unwrap();
Ok(response)
}
async fn logout(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
) -> Json<serde_json::value::Value> {
// Extract session_id from cookie
let cookies = crate::api::middleware::extract_cookies(&headers);
if let Some(sid) = cookies.iter().find(|(k, _)| k == "session_id").map(|(_, v)| v.clone()) {
state.db.delete_session(&sid).await.ok();
}
Json(serde_json::json!({
"success": true,
"message": "Logged out"
}))
}
async fn check_postgres() -> ServiceStatus {
let start = Instant::now();
match PostgresDb::init().await {
Ok(db) => match db.list_videos(1, 0).await {
Ok(_) => ServiceStatus {
status: "ok".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: None,
},
Err(e) => ServiceStatus {
status: "error".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: Some(e.to_string()),
},
},
Err(e) => ServiceStatus {
status: "error".to_string(),
latency_ms: None,
error: Some(e.to_string()),
},
}
}
async fn check_redis() -> ServiceStatus {
let start = Instant::now();
match RedisClient::new() {
Ok(redis) => match redis.get_conn().await {
Ok(mut conn) => {
let result: Result<String, _> = redis::cmd("PING").query_async(&mut conn).await;
match result {
Ok(_) => ServiceStatus {
status: "ok".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: None,
},
Err(e) => ServiceStatus {
status: "error".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: Some(e.to_string()),
},
}
}
Err(e) => ServiceStatus {
status: "error".to_string(),
latency_ms: None,
error: Some(e.to_string()),
},
},
Err(e) => ServiceStatus {
status: "error".to_string(),
latency_ms: None,
error: Some(e.to_string()),
},
}
}
async fn check_qdrant() -> ServiceStatus {
let start = Instant::now();
let base_url =
std::env::var("QDRANT_URL").unwrap_or_else(|_| "http://localhost:6333".to_string());
let api_key =
std::env::var("QDRANT_API_KEY").unwrap_or_else(|_| "Test3200Test3200Test3200".to_string());
let url = format!("{}/collections", base_url);
let client = reqwest::Client::new();
match client
.get(&url)
.header("api-key", api_key)
.timeout(std::time::Duration::from_secs(5))
.send()
.await
{
Ok(resp) if resp.status().is_success() => ServiceStatus {
status: "ok".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: None,
},
Ok(resp) => ServiceStatus {
status: "error".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: Some(format!("HTTP {}", resp.status())),
},
Err(e) => ServiceStatus {
status: "error".to_string(),
latency_ms: None,
error: Some(e.to_string()),
},
}
}
async fn check_mongodb(cache: &MongoCache) -> ServiceStatus {
let start = Instant::now();
match cache.health_check().await {
Ok(_) => ServiceStatus {
status: "ok".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: None,
},
Err(e) => ServiceStatus {
status: "error".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: Some(e.to_string()),
},
}
}
fn parse_required_migrations() -> Vec<MigrationInfo> {
let raw = env!("REQUIRED_MIGRATIONS");
if raw.is_empty() {
return vec![];
}
raw.split(',')
.filter_map(|entry| {
let mut parts = entry.splitn(2, ':');
let filename = parts.next()?.trim().to_string();
let checksum = parts.next()?.trim().to_string();
if filename.is_empty() || checksum.is_empty() {
return None;
}
Some(MigrationInfo { filename, checksum })
})
.collect()
}
async fn check_schema_migrations(pool: &sqlx::PgPool) -> SchemaHealth {
let required = parse_required_migrations();
// Check if table exists
let table_exists: bool = sqlx::query_scalar(
"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'schema_migrations')",
)
.fetch_one(pool)
.await
.unwrap_or(false);
if !table_exists {
return SchemaHealth {
table_exists: false,
applied: vec![],
required,
ok: false,
};
}
// Get applied migrations
let applied: Vec<MigrationInfo> = sqlx::query_as::<_, (String, String)>(
"SELECT filename, checksum FROM schema_migrations ORDER BY id",
)
.fetch_all(pool)
.await
.unwrap_or_default()
.into_iter()
.map(|(filename, checksum)| MigrationInfo { filename, checksum })
.collect();
// Compare: every required migration must be in applied with matching checksum
let ok = required.iter().all(|req| {
applied
.iter()
.any(|app| app.filename == req.filename && app.checksum == req.checksum)
});
SchemaHealth {
table_exists: true,
applied,
required,
ok,
}
}
async fn check_rsync() -> ServiceStatus {
let start = Instant::now();
let paths = [
std::path::Path::new("/Users/accusys/bin/rsync"),
std::path::Path::new("/opt/homebrew/bin/rsync"),
];
for p in &paths {
if p.exists() {
return ServiceStatus {
status: "ok".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: None,
};
}
}
ServiceStatus {
status: "error".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: Some("rsync not found (built from source expected at ~/bin/rsync)".to_string()),
}
}
async fn check_binary(name: &str) -> ServiceStatus {
let start = Instant::now();
match std::process::Command::new("which").arg(name).output() {
Ok(output) if output.status.success() => ServiceStatus {
status: "ok".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: None,
},
_ => ServiceStatus {
status: "error".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: Some(format!("{} not found in PATH", name)),
},
}
}
async fn check_http(url: &str) -> ServiceStatus {
let start = Instant::now();
match reqwest::get(url).await {
Ok(resp) => {
if resp.status().is_success() {
ServiceStatus {
status: "ok".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: None,
}
} else {
ServiceStatus {
status: "error".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: Some(format!("HTTP {}", resp.status())),
}
}
}
Err(e) => ServiceStatus {
status: "error".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: Some(e.to_string()),
},
}
}
fn generate_query_hash(query: &str, uuid: Option<&str>, limit: usize) -> String {
let data = serde_json::json!({
"query": query,
"uuid": uuid,
"limit": limit,
});
let mut hasher = Sha256::new();
hasher.update(data.to_string().as_bytes());
format!("{:x}", hasher.finalize())[..16].to_string()
}
fn generate_visual_search_hash(
uuid: &str,
criteria: &visual_chunk_search::VisualChunkSearchCriteria,
) -> String {
let data = serde_json::json!({
"uuid": uuid,
"criteria": criteria,
});
let mut hasher = Sha256::new();
hasher.update(data.to_string().as_bytes());
format!("{:x}", hasher.finalize())[..16].to_string()
}
/// Compute SHA256 for dedup. Returns hex string.
fn sha256_file(path: &std::path::Path) -> Option<String> {
crate::core::storage::content_hash::compute_sha256(path).ok()
}
/// Resolve name conflict: if file_name collides with existing but content differs,
/// append ` (N)` suffix. Returns the resolved file_name.
async fn resolve_filename(
db: &PostgresDb,
file_name: &str,
content_hash: &str,
) -> String {
let table = schema::table_name("videos");
let base = file_name.to_string();
let dot_pos = base.rfind('.');
let (stem, ext) = match dot_pos {
Some(p) => (base[..p].to_string(), base[p..].to_string()),
None => (base.clone(), String::new()),
};
let mut candidate = base.clone();
let mut attempt = 0usize;
loop {
// Check if candidate name exists with a DIFFERENT hash (same content = OK)
let conflict: Option<String> = sqlx::query_scalar(
&format!("SELECT file_uuid FROM {} WHERE file_name = $1 AND (content_hash IS DISTINCT FROM $2 OR content_hash IS NULL)", table)
)
.bind(&candidate)
.bind(content_hash)
.fetch_optional(db.pool())
.await
.unwrap_or(None);
if conflict.is_none() {
return candidate;
}
attempt += 1;
candidate = format!("{} ({}){}", stem, attempt, ext);
}
}
/// 註冊單一檔案(內部函數,不處理 pattern
async fn register_single_file(
state: &AppState,
file_path: &str,
_user_id: Option<i64>,
provided_hash: Option<String>,
) -> RegisterFileResponse {
tracing::info!("[REGISTER] Starting registration for: {}", file_path);
let path = std::path::Path::new(file_path);
if !path.exists() {
return RegisterFileResponse {
success: false,
file_uuid: String::new(),
file_name: String::new(),
file_path: file_path.to_string(),
file_type: None,
duration: 0.0,
width: 0,
height: 0,
fps: 0.0,
total_frames: 0,
registration_time: None,
already_exists: false,
message: format!("File not found: {}", file_path),
};
}
let canonical_path = path
.canonicalize()
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|_| file_path.to_string());
let file_name = path
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_default();
let db = match PostgresDb::init().await {
Ok(db) => db,
Err(e) => {
tracing::error!("[REGISTER] DB init failed: {}", e);
return RegisterFileResponse {
success: false,
file_uuid: String::new(),
file_name,
file_path: canonical_path,
file_type: None,
duration: 0.0,
width: 0,
height: 0,
fps: 0.0,
total_frames: 0,
registration_time: None,
already_exists: false,
message: format!("DB init failed: {}", e),
};
}
};
// Step 1: Try to load pre-computed data from .pre.json
let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string());
let birthday = std::fs::metadata(&path)
.ok()
.and_then(|m| m.modified().ok())
.map(|t| {
let secs = t.duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs();
chrono::DateTime::from_timestamp(secs as i64, 0)
.map(|dt| dt.to_rfc3339())
.unwrap_or_else(|| chrono::Utc::now().to_rfc3339())
})
.unwrap_or_else(|| chrono::Utc::now().to_rfc3339());
let mac_address = crate::core::storage::uuid::get_mac_address();
let pre_file_uuid = crate::core::storage::uuid::compute_birth_uuid(
&mac_address, &birthday, &canonical_path, &file_name,
);
let pre_path = std::path::Path::new(&output_dir).join(format!("{}.pre.json", pre_file_uuid));
let pre_data: Option<serde_json::Value> = std::fs::read_to_string(&pre_path).ok()
.and_then(|s| serde_json::from_str(&s).ok());
// Extract content_hash from pre.json or compute fresh
let (content_hash, birthday, _pre_file_uuid) = if let Some(ref pre) = pre_data {
let h = pre.get("content_hash").and_then(|v| v.as_str()).unwrap_or("").to_string();
let b = pre.get("birthday").and_then(|v| v.as_str()).unwrap_or(&birthday).to_string();
let u = pre.get("file_uuid").and_then(|v| v.as_str()).unwrap_or(&pre_file_uuid).to_string();
(h, b, u)
} else {
let h = provided_hash.filter(|h| !h.is_empty()).unwrap_or_else(|| sha256_file(&path).unwrap_or_default());
(h, birthday, pre_file_uuid)
};
// Recompute UUID with the resolved birthday
let file_uuid = crate::core::storage::uuid::compute_birth_uuid(
&mac_address, &birthday, &canonical_path, &file_name,
);
tracing::info!("[REGISTER] UUID inputs: mac={} birthday={} path={} name={} pre_found={} → {}", mac_address, birthday, canonical_path, file_name, pre_data.is_some(), file_uuid);
// Step 2: Hash check — same content = already registered (regardless of name)
let videos_table = schema::table_name("videos");
if !content_hash.is_empty() {
if let Ok(Some(existing_uuid)) = sqlx::query_scalar::<_, String>(
&format!("SELECT file_uuid FROM {} WHERE content_hash = $1 LIMIT 1", videos_table)
)
.bind(&content_hash)
.fetch_optional(db.pool())
.await
{
tracing::info!("[REGISTER] Content hash collision → already registered: {}", existing_uuid);
let existing_info: Option<(String, String, f64, i32, i32, f64, i64, Option<String>)> = sqlx::query_as(
&format!("SELECT file_name, file_path, duration, width, height, fps, total_frames, registration_time::text FROM {} WHERE file_uuid = $1", videos_table)
).bind(&existing_uuid).fetch_optional(db.pool()).await.unwrap_or(None);
if let Some((ename, epath, dur, w, h, f, tf, rt)) = existing_info {
return RegisterFileResponse {
success: true,
file_uuid: existing_uuid,
file_name: ename,
file_path: epath.clone(),
file_type: None,
duration: dur,
width: w as u32,
height: h as u32,
fps: f,
total_frames: tf as u64,
registration_time: rt,
already_exists: true,
message: format!("Content already registered: {}", epath),
};
}
// Fallback: content_hash matched but full info query failed
return RegisterFileResponse {
success: true,
file_uuid: existing_uuid,
file_name: file_name.clone(),
file_path: canonical_path.clone(),
file_type: None,
duration: 0.0,
width: 0,
height: 0,
fps: 0.0,
total_frames: 0,
registration_time: None,
already_exists: true,
message: "Content already registered (identical file)".to_string(),
};
}
}
// Step 3: Name check — same name but different content → auto-rename
let final_name = resolve_filename(&db, &file_name, &content_hash).await;
// Step 4: Compute UUID using birthday from pre.json or file creation time (never DB registration_time)
let mac_address = crate::core::storage::uuid::get_mac_address();
let file_uuid = crate::core::storage::uuid::compute_birth_uuid(
&mac_address,
&birthday,
&canonical_path,
&final_name,
);
// Step 5: Unified probe — use pre.json, otherwise run unified_probe()
let temp_probe_json: serde_json::Value = if let Some(ref pre) = pre_data {
pre.get("probe_json").cloned().unwrap_or_default()
} else {
let scripts_dir = std::env::var("MOMENTRY_SCRIPTS_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry_core_0.1/scripts".to_string());
let python_path = std::env::var("MOMENTRY_PYTHON_PATH")
.unwrap_or_else(|_| "/opt/homebrew/bin/python3.11".to_string());
crate::core::probe::unified::unified_probe(&path, &scripts_dir, &python_path).await
};
let probe_json = Some(temp_probe_json.clone());
let has_video = temp_probe_json.get("streams").and_then(|s| s.as_array())
.map_or(false, |streams| streams.iter().any(|st| st.get("codec_type").and_then(|c| c.as_str()) == Some("video")));
let has_audio = temp_probe_json.get("streams").and_then(|s| s.as_array())
.map_or(false, |streams| streams.iter().any(|st| st.get("codec_type").and_then(|c| c.as_str()) == Some("audio")));
let final_file_type = if has_video {
Some("video".to_string())
} else if has_audio {
Some("audio".to_string())
} else {
Some(temp_probe_json.get("format").and_then(|f| f.get("file_type")).and_then(|v| v.as_str()).unwrap_or("unknown").to_string())
};
let duration = temp_probe_json.get("format").and_then(|f| {
let src = if has_video { f.get("duration") } else { None };
src.and_then(|v| v.as_str()).and_then(|s| s.parse::<f64>().ok())
}).unwrap_or(0.0);
let mut width = 0u32;
let mut height = 0u32;
let mut fps = 0.0;
let mut total_frames = 0u64;
if let Some(streams) = temp_probe_json.get("streams").and_then(|s| s.as_array()) {
if let Some(s) = streams.iter().find(|st| st.get("codec_type").and_then(|c| c.as_str()) == Some("video")) {
width = s.get("width").and_then(|v| v.as_i64()).unwrap_or(0) as u32;
height = s.get("height").and_then(|v| v.as_i64()).unwrap_or(0) as u32;
if let Some(fps_str) = s.get("r_frame_rate").and_then(|v| v.as_str()) {
if let Some((num, den)) = fps_str.split_once('/') {
if let (Ok(n), Ok(d)) = (num.parse::<f64>(), den.parse::<f64>()) {
if d > 0.0 { fps = n / d; }
}
}
}
total_frames = s.get("nb_frames").and_then(|v| v.as_str())
.and_then(|s| s.parse().ok()).unwrap_or((duration * fps) as u64);
}
}
let videos_table = schema::table_name("videos");
let status = "registered";
let _ = sqlx::query(&format!(
"INSERT INTO {} (file_uuid, file_path, file_name, file_type, duration, width, height, fps, probe_json, status, content_hash, registration_time) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW()) ON CONFLICT (file_uuid) DO UPDATE SET file_path = EXCLUDED.file_path, file_name = EXCLUDED.file_name, status = EXCLUDED.status, content_hash = EXCLUDED.content_hash",
videos_table
))
.bind(&file_uuid).bind(&canonical_path).bind(&final_name).bind(&final_file_type)
.bind(duration).bind(width as i32).bind(height as i32).bind(fps)
.bind(&probe_json).bind(status).bind(&content_hash)
.execute(db.pool()).await;
// 若是 video 類型,同步執行 CUT + Scene 分類
let mut cut_done = false;
let mut scene_done = false;
if has_video && total_frames > 0 && fps > 0.0 {
let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string());
let scripts_dir = std::env::var("MOMENTRY_SCRIPTS_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry_core_0.1/scripts".to_string());
let python_path = std::env::var("MOMENTRY_PYTHON_PATH")
.unwrap_or_else(|_| "/opt/homebrew/bin/python3.11".to_string());
// CUT: 場景檢測PySceneDetect0.08s 即可完成)
let cut_path = std::path::Path::new(&output_dir).join(format!("{}.cut.json", file_uuid));
if !cut_path.exists() {
let cut_script = std::path::Path::new(&scripts_dir).join("cut_processor.py");
if cut_script.exists() {
let cut_output = std::process::Command::new(&python_path)
.arg(&cut_script)
.arg(&canonical_path)
.arg(&cut_path)
.output();
if let Ok(output) = cut_output {
if output.status.success() {
cut_done = true;
tracing::info!("[REGISTER] CUT completed for {}", file_uuid);
} else {
let stderr = String::from_utf8_lossy(&output.stderr);
tracing::error!("[REGISTER] CUT failed for {}: {}", file_uuid, stderr);
}
} else {
tracing::error!("[REGISTER] CUT execution error for {}", file_uuid);
}
}
} else {
cut_done = true;
// 讀取現有 CUT JSON 取得場景數
if let Ok(content) = std::fs::read_to_string(&cut_path) {
if let Ok(cut_data) = serde_json::from_str::<serde_json::Value>(&content) {
let scenes = cut_data
.get("scenes")
.and_then(|s| s.as_array())
.map(|a| a.len() as i32)
.unwrap_or(0);
tracing::info!(
"[REGISTER] CUT already exists: {} scenes for {}",
scenes,
file_uuid
);
}
}
}
// Scene: 場景分類MIT Places365取樣間隔 2s
let scene_path =
std::path::Path::new(&output_dir).join(format!("{}.scene.json", file_uuid));
if !scene_path.exists() {
let scene_script = std::path::Path::new(&scripts_dir).join("scene_classifier.py");
if scene_script.exists() {
let scene_output = std::process::Command::new(&python_path)
.arg(&scene_script)
.arg(&canonical_path)
.arg(&scene_path)
.arg("--sample-interval")
.arg("2")
.output();
if let Ok(output) = scene_output {
if output.status.success() {
scene_done = true;
tracing::info!(
"[REGISTER] Scene classification completed for {}",
file_uuid
);
}
}
}
} else {
scene_done = true;
}
}
// 更新 DB: cut_done, scene_done, audio_tracks
let audio_tracks: Vec<serde_json::Value> = temp_probe_json.get("streams").and_then(|s| s.as_array()).map_or(vec![], |streams| {
streams.iter()
.filter(|st| st.get("codec_type").and_then(|c| c.as_str()) == Some("audio"))
.map(|st| {
serde_json::json!({
"index": st.get("index").and_then(|v| v.as_i64()),
"codec": st.get("codec_name").and_then(|v| v.as_str()),
"channels": st.get("channels").and_then(|v| v.as_i64()),
"sample_rate": st.get("sample_rate").and_then(|v| v.as_str()),
"language": st.get("tags").and_then(|t| t.get("language")),
})
})
.collect()
});
let audio_tracks_json = serde_json::to_value(&audio_tracks).ok();
// 計算 cut_count 與 cut_max_duration
let cut_path = std::path::Path::new(
&std::env::var("MOMENTRY_OUTPUT_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string()),
)
.join(format!("{}.cut.json", file_uuid));
let mut cut_count = 0i32;
let mut cut_max_duration = 0.0f64;
if let Ok(content) = std::fs::read_to_string(&cut_path) {
if let Ok(cut_data) = serde_json::from_str::<serde_json::Value>(&content) {
if let Some(scenes) = cut_data.get("scenes").and_then(|s| s.as_array()) {
cut_count = scenes.len() as i32;
cut_max_duration = scenes
.iter()
.filter_map(|s| {
let start = s.get("start_time").and_then(|v| v.as_f64()).unwrap_or(0.0);
let end = s.get("end_time").and_then(|v| v.as_f64()).unwrap_or(0.0);
if end > start {
Some(end - start)
} else {
None
}
})
.fold(0.0_f64, f64::max);
}
}
}
let videos_table = schema::table_name("videos");
let _ = sqlx::query(
&format!("UPDATE {} SET cut_done = $1, scene_done = $2, audio_tracks = $3, cut_count = $4, cut_max_duration = $5 WHERE file_uuid = $6", videos_table)
)
.bind(cut_done).bind(scene_done).bind(&audio_tracks_json).bind(cut_count).bind(cut_max_duration).bind(&file_uuid)
.execute(db.pool()).await;
// 寫入 {file_uuid}.probe.json含音軌資訊
if let Some(json_val) = probe_json {
let probe_path = std::path::Path::new(
&std::env::var("MOMENTRY_OUTPUT_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string()),
)
.join(format!("{}.probe.json", file_uuid));
let json_str = serde_json::to_string_pretty(&json_val).unwrap_or_default();
let _ = std::fs::write(&probe_path, json_str);
}
// Auto-run offline TMDb prefetch + probe for video files (no API calls needed)
if final_file_type.as_deref() == Some("video") {
let auto_file_uuid = file_uuid.clone();
let auto_db = db.clone();
tokio::spawn(async move {
// Step 1: Offline prefetch (reads local identity files)
let identities_dir = std::path::Path::new(&*crate::core::config::OUTPUT_DIR).join("identities");
let index_path = identities_dir.join("_index.json");
let cache_path = format!("{}/{}.tmdb.json", *crate::core::config::OUTPUT_DIR, auto_file_uuid);
let cache_file = std::path::Path::new(&cache_path);
if index_path.exists() && cache_file.exists() {
tracing::info!("[AUTO-TMDB] Offline cache found for {}, running probe", auto_file_uuid);
if let Err(e) = crate::core::tmdb::probe::probe_from_cache(&auto_db, &auto_file_uuid).await {
tracing::warn!("[AUTO-TMDB] Probe failed for {}: {}", auto_file_uuid, e);
} else {
tracing::info!("[AUTO-TMDB] Probe completed for {}", auto_file_uuid);
}
} else {
tracing::info!("[AUTO-TMDB] No offline cache for {}, skipping", auto_file_uuid);
}
});
}
RegisterFileResponse {
success: true,
file_uuid,
file_name,
file_path: canonical_path,
file_type: final_file_type,
duration,
width,
height,
fps,
total_frames,
registration_time: None,
already_exists: false,
message: "File registered successfully".to_string(),
}
}
async fn register_file(
State(state): State<AppState>,
Json(req): Json<RegisterFileRequest>,
) -> Result<Json<RegisterFileResponse>, StatusCode> {
let file_path = req.file_path.clone();
let pattern = req.pattern;
// 如果有 pattern掃描目錄下所有符合的檔案逐一註冊
if let Some(ref pat) = pattern {
let dir = std::path::Path::new(&file_path);
if !dir.is_dir() {
return Ok(Json(RegisterFileResponse {
success: false,
file_uuid: String::new(),
file_name: String::new(),
file_path: file_path.clone(),
file_type: None,
duration: 0.0,
width: 0,
height: 0,
fps: 0.0,
total_frames: 0,
registration_time: None,
already_exists: false,
message: format!(
"Pattern requires a directory, but path is not a dir: {}",
file_path
),
}));
}
let re = regex::Regex::new(pat).map_err(|e| {
tracing::error!("[REGISTER] Invalid regex pattern: {}", e);
StatusCode::BAD_REQUEST
})?;
let mut registered = 0u32;
let mut failed = 0u32;
let mut skipped = 0u32;
if let Ok(entries) = std::fs::read_dir(dir) {
for entry in entries.flatten() {
let entry_path = entry.path();
if !entry_path.is_file() {
continue;
}
let fname = entry_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("");
if !re.is_match(fname) {
continue;
}
let result = register_single_file(
&state,
&entry_path.to_string_lossy().to_string(),
req.user_id,
None,
)
.await;
if result.success {
registered += 1;
} else if result.already_exists {
skipped += 1;
} else {
failed += 1;
}
}
}
return Ok(Json(RegisterFileResponse {
success: true,
file_uuid: format!("batch_{}_registered_{}_failed", registered, failed),
file_name: format!(
"{} files registered, {} skipped, {} failed",
registered, skipped, failed
),
file_path: file_path.clone(),
file_type: None,
duration: 0.0,
width: 0,
height: 0,
fps: 0.0,
total_frames: 0,
registration_time: None,
already_exists: false,
message: format!(
"Batch register: {} registered, {} skipped, {} failed",
registered, skipped, failed
),
}));
}
// 單一檔案註冊
let resp = register_single_file(&state, &file_path, req.user_id, req.content_hash).await;
// Auto-trigger pipeline for newly registered video files
if resp.success && !resp.already_exists && resp.file_type.as_deref() == Some("video")
&& crate::core::config::get_auto_pipeline_enabled() {
let auto_uuid = resp.file_uuid.clone();
let auto_state = state.clone();
tokio::spawn(async move {
// Brief delay to let DB settle, then trigger processing
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let video_path: Option<String> = sqlx::query_scalar(
&format!("SELECT file_path FROM {} WHERE file_uuid = $1", schema::table_name("videos"))
)
.bind(&auto_uuid)
.fetch_optional(auto_state.db.pool())
.await
.ok()
.flatten();
if let Some(ref vp) = video_path {
if let Ok(job) = auto_state.db.create_monitor_job(&auto_uuid, Some(vp)).await {
tracing::info!("[AUTO-PIPELINE] Job {} created for {}", job.id, auto_uuid);
// Initialize processing status with all processors
let all_procs: Vec<&str> = vec!["asr","cut","yolo","ocr","face","pose","asrx","visual_chunk","5w1h"];
let total = sqlx::query_scalar::<_, i64>(
&format!("SELECT COALESCE(total_frames, 0) FROM {} WHERE file_uuid = $1", schema::table_name("videos"))
)
.bind(&auto_uuid)
.fetch_one(auto_state.db.pool())
.await
.unwrap_or(0);
let _ = auto_state.db.init_processing_status(&auto_uuid, all_procs, total as u64).await;
let _ = sqlx::query(&format!("UPDATE {} SET status = 'processing' WHERE file_uuid = $1", schema::table_name("videos")))
.bind(&auto_uuid)
.execute(auto_state.db.pool())
.await;
tracing::info!("[AUTO-PIPELINE] Pipeline triggered for {}", auto_uuid);
}
}
});
}
return Ok(Json(resp));
}
async fn probe_by_uuid(
State(state): State<AppState>,
Path(file_uuid): Path<String>,
) -> Result<Json<ProbeResponse>, (StatusCode, Json<serde_json::Value>)> {
let table = schema::table_name("videos");
let row: Option<(String, String)> = sqlx::query_as(&format!(
"SELECT file_name, file_path FROM {} WHERE file_uuid = $1",
table
))
.bind(&file_uuid)
.fetch_optional(state.db.pool())
.await
.map_err(|e| {
tracing::error!("DB error fetching video: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("DB error: {}", e), "file_uuid": file_uuid})),
)
})?;
let (file_name, path) = row.ok_or_else(|| {
tracing::warn!("Video not found: {}", file_uuid);
(
StatusCode::NOT_FOUND,
Json(serde_json::json!({"error": "Video not found", "file_uuid": file_uuid})),
)
})?;
// 2. Check for cached probe.json
let probe_path = format!(
"{}/{}.probe.json",
crate::core::config::OUTPUT_DIR.as_str(),
file_uuid
);
let (probe_result, cached) = if let Ok(content) = std::fs::read_to_string(&probe_path) {
tracing::info!("Using cached probe.json: {}", probe_path);
let result: crate::core::probe::ProbeResult =
serde_json::from_str(&content).map_err(|e| {
tracing::error!("Failed to parse cached probe.json: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("Failed to parse cached probe.json: {}", e), "file_uuid": file_uuid})),
)
})?;
(result, true)
} else {
// Check if file still exists before running ffprobe
let file_path = std::path::Path::new(&path);
if !file_path.exists() {
tracing::error!("File not found at path: {}", path);
return Err((
StatusCode::NOT_FOUND,
Json(serde_json::json!({"error": "File does not exist at registered path", "file_uuid": file_uuid, "file_path": path})),
));
}
tracing::info!("Running ffprobe for: {}", path);
let result = crate::core::probe::probe_video(&path).map_err(|e| {
tracing::error!("ffprobe failed: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("ffprobe failed: {}", e), "file_uuid": file_uuid, "file_path": path})),
)
})?;
// Save probe.json to OUTPUT_DIR
let file_manager = FileManager::new(std::path::PathBuf::from(
crate::core::config::OUTPUT_DIR.as_str(),
));
let json_str = serde_json::to_string(&result).map_err(|e| {
tracing::error!("Failed to serialize probe result: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("Failed to serialize probe result: {}", e), "file_uuid": file_uuid})),
)
})?;
file_manager
.save_json(&file_uuid, "probe", &json_str)
.map_err(|e| {
tracing::error!("Failed to save probe.json: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("Failed to save probe.json: {}", e), "file_uuid": file_uuid})),
)
})?;
(result, false)
};
// 3. Extract video info
let duration = probe_result
.format
.duration
.as_ref()
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(0.0);
let mut width = 0u32;
let mut height = 0u32;
let mut fps = 0.0;
for stream in &probe_result.streams {
if stream.codec_type.as_deref() == Some("video") {
width = stream.width.unwrap_or(0);
height = stream.height.unwrap_or(0);
if let Some(fps_str) = &stream.r_frame_rate {
fps = if fps_str.contains('/') {
let parts: Vec<&str> = fps_str.split('/').collect();
if parts.len() == 2 {
let num: f64 = parts[0].parse().unwrap_or(0.0);
let den: f64 = parts[1].parse().unwrap_or(1.0);
if den > 0.0 {
num / den
} else {
0.0
}
} else {
0.0
}
} else {
fps_str.parse().unwrap_or(0.0)
};
}
}
}
// 4. Calculate total_frames and update DB
let total_frames = probe_result
.streams
.iter()
.find(|s| s.codec_type.as_deref() == Some("video"))
.and_then(|s| s.nb_frames.as_ref())
.and_then(|n| n.parse::<i64>().ok())
.unwrap_or_else(|| (duration * fps).floor() as i64);
// Update videos table with probe metadata
let _ = sqlx::query(&format!(
"UPDATE {} SET duration = $1, width = $2, height = $3, fps = $4, total_frames = $5 WHERE file_uuid = $6",
schema::table_name("videos")
))
.bind(duration)
.bind(width as i32)
.bind(height as i32)
.bind(fps)
.bind(total_frames)
.bind(&file_uuid)
.execute(state.db.pool())
.await;
let file_size = std::fs::metadata(&path).ok().map(|m| m.len() as i64);
Ok(Json(ProbeResponse {
file_uuid,
file_name,
file_size,
duration,
width,
height,
fps,
total_frames,
cached,
format: probe_result.format,
streams: probe_result.streams,
}))
}
// --- P0 Core API Handlers ---
async fn trigger_processing(
State(state): State<AppState>,
Path(file_uuid): Path<String>,
Json(req): Json<ProcessRequest>,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
let table = schema::table_name("videos");
let asset: Option<(String, i64)> = sqlx::query_as(&format!(
"SELECT file_name, COALESCE(total_frames, 0) FROM {} WHERE file_uuid = $1",
table
))
.bind(&file_uuid)
.fetch_optional(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("DB Error: {}", e),
)
})?;
let (file_name, total_frames) =
asset.ok_or((StatusCode::NOT_FOUND, "File not found".to_string()))?;
// Get video file_path
let video_path: Option<String> = sqlx::query_scalar(&format!(
"SELECT file_path FROM {} WHERE file_uuid = $1",
table
))
.bind(&file_uuid)
.fetch_optional(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to get video path: {}", e),
)
})?;
// 2. Create Monitor Job (Worker polls this table)
let monitor_job = state
.db
.create_monitor_job(&file_uuid, video_path.as_deref())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to create monitor job: {}", e),
)
})?;
// Update processors if specified
let processors_to_run: Vec<&str> = if let Some(procs) = &req.processors {
let table = crate::core::db::schema::table_name("monitor_jobs");
sqlx::query(&format!(
"UPDATE {} SET processors = $1 WHERE id = $2",
table
))
.bind(procs)
.bind(monitor_job.id)
.execute(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to update processors: {}", e),
)
})?;
procs.iter().map(|s| s.as_str()).collect()
} else {
// Empty means all processors
vec![
"asr",
"cut",
"yolo",
"ocr",
"face",
"pose",
"asrx",
"visual_chunk",
"5w1h",
]
};
// 3. Update Asset Status
let table = schema::table_name("videos");
sqlx::query(&format!(
"UPDATE {} SET status = 'processing' WHERE file_uuid = $1",
table
))
.bind(&file_uuid)
.execute(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to update asset status: {}", e),
)
})?;
// Initialize processing_status with all processors and total_frames
state
.db
.init_processing_status(&file_uuid, processors_to_run, total_frames as u64)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to init processing status: {}", e),
)
})?;
tracing::info!("Job {} created for asset {}", monitor_job.id, file_uuid);
let prefix = REDIS_KEY_PREFIX.as_str();
let mut processor_pids: Vec<i32> = Vec::new();
if let Ok(redis_client) = RedisClient::new() {
if let Ok(mut conn) = redis_client.get_conn().await {
for name in ["asr", "cut", "asrx", "yolo", "ocr", "face", "pose"] {
let key = format!("{}job:{}:processor:{}", prefix, file_uuid, name);
let pid: Option<i32> = redis::cmd("HGET")
.arg(&key)
.arg("pid")
.query_async(&mut conn)
.await
.ok()
.flatten();
if let Some(p) = pid {
processor_pids.push(p);
}
}
}
}
Ok(Json(serde_json::json!({
"job_id": monitor_job.id,
"file_uuid": file_uuid,
"status": "PENDING",
"pids": processor_pids,
"message": format!("Processing triggered for {}", file_name)
})))
}
async fn download_json(
Path((file_uuid, processor)): Path<(String, String)>,
) -> Result<(StatusCode, [(String, String); 1], Vec<u8>), StatusCode> {
let output_dir = crate::core::config::OUTPUT_DIR.as_str();
let path = std::path::Path::new(output_dir).join(format!("{}.{}.json", file_uuid, processor));
if !path.exists() {
return Err(StatusCode::NOT_FOUND);
}
let data = std::fs::read(&path).map_err(|_| StatusCode::NOT_FOUND)?;
Ok((
StatusCode::OK,
[("content-type".to_string(), "application/json".to_string())],
data,
))
}
async fn get_chunk_by_path(
Path((file_uuid, chunk_id)): Path<(String, String)>,
State(state): State<AppState>,
) -> Result<Json<Chunk>, StatusCode> {
let chunk = state
.db
.get_chunk_by_chunk_id_and_uuid(&chunk_id, &file_uuid)
.await
.map_err(|_| {
tracing::error!("[get_chunk_by_path] DB error: {}:{}", file_uuid, chunk_id);
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or_else(|| {
tracing::warn!("[get_chunk_by_path] Not found: {}:{}", file_uuid, chunk_id);
StatusCode::NOT_FOUND
})?;
Ok(Json(chunk))
}
async fn get_asset_status(
State(state): State<AppState>,
Path(uuid): Path<String>,
) -> Result<Json<AssetStatusResponse>, StatusCode> {
let videos_table = schema::table_name("videos");
let row: Option<(String, String, chrono::DateTime<chrono::Utc>, String, i64)> = sqlx::query_as(
&format!(
"SELECT file_uuid, file_name, created_at AT TIME ZONE 'UTC', COALESCE(processing_status, 'REGISTERED'), COALESCE(total_frames, 0) FROM {} WHERE file_uuid = $1",
videos_table
)
)
.bind(&uuid)
.fetch_optional(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let (uuid, file_name, time, status, total) = row.ok_or(StatusCode::NOT_FOUND)?;
let jobs_table = schema::table_name("monitor_jobs");
let job: Option<(String, String, i64, i64)> = sqlx::query_as(
&format!(
"SELECT id::text, COALESCE(status, 'QUEUED'), COALESCE(processed_frames, 0), COALESCE(total_frames, 0) FROM {} WHERE file_uuid = $1 ORDER BY created_at DESC LIMIT 1",
jobs_table
)
)
.bind(&uuid)
.fetch_optional(state.db.pool())
.await
.ok()
.flatten();
let progress = if let Some((jid, jstatus, pf, tf)) = job {
if tf > 0 && (jstatus == "RUNNING" || jstatus == "QUEUED") {
Some((
jid,
FrameProgress {
total_frames: tf,
processed_frames: pf,
progress_percent: (pf as f64 / tf as f64) * 100.0,
},
))
} else {
None
}
} else {
None
};
Ok(Json(AssetStatusResponse {
uuid,
file_name,
registration_time: time.to_rfc3339(),
processing_status: status,
current_job_id: progress.as_ref().map(|(id, _)| id.clone()),
frame_progress: progress.map(|(_, p)| p),
}))
}
async fn get_job_status(
State(state): State<AppState>,
Path(job_id): Path<String>,
) -> Result<Json<JobStatusResponse>, StatusCode> {
let jobs_table = schema::table_name("monitor_jobs");
let row: Option<(String, String, String, String, Option<String>, i64, i64)> = sqlx::query_as(
&format!(
"SELECT j.id::text, j.file_uuid, COALESCE(j.rule, 'unknown'), COALESCE(j.status, 'QUEUED'), j.assigned_processor_id::text, j.processed_frames, j.total_frames FROM {} j WHERE j.id = $1::uuid",
jobs_table
)
)
.bind(&job_id)
.fetch_optional(state.db.pool())
.await
.map_err(|e| {
eprintln!("DB Error in get_job_status: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let (id, asset, rule, status, proc_id, pf, tf) = row.ok_or(StatusCode::NOT_FOUND)?;
Ok(Json(JobStatusResponse {
job_id: id,
file_uuid: asset,
rule,
status,
current_processor_id: proc_id,
frame_progress: FrameProgress {
total_frames: tf,
processed_frames: pf,
progress_percent: if tf > 0 {
(pf as f64 / tf as f64) * 100.0
} else {
0.0
},
},
}))
}
async fn get_rule_status(
State(state): State<AppState>,
Path(rule): Path<String>,
) -> Result<Json<RuleStatusResponse>, StatusCode> {
let procs: Vec<String> = sqlx::query_scalar(
"SELECT id::text FROM processors WHERE supported_rules @> ARRAY[$1]::TEXT[]",
)
.bind(&rule)
.fetch_all(state.db.pool())
.await
.unwrap_or_default();
let jobs_table = schema::table_name("monitor_jobs");
let jobs: Vec<(String, String, String, String, Option<String>, i64, i64)> = sqlx::query_as(
&format!(
"SELECT id::text, file_uuid, COALESCE(rule, 'unknown'), status, assigned_processor_id::text, processed_frames, total_frames FROM {} WHERE rule = $1 AND status IN ('QUEUED','RUNNING')",
jobs_table
)
)
.bind(&rule)
.fetch_all(state.db.pool())
.await
.unwrap_or_default();
let active = jobs
.into_iter()
.map(|(id, asset, r, s, p, pf, tf)| JobStatusResponse {
job_id: id,
file_uuid: asset,
rule: r,
status: s,
current_processor_id: p,
frame_progress: FrameProgress {
total_frames: tf,
processed_frames: pf,
progress_percent: if tf > 0 {
(pf as f64 / tf as f64) * 100.0
} else {
0.0
},
},
})
.collect();
Ok(Json(RuleStatusResponse {
rule,
supported_processor_ids: procs,
active_jobs: active,
}))
}
// --- End P0 Core API Handlers ---
async fn search(
State(state): State<AppState>,
Json(req): Json<SearchRequest>,
) -> Result<Json<SearchResponse>, StatusCode> {
let mode = req.mode.unwrap_or(SearchMode::Smart);
let limit = req.limit.unwrap_or(10);
let query_hash = generate_query_hash(&req.query, req.uuid.as_deref(), limit);
let cache_key = keys::search(&format!("{:?}", mode));
let ttl = state.mongo_cache.ttl_search();
let response = state
.mongo_cache
.get_or_fetch(&cache_key, ttl, keys::CATEGORY_SEARCH, || async {
let pg = PostgresDb::init()
.await
.map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?;
let results: Vec<SearchResult> = match mode {
SearchMode::Vector => {
let query_vector = state
.embedder
.embed_query(&req.query)
.await
.map_err(|e| anyhow::anyhow!("Embedding failed: {}", e))?;
let qdrant = QdrantDb::new();
let search_results = if let Some(ref uuid) = req.uuid {
qdrant.search_in_uuid(&query_vector, uuid, limit).await?
} else {
qdrant.search(&query_vector, limit).await?
};
let mut results = Vec::new();
for r in search_results {
if let Some(chunk) = pg
.get_chunk_by_chunk_id_and_uuid(&r.chunk_id, &r.uuid)
.await
.ok()
.flatten()
{
let text = extract_text_from_content(&chunk.content);
results.push(SearchResult {
uuid: r.uuid,
chunk_id: r.chunk_id,
chunk_type: chunk.chunk_type.as_str().to_string(),
start_time: chunk.start_time().seconds(),
end_time: chunk.end_time().seconds(),
text,
score: r.score,
});
}
}
results
}
SearchMode::Smart => {
// Vector search + BM25 reranking
let query_vector = state
.embedder
.embed_query(&req.query)
.await
.map_err(|e| anyhow::anyhow!("Embedding failed: {}", e))?;
let qdrant = QdrantDb::new();
let search_results = if let Some(ref uuid) = req.uuid {
qdrant
.search_in_uuid(&query_vector, uuid, limit * 2)
.await?
} else {
qdrant.search(&query_vector, limit * 2).await?
};
// 取得所有 chunk 並用 BM25 重新排序
let mut results_with_bm25: Vec<(SearchResult, f32)> = Vec::new();
for r in search_results {
if let Some(chunk) = pg
.get_chunk_by_chunk_id_and_uuid(&r.chunk_id, &r.uuid)
.await
.ok()
.flatten()
{
let text = extract_text_from_content(&chunk.content);
let vector_score = r.score;
results_with_bm25.push((
SearchResult {
uuid: r.uuid,
chunk_id: r.chunk_id,
chunk_type: chunk.chunk_type.as_str().to_string(),
start_time: chunk.start_time().seconds(),
end_time: chunk.end_time().seconds(),
text,
score: vector_score,
},
vector_score,
));
}
}
// 依 vector score 排序後取前 limit 個
results_with_bm25
.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
results_with_bm25
.into_iter()
.take(limit)
.map(|(r, _)| r)
.collect()
}
};
let total = results.len();
let results = dedup_search_results(results);
let page = req.page.unwrap_or(1).max(1);
let page_size = req.page_size.or(req.limit).unwrap_or(total.max(1));
let start = (page - 1) * page_size;
let paged_results: Vec<SearchResult> = results.into_iter().skip(start).take(page_size).collect();
Ok::<SearchResponse, anyhow::Error>(SearchResponse {
results: paged_results,
query: req.query.clone(),
total,
page,
page_size,
limit: req.limit.unwrap_or(10),
})
})
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(response))
}
async fn search_bm25(
State(state): State<AppState>,
Json(req): Json<SearchRequest>,
) -> Result<Json<SearchResponse>, StatusCode> {
let limit = req.limit.unwrap_or(10);
let bm25_results = state
.db
.search_bm25(&req.query, req.uuid.as_deref(), limit as i64)
.await
.map_err(|e| {
tracing::error!("BM25 search failed: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let results: Vec<SearchResult> = bm25_results
.into_iter()
.map(|r| SearchResult {
uuid: r.uuid,
chunk_id: r.chunk_id,
chunk_type: r.chunk_type,
start_time: r.start_time.unwrap_or(0.0),
end_time: r.end_time.unwrap_or(0.0),
text: r.text.unwrap_or_default(),
score: r.bm25_score as f32,
})
.collect();
let results = dedup_search_results(results);
let total = results.len();
let page = req.page.unwrap_or(1).max(1);
let page_size = req.page_size.or(req.limit).unwrap_or(total.max(1));
let start = (page - 1) * page_size;
let paged_results: Vec<SearchResult> = results.into_iter().skip(start).take(page_size).collect();
Ok(Json(SearchResponse {
results: paged_results,
query: req.query.clone(),
total,
page,
page_size,
limit: req.limit.unwrap_or(10),
}))
}
async fn search_smart(
State(state): State<AppState>,
Json(req): Json<SearchRequest>,
) -> Result<Json<SearchResponse>, StatusCode> {
let limit = req.limit.unwrap_or(10);
let query_hash = generate_query_hash(&req.query, req.uuid.as_deref(), limit);
let cache_key = keys::search(&format!("{}smart", query_hash));
let ttl = state.mongo_cache.ttl_search();
let response = state
.mongo_cache
.get_or_fetch(&cache_key, ttl, keys::CATEGORY_SEARCH, || async {
let pg = PostgresDb::init()
.await
.map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?;
let keywords = vec![req.query.clone()];
let search_terms = keywords.join(" ");
let bm25_results = pg
.search_bm25(&search_terms, req.uuid.as_deref(), limit as i64)
.await?;
let results: Vec<SearchResult> = bm25_results
.into_iter()
.map(|r| SearchResult {
uuid: r.uuid,
chunk_id: r.chunk_id,
chunk_type: r.chunk_type,
start_time: r.start_time.unwrap_or(0.0),
end_time: r.end_time.unwrap_or(0.0),
text: r.text.unwrap_or_default(),
score: r.bm25_score as f32,
})
.collect();
let total = results.len();
let results = dedup_search_results(results);
let page = req.page.unwrap_or(1).max(1);
let page_size = req.page_size.or(req.limit).unwrap_or(total.max(1));
let start = (page - 1) * page_size;
let paged_results: Vec<SearchResult> = results.into_iter().skip(start).take(page_size).collect();
Ok::<SearchResponse, anyhow::Error>(SearchResponse {
results: paged_results,
query: req.query.clone(),
total,
page,
page_size,
limit: req.limit.unwrap_or(10),
})
}) // end smart get_or_fetch
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(response))
}
async fn hybrid_search(
State(state): State<AppState>,
Json(req): Json<HybridSearchRequest>,
) -> Result<Json<HybridSearchResponse>, StatusCode> {
let limit = req.limit.unwrap_or(10);
let vector_weight = req.vector_weight.unwrap_or(0.7);
let bm25_weight = req.bm25_weight.unwrap_or(0.3);
let query_hash = generate_query_hash(&req.query, req.uuid.as_deref(), limit);
let cache_key = keys::hybrid_search(&query_hash);
let ttl = state.mongo_cache.ttl_hybrid_search();
let response = state
.mongo_cache
.get_or_fetch(&cache_key, ttl, keys::CATEGORY_HYBRID_SEARCH, || async {
let query_vector = state
.embedder
.embed_query(&req.query)
.await
.map_err(|e| anyhow::anyhow!("Embedding failed: {}", e))?;
let pg = PostgresDb::init()
.await
.map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?;
let results = pg
.hybrid_search(
&req.query,
&query_vector,
req.uuid.as_deref(),
limit,
vector_weight,
bm25_weight,
)
.await?;
let search_results: Vec<HybridSearchResult> = results
.into_iter()
.map(|r| HybridSearchResult {
uuid: r.uuid,
chunk_id: r.chunk_id,
chunk_type: r.chunk_type,
start_time: r.start_time.unwrap_or(0.0),
end_time: r.end_time.unwrap_or(0.0),
text: r.text.unwrap_or_default(),
vector_score: r.vector_score,
bm25_score: r.bm25_score,
combined_score: r.combined_score,
})
.collect();
let total = search_results.len();
let page = req.page.unwrap_or(1).max(1);
let page_size = req.page_size.or(req.limit).unwrap_or(total.max(1));
let start = (page - 1) * page_size;
let search_results = dedup_hybrid_results(search_results);
let paged: Vec<HybridSearchResult> = search_results.into_iter().skip(start).take(page_size).collect();
Ok::<HybridSearchResponse, anyhow::Error>(HybridSearchResponse {
results: paged,
query: req.query.clone(),
total,
page,
page_size,
limit: req.limit.unwrap_or(10),
})
})
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(response))
}
async fn lookup(
State(state): State<AppState>,
Query(query): Query<LookupQuery>,
) -> Result<Json<LookupResponse>, StatusCode> {
if let Some(path) = query.path {
let uuid = crate::uuid::compute_uuid_from_path(&path);
return Ok(Json(LookupResponse {
file_uuid: uuid,
file_path: None,
file_name: None,
duration: None,
}));
}
if let Some(uuid) = query.uuid {
let cache_key = keys::video_meta(&uuid);
let ttl = state.mongo_cache.ttl_video_meta();
let video = state
.mongo_cache
.get_or_fetch(&cache_key, ttl, keys::CATEGORY_VIDEO_META, || async {
let db = PostgresDb::init()
.await
.map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?;
db.get_video_by_uuid(&uuid)
.await
.map_err(|e| anyhow::anyhow!("{}", e))
})
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
if let Some(v) = video {
return Ok(Json(LookupResponse {
file_uuid: v.file_uuid,
file_path: Some(v.file_path),
file_name: Some(v.file_name),
duration: Some(v.duration),
}));
}
}
Err(StatusCode::NOT_FOUND)
}
#[derive(Debug, Serialize, Deserialize)]
struct ScannedFileInfo {
file_name: String,
relative_path: String,
file_path: String,
file_size: u64,
modified_time: String,
// Registration info
is_registered: bool,
file_uuid: Option<String>,
status: Option<String>,
registration_time: Option<String>,
job_id: Option<i32>,
}
#[derive(Debug, Serialize, Deserialize)]
struct ScanFilesResponse {
files: Vec<ScannedFileInfo>,
total: usize,
filtered_total: usize,
page: usize,
page_size: usize,
total_pages: usize,
registered_count: usize,
unregistered_count: usize,
total_chunks: i64,
searchable_chunks: i64,
pending_videos: i64,
}
fn scan_directory_recursive(
dir: &std::path::Path,
root: &std::path::Path,
allowed_extensions: &[&str],
registered_paths: &std::collections::HashMap<String, (String, String, Option<String>, Option<i32>)>,
files: &mut Vec<ScannedFileInfo>,
) {
if let Ok(entries) = std::fs::read_dir(dir) {
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
if name.starts_with('.') {
return;
}
}
scan_directory_recursive(&path, root, allowed_extensions, registered_paths, files);
} else if path.is_file() {
if let Some(ext) = path.extension().and_then(|e| e.to_str()) {
if allowed_extensions.contains(&ext.to_lowercase().as_str()) {
if let Ok(meta) = entry.metadata() {
let abs_path = path.to_string_lossy().to_string();
let rel_path = path
.strip_prefix(root)
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|_| abs_path.clone());
let file_name = path
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_default();
let modified_time = meta
.modified()
.ok()
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.map(|d| {
chrono::DateTime::from_timestamp(d.as_secs() as i64, 0)
.map(|dt| dt.to_rfc3339())
.unwrap_or_default()
})
.unwrap_or_default();
// Check registration
match registered_paths.get(&abs_path) {
Some((uuid, status, reg_time, jid)) if status != "unregistered" => {
files.push(ScannedFileInfo {
file_name,
relative_path: rel_path,
file_path: abs_path,
file_size: meta.len(),
modified_time,
is_registered: true,
file_uuid: Some(uuid.clone()),
status: Some(status.clone()),
registration_time: reg_time.clone(),
job_id: *jid,
});
}
_ => {
files.push(ScannedFileInfo {
file_name,
relative_path: rel_path,
file_path: abs_path,
file_size: meta.len(),
modified_time,
is_registered: false,
file_uuid: None,
status: Some("unregistered".to_string()),
registration_time: None,
job_id: None,
});
}
}
}
}
}
}
}
}
}
#[derive(Debug, Deserialize)]
struct ScanFilesQuery {
limit: Option<usize>,
page: Option<usize>,
page_size: Option<usize>,
pattern: Option<String>,
sort_by: Option<String>,
sort_order: Option<String>,
}
async fn scan_files(
State(state): State<AppState>,
Query(params): Query<ScanFilesQuery>,
) -> Result<Json<ScanFilesResponse>, StatusCode> {
let demo_dir_str = std::env::var("MOMENTRY_SFTP_ROOT")
.unwrap_or_else(|_| "/Users/accusys/momentry/var/sftpgo/data/demo".to_string());
let demo_dir = std::path::Path::new(&demo_dir_str);
let allowed_extensions = vec!["mp4", "mov", "mkv", "avi", "webm", "jpg", "jpeg", "png", "gif", "webp"];
// 1. Get registered files from DB (Map key: absolute file_path)
let table = schema::table_name("videos");
let mj_table = schema::table_name("monitor_jobs");
let registered_db: Vec<(String, String, String, String, Option<String>, Option<i32>)> = sqlx::query_as(&format!(
"SELECT v.file_path, v.file_name, v.file_uuid, v.status, v.registration_time::text, \
latest_job.id as job_id \
FROM {} v \
LEFT JOIN LATERAL ( \
SELECT id FROM {} WHERE uuid = v.file_uuid ORDER BY id DESC LIMIT 1 \
) latest_job ON true \
ORDER BY v.id",
table, mj_table
))
.fetch_all(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let registered_paths: std::collections::HashMap<String, (String, String, Option<String>, Option<i32>)> =
registered_db
.into_iter()
.map(|(path, _name, uuid, status, reg_time, jid)| (path, (uuid, status, reg_time, jid)))
.collect();
// 2. Scan filesystem recursively
let mut result_files = Vec::new();
if demo_dir.exists() {
scan_directory_recursive(
demo_dir,
demo_dir,
&allowed_extensions,
&registered_paths,
&mut result_files,
);
}
// 3. Sort: customizable
let desc = params.sort_order.as_deref().unwrap_or("asc") == "desc";
match params.sort_by.as_deref().unwrap_or("name") {
"size" => {
if desc { result_files.sort_by(|a, b| b.file_size.cmp(&a.file_size)); }
else { result_files.sort_by(|a, b| a.file_size.cmp(&b.file_size)); }
}
"modified" | "time" => {
if desc { result_files.sort_by(|a, b| b.modified_time.cmp(&a.modified_time)); }
else { result_files.sort_by(|a, b| a.modified_time.cmp(&b.modified_time)); }
}
"status" => {
if desc { result_files.sort_by(|a, b| b.status.cmp(&a.status).then(b.file_name.cmp(&a.file_name))); }
else { result_files.sort_by(|a, b| a.status.cmp(&b.status).then(a.file_name.cmp(&b.file_name))); }
}
_ => { // "name" (default): registered first, then by name
if desc { result_files.sort_by(|a, b| a.is_registered.cmp(&b.is_registered).then(b.file_name.cmp(&a.file_name))); }
else { result_files.sort_by(|a, b| b.is_registered.cmp(&a.is_registered).then(a.file_name.cmp(&b.file_name))); }
}
}
let total_all = result_files.len();
let registered_count = result_files.iter().filter(|f| f.is_registered).count();
let unregistered_count = result_files.iter().filter(|f| !f.is_registered).count();
// 4. Apply regex filter on filename
let filtered: Vec<ScannedFileInfo> = if let Some(ref pat) = params.pattern {
let re = match regex::Regex::new(&format!("(?i){}", pat)) {
Ok(r) => r,
Err(_) => return Err(StatusCode::BAD_REQUEST),
};
result_files.into_iter().filter(|f| re.is_match(&f.file_name)).collect()
} else {
result_files
};
let filtered_total = filtered.len();
// 5. Pagination
let page = params.page.unwrap_or(1).max(1);
let page_size = params.page_size.or(params.limit).unwrap_or(filtered_total.max(1));
let total_pages = if page_size > 0 { (filtered_total + page_size - 1) / page_size } else { 1 };
let start = (page - 1) * page_size;
let files: Vec<ScannedFileInfo> = filtered.into_iter().skip(start).take(page_size).collect();
let table_videos = schema::table_name("videos");
let table_chunks = schema::table_name("chunk");
let total_chunks: i64 = sqlx::query_scalar(&format!("SELECT COUNT(*) FROM {}", table_chunks))
.fetch_one(state.db.pool()).await.unwrap_or(0);
let searchable_chunks: i64 = sqlx::query_scalar(&format!("SELECT COUNT(*) FROM {} WHERE vector_id IS NOT NULL", table_chunks))
.fetch_one(state.db.pool()).await.unwrap_or(0);
let pending_videos: i64 = sqlx::query_scalar(&format!("SELECT COUNT(*) FROM {} WHERE status = 'pending'", table_videos))
.fetch_one(state.db.pool()).await.unwrap_or(0);
Ok(Json(ScanFilesResponse {
files,
total: total_all,
filtered_total,
page,
page_size,
total_pages,
registered_count,
unregistered_count,
total_chunks,
searchable_chunks,
pending_videos,
}))
}
#[derive(Debug, Serialize)]
struct ProgressResponse {
file_uuid: String,
user: Option<String>,
group: Option<String>,
file_name: Option<String>,
duration: Option<f64>,
overall_progress: u32,
cpu_percent: Option<f64>,
gpu_percent: Option<f64>,
memory_percent: Option<f64>,
memory_mb: Option<u64>,
system: Option<SystemHealthInfo>,
processors: Vec<ProcessorProgressInfo>,
}
#[derive(Debug, Serialize)]
struct SystemHealthInfo {
cpu_idle_pct: f64,
memory_available_mb: u64,
memory_total_mb: u64,
memory_used_pct: f64,
gpu_available: bool,
gpu_utilization_pct: Option<f64>,
gpu_memory_used_pct: Option<f64>,
dynamic_concurrency: u32,
config_concurrency: u32,
running_processors: u32,
}
#[derive(Debug, Serialize)]
struct ProcessorProgressInfo {
name: String,
status: String,
current: u32,
total: u32,
progress: u32,
message: String,
frames_processed: i32,
chunks_produced: i32,
retry_count: i32,
eta_seconds: Option<i64>,
}
/// 從 .json 輸出檔讀取 processor 的已處理幀數
fn get_json_frame_count(file_uuid: &str, processor: &str) -> i32 {
let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string());
let path = std::path::Path::new(&output_dir).join(format!("{}.{}.json", file_uuid, processor));
match std::fs::read_to_string(&path) {
Ok(content) => match serde_json::from_str::<serde_json::Value>(&content) {
Ok(val) => val.get("frame_count").and_then(|v| v.as_i64()).unwrap_or(0) as i32,
Err(_) => 0,
},
Err(_) => 0,
}
}
fn get_system_stats() -> (Option<f64>, Option<f64>, Option<f64>, Option<u64>) {
use std::process::Command;
let pid = std::process::id().to_string();
let cpu = Command::new("ps")
.args(["-p", &pid, "-o", "%cpu="])
.output()
.ok()
.and_then(|o| String::from_utf8_lossy(&o.stdout).trim().parse().ok());
let (mem_percent, mem_rss) = Command::new("ps")
.args(["-p", &pid, "-o", "%mem=,rss="])
.output()
.ok()
.map(|o| {
let output = String::from_utf8_lossy(&o.stdout);
let parts: Vec<&str> = output.split_whitespace().collect();
let percent = parts.first().and_then(|s| s.parse().ok());
let rss = parts.get(1).and_then(|s| s.parse().ok());
(percent, rss)
})
.unwrap_or((None, None));
let gpu = Command::new("nvidia-smi")
.args([
"--query-gpu=utilization.gpu",
"--format=csv,noheader,nounits",
])
.output()
.ok()
.and_then(|o| String::from_utf8_lossy(&o.stdout).trim().parse().ok());
(cpu, gpu, mem_percent, mem_rss)
}
async fn get_progress(
axum::extract::Path(file_uuid): axum::extract::Path<String>,
) -> Result<Json<ProgressResponse>, StatusCode> {
let redis = RedisClient::new().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let pg = PostgresDb::init()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let mut conn = redis
.get_conn()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let processor_names = ["asr", "cut", "asrx", "yolo", "ocr", "face", "pose", "visual_chunk", "story"];
let mut processors = Vec::new();
let mut completed_count = 0u32;
for name in processor_names {
let prefix = REDIS_KEY_PREFIX.as_str();
let key = format!("{}job:{}:processor:{}", prefix, file_uuid, name);
let status: String = redis::cmd("HGET")
.arg(&key)
.arg("status")
.query_async(&mut conn)
.await
.unwrap_or_else(|_| "pending".to_string());
let current: u32 = redis::cmd("HGET")
.arg(&key)
.arg("current")
.query_async(&mut conn)
.await
.unwrap_or_else(|_| "0".to_string())
.parse()
.unwrap_or(0);
let total: u32 = redis::cmd("HGET")
.arg(&key)
.arg("total")
.query_async(&mut conn)
.await
.unwrap_or_else(|_| "0".to_string())
.parse()
.unwrap_or(0);
let message: String = redis::cmd("HGET")
.arg(&key)
.arg("message")
.query_async(&mut conn)
.await
.unwrap_or_else(|_| "".to_string());
let progress = if total > 0 {
((current as f64 / total as f64) * 100.0) as u32
} else if status == "complete" {
100
} else {
0
};
// 從 Redis 讀取 frames_processed / chunks_produced / retry_count
let frames_processed: i32 = redis::cmd("HGET")
.arg(&key)
.arg("frames_processed")
.query_async(&mut conn)
.await
.unwrap_or_else(|_| "0".to_string())
.parse()
.unwrap_or(0);
let chunks_produced: i32 = redis::cmd("HGET")
.arg(&key)
.arg("chunks_produced")
.query_async(&mut conn)
.await
.unwrap_or_else(|_| "0".to_string())
.parse()
.unwrap_or(0);
let retry_count: i32 = redis::cmd("HGET")
.arg(&key)
.arg("retry_count")
.query_async(&mut conn)
.await
.unwrap_or_else(|_| "0".to_string())
.parse()
.unwrap_or(0);
let eta_seconds = if status == "running" && current > 0 && total > 0 && current < total {
let started_str: String = redis::cmd("HGET")
.arg(&key)
.arg("started_at")
.query_async(&mut conn)
.await
.unwrap_or_else(|_| String::new());
if !started_str.is_empty() {
if let Ok(started_at) = chrono::DateTime::parse_from_rfc3339(&started_str) {
let elapsed = chrono::Utc::now().signed_duration_since(started_at).num_seconds().max(1);
let estimated_total = (elapsed as f64 * total as f64 / current as f64) as i64;
Some((estimated_total - elapsed).max(0))
} else { None }
} else { None }
} else { None };
if status == "complete" {
completed_count += 1;
}
processors.push(ProcessorProgressInfo {
name: name.to_string(),
status,
current,
total,
progress,
message,
frames_processed,
chunks_produced,
retry_count,
eta_seconds,
});
}
// Supplement with actual processor_results from DB (overrides stale Redis data)
let pr_table = schema::table_name("processor_results");
let vt = schema::table_name("videos");
let total_frames: i64 = sqlx::query_scalar(&format!("SELECT COALESCE(total_frames, 0) FROM {} WHERE file_uuid = $1", vt))
.bind(&file_uuid).fetch_one(pg.pool()).await.unwrap_or(0);
if let Ok(rows) = sqlx::query_as::<_, (String, String, i32, i32)>(
&format!(
"SELECT pr.status, pr.processor_type, COALESCE(pr.frames_processed, 0), COALESCE(pr.chunks_produced, 0) \
FROM {} pr JOIN {} mj ON pr.job_id = mj.id \
WHERE mj.uuid = $1 ORDER BY pr.id",
pr_table, schema::table_name("monitor_jobs")
)
)
.bind(&file_uuid)
.fetch_all(pg.pool())
.await
{
completed_count = 0;
for (db_status, ptype, frames, chunks) in &rows {
for p in &mut processors {
if p.name == ptype.to_lowercase() {
p.status = db_status.clone();
p.frames_processed = *frames;
p.chunks_produced = *chunks;
if *db_status == "completed" && p.current == 0 {
p.progress = 100;
}
}
}
}
completed_count = processors.iter().filter(|p| p.status == "completed").count() as u32;
}
let overall_progress = (completed_count as f64 / processor_names.len() as f64 * 100.0) as u32;
let job_key = format!("{}job:{}", REDIS_KEY_PREFIX.as_str(), file_uuid);
let user: Option<String> = redis::cmd("HGET")
.arg(&job_key)
.arg("user")
.query_async(&mut conn)
.await
.ok()
.filter(|s: &String| !s.is_empty());
let group: Option<String> = redis::cmd("HGET")
.arg(&job_key)
.arg("group")
.query_async(&mut conn)
.await
.ok()
.filter(|s: &String| !s.is_empty());
let (file_name, duration) = pg
.get_video_by_uuid(&file_uuid)
.await
.ok()
.flatten()
.map(|v| (Some(v.file_name), Some(v.duration)))
.unwrap_or((None, None));
let (cpu_percent, gpu_percent, memory_percent, memory_mb) = get_system_stats();
// 從 Redis 讀取系統健康資訊(由 Worker 寫入)
let health_key = format!("{}health", REDIS_KEY_PREFIX.as_str());
let cpu_idle: Option<String> = redis::cmd("HGET")
.arg(&health_key)
.arg("cpu_idle_pct")
.query_async(&mut conn)
.await
.ok();
let mem_avail: Option<String> = redis::cmd("HGET")
.arg(&health_key)
.arg("memory_available_mb")
.query_async(&mut conn)
.await
.ok();
let mem_total: Option<String> = redis::cmd("HGET")
.arg(&health_key)
.arg("memory_total_mb")
.query_async(&mut conn)
.await
.ok();
let mem_used: Option<String> = redis::cmd("HGET")
.arg(&health_key)
.arg("memory_used_pct")
.query_async(&mut conn)
.await
.ok();
let gpu_avail: Option<String> = redis::cmd("HGET")
.arg(&health_key)
.arg("gpu_available")
.query_async(&mut conn)
.await
.ok();
let gpu_util: Option<String> = redis::cmd("HGET")
.arg(&health_key)
.arg("gpu_utilization_pct")
.query_async(&mut conn)
.await
.ok();
let gpu_mem: Option<String> = redis::cmd("HGET")
.arg(&health_key)
.arg("gpu_memory_used_pct")
.query_async(&mut conn)
.await
.ok();
let dyn_conc: Option<String> = redis::cmd("HGET")
.arg(&health_key)
.arg("dynamic_concurrency")
.query_async(&mut conn)
.await
.ok();
let cfg_conc: Option<String> = redis::cmd("HGET")
.arg(&health_key)
.arg("config_concurrency")
.query_async(&mut conn)
.await
.ok();
let run_proc: Option<String> = redis::cmd("HGET")
.arg(&health_key)
.arg("running_processors")
.query_async(&mut conn)
.await
.ok();
let system = cpu_idle.and_then(|idle| {
Some(SystemHealthInfo {
cpu_idle_pct: idle.parse().ok()?,
memory_available_mb: mem_avail?.parse().ok()?,
memory_total_mb: mem_total?.parse().ok()?,
memory_used_pct: mem_used?.parse().ok()?,
gpu_available: gpu_avail.map(|v| v == "true").unwrap_or(false),
gpu_utilization_pct: gpu_util.and_then(|v| v.parse().ok()),
gpu_memory_used_pct: gpu_mem.and_then(|v| v.parse().ok()),
dynamic_concurrency: dyn_conc?.parse().ok()?,
config_concurrency: cfg_conc?.parse().ok()?,
running_processors: run_proc?.parse().ok()?,
})
});
Ok(Json(ProgressResponse {
file_uuid,
user,
group,
file_name,
duration,
overall_progress,
cpu_percent,
gpu_percent,
memory_percent,
memory_mb,
system,
processors,
}))
}
async fn list_jobs(Query(params): Query<JobsQuery>) -> Result<Json<JobListResponse>, StatusCode> {
let page = params.page.unwrap_or(1);
let page_size = params.page_size.unwrap_or(20);
let status_filter = params
.status
.unwrap_or_else(|| "pending,running".to_string());
let offset = ((page - 1) as i64) * (page_size as i64);
let pg = PostgresDb::init()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let table = crate::core::db::schema::table_name("monitor_jobs");
// Build status IN clause
let statuses: Vec<String> = status_filter
.split(',')
.map(|s| format!("'{}'", s.trim()))
.collect();
let status_clause = statuses.join(",");
let query = format!(
"SELECT id, uuid, video_path, status, current_processor, progress_total, progress_current,
error_count, last_error, started_at::TEXT, updated_at::TEXT, created_at::TEXT,
processors, completed_processors, failed_processors, video_id
FROM {}
WHERE status IN ({})
ORDER BY created_at DESC
LIMIT {} OFFSET {}",
table, status_clause, page_size, offset
);
let count_query = format!(
"SELECT COUNT(*) FROM {} WHERE status IN ({})",
table, status_clause
);
let total_count: i64 = sqlx::query_scalar(&count_query)
.fetch_one(pg.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
use crate::core::db::MonitorJobStatus;
let rows = sqlx::query(&query)
.fetch_all(pg.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let job_infos: Vec<JobInfoResponse> = rows
.into_iter()
.map(|r| {
let status_str: String = r.try_get("status").unwrap_or_default();
let status =
MonitorJobStatus::from_db_str(&status_str).unwrap_or(MonitorJobStatus::Pending);
JobInfoResponse {
id: r.try_get("id").unwrap_or(0),
uuid: r.try_get("uuid").unwrap_or_default(),
status: status.as_str().to_string(),
current_processor: r.try_get("current_processor").ok(),
progress_current: r.try_get("progress_current").unwrap_or(0),
progress_total: r.try_get("progress_total").unwrap_or(0),
created_at: r.try_get::<String, _>("created_at").unwrap_or_default(),
started_at: r.try_get::<String, _>("started_at").ok(),
}
})
.collect();
Ok(Json(JobListResponse {
jobs: job_infos,
count: total_count,
page,
page_size,
}))
}
async fn get_job(
axum::extract::Path(uuid): axum::extract::Path<String>,
) -> Result<Json<JobDetailResponse>, StatusCode> {
tracing::info!("[get_job] START - uuid: {}", uuid);
let pg = PostgresDb::init().await.map_err(|e| {
tracing::error!("[get_job] ERROR - Failed to init PostgresDb: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
tracing::info!("[get_job] PostgresDb initialized");
let job = pg
.get_monitor_job_by_uuid(&uuid)
.await
.map_err(|e| {
tracing::error!("[get_job] ERROR - Failed to get monitor job: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or_else(|| {
tracing::warn!("[get_job] Job not found: {}", uuid);
StatusCode::NOT_FOUND
})?;
tracing::info!("[get_job] Found job: id={}, uuid={}", job.id, job.uuid);
let results = pg.get_processor_results_by_job(job.id).await.map_err(|e| {
tracing::error!("[get_job] ERROR - Failed to get processor results: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
tracing::info!("[get_job] Got {} processor results", results.len());
let processors: Vec<ProcessorInfoResponse> = results
.into_iter()
.map(|r| ProcessorInfoResponse {
processor_type: r.processor_type.as_str().to_string(),
status: r.status.as_str().to_string(),
started_at: r.started_at,
completed_at: r.completed_at,
duration_secs: r.duration_secs,
error_message: r.error_message,
})
.collect();
tracing::info!("[get_job] Mapped {} processors", processors.len());
let response = JobDetailResponse {
id: job.id,
uuid: job.uuid,
status: job.status.as_str().to_string(),
current_processor: job.current_processor,
progress_current: job.progress_current,
progress_total: job.progress_total,
processors,
created_at: job.created_at.to_string(),
started_at: job.started_at.map(|t| t.to_string()),
updated_at: job.updated_at.map(|t| t.to_string()),
};
tracing::info!("[get_job] SUCCESS - returning response");
Ok(Json(response))
}
async fn cache_toggle(
State(_state): State<AppState>,
Json(req): Json<CacheToggleRequest>,
) -> Result<Json<CacheToggleResponse>, StatusCode> {
tracing::info!("[cache_toggle] Setting cache enabled to: {}", req.enabled);
crate::core::config::set_cache_enabled(req.enabled);
let response = CacheToggleResponse {
success: true,
cache_enabled: req.enabled,
message: if req.enabled {
"Cache enabled".to_string()
} else {
"Cache disabled".to_string()
},
};
tracing::info!("[cache_toggle] SUCCESS");
Ok(Json(response))
}
async fn auto_pipeline_toggle(
Json(req): Json<AutoPipelineToggleRequest>,
) -> Result<Json<AutoPipelineToggleResponse>, StatusCode> {
tracing::info!("[auto_pipeline_toggle] Setting to: {}", req.enabled);
crate::core::config::set_auto_pipeline_enabled(req.enabled);
Ok(Json(AutoPipelineToggleResponse {
success: true,
auto_pipeline_enabled: req.enabled,
message: format!("Auto-pipeline {}", if req.enabled { "enabled" } else { "disabled" }),
}))
}
async fn watcher_auto_register_toggle(
Json(req): Json<WatcherAutoRegisterToggleRequest>,
) -> Result<Json<WatcherAutoRegisterToggleResponse>, StatusCode> {
tracing::info!("[watcher_auto_register_toggle] Setting to: {}", req.enabled);
crate::core::config::set_watcher_auto_register(req.enabled);
Ok(Json(WatcherAutoRegisterToggleResponse {
success: true,
watcher_auto_register_enabled: req.enabled,
message: format!("Watcher auto-register {}", if req.enabled { "enabled" } else { "disabled" }),
}))
}
#[derive(Debug, Serialize)]
struct UnregisterResponse {
success: bool,
file_uuid: String,
message: String,
}
#[derive(Debug, Deserialize)]
struct UnregisterRequest {
file_uuid: Option<String>,
file_path: Option<String>,
pattern: Option<String>,
/// If true (default), delete processor output JSON ({uuid}.*.json) from disk
delete_output_files: Option<bool>,
}
fn delete_output_files(uuid: &str) {
let output_dir = std::path::PathBuf::from(&*crate::core::config::OUTPUT_DIR);
if let Ok(entries) = std::fs::read_dir(&output_dir) {
for entry in entries.flatten() {
let name = entry.file_name().to_string_lossy().to_string();
if name.starts_with(uuid) && name.ends_with(".json") {
std::fs::remove_file(entry.path()).ok();
}
}
}
}
async fn unregister(
State(state): State<AppState>,
Json(req): Json<UnregisterRequest>,
) -> Result<Json<UnregisterResponse>, StatusCode> {
let db = &state.db;
let clean_files = req.delete_output_files.unwrap_or(true);
// Pattern mode: unregister all matching files in a directory
if let (Some(ref dir_path), Some(ref pat)) = (&req.file_path, &req.pattern) {
let dir = std::path::Path::new(dir_path);
if !dir.is_dir() {
return Ok(Json(UnregisterResponse {
success: false,
file_uuid: String::new(),
message: format!("Not a directory: {}", dir_path),
}));
}
let re = regex::Regex::new(pat).map_err(|_| StatusCode::BAD_REQUEST)?;
let mut count = 0u32;
let table = crate::core::db::schema::table_name("videos");
if let Ok(entries) = std::fs::read_dir(dir) {
for entry in entries.flatten() {
let fname = entry.file_name().to_string_lossy().to_string();
if !re.is_match(&fname) {
continue;
}
// Find file_uuid by file_name and delete
let rows: Vec<(String,)> = sqlx::query_as(&format!(
"SELECT file_uuid FROM {} WHERE file_name = $1",
table
))
.bind(&fname)
.fetch_all(db.pool())
.await
.unwrap_or_default();
for (uuid,) in rows {
let _ = db.delete_video(&uuid).await;
if clean_files {
delete_output_files(&uuid);
}
count += 1;
}
}
}
let _ = state.mongo_cache.invalidate_videos_list().await;
return Ok(Json(UnregisterResponse {
success: true,
file_uuid: format!("batch_{}", count),
message: format!("Unregistered {} files matching pattern", count),
}));
}
// Single UUID mode
let uuid = req.file_uuid.as_deref().unwrap_or("");
if uuid.is_empty() {
return Ok(Json(UnregisterResponse {
success: false,
file_uuid: String::new(),
message: "Either file_uuid or file_path+pattern is required".to_string(),
}));
}
tracing::info!("[unregister] Unregistering file: {}", uuid);
match db.delete_video(uuid).await {
Ok(_) => {
let _ = state.mongo_cache.invalidate_videos_list().await;
if clean_files {
delete_output_files(uuid);
}
Ok(Json(UnregisterResponse {
success: true,
file_uuid: uuid.to_string(),
message: if clean_files {
"File unregistered (DB + output files deleted)".to_string()
} else {
"File unregistered (DB deleted, output files kept)".to_string()
},
}))
}
Err(e) => {
tracing::error!("[unregister] Failed: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
/// Serve documentation HTML pages with cookie-based auth.
async fn doc_redirect() -> axum::response::Redirect {
axum::response::Redirect::to("/doc-wasm")
}
async fn wasm_doc_handler() -> Result<impl axum::response::IntoResponse, (StatusCode, &'static str)> {
let path = std::path::Path::new("/Users/accusys/momentry_core_0.1/docs_v1.0/doc_wasm/index.html");
match tokio::fs::read_to_string(path).await {
Ok(html) => Ok(([("content-type", "text/html; charset=utf-8")], html)),
Err(_) => Err((StatusCode::NOT_FOUND, "Doc not found")),
}
}
async fn wasm_doc_file_handler(
Path(file): Path<String>,
) -> Result<impl axum::response::IntoResponse, (StatusCode, &'static str)> {
if file.contains("..") || file.contains("//") {
return Err((StatusCode::NOT_FOUND, "Invalid path"));
}
let base = std::path::Path::new("/Users/accusys/momentry_core_0.1/docs_v1.0/doc_wasm");
let path = base.join(&file);
if !path.exists() || !path.starts_with(base) {
return Err((StatusCode::NOT_FOUND, "File not found"));
}
let data = tokio::fs::read(&path).await.map_err(|_| (StatusCode::NOT_FOUND, "Read error"))?;
let mime = if file.ends_with(".wasm") {
"application/wasm"
} else if file.ends_with(".js") {
"application/javascript"
} else if file.ends_with(".md") {
"text/markdown; charset=utf-8"
} else if file.ends_with(".css") {
"text/css"
} else {
"application/octet-stream"
};
Ok(([("content-type", mime)], data))
}
async fn doc_handler(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
) -> Result<impl axum::response::IntoResponse, (StatusCode, &'static str)> {
serve_doc(&state, &headers, None).await
}
async fn dev_doc_handler(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
) -> Result<impl axum::response::IntoResponse, (StatusCode, &'static str)> {
serve_doc(&state, &headers, Some("dev")).await
}
#[allow(unused)]
async fn doc_file_handler(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Path(file): Path<String>,
) -> Result<impl axum::response::IntoResponse, (StatusCode, &'static str)> {
serve_doc_file(&state, &headers, None, &file).await
}
async fn serve_doc(
state: &AppState,
headers: &axum::http::HeaderMap,
mode: Option<&str>,
) -> Result<impl axum::response::IntoResponse, (StatusCode, &'static str)> {
let authorized = check_doc_auth(state, headers).await;
let project_root = std::path::Path::new("/Users/accusys/momentry_core_0.1");
let base_dir = match mode {
Some("dev") => project_root.join("docs_v1.0").join("doc_developer"),
_ => project_root.join("docs_v1.0").join("doc"),
};
if !authorized {
let login_html = tokio::fs::read_to_string(&base_dir.join("login.html")).await
.unwrap_or_else(|_| "<html><body><h1>Login</h1><p>Please login at /api/v1/auth/login</p></body></html>".to_string());
return Ok((
[("content-type", "text/html; charset=utf-8")],
login_html,
));
}
let index_html = tokio::fs::read_to_string(&base_dir.join("index.html")).await
.unwrap_or_else(|_| "<html><body><h1>Docs not found</h1></body></html>".to_string());
Ok((
[("content-type", "text/html; charset=utf-8")],
index_html,
))
}
async fn serve_doc_file(
state: &AppState,
headers: &axum::http::HeaderMap,
mode: Option<&str>,
file: &str,
) -> Result<impl axum::response::IntoResponse, (StatusCode, &'static str)> {
let authorized = check_doc_auth(state, headers).await;
let project_root = std::path::Path::new("/Users/accusys/momentry_core_0.1");
let base_dir = match mode {
Some("dev") => project_root.join("docs_v1.0").join("doc_developer"),
_ => project_root.join("docs_v1.0").join("doc"),
};
if !authorized {
let login_html = tokio::fs::read_to_string(&base_dir.join("login.html")).await
.unwrap_or_else(|_| "<html><body><h1>Login</h1></body></html>".to_string());
return Ok((
[("content-type", "text/html; charset=utf-8")],
login_html,
));
}
// Sanitize: only allow .html files, no path traversal
if file.contains('/') || file.contains("..") || !file.ends_with(".html") {
return Ok((
[("content-type", "text/html; charset=utf-8")],
"<html><body><h1>Not found</h1></body></html>".to_string(),
));
}
let html = tokio::fs::read_to_string(&base_dir.join(file)).await
.unwrap_or_else(|_| "<html><body><h1>Page not found</h1></body></html>".to_string());
Ok((
[("content-type", "text/html; charset=utf-8")],
html,
))
}
async fn check_doc_auth(state: &AppState, headers: &axum::http::HeaderMap) -> bool {
use crate::api::middleware::extract_cookies;
let cookies = extract_cookies(headers);
let sid = cookies.iter().find(|(k, _)| k == "session_id").map(|(_, v)| v.clone());
if let Some(ref session_id) = sid {
let table = crate::core::db::schema::table_name("sessions");
sqlx::query_scalar::<_, i32>(
&format!("SELECT 1 FROM {} WHERE session_id = $1 AND expires_at > NOW()", table)
)
.bind(session_id)
.fetch_optional(state.db.pool())
.await
.map(|r| r.is_some())
.unwrap_or(false)
} else { false }
}
pub async fn start_server(host: &str, port: u16) -> anyhow::Result<()> {
let _ = SERVER_START.set(Instant::now());
// Resolve actual IP address for health identification
let resolved_ip = if host == "0.0.0.0" {
// Try to find a non-loopback IP
if let Ok(addrs) = std::net::ToSocketAddrs::to_socket_addrs(&"localhost:0") {
if let Some(addr) = addrs.filter_map(|a| match a {
std::net::SocketAddr::V4(v4) if !v4.ip().is_loopback() => Some(v4.ip().to_string()),
_ => None,
}).next() {
addr
} else {
// Fallback: try getting IP from UDP socket
std::net::UdpSocket::bind("0.0.0.0:0")
.and_then(|s| s.connect("8.8.8.8:53").map(|_| s))
.and_then(|s| s.local_addr())
.map(|a| a.ip().to_string())
.unwrap_or_else(|_| "0.0.0.0".to_string())
}
} else {
host.to_string()
}
} else {
host.to_string()
};
let _ = SERVER_HOST.set(resolved_ip);
let _ = SERVER_PORT.set(port);
let embedder = std::sync::Arc::new(Embedder::new("nomic-embed-text-v2-moe:latest".to_string()));
let mongo_cache = MongoCache::init().await?;
let redis_cache = RedisCache::new()?;
let db = PostgresDb::init().await?;
// ── Schema migration startup check ──
let schema_health = check_schema_migrations(db.pool()).await;
if schema_health.ok {
tracing::info!(
"[SCHEMA] All {}/{} required migrations applied ✓",
schema_health.required.len(),
schema_health.required.len()
);
} else if !schema_health.table_exists {
tracing::warn!(
"[SCHEMA] schema_migrations table not found! Run: psql -U accusys -d momentry -f release/migrate_add_schema_version.sql"
);
} else {
let missing: Vec<&str> = schema_health
.required
.iter()
.filter(|req| {
!schema_health
.applied
.iter()
.any(|app| app.filename == req.filename && app.checksum == req.checksum)
})
.map(|m| m.filename.as_str())
.collect();
tracing::warn!(
"[SCHEMA] {}/{} migrations match. Missing/corrupt: {}",
schema_health.applied.len(),
schema_health.required.len(),
missing.join(", ")
);
}
let db = std::sync::Arc::new(db);
let api_state = super::middleware::ApiState { db: db.clone() };
let state = AppState {
db,
embedder,
embedder_model: "nomic-embed-text-v2-moe:latest".to_string(),
mongo_cache,
redis_cache,
api_state,
};
let protected_routes = Router::new()
.route("/api/v1/files/register", post(register_file))
.route("/api/v1/files/lookup", get(lookup_file_by_name))
.route("/api/v1/unregister", post(unregister))
.route("/api/v1/files/scan", get(scan_files))
.route("/api/v1/file/:file_uuid/probe", get(probe_by_uuid))
.route("/api/v1/file/:file_uuid/json/:processor", get(download_json))
.route("/api/v1/file/:file_uuid/process", post(trigger_processing))
.route("/api/v1/file/:file_uuid/chunk/:chunk_id", get(get_chunk_by_path))
.route("/api/v1/progress/:file_uuid", get(get_progress))
.route("/api/v1/jobs", get(list_jobs))
.route("/api/v1/config/cache", post(cache_toggle))
.route("/api/v1/config/auto-pipeline", post(auto_pipeline_toggle))
.route("/api/v1/config/watcher-auto-register", post(watcher_auto_register_toggle))
// .merge(person_identity::person_identity_routes()) // V4.0: DISABLED (person_identities table removed)
.merge(identity_binding::identity_binding_routes())
.merge(identities::identity_routes())
.merge(tmdb_api::tmdb_routes())
.merge(identity_api::identity_routes()) // Phase 3 Routes
.merge(agent_api::agent_routes()) // Phase 6 Routes
.merge(super::identity_agent_api::identity_agent_routes()) // Phase 5 Routes
.merge(five_w1h_agent_api::five_w1h_agent_routes()) // Phase 3 Routes (5W1H Agent)
.merge(super::media_api::bbox_routes()) // Media: video/bbox/thumbnail
.merge(super::trace_agent_api::trace_agent_routes()) // Trace listing
.merge(search_routes()) // Smart search drill-down
.merge(universal_search_routes()) // Universal / frames / persons search
.route("/health/detailed", get(health_detailed))
.route("/health/consistency", get(health_consistency))
.layer(axum::middleware::from_fn_with_state(
state.api_state.clone(),
unified_auth,
))
.with_state(state.clone());
let cors = CorsLayer::new()
.allow_origin(tower_http::cors::AllowOrigin::any())
.allow_methods(Any)
.allow_headers(Any);
let app = Router::new()
.route("/health", get(health))
.route("/doc", get(doc_redirect))
.route("/doc/*file", get(doc_redirect))
.route("/dev-doc", get(doc_redirect))
.route("/doc-wasm", get(wasm_doc_handler))
.route("/doc-wasm/*file", get(wasm_doc_file_handler))
.route("/api/v1/auth/login", post(login))
.route("/api/v1/auth/logout", post(logout))
.route("/api/v1/stats/sftpgo", get(get_sftpgo_status))
.route("/api/v1/stats/ingestion-status/:file_uuid", get(get_ingestion_status))
.route("/api/v1/search/visual", post(search_visual_chunks))
.route(
"/api/v1/search/visual/class",
post(search_visual_chunks_by_class),
)
.route(
"/api/v1/search/visual/density",
post(search_visual_chunks_by_density),
)
.route("/api/v1/search/visual/stats", post(get_visual_chunk_stats))
.route(
"/api/v1/search/visual/combination",
post(search_visual_chunks_by_combination),
)
.merge(protected_routes)
.layer(cors)
.with_state(state);
let addr: std::net::SocketAddr = format!("{}:{}", host, port).parse().unwrap();
tracing::info!("Starting API server at http://{}", addr);
let listener = tokio::net::TcpListener::bind(addr).await?;
axum::serve(listener, app).await?;
Ok(())
}
#[derive(Debug, Serialize)]
struct SftpgoStatusResponse {
username: String,
home_dir: String,
files_count: i64,
registered_videos: Vec<RegisteredVideo>,
last_login: Option<String>,
}
#[derive(Debug, Serialize)]
struct RegisteredVideo {
uuid: String,
file_name: String,
status: String,
}
async fn get_sftpgo_status(
State(state): State<AppState>,
) -> Result<Json<SftpgoStatusResponse>, StatusCode> {
let demo_dir = "/Users/accusys/momentry/var/sftpgo/data/demo";
let files_count: i64 = std::fs::read_dir(demo_dir)
.map(|entries| entries.count() as i64)
.unwrap_or(0);
let table_videos = schema::table_name("videos");
let registered_videos: Vec<(String, String, String)> = sqlx::query_as(&format!(
"SELECT file_uuid, file_name, status FROM {} WHERE file_path LIKE '%demo%' ORDER BY id",
table_videos
))
.fetch_all(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let registered_videos = registered_videos
.into_iter()
.map(|(uuid, file_name, status)| RegisteredVideo {
uuid,
file_name,
status,
})
.collect();
Ok(Json(SftpgoStatusResponse {
username: "demo".to_string(),
home_dir: demo_dir.to_string(),
files_count,
registered_videos,
last_login: None,
}))
}
#[derive(Debug, Serialize)]
struct IngestionStep {
name: String,
status: String,
detail: Option<String>,
}
#[derive(Debug, Serialize)]
struct IdentityRef {
uuid: String,
name: String,
}
#[derive(Debug, Serialize)]
struct IngestionStatusResponse {
file_uuid: String,
steps: Vec<IngestionStep>,
related_identities: Vec<IdentityRef>,
strangers: i64,
}
async fn get_ingestion_status(
State(state): State<AppState>,
Path(file_uuid): Path<String>,
) -> Result<Json<IngestionStatusResponse>, StatusCode> {
let pool = state.db.pool();
let chunk = schema::table_name("chunk");
let fd = schema::table_name("face_detections");
let identities = schema::table_name("identities");
let scene_meta_path = format!("{}/{}.scene_meta.json",
crate::core::config::OUTPUT_DIR.as_str(),
file_uuid);
let scene_meta_ok = std::path::Path::new(&scene_meta_path).exists();
macro_rules! count_sql {
($sql:expr) => {
sqlx::query_scalar::<_, i64>($sql)
.fetch_one(pool)
.await
.unwrap_or(0)
};
}
let sentence_count = count_sql!(&format!("SELECT COUNT(*) FROM {chunk} WHERE file_uuid = '{file_uuid}' AND chunk_type = 'sentence'"));
let sentence_embedded = count_sql!(&format!("SELECT COUNT(*) FROM {chunk} WHERE file_uuid = '{file_uuid}' AND chunk_type = 'sentence' AND embedding IS NOT NULL"));
let scene_count = count_sql!(&format!("SELECT COUNT(*) FROM {chunk} WHERE file_uuid = '{file_uuid}' AND chunk_type = 'cut'"));
let face_total = count_sql!(&format!("SELECT COUNT(*) FROM {fd} WHERE file_uuid = '{file_uuid}'"));
let trace_count = count_sql!(&format!("SELECT COUNT(DISTINCT trace_id) FROM {fd} WHERE file_uuid = '{file_uuid}' AND trace_id IS NOT NULL"));
let trace_chunks = count_sql!(&format!("SELECT COUNT(*) FROM {chunk} WHERE file_uuid = '{file_uuid}' AND chunk_type = 'trace'"));
let identity_count = count_sql!(&format!("SELECT COUNT(DISTINCT identity_id) FROM {fd} WHERE file_uuid = '{file_uuid}' AND identity_id IS NOT NULL"));
let tkg_nodes = count_sql!(&format!("SELECT COUNT(*) FROM {} WHERE file_uuid = '{file_uuid}'", schema::table_name("tkg_nodes")));
let tkg_edges = count_sql!(&format!("SELECT COUNT(*) FROM {} WHERE file_uuid = '{file_uuid}'", schema::table_name("tkg_edges")));
let scene_5w1h = count_sql!(&format!("SELECT COUNT(*) FROM {chunk} WHERE file_uuid = '{file_uuid}' AND chunk_type = 'cut' AND summary_text IS NOT NULL AND summary_text != ''"));
let related_identities: Vec<IdentityRef> = match sqlx::query_as::<_, (String, String)>(&format!(
"SELECT DISTINCT i.uuid::text, i.name FROM {identities} i \
JOIN {fd} fd ON fd.identity_id = i.id \
WHERE fd.file_uuid = '{file_uuid}' AND fd.identity_id IS NOT NULL \
ORDER BY i.name"
)).fetch_all(pool).await {
Ok(rows) => rows.into_iter().map(|(uuid, name)| {
IdentityRef { uuid: uuid.replace('-', ""), name }
}).collect(),
Err(e) => {
tracing::error!("related_identities query failed: {}", e);
vec![]
}
};
let strangers = count_sql!(&format!(
"SELECT COUNT(DISTINCT trace_id) FROM {fd} \
WHERE file_uuid = '{file_uuid}' AND trace_id IS NOT NULL AND identity_id IS NULL"
));
macro_rules! step {
($name:expr, $done:expr, $detail:expr) => {
IngestionStep {
name: $name.into(),
status: if $done { "done" } else { "pending" }.into(),
detail: $detail,
}
};
}
let steps = vec![
step!("rule1_sentence", sentence_count > 0, Some(format!("{sentence_count} sentence chunks"))),
step!("auto_vectorize", sentence_embedded > 0, Some(format!("{sentence_embedded} embedded"))),
step!("rule3_scene", scene_count > 0, Some(format!("{scene_count} scene chunks"))),
step!("face_trace", trace_count > 0, Some(format!("{trace_count} traces / {face_total} detections"))),
step!("trace_chunks", trace_chunks > 0, Some(format!("{trace_chunks} trace chunks"))),
step!("tkg", tkg_nodes > 0 || tkg_edges > 0, Some(format!("{tkg_nodes} nodes, {tkg_edges} edges"))),
step!("identity_match", identity_count > 0, Some(format!("{identity_count} identities matched"))),
step!("scene_metadata", scene_meta_ok, None),
step!("5w1h", scene_5w1h > 0, Some(format!("{scene_5w1h} scenes with 5W1H"))),
];
Ok(Json(IngestionStatusResponse { file_uuid, steps, related_identities, strangers }))
}
#[derive(Debug, Deserialize)]
struct VideoDetailsQuery {
chunk_id: Option<String>,
parent_id: Option<String>,
}
#[derive(Debug, Serialize)]
struct VideoDetailsResponse {
uuid: String,
#[serde(flatten)]
details: VideoDetailsResult,
}
#[derive(Debug, Serialize)]
#[serde(untagged)]
enum VideoDetailsResult {
Chunk(ChunkDetailResponse),
Parent(ParentChunkResponse),
}
#[derive(Debug, Serialize)]
struct FrameRange {
start_frame: i64,
end_frame: i64,
duration_frames: i64,
fps: f64,
}
#[derive(Debug, Serialize)]
struct ReferenceTime {
start: f64,
end: f64,
}
#[derive(Debug, Serialize)]
struct ChunkDetailResponse {
chunk_id: String,
chunk_type: String,
frame_range: FrameRange,
reference_time: ReferenceTime,
text_content: Option<String>,
content: Option<serde_json::Value>,
parent_id: Option<String>,
summary_text: Option<String>,
metadata: Option<serde_json::Value>,
visual_stats: Option<serde_json::Value>,
speaker_ids: Option<Vec<String>>,
person_ids: Option<Vec<String>>,
}
#[derive(Debug, Serialize)]
struct ParentChunkResponse {
parent_id: i32,
metadata: Option<serde_json::Value>,
summary_text: Option<String>,
frame_range: FrameRange,
reference_time: ReferenceTime,
}
/// Search visual chunks based on criteria
#[derive(Debug, Deserialize)]
struct VisualChunkSearchRequest {
file_uuid: String,
criteria: visual_chunk_search::VisualChunkSearchCriteria,
}
#[derive(Debug, Serialize)]
struct VisualChunkSearchResponse {
chunks: Vec<Chunk>,
total: usize,
}
async fn search_visual_chunks(
State(state): State<AppState>,
Json(req): Json<VisualChunkSearchRequest>,
) -> Result<Json<VisualChunkSearchResponse>, StatusCode> {
let criteria_hash = generate_visual_search_hash(&req.file_uuid, &req.criteria);
let cache_key = keys::visual_search(&req.file_uuid, &criteria_hash);
let ttl = state.mongo_cache.ttl_visual_search();
let chunks = state
.mongo_cache
.get_or_fetch(&cache_key, ttl, keys::CATEGORY_VISUAL_SEARCH, || async {
let db = PostgresDb::init()
.await
.map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?;
visual_chunk_search::search_visual_chunks(&db, &req.file_uuid, &req.criteria)
.await
.map_err(|e| anyhow::anyhow!("Visual search failed: {}", e))
})
.await
.map_err(|e| {
tracing::error!("Visual chunk search failed: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(VisualChunkSearchResponse {
total: chunks.len(),
chunks,
}))
}
/// Request for searching visual chunks by object class
#[derive(Debug, Deserialize)]
struct VisualChunkSearchByClassRequest {
uuid: String,
object_class: String,
min_count: Option<u32>,
max_count: Option<u32>,
}
/// Request for searching visual chunks by density
#[derive(Debug, Deserialize)]
struct VisualChunkSearchByDensityRequest {
uuid: String,
min_density: f32,
max_density: Option<f32>,
}
/// Request for getting visual chunk statistics
#[derive(Debug, Deserialize)]
struct VisualChunkStatsRequest {
uuid: String,
}
async fn search_visual_chunks_by_class(
State(state): State<AppState>,
Json(req): Json<VisualChunkSearchByClassRequest>,
) -> Result<Json<VisualChunkSearchResponse>, StatusCode> {
let db = PostgresDb::init()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let chunks = visual_chunk_search::search_visual_chunks_by_class(
&db,
&req.uuid,
&req.object_class,
req.min_count,
req.max_count,
)
.await
.map_err(|e| {
tracing::error!("Visual chunk search by class failed: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(VisualChunkSearchResponse {
total: chunks.len(),
chunks,
}))
}
async fn search_visual_chunks_by_density(
State(state): State<AppState>,
Json(req): Json<VisualChunkSearchByDensityRequest>,
) -> Result<Json<VisualChunkSearchResponse>, StatusCode> {
let db = PostgresDb::init()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let chunks = visual_chunk_search::search_visual_chunks_by_density(
&db,
&req.uuid,
req.min_density,
req.max_density,
)
.await
.map_err(|e| {
tracing::error!("Visual chunk search by density failed: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(VisualChunkSearchResponse {
total: chunks.len(),
chunks,
}))
}
#[derive(Debug, Serialize)]
struct VisualChunkStatsResponse {
uuid: String,
stats: std::collections::HashMap<String, serde_json::Value>,
}
async fn get_visual_chunk_stats(
State(state): State<AppState>,
Json(req): Json<VisualChunkStatsRequest>,
) -> Result<Json<VisualChunkStatsResponse>, StatusCode> {
let db = PostgresDb::init()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let stats = visual_chunk_search::get_visual_chunk_statistics(&db, &req.uuid)
.await
.map_err(|e| {
tracing::error!("Get visual chunk stats failed: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(VisualChunkStatsResponse {
uuid: req.uuid,
stats,
}))
}
/// Request for searching visual chunks by object combination
#[derive(Debug, Deserialize)]
struct VisualChunkSearchByCombinationRequest {
uuid: String,
combination: Vec<(String, u32)>, // (object_class, min_count)
}
async fn search_visual_chunks_by_combination(
State(state): State<AppState>,
Json(req): Json<VisualChunkSearchByCombinationRequest>,
) -> Result<Json<VisualChunkSearchResponse>, StatusCode> {
let db = PostgresDb::init()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let combination: Vec<(&str, u32)> = req
.combination
.iter()
.map(|(c, n)| (c.as_str(), *n))
.collect();
let chunks =
visual_chunk_search::search_visual_chunks_by_combination(&db, &req.uuid, &combination)
.await
.map_err(|e| {
tracing::error!("Visual chunk search by combination failed: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(VisualChunkSearchResponse {
total: chunks.len(),
chunks,
}))
}
async fn video_details(
Path(uuid): Path<String>,
Query(query): Query<VideoDetailsQuery>,
State(state): State<AppState>,
) -> Result<Json<VideoDetailsResponse>, StatusCode> {
let table = schema::table_name("chunk");
if let Some(chunk_id) = query.chunk_id {
let row: Option<(
i32, String, String, String, f64, i64, i64,
Option<String>, serde_json::Value, Option<serde_json::Value>,
Option<String>, i32, Option<String>, Option<serde_json::Value>, Option<String>,
)> = sqlx::query_as(&format!(
"SELECT file_id, uuid, chunk_id, chunk_type::text, fps, start_frame, end_frame,
text_content, content, metadata, vector_id, frame_count,
parent_chunk_id, visual_stats, summary_text
FROM {} WHERE chunk_id = $1 AND uuid = $2",
table
))
.bind(&chunk_id)
.bind(&uuid)
.fetch_optional(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let speaker_row: Option<(String, String)> = sqlx::query_as(&format!(
"SELECT COALESCE(speaker_ids, '{{}}'::text[]), COALESCE(face_ids, '{{}}'::integer[])::text[] FROM {} WHERE chunk_id = $1 AND uuid = $2",
table
))
.bind(&chunk_id)
.bind(&uuid)
.fetch_optional(state.db.pool())
.await
.ok()
.flatten();
let row = row.ok_or(StatusCode::NOT_FOUND)?;
let fps = if row.4 > 0.0 { row.4 } else { 24.0 };
let start_frame = row.5;
let end_frame = row.6;
let duration_frames = end_frame - start_frame;
let start_time = start_frame as f64 / fps;
let end_time = end_frame as f64 / fps;
let row_metadata = row.9.clone();
let mut summary_text = row.14.clone();
let mut metadata = None;
if let Some(ref pid_str) = row.12 {
if !pid_str.is_empty() {
if let Ok(pid) = pid_str.parse::<i32>() {
let parent_table = schema::table_name("parent_chunks");
let parent: Option<(Option<String>, Option<serde_json::Value>)> =
sqlx::query_as(&format!(
"SELECT summary_text, metadata FROM {} WHERE id = $1",
parent_table
))
.bind(pid)
.fetch_optional(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
if let Some((s, m)) = parent {
if summary_text.is_none() {
summary_text = s;
}
let mut merged: serde_json::Value = serde_json::json!({});
if let Some(ref cm) = row_metadata {
if let Some(obj) = cm.as_object() {
for (k, v) in obj {
merged[k] = v.clone();
}
}
}
if let Some(pm) = &m {
if let Some(obj) = pm.as_object() {
for (k, v) in obj {
merged[k] = v.clone();
}
}
}
metadata = Some(merged);
}
}
}
} else if let Some(ref cm) = row_metadata {
metadata = Some(cm.clone());
}
let parse_pg_array = |s: &str| -> Vec<String> {
if s.is_empty() || s == "{}" {
return vec![];
}
s.trim_start_matches('{')
.trim_end_matches('}')
.split(',')
.map(|s| s.trim_matches('"').to_string())
.collect()
};
let (speaker_str, face_str) = speaker_row.unwrap_or(("{}".to_string(), "{}".to_string()));
let speaker_vec: Vec<String> = parse_pg_array(&speaker_str);
let speaker_ids: Option<Vec<String>> = if speaker_vec.is_empty() {
None
} else {
Some(speaker_vec)
};
let face_vec: Vec<String> = parse_pg_array(&face_str);
let person_ids: Option<Vec<String>> = if face_vec.is_empty() {
None
} else {
Some(face_vec.iter().map(|id| format!("face_{}", id)).collect())
};
return Ok(Json(VideoDetailsResponse {
uuid: row.1.clone(),
details: VideoDetailsResult::Chunk(ChunkDetailResponse {
chunk_id: row.2.clone(),
chunk_type: row.3.clone(),
frame_range: FrameRange {
start_frame,
end_frame,
duration_frames,
fps,
},
reference_time: ReferenceTime {
start: start_time,
end: end_time,
},
text_content: row.7.clone(),
content: Some(row.8.clone()),
parent_id: row.12.clone(),
summary_text,
metadata,
visual_stats: row.13.clone(),
speaker_ids,
person_ids,
}),
}));
}
Err(StatusCode::BAD_REQUEST)
}
#[derive(Debug, Serialize)]
struct DeleteVideoResponse {
success: bool,
message: String,
deleted_face_detections: u64,
deleted_processor_results: u64,
}
#[derive(Debug, Serialize)]
struct UnregisterFileResponse {
success: bool,
message: String,
file_uuid: String,
deleted_face_detections: u64,
deleted_processor_results: u64,
deleted_chunks: u64,
}
/// Detects file type using a tiered strategy:
/// 1. ffprobe (if available)
/// 2. `file` command (MIME type)
/// 3. File extension
fn detect_file_type(
path: &std::path::Path,
probe_result: Option<&crate::core::probe::ffprobe::ProbeResult>,
) -> String {
// 1. ffprobe
if let Some(probe) = probe_result {
let has_video = probe
.streams
.iter()
.any(|s| s.codec_type.as_deref() == Some("video"));
let has_audio = probe
.streams
.iter()
.any(|s| s.codec_type.as_deref() == Some("audio"));
if has_video {
return "video".to_string();
}
if has_audio {
return "audio".to_string();
}
if !probe.streams.is_empty() {
return "image".to_string();
}
}
// 2. file command
if let Ok(output) = std::process::Command::new("file")
.args(["--mime-type", "-b"])
.arg(path)
.output()
{
let mime = String::from_utf8_lossy(&output.stdout)
.trim()
.to_lowercase();
if mime.starts_with("video/") {
return "video".to_string();
}
if mime.starts_with("audio/") {
return "audio".to_string();
}
if mime.starts_with("image/") {
return "image".to_string();
}
}
// 3. Extension
match path
.extension()
.and_then(|e| e.to_str())
.map(|s| s.to_lowercase())
.as_deref()
{
Some("mp4" | "mov" | "mkv" | "avi" | "webm" | "flv" | "wmv") => "video".to_string(),
Some("mp3" | "wav" | "flac" | "aac" | "ogg" | "m4a") => "audio".to_string(),
Some("jpg" | "jpeg" | "png" | "gif" | "bmp" | "webp" | "tiff" | "svg") => {
"image".to_string()
}
_ => "unknown".to_string(),
}
}
async fn delete_video(
Path(uuid): Path<String>,
State(state): State<AppState>,
) -> Result<Json<UnregisterFileResponse>, StatusCode> {
tracing::info!("[UNREGISTER] Unregistering file: {}", uuid);
let videos_table = schema::table_name("videos");
let face_table = schema::table_name("face_detections");
let processor_table = schema::table_name("processor_results");
let chunks_table = schema::table_name("chunk");
let parent_chunks_table = schema::table_name("parent_chunks");
// Check if video exists first
let exists: Option<String> = sqlx::query_scalar(&format!(
"SELECT file_uuid FROM {} WHERE file_uuid = $1",
videos_table
))
.bind(&uuid)
.fetch_optional(state.db.pool())
.await
.map_err(|e| {
tracing::error!("[UNREGISTER] DB check failed: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
// Delete associated data regardless of video existence (cleanup orphans)
let deleted_faces: i64 =
sqlx::query(&format!("DELETE FROM {} WHERE file_uuid = $1", face_table))
.bind(&uuid)
.execute(state.db.pool())
.await
.map(|r| r.rows_affected() as i64)
.unwrap_or(0);
tracing::info!("[UNREGISTER] Deleted {} face_detections", deleted_faces);
let deleted_processors: i64 = sqlx::query(&format!(
"DELETE FROM {} WHERE file_uuid = $1",
processor_table
))
.bind(&uuid)
.execute(state.db.pool())
.await
.map(|r| r.rows_affected() as i64)
.unwrap_or(0);
tracing::info!(
"[UNREGISTER] Deleted {} processor_results",
deleted_processors
);
let deleted_parent_chunks: i64 = sqlx::query(&format!(
"DELETE FROM {} WHERE uuid = $1",
parent_chunks_table
))
.bind(&uuid)
.execute(state.db.pool())
.await
.map(|r| r.rows_affected() as i64)
.unwrap_or(0);
let deleted_chunks: i64 = sqlx::query(&format!("DELETE FROM {} WHERE uuid = $1", chunks_table))
.bind(&uuid)
.execute(state.db.pool())
.await
.map(|r| r.rows_affected() as i64)
.unwrap_or(0);
tracing::info!("[UNREGISTER] Deleted {} chunks", deleted_chunks);
if exists.is_none() {
tracing::warn!(
"[UNREGISTER] Video not found: {}. (Maybe already deleted or UUID mismatch)",
uuid
);
return Err(StatusCode::NOT_FOUND);
}
Ok(Json(UnregisterFileResponse {
success: true,
message: format!(
"File {} unregistered successfully. Record deleted from database.",
uuid
),
file_uuid: uuid,
deleted_face_detections: deleted_faces as u64,
deleted_processor_results: deleted_processors as u64,
deleted_chunks: (deleted_chunks + deleted_parent_chunks) as u64,
}))
}