Files
momentry_core/src/api/server.rs

3630 lines
115 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::Json,
routing::{delete, get, post},
Router,
};
use once_cell::sync::OnceCell;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use sha2::{Digest, Sha256};
use sqlx::Row;
use std::time::Instant;
use tower_http::cors::{Any, CorsLayer};
use crate::core::cache::{keys, MongoCache, RedisCache};
use crate::core::config::REDIS_KEY_PREFIX;
use crate::core::db::schema;
use crate::core::db::{Database, PostgresDb, QdrantDb, RedisClient, VideoRecord, VideoStatus};
use crate::worker::resources::SystemResources;
use crate::core::text::tokenizer::tokenize_chinese_text;
use crate::{Embedder, FileManager};
use super::agent_api;
use super::five_w1h_agent_api;
use super::identities;
use super::identity_api;
use super::identity_binding;
use super::middleware::api_key_validation;
use super::search::search_routes;
use super::trace_agent_api;
use super::universal_search::universal_search_routes;
use super::visual_chunk_search;
use crate::core::chunk::types::Chunk;
static DEMO_USER_API_KEY: &str = "muser_68600856036340bcafc01930eb4bd839_1774418104_97221b69";
fn hash_password(password: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(password.as_bytes());
format!("{:x}", hasher.finalize())
}
#[derive(Debug, Deserialize)]
struct LoginRequest {
username: String,
password: String,
}
#[derive(Debug, Serialize)]
struct LoginResponse {
success: bool,
message: Option<String>,
api_key: Option<String>,
user: Option<UserInfo>,
}
#[derive(Debug, Serialize)]
struct UserInfo {
username: String,
}
// Global State
static SERVER_START: OnceCell<Instant> = OnceCell::new();
fn get_uptime_ms() -> u64 {
SERVER_START
.get()
.map(|i| i.elapsed().as_millis() as u64)
.unwrap_or(0)
}
#[derive(Debug, Serialize)]
struct HealthResponse {
status: String,
version: String,
build_git_hash: String,
build_timestamp: String,
uptime_ms: u64,
}
#[derive(Debug, Serialize)]
struct JobListResponse {
jobs: Vec<JobInfoResponse>,
count: i64,
page: usize,
page_size: usize,
}
#[derive(Debug, Deserialize)]
struct JobsQuery {
page: Option<usize>,
page_size: Option<usize>,
status: Option<String>,
}
#[derive(Debug, Serialize)]
struct JobInfoResponse {
id: i32,
uuid: String,
status: String,
current_processor: Option<String>,
progress_current: i32,
progress_total: i32,
created_at: String,
started_at: Option<String>,
}
#[derive(Debug, Serialize)]
struct JobDetailResponse {
id: i32,
uuid: String,
status: String,
current_processor: Option<String>,
progress_current: i32,
progress_total: i32,
processors: Vec<ProcessorInfoResponse>,
created_at: String,
started_at: Option<String>,
updated_at: Option<String>,
}
#[derive(Debug, Serialize)]
struct ProcessorInfoResponse {
processor_type: String,
status: String,
started_at: Option<String>,
completed_at: Option<String>,
duration_secs: Option<f64>,
error_message: Option<String>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
enum SearchMode {
Vector,
Smart,
}
#[derive(Debug, Deserialize)]
struct SearchRequest {
query: String,
mode: Option<SearchMode>,
collection: Option<String>,
uuid: Option<String>,
limit: Option<usize>,
vector_weight: Option<f32>,
bm25_weight: Option<f32>,
}
#[derive(Debug, Deserialize)]
struct CacheToggleRequest {
enabled: bool,
}
#[derive(Debug, Serialize)]
struct CacheToggleResponse {
success: bool,
cache_enabled: bool,
message: String,
}
// Missing structs added
#[derive(Debug, Deserialize)]
#[derive(Serialize)]
struct FileLookupMatch {
file_uuid: String,
file_name: String,
file_type: Option<String>,
status: String,
content_hash: Option<String>,
file_size: Option<i64>,
duration: Option<f64>,
width: Option<i32>,
height: Option<i32>,
}
#[derive(Serialize)]
struct FileLookupResponse {
file_name: String,
exists: bool,
matches: Vec<FileLookupMatch>,
next_name: String,
}
async fn lookup_file_by_name(
State(state): State<AppState>,
Query(params): Query<HashMap<String, String>>,
) -> Result<Json<FileLookupResponse>, StatusCode> {
let base = params.get("file_name").map(|s| s.trim().to_string()).unwrap_or_default();
if base.is_empty() {
return Ok(Json(FileLookupResponse {
file_name: String::new(),
exists: false,
matches: vec![],
next_name: String::new(),
}));
}
let table = schema::table_name("videos");
let dot_pos = base.rfind('.');
let (stem, ext) = match dot_pos {
Some(p) => (base[..p].to_string(), base[p..].to_string()),
None => (base.clone(), String::new()),
};
let pattern = format!("{}%%", &stem);
let query_sql = format!("SELECT file_uuid, file_name, file_type, status, content_hash, duration, width, height FROM {} WHERE file_name = $1 OR file_name LIKE $2 ORDER BY file_name", table);
let rows = sqlx::query(&query_sql)
.bind(&base)
.bind(&pattern)
.fetch_all(state.db.pool())
.await
.map_err(|e| { tracing::error!("lookup query error: {}", e); StatusCode::INTERNAL_SERVER_ERROR })?;
let exists = rows.iter().any(|r| r.get::<String, _>("file_name") == base);
let matches: Vec<FileLookupMatch> = rows.iter().map(|r| {
FileLookupMatch {
file_uuid: r.get("file_uuid"),
file_name: r.get("file_name"),
file_type: r.get("file_type"),
status: r.get("status"),
content_hash: r.get("content_hash"),
file_size: None,
duration: r.get("duration"),
width: r.get("width"),
height: r.get("height"),
}
}).collect();
let max_n: usize = rows.iter().filter_map(|r| {
let n: String = r.get("file_name");
if n == base { return Some(0usize); }
let rest = n.strip_prefix(&stem).and_then(|r| r.strip_suffix(&ext))?;
let inner = rest.trim().strip_prefix('(').and_then(|r| r.strip_suffix(')'))?;
inner.parse::<usize>().ok()
}).max().unwrap_or(0);
let next_name = if max_n == 0 && !exists {
base.clone()
} else {
format!("{} ({}){}", stem, max_n + 1, ext)
};
Ok(Json(FileLookupResponse {
file_name: base,
exists,
matches,
next_name,
}))
}
#[derive(Debug, Deserialize)]
struct RegisterFileRequest {
file_path: String,
pattern: Option<String>,
user_id: Option<i64>,
}
#[derive(Debug, Serialize)]
struct RegisterFileResponse {
success: bool,
file_uuid: String,
file_name: String,
file_path: String,
file_type: Option<String>,
duration: f64,
width: u32,
height: u32,
fps: f64,
total_frames: u64,
registration_time: Option<String>,
already_exists: bool,
message: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct SearchResult {
uuid: String,
chunk_id: String,
chunk_type: String,
start_time: f64,
end_time: f64,
text: String,
score: f32,
}
#[derive(Debug, Serialize, Deserialize)]
struct SearchResponse {
results: Vec<SearchResult>,
query: String,
}
#[derive(Debug, Serialize)]
struct ProbeResponse {
file_uuid: String,
file_name: String,
duration: f64,
width: u32,
height: u32,
fps: f64,
total_frames: i64,
cached: bool,
format: crate::core::probe::FormatInfo,
streams: Vec<crate::core::probe::StreamInfo>,
}
// --- P0 API Structs ---
#[derive(Debug, Deserialize)]
struct ProcessRequest {
rules: Option<Vec<String>>,
processors: Option<Vec<String>>,
}
#[derive(Debug, Serialize)]
struct FrameProgress {
total_frames: i64,
processed_frames: i64,
progress_percent: f64,
}
#[derive(Debug, Serialize)]
struct AssetStatusResponse {
uuid: String,
file_name: String,
registration_time: String,
processing_status: String,
current_job_id: Option<String>,
frame_progress: Option<FrameProgress>,
}
#[derive(Debug, Serialize)]
struct JobStatusResponse {
job_id: String,
file_uuid: String,
rule: String,
status: String,
current_processor_id: Option<String>,
frame_progress: FrameProgress,
}
#[derive(Debug, Serialize)]
struct RuleStatusResponse {
rule: String,
supported_processor_ids: Vec<String>,
active_jobs: Vec<JobStatusResponse>,
}
// --- End P0 API Structs ---
#[derive(Debug, Deserialize)]
struct HybridSearchRequest {
query: String,
limit: Option<usize>,
uuid: Option<String>,
vector_weight: Option<f32>,
bm25_weight: Option<f32>,
}
#[derive(Debug, Serialize, Deserialize)]
struct HybridSearchResult {
uuid: String,
chunk_id: String,
chunk_type: String,
start_time: f64,
end_time: f64,
text: String,
vector_score: f64,
bm25_score: f64,
combined_score: f64,
}
#[derive(Debug, Serialize, Deserialize)]
struct HybridSearchResponse {
results: Vec<HybridSearchResult>,
query: String,
}
fn extract_text_from_content(content: &serde_json::Value) -> String {
let raw_text = content
.get("data")
.and_then(|data| data.get("text"))
.and_then(|v| v.as_str())
.or_else(|| content.get("text").and_then(|v| v.as_str()))
.unwrap_or("");
tokenize_chinese_text(raw_text)
}
fn extract_title_from_content(content: &serde_json::Value) -> String {
content
.get("data")
.and_then(|data| data.get("title"))
.and_then(|v| v.as_str())
.or_else(|| content.get("title").and_then(|v| v.as_str()))
.unwrap_or("")
.to_string()
}
#[derive(Debug, Deserialize)]
struct LookupQuery {
path: Option<String>,
uuid: Option<String>,
}
#[derive(Debug, Serialize)]
struct LookupResponse {
file_uuid: String,
file_path: Option<String>,
file_name: Option<String>,
duration: Option<f64>,
}
#[derive(Debug, Serialize, Deserialize)]
struct FileInfoResponse {
file_uuid: String,
file_path: String,
file_name: String,
file_type: Option<String>,
duration: f64,
width: u32,
height: u32,
status: String,
processing_status: Option<serde_json::Value>,
birth_registration: Option<serde_json::Value>,
created_at: Option<String>,
registration_time: Option<String>,
file_size: Option<i64>,
probe_json: Option<String>,
total_frames: u64,
}
#[derive(Debug, Serialize, Deserialize)]
struct FilesResponse {
files: Vec<FileInfoResponse>,
count: i64,
page: usize,
page_size: usize,
}
#[derive(Debug, Deserialize)]
struct FilesQuery {
page: Option<usize>,
page_size: Option<usize>,
status: Option<String>,
q: Option<String>,
uuid: Option<String>,
}
#[derive(Clone)]
pub struct AppState {
pub db: std::sync::Arc<crate::core::db::PostgresDb>,
pub embedder: std::sync::Arc<crate::Embedder>,
pub embedder_model: String,
pub mongo_cache: crate::core::cache::MongoCache,
pub redis_cache: crate::core::cache::RedisCache,
pub api_state: super::middleware::ApiState,
}
#[derive(Debug, Serialize)]
struct DetailedHealthResponse {
status: String,
version: String,
build_git_hash: String,
build_timestamp: String,
uptime_ms: u64,
services: ServiceHealth,
resources: ResourceStatus,
pipeline: PipelineStatus,
}
#[derive(Debug, Serialize)]
struct PipelineStatus {
scripts: bool,
models: bool,
ffmpeg: bool,
/// Embedding server (port 11436)
embedding_server: ServiceStatus,
/// GDINO API (port 8080)
gdino_api: ServiceStatus,
/// LLM via llama.cpp (port 8082)
llm: ServiceStatus,
}
#[derive(Debug, Serialize)]
struct ResourceStatus {
cpu_used_percent: f64,
cpu_idle_percent: f64,
memory_available_mb: u64,
memory_total_mb: u64,
memory_used_percent: f64,
gpu_available: bool,
gpu_utilization: Option<f64>,
gpu_memory_used_pct: Option<f64>,
}
#[derive(Debug, Serialize)]
struct ServiceHealth {
postgres: ServiceStatus,
redis: ServiceStatus,
qdrant: ServiceStatus,
mongodb: ServiceStatus,
}
#[derive(Debug, Serialize)]
struct ServiceStatus {
status: String,
latency_ms: Option<u64>,
error: Option<String>,
}
async fn health(State(state): State<AppState>) -> Json<HealthResponse> {
let postgres = check_postgres().await;
let redis = check_redis().await;
let qdrant = check_qdrant().await;
let mongodb = check_mongodb(&state.mongo_cache).await;
let all_ok = postgres.status == "ok"
&& redis.status == "ok"
&& qdrant.status == "ok"
&& mongodb.status == "ok";
let status = if all_ok { "ok" } else { "degraded" };
if all_ok {
let _ = state.redis_cache.set_health(status).await;
}
Json(HealthResponse {
status: status.to_string(),
version: env!("BUILD_VERSION").to_string(),
build_git_hash: env!("BUILD_GIT_HASH").to_string(),
build_timestamp: env!("BUILD_TIMESTAMP").to_string(),
uptime_ms: get_uptime_ms(),
})
}
async fn health_detailed(State(state): State<AppState>) -> Json<DetailedHealthResponse> {
let postgres = check_postgres().await;
let redis = check_redis().await;
let qdrant = check_qdrant().await;
let mongodb = check_mongodb(&state.mongo_cache).await;
let overall_status = if postgres.status == "ok"
&& redis.status == "ok"
&& qdrant.status == "ok"
&& mongodb.status == "ok"
{
"ok"
} else {
"degraded"
};
let sys = SystemResources::check();
let scripts_dir = std::path::Path::new("/Users/accusys/momentry_core_0.1/scripts");
let models_dir = std::path::Path::new("/Users/accusys/momentry_core_0.1/models");
let ffmpeg_full = std::path::Path::new("/opt/homebrew/opt/ffmpeg-full/bin/ffmpeg");
let has_scripts = scripts_dir.is_dir();
let has_models = models_dir.is_dir();
let has_ffmpeg = ffmpeg_full.exists();
Json(DetailedHealthResponse {
status: overall_status.to_string(),
version: env!("BUILD_VERSION").to_string(),
build_git_hash: env!("BUILD_GIT_HASH").to_string(),
build_timestamp: env!("BUILD_TIMESTAMP").to_string(),
uptime_ms: get_uptime_ms(),
services: ServiceHealth {
postgres,
redis,
qdrant,
mongodb,
},
resources: ResourceStatus {
cpu_used_percent: sys.cpu_used_percent,
cpu_idle_percent: sys.cpu_idle_percent,
memory_available_mb: sys.memory_available_mb,
memory_total_mb: sys.memory_total_mb,
memory_used_percent: sys.memory_used_percent,
gpu_available: sys.gpu_available,
gpu_utilization: sys.gpu_utilization,
gpu_memory_used_pct: sys.gpu_memory_used_pct,
},
pipeline: PipelineStatus {
scripts: has_scripts,
models: has_models,
ffmpeg: has_ffmpeg,
embedding_server: check_http("http://127.0.0.1:11436/health").await,
gdino_api: check_http("http://127.0.0.1:8080/health").await,
llm: check_http("http://127.0.0.1:8082/health").await,
},
})
}
async fn login(Json(req): Json<LoginRequest>) -> Json<LoginResponse> {
if req.username == "demo" && req.password == "demo" {
Json(LoginResponse {
success: true,
message: Some("Login successful".to_string()),
api_key: Some(DEMO_USER_API_KEY.to_string()),
user: Some(UserInfo {
username: "demo".to_string(),
}),
})
} else {
Json(LoginResponse {
success: false,
message: Some("Invalid username or password".to_string()),
api_key: None,
user: None,
})
}
}
async fn logout() -> Json<serde_json::Value> {
Json(serde_json::json!({ "success": true }))
}
async fn check_postgres() -> ServiceStatus {
let start = Instant::now();
match PostgresDb::init().await {
Ok(db) => match db.list_files(1, 0).await {
Ok(_) => ServiceStatus {
status: "ok".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: None,
},
Err(e) => ServiceStatus {
status: "error".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: Some(e.to_string()),
},
},
Err(e) => ServiceStatus {
status: "error".to_string(),
latency_ms: None,
error: Some(e.to_string()),
},
}
}
async fn check_redis() -> ServiceStatus {
let start = Instant::now();
match RedisClient::new() {
Ok(redis) => match redis.get_conn().await {
Ok(mut conn) => {
let result: Result<String, _> = redis::cmd("PING").query_async(&mut conn).await;
match result {
Ok(_) => ServiceStatus {
status: "ok".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: None,
},
Err(e) => ServiceStatus {
status: "error".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: Some(e.to_string()),
},
}
}
Err(e) => ServiceStatus {
status: "error".to_string(),
latency_ms: None,
error: Some(e.to_string()),
},
},
Err(e) => ServiceStatus {
status: "error".to_string(),
latency_ms: None,
error: Some(e.to_string()),
},
}
}
async fn check_qdrant() -> ServiceStatus {
let start = Instant::now();
let base_url =
std::env::var("QDRANT_URL").unwrap_or_else(|_| "http://localhost:6333".to_string());
let api_key =
std::env::var("QDRANT_API_KEY").unwrap_or_else(|_| "Test3200Test3200Test3200".to_string());
let url = format!("{}/collections", base_url);
let client = reqwest::Client::new();
match client
.get(&url)
.header("api-key", api_key)
.timeout(std::time::Duration::from_secs(5))
.send()
.await
{
Ok(resp) if resp.status().is_success() => ServiceStatus {
status: "ok".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: None,
},
Ok(resp) => ServiceStatus {
status: "error".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: Some(format!("HTTP {}", resp.status())),
},
Err(e) => ServiceStatus {
status: "error".to_string(),
latency_ms: None,
error: Some(e.to_string()),
},
}
}
async fn check_mongodb(cache: &MongoCache) -> ServiceStatus {
let start = Instant::now();
match cache.health_check().await {
Ok(_) => ServiceStatus {
status: "ok".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: None,
},
Err(e) => ServiceStatus {
status: "error".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: Some(e.to_string()),
},
}
}
async fn check_http(url: &str) -> ServiceStatus {
let start = Instant::now();
match reqwest::get(url).await {
Ok(resp) => {
if resp.status().is_success() {
ServiceStatus {
status: "ok".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: None,
}
} else {
ServiceStatus {
status: "error".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: Some(format!("HTTP {}", resp.status())),
}
}
}
Err(e) => ServiceStatus {
status: "error".to_string(),
latency_ms: Some(start.elapsed().as_millis() as u64),
error: Some(e.to_string()),
},
}
}
fn generate_query_hash(query: &str, uuid: Option<&str>, limit: usize) -> String {
let data = serde_json::json!({
"query": query,
"uuid": uuid,
"limit": limit,
});
let mut hasher = Sha256::new();
hasher.update(data.to_string().as_bytes());
format!("{:x}", hasher.finalize())[..16].to_string()
}
fn generate_visual_search_hash(
uuid: &str,
criteria: &visual_chunk_search::VisualChunkSearchCriteria,
) -> String {
let data = serde_json::json!({
"uuid": uuid,
"criteria": criteria,
});
let mut hasher = Sha256::new();
hasher.update(data.to_string().as_bytes());
format!("{:x}", hasher.finalize())[..16].to_string()
}
/// Compute SHA256 for dedup. Returns hex string.
fn sha256_file(path: &std::path::Path) -> Option<String> {
crate::core::storage::content_hash::compute_sha256(path).ok()
}
/// Resolve name conflict: if file_name collides with existing but content differs,
/// append ` (N)` suffix. Returns the resolved file_name.
async fn resolve_filename(
db: &PostgresDb,
file_name: &str,
content_hash: &str,
) -> String {
let table = schema::table_name("videos");
let base = file_name.to_string();
let dot_pos = base.rfind('.');
let (stem, ext) = match dot_pos {
Some(p) => (base[..p].to_string(), base[p..].to_string()),
None => (base.clone(), String::new()),
};
let mut candidate = base.clone();
let mut attempt = 0usize;
loop {
// Check if candidate name exists with a DIFFERENT hash (same content = OK)
let conflict: Option<String> = sqlx::query_scalar(
&format!("SELECT file_uuid FROM {} WHERE file_name = $1 AND (content_hash IS DISTINCT FROM $2 OR content_hash IS NULL)", table)
)
.bind(&candidate)
.bind(content_hash)
.fetch_optional(db.pool())
.await
.unwrap_or(None);
if conflict.is_none() {
return candidate;
}
attempt += 1;
candidate = format!("{} ({}){}", stem, attempt, ext);
}
}
/// 註冊單一檔案(內部函數,不處理 pattern
async fn register_single_file(
state: &AppState,
file_path: &str,
_user_id: Option<i64>,
) -> RegisterFileResponse {
tracing::info!("[REGISTER] Starting registration for: {}", file_path);
let path = std::path::Path::new(file_path);
if !path.exists() {
return RegisterFileResponse {
success: false,
file_uuid: String::new(),
file_name: String::new(),
file_path: file_path.to_string(),
file_type: None,
duration: 0.0,
width: 0,
height: 0,
fps: 0.0,
total_frames: 0,
registration_time: None,
already_exists: false,
message: format!("File not found: {}", file_path),
};
}
let canonical_path = path
.canonicalize()
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|_| file_path.to_string());
let file_name = path
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_default();
let db = match PostgresDb::init().await {
Ok(db) => db,
Err(e) => {
tracing::error!("[REGISTER] DB init failed: {}", e);
return RegisterFileResponse {
success: false,
file_uuid: String::new(),
file_name,
file_path: canonical_path,
file_type: None,
duration: 0.0,
width: 0,
height: 0,
fps: 0.0,
total_frames: 0,
registration_time: None,
already_exists: false,
message: format!("DB init failed: {}", e),
};
}
};
// Step 1: Compute SHA256 of full file
let content_hash = sha256_file(&path).unwrap_or_default();
// Step 2: Hash check — same content = already registered (regardless of name)
let videos_table = schema::table_name("videos");
if !content_hash.is_empty() {
if let Ok(Some(existing_uuid)) = sqlx::query_scalar::<_, String>(
&format!("SELECT file_uuid FROM {} WHERE content_hash = $1 LIMIT 1", videos_table)
)
.bind(&content_hash)
.fetch_optional(db.pool())
.await
{
tracing::info!("[REGISTER] Content hash collision → already registered: {}", existing_uuid);
let existing_name: Option<String> = sqlx::query_scalar(
&format!("SELECT file_name FROM {} WHERE file_uuid = $1", videos_table)
).bind(&existing_uuid).fetch_optional(db.pool()).await.unwrap_or(None);
return RegisterFileResponse {
success: true,
file_uuid: existing_uuid,
file_name: existing_name.unwrap_or(file_name),
file_path: canonical_path,
file_type: None,
duration: 0.0,
width: 0,
height: 0,
fps: 0.0,
total_frames: 0,
registration_time: None,
already_exists: true,
message: "Content already registered (identical file)".to_string(),
};
}
}
// Step 3: Name check — same name but different content → auto-rename
let final_name = resolve_filename(&db, &file_name, &content_hash).await;
// Step 4: Compute UUID (using final resolved name)
let videos_table = schema::table_name("videos");
let birthday = sqlx::query_scalar::<_, chrono::DateTime<chrono::Utc>>(
&format!("SELECT registration_time FROM {} WHERE file_name = $1 AND registration_time IS NOT NULL LIMIT 1", videos_table)
)
.bind(&final_name)
.fetch_optional(db.pool())
.await
.unwrap_or(None)
.map(|t| t.to_rfc3339())
.unwrap_or_else(|| chrono::Utc::now().to_rfc3339());
let mac_address = crate::core::storage::uuid::get_mac_address();
let file_uuid = crate::core::storage::uuid::compute_birth_uuid(
&mac_address,
&birthday,
&canonical_path,
&final_name,
);
// Step 5: Probe
let probe_result = match crate::core::probe::probe_video(&canonical_path) {
Ok(r) => r,
Err(e) => {
return RegisterFileResponse {
success: false,
file_uuid,
file_name,
file_path: canonical_path,
file_type: None,
duration: 0.0,
width: 0,
height: 0,
fps: 0.0,
total_frames: 0,
registration_time: None,
already_exists: false,
message: format!("Probe failed: {}", e),
};
}
};
let has_video = probe_result
.streams
.iter()
.any(|s| s.codec_type.as_deref() == Some("video"));
let has_audio = probe_result
.streams
.iter()
.any(|s| s.codec_type.as_deref() == Some("audio"));
let final_file_type = if has_video {
Some("video".to_string())
} else if has_audio {
Some("audio".to_string())
} else {
None
};
let duration = probe_result
.format
.duration
.as_ref()
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(0.0);
let mut width = 0u32;
let mut height = 0u32;
let mut fps = 0.0;
let mut total_frames = 0u64;
if let Some(s) = probe_result
.streams
.iter()
.find(|s| s.codec_type.as_deref() == Some("video"))
{
width = s.width.unwrap_or(0);
height = s.height.unwrap_or(0);
if let Some(fps_str) = &s.r_frame_rate {
if let Some((num, den)) = fps_str.split_once('/') {
if let (Ok(n), Ok(d)) = (num.parse::<f64>(), den.parse::<f64>()) {
if d > 0.0 {
fps = n / d;
}
}
}
}
total_frames = s
.nb_frames
.as_ref()
.and_then(|s| s.parse().ok())
.unwrap_or((duration * fps) as u64);
}
let videos_table = schema::table_name("videos");
let probe_json = serde_json::to_value(&probe_result).ok();
let status = "pending";
let _ = sqlx::query(&format!(
"INSERT INTO {} (file_uuid, file_path, file_name, file_type, duration, width, height, fps, probe_json, status, content_hash, registration_time) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW()) ON CONFLICT (file_uuid) DO UPDATE SET file_path = EXCLUDED.file_path, file_name = EXCLUDED.file_name, status = EXCLUDED.status, content_hash = EXCLUDED.content_hash",
videos_table
))
.bind(&file_uuid).bind(&canonical_path).bind(&final_name).bind(&final_file_type)
.bind(duration).bind(width as i32).bind(height as i32).bind(fps)
.bind(&probe_json).bind(status).bind(&content_hash)
.execute(db.pool()).await;
// 若是 video 類型,同步執行 CUT + Scene 分類
let mut cut_done = false;
let mut scene_done = false;
if has_video && total_frames > 0 && fps > 0.0 {
let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string());
let scripts_dir = std::env::var("MOMENTRY_SCRIPTS_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry_core_0.1/scripts".to_string());
let python_path = std::env::var("MOMENTRY_PYTHON_PATH")
.unwrap_or_else(|_| "/opt/homebrew/bin/python3.11".to_string());
// CUT: 場景檢測PySceneDetect0.08s 即可完成)
let cut_path = std::path::Path::new(&output_dir).join(format!("{}.cut.json", file_uuid));
if !cut_path.exists() {
let cut_script = std::path::Path::new(&scripts_dir).join("cut_processor.py");
if cut_script.exists() {
let cut_output = std::process::Command::new(&python_path)
.arg(&cut_script)
.arg(&canonical_path)
.arg(&cut_path)
.output();
if let Ok(output) = cut_output {
if output.status.success() {
cut_done = true;
tracing::info!("[REGISTER] CUT completed for {}", file_uuid);
} else {
let stderr = String::from_utf8_lossy(&output.stderr);
tracing::error!("[REGISTER] CUT failed for {}: {}", file_uuid, stderr);
}
} else {
tracing::error!("[REGISTER] CUT execution error for {}", file_uuid);
}
}
} else {
cut_done = true;
// 讀取現有 CUT JSON 取得場景數
if let Ok(content) = std::fs::read_to_string(&cut_path) {
if let Ok(cut_data) = serde_json::from_str::<serde_json::Value>(&content) {
let scenes = cut_data
.get("scenes")
.and_then(|s| s.as_array())
.map(|a| a.len() as i32)
.unwrap_or(0);
tracing::info!(
"[REGISTER] CUT already exists: {} scenes for {}",
scenes,
file_uuid
);
}
}
}
// Scene: 場景分類MIT Places365取樣間隔 2s
let scene_path =
std::path::Path::new(&output_dir).join(format!("{}.scene.json", file_uuid));
if !scene_path.exists() {
let scene_script = std::path::Path::new(&scripts_dir).join("scene_classifier.py");
if scene_script.exists() {
let scene_output = std::process::Command::new(&python_path)
.arg(&scene_script)
.arg(&canonical_path)
.arg(&scene_path)
.arg("--sample-interval")
.arg("2")
.output();
if let Ok(output) = scene_output {
if output.status.success() {
scene_done = true;
tracing::info!(
"[REGISTER] Scene classification completed for {}",
file_uuid
);
}
}
}
} else {
scene_done = true;
}
}
// 更新 DB: cut_done, scene_done, audio_tracks
let audio_tracks: Vec<serde_json::Value> = probe_result.streams.iter()
.filter(|s| s.codec_type.as_deref() == Some("audio"))
.map(|s| {
serde_json::json!({
"index": s.index,
"codec": s.codec_name,
"channels": s.channels,
"sample_rate": s.sample_rate,
"language": s.tags.as_ref().and_then(|t| t.get("language")).unwrap_or(&serde_json::Value::Null),
})
})
.collect();
let audio_tracks_json = serde_json::to_value(&audio_tracks).ok();
// 計算 cut_count 與 cut_max_duration
let cut_path = std::path::Path::new(
&std::env::var("MOMENTRY_OUTPUT_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string()),
)
.join(format!("{}.cut.json", file_uuid));
let mut cut_count = 0i32;
let mut cut_max_duration = 0.0f64;
if let Ok(content) = std::fs::read_to_string(&cut_path) {
if let Ok(cut_data) = serde_json::from_str::<serde_json::Value>(&content) {
if let Some(scenes) = cut_data.get("scenes").and_then(|s| s.as_array()) {
cut_count = scenes.len() as i32;
cut_max_duration = scenes
.iter()
.filter_map(|s| {
let start = s.get("start_time").and_then(|v| v.as_f64()).unwrap_or(0.0);
let end = s.get("end_time").and_then(|v| v.as_f64()).unwrap_or(0.0);
if end > start {
Some(end - start)
} else {
None
}
})
.fold(0.0_f64, f64::max);
}
}
}
let videos_table = schema::table_name("videos");
let _ = sqlx::query(
&format!("UPDATE {} SET cut_done = $1, scene_done = $2, audio_tracks = $3, cut_count = $4, cut_max_duration = $5 WHERE file_uuid = $6", videos_table)
)
.bind(cut_done).bind(scene_done).bind(&audio_tracks_json).bind(cut_count).bind(cut_max_duration).bind(&file_uuid)
.execute(db.pool()).await;
// 寫入 {file_uuid}.probe.json含音軌資訊
if let Some(json_val) = probe_json {
let probe_path = std::path::Path::new(
&std::env::var("MOMENTRY_OUTPUT_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string()),
)
.join(format!("{}.probe.json", file_uuid));
let json_str = serde_json::to_string_pretty(&json_val).unwrap_or_default();
let _ = std::fs::write(&probe_path, json_str);
}
RegisterFileResponse {
success: true,
file_uuid,
file_name,
file_path: canonical_path,
file_type: final_file_type,
duration,
width,
height,
fps,
total_frames,
registration_time: None,
already_exists: false,
message: "File registered successfully".to_string(),
}
}
async fn register_file(
State(state): State<AppState>,
Json(req): Json<RegisterFileRequest>,
) -> Result<Json<RegisterFileResponse>, StatusCode> {
let file_path = req.file_path.clone();
let pattern = req.pattern;
// 如果有 pattern掃描目錄下所有符合的檔案逐一註冊
if let Some(ref pat) = pattern {
let dir = std::path::Path::new(&file_path);
if !dir.is_dir() {
return Ok(Json(RegisterFileResponse {
success: false,
file_uuid: String::new(),
file_name: String::new(),
file_path: file_path.clone(),
file_type: None,
duration: 0.0,
width: 0,
height: 0,
fps: 0.0,
total_frames: 0,
registration_time: None,
already_exists: false,
message: format!(
"Pattern requires a directory, but path is not a dir: {}",
file_path
),
}));
}
let re = regex::Regex::new(pat).map_err(|e| {
tracing::error!("[REGISTER] Invalid regex pattern: {}", e);
StatusCode::BAD_REQUEST
})?;
let mut registered = 0u32;
let mut failed = 0u32;
let mut skipped = 0u32;
if let Ok(entries) = std::fs::read_dir(dir) {
for entry in entries.flatten() {
let entry_path = entry.path();
if !entry_path.is_file() {
continue;
}
let fname = entry_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("");
if !re.is_match(fname) {
continue;
}
let result = register_single_file(
&state,
&entry_path.to_string_lossy().to_string(),
req.user_id,
)
.await;
if result.success {
registered += 1;
} else if result.already_exists {
skipped += 1;
} else {
failed += 1;
}
}
}
return Ok(Json(RegisterFileResponse {
success: true,
file_uuid: format!("batch_{}_registered_{}_failed", registered, failed),
file_name: format!(
"{} files registered, {} skipped, {} failed",
registered, skipped, failed
),
file_path: file_path.clone(),
file_type: None,
duration: 0.0,
width: 0,
height: 0,
fps: 0.0,
total_frames: 0,
registration_time: None,
already_exists: false,
message: format!(
"Batch register: {} registered, {} skipped, {} failed",
registered, skipped, failed
),
}));
}
// 單一檔案註冊
let resp = register_single_file(&state, &file_path, req.user_id).await;
return Ok(Json(resp));
}
async fn probe_by_uuid(
State(state): State<AppState>,
Path(file_uuid): Path<String>,
) -> Result<Json<ProbeResponse>, StatusCode> {
let table = schema::table_name("videos");
let row: Option<(String, String)> = sqlx::query_as(&format!(
"SELECT file_name, file_path FROM {} WHERE file_uuid = $1",
table
))
.bind(&file_uuid)
.fetch_optional(state.db.pool())
.await
.map_err(|e| {
tracing::error!("DB error fetching video: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let (file_name, path) = row.ok_or_else(|| {
tracing::warn!("Video not found: {}", file_uuid);
StatusCode::NOT_FOUND
})?;
// 2. Check for cached probe.json
let probe_path = format!(
"{}/{}.probe.json",
crate::core::config::OUTPUT_DIR.as_str(),
file_uuid
);
let (probe_result, cached) = if let Ok(content) = std::fs::read_to_string(&probe_path) {
tracing::info!("Using cached probe.json: {}", probe_path);
let result: crate::core::probe::ProbeResult =
serde_json::from_str(&content).map_err(|e| {
tracing::error!("Failed to parse cached probe.json: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
(result, true)
} else {
tracing::info!("Running ffprobe for: {}", path);
let result = crate::core::probe::probe_video(&path).map_err(|e| {
tracing::error!("ffprobe failed: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
// Save probe.json to OUTPUT_DIR
let file_manager = FileManager::new(std::path::PathBuf::from(
crate::core::config::OUTPUT_DIR.as_str(),
));
let json_str = serde_json::to_string(&result).map_err(|e| {
tracing::error!("Failed to serialize probe result: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
file_manager
.save_json(&file_uuid, "probe", &json_str)
.map_err(|e| {
tracing::error!("Failed to save probe.json: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
(result, false)
};
// 3. Extract video info
let duration = probe_result
.format
.duration
.as_ref()
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(0.0);
let mut width = 0u32;
let mut height = 0u32;
let mut fps = 0.0;
for stream in &probe_result.streams {
if stream.codec_type.as_deref() == Some("video") {
width = stream.width.unwrap_or(0);
height = stream.height.unwrap_or(0);
if let Some(fps_str) = &stream.r_frame_rate {
fps = if fps_str.contains('/') {
let parts: Vec<&str> = fps_str.split('/').collect();
if parts.len() == 2 {
let num: f64 = parts[0].parse().unwrap_or(0.0);
let den: f64 = parts[1].parse().unwrap_or(1.0);
if den > 0.0 {
num / den
} else {
0.0
}
} else {
0.0
}
} else {
fps_str.parse().unwrap_or(0.0)
};
}
}
}
// 4. Calculate total_frames and update DB
let total_frames = probe_result
.streams
.iter()
.find(|s| s.codec_type.as_deref() == Some("video"))
.and_then(|s| s.nb_frames.as_ref())
.and_then(|n| n.parse::<i64>().ok())
.unwrap_or_else(|| (duration * fps).floor() as i64);
// Update videos table with probe metadata
let _ = sqlx::query(&format!(
"UPDATE {} SET duration = $1, width = $2, height = $3, fps = $4, total_frames = $5 WHERE file_uuid = $6",
schema::table_name("videos")
))
.bind(duration)
.bind(width as i32)
.bind(height as i32)
.bind(fps)
.bind(total_frames)
.bind(&file_uuid)
.execute(state.db.pool())
.await;
Ok(Json(ProbeResponse {
file_uuid,
file_name,
duration,
width,
height,
fps,
total_frames,
cached,
format: probe_result.format,
streams: probe_result.streams,
}))
}
// --- P0 Core API Handlers ---
async fn trigger_processing(
State(state): State<AppState>,
Path(file_uuid): Path<String>,
Json(req): Json<ProcessRequest>,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
let table = schema::table_name("videos");
let asset: Option<(String, i64)> = sqlx::query_as(&format!(
"SELECT file_name, COALESCE(total_frames, 0) FROM {} WHERE file_uuid = $1",
table
))
.bind(&file_uuid)
.fetch_optional(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("DB Error: {}", e),
)
})?;
let (file_name, total_frames) =
asset.ok_or((StatusCode::NOT_FOUND, "File not found".to_string()))?;
// Get video file_path
let video_path: Option<String> = sqlx::query_scalar(&format!(
"SELECT file_path FROM {} WHERE file_uuid = $1",
table
))
.bind(&file_uuid)
.fetch_optional(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to get video path: {}", e),
)
})?;
// 2. Create Monitor Job (Worker polls this table)
let monitor_job = state
.db
.create_monitor_job(&file_uuid, video_path.as_deref())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to create monitor job: {}", e),
)
})?;
// Update processors if specified
let processors_to_run: Vec<&str> = if let Some(procs) = &req.processors {
let table = crate::core::db::schema::table_name("monitor_jobs");
sqlx::query(&format!(
"UPDATE {} SET processors = $1 WHERE id = $2",
table
))
.bind(procs)
.bind(monitor_job.id)
.execute(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to update processors: {}", e),
)
})?;
procs.iter().map(|s| s.as_str()).collect()
} else {
// Empty means all processors
vec![
"asr",
"cut",
"yolo",
"ocr",
"face",
"pose",
"asrx",
"visual_chunk",
]
};
// 3. Update Asset Status
let table = schema::table_name("videos");
sqlx::query(&format!(
"UPDATE {} SET status = 'processing' WHERE file_uuid = $1",
table
))
.bind(&file_uuid)
.execute(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to update asset status: {}", e),
)
})?;
// Initialize processing_status with all processors and total_frames
state
.db
.init_processing_status(&file_uuid, processors_to_run, total_frames as u64)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to init processing status: {}", e),
)
})?;
tracing::info!("Job {} created for asset {}", monitor_job.id, file_uuid);
let prefix = REDIS_KEY_PREFIX.as_str();
let mut processor_pids: Vec<i32> = Vec::new();
if let Ok(redis_client) = RedisClient::new() {
if let Ok(mut conn) = redis_client.get_conn().await {
for name in ["asr", "cut", "asrx", "yolo", "ocr", "face", "pose"] {
let key = format!("{}worker:job:{}:processor:{}", prefix, file_uuid, name);
let pid: Option<i32> = redis::cmd("HGET")
.arg(&key)
.arg("pid")
.query_async(&mut conn)
.await
.ok()
.flatten();
if let Some(p) = pid {
processor_pids.push(p);
}
}
}
}
Ok(Json(serde_json::json!({
"job_id": monitor_job.id,
"file_uuid": file_uuid,
"status": "PENDING",
"pids": processor_pids,
"message": format!("Processing triggered for {}", file_name)
})))
}
async fn get_chunk_by_path(
Path((file_uuid, chunk_id)): Path<(String, String)>,
State(state): State<AppState>,
) -> Result<Json<Chunk>, StatusCode> {
let chunk = state
.db
.get_chunk_by_chunk_id_and_uuid(&chunk_id, &file_uuid)
.await
.map_err(|_| {
tracing::error!("[get_chunk_by_path] DB error: {}:{}", file_uuid, chunk_id);
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or_else(|| {
tracing::warn!("[get_chunk_by_path] Not found: {}:{}", file_uuid, chunk_id);
StatusCode::NOT_FOUND
})?;
Ok(Json(chunk))
}
async fn get_asset_status(
State(state): State<AppState>,
Path(uuid): Path<String>,
) -> Result<Json<AssetStatusResponse>, StatusCode> {
let videos_table = schema::table_name("videos");
let row: Option<(String, String, chrono::DateTime<chrono::Utc>, String, i64)> = sqlx::query_as(
&format!(
"SELECT file_uuid, file_name, created_at AT TIME ZONE 'UTC', COALESCE(processing_status, 'REGISTERED'), COALESCE(total_frames, 0) FROM {} WHERE file_uuid = $1",
videos_table
)
)
.bind(&uuid)
.fetch_optional(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let (uuid, file_name, time, status, total) = row.ok_or(StatusCode::NOT_FOUND)?;
let jobs_table = schema::table_name("monitor_jobs");
let job: Option<(String, String, i64, i64)> = sqlx::query_as(
&format!(
"SELECT id::text, COALESCE(status, 'QUEUED'), COALESCE(processed_frames, 0), COALESCE(total_frames, 0) FROM {} WHERE file_uuid = $1 ORDER BY created_at DESC LIMIT 1",
jobs_table
)
)
.bind(&uuid)
.fetch_optional(state.db.pool())
.await
.ok()
.flatten();
let progress = if let Some((jid, jstatus, pf, tf)) = job {
if tf > 0 && (jstatus == "RUNNING" || jstatus == "QUEUED") {
Some((
jid,
FrameProgress {
total_frames: tf,
processed_frames: pf,
progress_percent: (pf as f64 / tf as f64) * 100.0,
},
))
} else {
None
}
} else {
None
};
Ok(Json(AssetStatusResponse {
uuid,
file_name,
registration_time: time.to_rfc3339(),
processing_status: status,
current_job_id: progress.as_ref().map(|(id, _)| id.clone()),
frame_progress: progress.map(|(_, p)| p),
}))
}
async fn get_job_status(
State(state): State<AppState>,
Path(job_id): Path<String>,
) -> Result<Json<JobStatusResponse>, StatusCode> {
let jobs_table = schema::table_name("monitor_jobs");
let row: Option<(String, String, String, String, Option<String>, i64, i64)> = sqlx::query_as(
&format!(
"SELECT j.id::text, j.file_uuid, COALESCE(j.rule, 'unknown'), COALESCE(j.status, 'QUEUED'), j.assigned_processor_id::text, j.processed_frames, j.total_frames FROM {} j WHERE j.id = $1::uuid",
jobs_table
)
)
.bind(&job_id)
.fetch_optional(state.db.pool())
.await
.map_err(|e| {
eprintln!("DB Error in get_job_status: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let (id, asset, rule, status, proc_id, pf, tf) = row.ok_or(StatusCode::NOT_FOUND)?;
Ok(Json(JobStatusResponse {
job_id: id,
file_uuid: asset,
rule,
status,
current_processor_id: proc_id,
frame_progress: FrameProgress {
total_frames: tf,
processed_frames: pf,
progress_percent: if tf > 0 {
(pf as f64 / tf as f64) * 100.0
} else {
0.0
},
},
}))
}
async fn get_rule_status(
State(state): State<AppState>,
Path(rule): Path<String>,
) -> Result<Json<RuleStatusResponse>, StatusCode> {
let procs: Vec<String> = sqlx::query_scalar(
"SELECT id::text FROM processors WHERE supported_rules @> ARRAY[$1]::TEXT[]",
)
.bind(&rule)
.fetch_all(state.db.pool())
.await
.unwrap_or_default();
let jobs_table = schema::table_name("monitor_jobs");
let jobs: Vec<(String, String, String, String, Option<String>, i64, i64)> = sqlx::query_as(
&format!(
"SELECT id::text, file_uuid, COALESCE(rule, 'unknown'), status, assigned_processor_id::text, processed_frames, total_frames FROM {} WHERE rule = $1 AND status IN ('QUEUED','RUNNING')",
jobs_table
)
)
.bind(&rule)
.fetch_all(state.db.pool())
.await
.unwrap_or_default();
let active = jobs
.into_iter()
.map(|(id, asset, r, s, p, pf, tf)| JobStatusResponse {
job_id: id,
file_uuid: asset,
rule: r,
status: s,
current_processor_id: p,
frame_progress: FrameProgress {
total_frames: tf,
processed_frames: pf,
progress_percent: if tf > 0 {
(pf as f64 / tf as f64) * 100.0
} else {
0.0
},
},
})
.collect();
Ok(Json(RuleStatusResponse {
rule,
supported_processor_ids: procs,
active_jobs: active,
}))
}
// --- End P0 Core API Handlers ---
async fn search(
State(state): State<AppState>,
Json(req): Json<SearchRequest>,
) -> Result<Json<SearchResponse>, StatusCode> {
let mode = req.mode.unwrap_or(SearchMode::Smart);
let limit = req.limit.unwrap_or(10);
let query_hash = generate_query_hash(&req.query, req.uuid.as_deref(), limit);
let cache_key = keys::search(&format!("{:?}", mode));
let ttl = state.mongo_cache.ttl_search();
let response = state
.mongo_cache
.get_or_fetch(&cache_key, ttl, keys::CATEGORY_SEARCH, || async {
let pg = PostgresDb::init()
.await
.map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?;
let results: Vec<SearchResult> = match mode {
SearchMode::Vector => {
let query_vector = state
.embedder
.embed_query(&req.query)
.await
.map_err(|e| anyhow::anyhow!("Embedding failed: {}", e))?;
let qdrant = QdrantDb::new();
let search_results = if let Some(ref uuid) = req.uuid {
qdrant.search_in_uuid(&query_vector, uuid, limit).await?
} else {
qdrant.search(&query_vector, limit).await?
};
let mut results = Vec::new();
for r in search_results {
if let Some(chunk) = pg
.get_chunk_by_chunk_id_and_uuid(&r.chunk_id, &r.uuid)
.await
.ok()
.flatten()
{
let text = extract_text_from_content(&chunk.content);
results.push(SearchResult {
uuid: r.uuid,
chunk_id: r.chunk_id,
chunk_type: chunk.chunk_type.as_str().to_string(),
start_time: chunk.start_time().seconds(),
end_time: chunk.end_time().seconds(),
text,
score: r.score,
});
}
}
results
}
SearchMode::Smart => {
// Vector search + BM25 reranking
let query_vector = state
.embedder
.embed_query(&req.query)
.await
.map_err(|e| anyhow::anyhow!("Embedding failed: {}", e))?;
let qdrant = QdrantDb::new();
let search_results = if let Some(ref uuid) = req.uuid {
qdrant
.search_in_uuid(&query_vector, uuid, limit * 2)
.await?
} else {
qdrant.search(&query_vector, limit * 2).await?
};
// 取得所有 chunk 並用 BM25 重新排序
let mut results_with_bm25: Vec<(SearchResult, f32)> = Vec::new();
for r in search_results {
if let Some(chunk) = pg
.get_chunk_by_chunk_id_and_uuid(&r.chunk_id, &r.uuid)
.await
.ok()
.flatten()
{
let text = extract_text_from_content(&chunk.content);
let vector_score = r.score;
results_with_bm25.push((
SearchResult {
uuid: r.uuid,
chunk_id: r.chunk_id,
chunk_type: chunk.chunk_type.as_str().to_string(),
start_time: chunk.start_time().seconds(),
end_time: chunk.end_time().seconds(),
text,
score: vector_score,
},
vector_score,
));
}
}
// 依 vector score 排序後取前 limit 個
results_with_bm25
.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
results_with_bm25
.into_iter()
.take(limit)
.map(|(r, _)| r)
.collect()
}
};
Ok::<SearchResponse, anyhow::Error>(SearchResponse {
results,
query: req.query.clone(),
})
})
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(response))
}
async fn search_bm25(
State(state): State<AppState>,
Json(req): Json<SearchRequest>,
) -> Result<Json<SearchResponse>, StatusCode> {
let limit = req.limit.unwrap_or(10);
let bm25_results = state
.db
.search_bm25(&req.query, req.uuid.as_deref(), limit)
.await
.map_err(|e| {
tracing::error!("BM25 search failed: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let results: Vec<SearchResult> = bm25_results
.into_iter()
.map(|r| SearchResult {
uuid: r.uuid,
chunk_id: r.chunk_id,
chunk_type: r.chunk_type,
start_time: r.start_time,
end_time: r.end_time,
text: r.text,
score: r.bm25_score,
})
.collect();
Ok(Json(SearchResponse {
results,
query: req.query.clone(),
}))
}
async fn search_smart(
State(state): State<AppState>,
Json(req): Json<SearchRequest>,
) -> Result<Json<SearchResponse>, StatusCode> {
let limit = req.limit.unwrap_or(10);
let query_hash = generate_query_hash(&req.query, req.uuid.as_deref(), limit);
let cache_key = keys::search(&format!("{}smart", query_hash));
let ttl = state.mongo_cache.ttl_search();
let response = state
.mongo_cache
.get_or_fetch(&cache_key, ttl, keys::CATEGORY_SEARCH, || async {
let pg = PostgresDb::init()
.await
.map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?;
let keywords = vec![req.query.clone()];
let search_terms = keywords.join(" ");
let bm25_results = pg
.search_bm25(&search_terms, req.uuid.as_deref(), limit)
.await?;
let results: Vec<SearchResult> = bm25_results
.into_iter()
.map(|r| SearchResult {
uuid: r.uuid,
chunk_id: r.chunk_id,
chunk_type: r.chunk_type,
start_time: r.start_time,
end_time: r.end_time,
text: r.text,
score: r.bm25_score,
})
.collect();
Ok::<SearchResponse, anyhow::Error>(SearchResponse {
results,
query: req.query.clone(),
})
})
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(response))
}
async fn hybrid_search(
State(state): State<AppState>,
Json(req): Json<HybridSearchRequest>,
) -> Result<Json<HybridSearchResponse>, StatusCode> {
let limit = req.limit.unwrap_or(10);
let vector_weight = req.vector_weight.unwrap_or(0.7);
let bm25_weight = req.bm25_weight.unwrap_or(0.3);
let query_hash = generate_query_hash(&req.query, req.uuid.as_deref(), limit);
let cache_key = keys::hybrid_search(&query_hash);
let ttl = state.mongo_cache.ttl_hybrid_search();
let response = state
.mongo_cache
.get_or_fetch(&cache_key, ttl, keys::CATEGORY_HYBRID_SEARCH, || async {
let query_vector = state
.embedder
.embed_query(&req.query)
.await
.map_err(|e| anyhow::anyhow!("Embedding failed: {}", e))?;
let pg = PostgresDb::init()
.await
.map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?;
let results = pg
.hybrid_search(
&req.query,
&query_vector,
req.uuid.as_deref(),
limit,
vector_weight,
bm25_weight,
)
.await?;
let search_results: Vec<HybridSearchResult> = results
.into_iter()
.map(|r| HybridSearchResult {
uuid: r.uuid,
chunk_id: r.chunk_id,
chunk_type: r.chunk_type,
start_time: r.start_time,
end_time: r.end_time,
text: r.text,
vector_score: r.vector_score,
bm25_score: r.bm25_score,
combined_score: r.combined_score,
})
.collect();
Ok::<HybridSearchResponse, anyhow::Error>(HybridSearchResponse {
results: search_results,
query: req.query.clone(),
})
})
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(response))
}
async fn lookup(
State(state): State<AppState>,
Query(query): Query<LookupQuery>,
) -> Result<Json<LookupResponse>, StatusCode> {
if let Some(path) = query.path {
let uuid = crate::uuid::compute_uuid_from_path(&path);
return Ok(Json(LookupResponse {
file_uuid: uuid,
file_path: None,
file_name: None,
duration: None,
}));
}
if let Some(uuid) = query.uuid {
let cache_key = keys::video_meta(&uuid);
let ttl = state.mongo_cache.ttl_video_meta();
let video = state
.mongo_cache
.get_or_fetch(&cache_key, ttl, keys::CATEGORY_VIDEO_META, || async {
let db = PostgresDb::init()
.await
.map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?;
db.get_video_by_uuid(&uuid)
.await
.map_err(|e| anyhow::anyhow!("{}", e))
})
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
if let Some(v) = video {
return Ok(Json(LookupResponse {
file_uuid: v.file_uuid,
file_path: Some(v.file_path),
file_name: Some(v.file_name),
duration: Some(v.duration),
}));
}
}
Err(StatusCode::NOT_FOUND)
}
#[derive(Debug, Serialize, Deserialize)]
struct ScannedFileInfo {
file_name: String,
relative_path: String,
file_path: String,
file_size: u64,
modified_time: String,
// Registration info
is_registered: bool,
file_uuid: Option<String>,
status: Option<String>,
registration_time: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
struct ScanFilesResponse {
files: Vec<ScannedFileInfo>,
total: usize,
registered_count: usize,
unregistered_count: usize,
}
fn scan_directory_recursive(
dir: &std::path::Path,
root: &std::path::Path,
allowed_extensions: &[&str],
registered_paths: &std::collections::HashMap<String, (String, String, Option<String>)>,
files: &mut Vec<ScannedFileInfo>,
) {
if let Ok(entries) = std::fs::read_dir(dir) {
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
if name.starts_with('.') {
return;
}
}
scan_directory_recursive(&path, root, allowed_extensions, registered_paths, files);
} else if path.is_file() {
if let Some(ext) = path.extension().and_then(|e| e.to_str()) {
if allowed_extensions.contains(&ext.to_lowercase().as_str()) {
if let Ok(meta) = entry.metadata() {
let abs_path = path.to_string_lossy().to_string();
let rel_path = path
.strip_prefix(root)
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|_| abs_path.clone());
let file_name = path
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_default();
let modified_time = meta
.modified()
.ok()
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.map(|d| {
chrono::DateTime::from_timestamp(d.as_secs() as i64, 0)
.map(|dt| dt.to_rfc3339())
.unwrap_or_default()
})
.unwrap_or_default();
// Check registration
if let Some((uuid, status, reg_time)) = registered_paths.get(&abs_path)
{
files.push(ScannedFileInfo {
file_name,
relative_path: rel_path,
file_path: abs_path,
file_size: meta.len(),
modified_time,
is_registered: true,
file_uuid: Some(uuid.clone()),
status: Some(status.clone()),
registration_time: reg_time.clone(),
});
} else {
files.push(ScannedFileInfo {
file_name,
relative_path: rel_path,
file_path: abs_path,
file_size: meta.len(),
modified_time,
is_registered: false,
file_uuid: None,
status: None,
registration_time: None,
});
}
}
}
}
}
}
}
}
async fn scan_files(State(state): State<AppState>) -> Result<Json<ScanFilesResponse>, StatusCode> {
let demo_dir_str = std::env::var("MOMENTRY_SFTP_ROOT")
.unwrap_or_else(|_| "/Users/accusys/momentry/var/sftpgo/data/demo".to_string());
let demo_dir = std::path::Path::new(&demo_dir_str);
let allowed_extensions = vec!["mp4", "mov", "mkv", "avi", "webm", "jpg", "jpeg", "png", "gif", "webp"];
// 1. Get registered files from DB (Map key: absolute file_path)
let table = schema::table_name("videos");
let registered_db: Vec<(String, String, String, String, Option<String>)> = sqlx::query_as(&format!(
"SELECT file_path, file_name, file_uuid, status, registration_time::text FROM {} ORDER BY id",
table
))
.fetch_all(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let registered_paths: std::collections::HashMap<String, (String, String, Option<String>)> =
registered_db
.into_iter()
.map(|(path, _name, uuid, status, reg_time)| (path, (uuid, status, reg_time)))
.collect();
// 2. Scan filesystem recursively
let mut result_files = Vec::new();
if demo_dir.exists() {
scan_directory_recursive(
demo_dir,
demo_dir,
&allowed_extensions,
&registered_paths,
&mut result_files,
);
}
// 3. Sort: registered first, then by name
result_files.sort_by(|a, b| {
b.is_registered
.cmp(&a.is_registered)
.then(a.relative_path.cmp(&b.relative_path))
});
let registered_count = result_files.iter().filter(|f| f.is_registered).count();
let unregistered_count = result_files.iter().filter(|f| !f.is_registered).count();
Ok(Json(ScanFilesResponse {
files: result_files,
total: registered_count + unregistered_count,
registered_count,
unregistered_count,
}))
}
#[derive(Debug, Serialize)]
struct ProgressResponse {
file_uuid: String,
user: Option<String>,
group: Option<String>,
file_name: Option<String>,
duration: Option<f64>,
overall_progress: u32,
cpu_percent: Option<f64>,
gpu_percent: Option<f64>,
memory_percent: Option<f64>,
memory_mb: Option<u64>,
system: Option<SystemHealthInfo>,
processors: Vec<ProcessorProgressInfo>,
}
#[derive(Debug, Serialize)]
struct SystemHealthInfo {
cpu_idle_pct: f64,
memory_available_mb: u64,
memory_total_mb: u64,
memory_used_pct: f64,
gpu_available: bool,
gpu_utilization_pct: Option<f64>,
gpu_memory_used_pct: Option<f64>,
dynamic_concurrency: u32,
config_concurrency: u32,
running_processors: u32,
}
#[derive(Debug, Serialize)]
struct ProcessorProgressInfo {
name: String,
status: String,
current: u32,
total: u32,
progress: u32,
message: String,
frames_processed: i32,
chunks_produced: i32,
retry_count: i32,
}
/// 從 .json 輸出檔讀取 processor 的已處理幀數
fn get_json_frame_count(file_uuid: &str, processor: &str) -> i32 {
let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string());
let path = std::path::Path::new(&output_dir).join(format!("{}.{}.json", file_uuid, processor));
match std::fs::read_to_string(&path) {
Ok(content) => match serde_json::from_str::<serde_json::Value>(&content) {
Ok(val) => val.get("frame_count").and_then(|v| v.as_i64()).unwrap_or(0) as i32,
Err(_) => 0,
},
Err(_) => 0,
}
}
fn get_system_stats() -> (Option<f64>, Option<f64>, Option<f64>, Option<u64>) {
use std::process::Command;
let pid = std::process::id().to_string();
let cpu = Command::new("ps")
.args(["-p", &pid, "-o", "%cpu="])
.output()
.ok()
.and_then(|o| String::from_utf8_lossy(&o.stdout).trim().parse().ok());
let (mem_percent, mem_rss) = Command::new("ps")
.args(["-p", &pid, "-o", "%mem=,rss="])
.output()
.ok()
.map(|o| {
let output = String::from_utf8_lossy(&o.stdout);
let parts: Vec<&str> = output.split_whitespace().collect();
let percent = parts.first().and_then(|s| s.parse().ok());
let rss = parts.get(1).and_then(|s| s.parse().ok());
(percent, rss)
})
.unwrap_or((None, None));
let gpu = Command::new("nvidia-smi")
.args([
"--query-gpu=utilization.gpu",
"--format=csv,noheader,nounits",
])
.output()
.ok()
.and_then(|o| String::from_utf8_lossy(&o.stdout).trim().parse().ok());
(cpu, gpu, mem_percent, mem_rss)
}
async fn get_progress(
axum::extract::Path(file_uuid): axum::extract::Path<String>,
) -> Result<Json<ProgressResponse>, StatusCode> {
let redis = RedisClient::new().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let pg = PostgresDb::init()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let mut conn = redis
.get_conn()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let processor_names = ["asr", "cut", "asrx", "yolo", "ocr", "face", "pose"];
let mut processors = Vec::new();
let mut completed_count = 0u32;
for name in processor_names {
let prefix = REDIS_KEY_PREFIX.as_str();
let key = format!("{}worker:job:{}:processor:{}", prefix, file_uuid, name);
let status: String = redis::cmd("HGET")
.arg(&key)
.arg("status")
.query_async(&mut conn)
.await
.unwrap_or_else(|_| "pending".to_string());
let current: u32 = redis::cmd("HGET")
.arg(&key)
.arg("current")
.query_async(&mut conn)
.await
.unwrap_or_else(|_| "0".to_string())
.parse()
.unwrap_or(0);
let total: u32 = redis::cmd("HGET")
.arg(&key)
.arg("total")
.query_async(&mut conn)
.await
.unwrap_or_else(|_| "0".to_string())
.parse()
.unwrap_or(0);
let message: String = redis::cmd("HGET")
.arg(&key)
.arg("message")
.query_async(&mut conn)
.await
.unwrap_or_else(|_| "".to_string());
let progress = if total > 0 {
((current as f64 / total as f64) * 100.0) as u32
} else if status == "complete" {
100
} else {
0
};
// 從 Redis 讀取 frames_processed / chunks_produced / retry_count
let frames_processed: i32 = redis::cmd("HGET")
.arg(&key)
.arg("frames_processed")
.query_async(&mut conn)
.await
.unwrap_or_else(|_| "0".to_string())
.parse()
.unwrap_or(0);
let chunks_produced: i32 = redis::cmd("HGET")
.arg(&key)
.arg("chunks_produced")
.query_async(&mut conn)
.await
.unwrap_or_else(|_| "0".to_string())
.parse()
.unwrap_or(0);
let retry_count: i32 = redis::cmd("HGET")
.arg(&key)
.arg("retry_count")
.query_async(&mut conn)
.await
.unwrap_or_else(|_| "0".to_string())
.parse()
.unwrap_or(0);
if status == "complete" {
completed_count += 1;
}
processors.push(ProcessorProgressInfo {
name: name.to_string(),
status,
current,
total,
progress,
message,
frames_processed,
chunks_produced,
retry_count,
});
}
let overall_progress = (completed_count as f64 / processor_names.len() as f64 * 100.0) as u32;
let job_key = format!("{}worker:job:{}", REDIS_KEY_PREFIX.as_str(), file_uuid);
let user: Option<String> = redis::cmd("HGET")
.arg(&job_key)
.arg("user")
.query_async(&mut conn)
.await
.ok()
.filter(|s: &String| !s.is_empty());
let group: Option<String> = redis::cmd("HGET")
.arg(&job_key)
.arg("group")
.query_async(&mut conn)
.await
.ok()
.filter(|s: &String| !s.is_empty());
let (file_name, duration) = pg
.get_video_by_uuid(&file_uuid)
.await
.ok()
.flatten()
.map(|v| (Some(v.file_name), Some(v.duration)))
.unwrap_or((None, None));
let (cpu_percent, gpu_percent, memory_percent, memory_mb) = get_system_stats();
// 從 Redis 讀取系統健康資訊(由 Worker 寫入)
let health_key = format!("{}health", REDIS_KEY_PREFIX.as_str());
let cpu_idle: Option<String> = redis::cmd("HGET")
.arg(&health_key)
.arg("cpu_idle_pct")
.query_async(&mut conn)
.await
.ok();
let mem_avail: Option<String> = redis::cmd("HGET")
.arg(&health_key)
.arg("memory_available_mb")
.query_async(&mut conn)
.await
.ok();
let mem_total: Option<String> = redis::cmd("HGET")
.arg(&health_key)
.arg("memory_total_mb")
.query_async(&mut conn)
.await
.ok();
let mem_used: Option<String> = redis::cmd("HGET")
.arg(&health_key)
.arg("memory_used_pct")
.query_async(&mut conn)
.await
.ok();
let gpu_avail: Option<String> = redis::cmd("HGET")
.arg(&health_key)
.arg("gpu_available")
.query_async(&mut conn)
.await
.ok();
let gpu_util: Option<String> = redis::cmd("HGET")
.arg(&health_key)
.arg("gpu_utilization_pct")
.query_async(&mut conn)
.await
.ok();
let gpu_mem: Option<String> = redis::cmd("HGET")
.arg(&health_key)
.arg("gpu_memory_used_pct")
.query_async(&mut conn)
.await
.ok();
let dyn_conc: Option<String> = redis::cmd("HGET")
.arg(&health_key)
.arg("dynamic_concurrency")
.query_async(&mut conn)
.await
.ok();
let cfg_conc: Option<String> = redis::cmd("HGET")
.arg(&health_key)
.arg("config_concurrency")
.query_async(&mut conn)
.await
.ok();
let run_proc: Option<String> = redis::cmd("HGET")
.arg(&health_key)
.arg("running_processors")
.query_async(&mut conn)
.await
.ok();
let system = cpu_idle.and_then(|idle| {
Some(SystemHealthInfo {
cpu_idle_pct: idle.parse().ok()?,
memory_available_mb: mem_avail?.parse().ok()?,
memory_total_mb: mem_total?.parse().ok()?,
memory_used_pct: mem_used?.parse().ok()?,
gpu_available: gpu_avail.map(|v| v == "true").unwrap_or(false),
gpu_utilization_pct: gpu_util.and_then(|v| v.parse().ok()),
gpu_memory_used_pct: gpu_mem.and_then(|v| v.parse().ok()),
dynamic_concurrency: dyn_conc?.parse().ok()?,
config_concurrency: cfg_conc?.parse().ok()?,
running_processors: run_proc?.parse().ok()?,
})
});
Ok(Json(ProgressResponse {
file_uuid,
user,
group,
file_name,
duration,
overall_progress,
cpu_percent,
gpu_percent,
memory_percent,
memory_mb,
system,
processors,
}))
}
async fn list_jobs(Query(params): Query<JobsQuery>) -> Result<Json<JobListResponse>, StatusCode> {
let page = params.page.unwrap_or(1);
let page_size = params.page_size.unwrap_or(20);
let status_filter = params
.status
.unwrap_or_else(|| "pending,running".to_string());
let offset = ((page - 1) as i64) * (page_size as i64);
let pg = PostgresDb::init()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let table = crate::core::db::schema::table_name("monitor_jobs");
// Build status IN clause
let statuses: Vec<String> = status_filter
.split(',')
.map(|s| format!("'{}'", s.trim()))
.collect();
let status_clause = statuses.join(",");
let query = format!(
"SELECT id, uuid, video_path, status, current_processor, progress_total, progress_current,
error_count, last_error, started_at::TEXT, updated_at::TEXT, created_at::TEXT,
processors, completed_processors, failed_processors, video_id
FROM {}
WHERE status IN ({})
ORDER BY created_at DESC
LIMIT {} OFFSET {}",
table, status_clause, page_size, offset
);
let count_query = format!(
"SELECT COUNT(*) FROM {} WHERE status IN ({})",
table, status_clause
);
let total_count: i64 = sqlx::query_scalar(&count_query)
.fetch_one(pg.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
use crate::core::db::MonitorJobStatus;
let rows = sqlx::query(&query)
.fetch_all(pg.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let job_infos: Vec<JobInfoResponse> = rows
.into_iter()
.map(|r| {
let status_str: String = r.try_get("status").unwrap_or_default();
let status =
MonitorJobStatus::from_db_str(&status_str).unwrap_or(MonitorJobStatus::Pending);
JobInfoResponse {
id: r.try_get("id").unwrap_or(0),
uuid: r.try_get("uuid").unwrap_or_default(),
status: status.as_str().to_string(),
current_processor: r.try_get("current_processor").ok(),
progress_current: r.try_get("progress_current").unwrap_or(0),
progress_total: r.try_get("progress_total").unwrap_or(0),
created_at: r.try_get::<String, _>("created_at").unwrap_or_default(),
started_at: r.try_get::<String, _>("started_at").ok(),
}
})
.collect();
Ok(Json(JobListResponse {
jobs: job_infos,
count: total_count,
page,
page_size,
}))
}
async fn get_job(
axum::extract::Path(uuid): axum::extract::Path<String>,
) -> Result<Json<JobDetailResponse>, StatusCode> {
tracing::info!("[get_job] START - uuid: {}", uuid);
let pg = PostgresDb::init().await.map_err(|e| {
tracing::error!("[get_job] ERROR - Failed to init PostgresDb: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
tracing::info!("[get_job] PostgresDb initialized");
let job = pg
.get_monitor_job_by_uuid(&uuid)
.await
.map_err(|e| {
tracing::error!("[get_job] ERROR - Failed to get monitor job: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or_else(|| {
tracing::warn!("[get_job] Job not found: {}", uuid);
StatusCode::NOT_FOUND
})?;
tracing::info!("[get_job] Found job: id={}, uuid={}", job.id, job.uuid);
let results = pg.get_processor_results_by_job(job.id).await.map_err(|e| {
tracing::error!("[get_job] ERROR - Failed to get processor results: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
tracing::info!("[get_job] Got {} processor results", results.len());
let processors: Vec<ProcessorInfoResponse> = results
.into_iter()
.map(|r| ProcessorInfoResponse {
processor_type: r.processor_type.as_str().to_string(),
status: r.status.as_str().to_string(),
started_at: r.started_at,
completed_at: r.completed_at,
duration_secs: r.duration_secs,
error_message: r.error_message,
})
.collect();
tracing::info!("[get_job] Mapped {} processors", processors.len());
let response = JobDetailResponse {
id: job.id,
uuid: job.uuid,
status: job.status.as_str().to_string(),
current_processor: job.current_processor,
progress_current: job.progress_current,
progress_total: job.progress_total,
processors,
created_at: job.created_at.to_string(),
started_at: job.started_at.map(|t| t.to_string()),
updated_at: job.updated_at.map(|t| t.to_string()),
};
tracing::info!("[get_job] SUCCESS - returning response");
Ok(Json(response))
}
async fn cache_toggle(
State(_state): State<AppState>,
Json(req): Json<CacheToggleRequest>,
) -> Result<Json<CacheToggleResponse>, StatusCode> {
tracing::info!("[cache_toggle] Setting cache enabled to: {}", req.enabled);
crate::core::config::set_cache_enabled(req.enabled);
let response = CacheToggleResponse {
success: true,
cache_enabled: req.enabled,
message: if req.enabled {
"Cache enabled".to_string()
} else {
"Cache disabled".to_string()
},
};
tracing::info!("[cache_toggle] SUCCESS");
Ok(Json(response))
}
#[derive(Debug, Serialize)]
struct UnregisterResponse {
success: bool,
file_uuid: String,
message: String,
}
#[derive(Debug, Deserialize)]
struct UnregisterRequest {
file_uuid: Option<String>,
file_path: Option<String>,
pattern: Option<String>,
}
async fn unregister(
State(state): State<AppState>,
Json(req): Json<UnregisterRequest>,
) -> Result<Json<UnregisterResponse>, StatusCode> {
let db = &state.db;
// Pattern mode: unregister all matching files in a directory
if let (Some(ref dir_path), Some(ref pat)) = (&req.file_path, &req.pattern) {
let dir = std::path::Path::new(dir_path);
if !dir.is_dir() {
return Ok(Json(UnregisterResponse {
success: false,
file_uuid: String::new(),
message: format!("Not a directory: {}", dir_path),
}));
}
let re = regex::Regex::new(pat).map_err(|_| StatusCode::BAD_REQUEST)?;
let mut count = 0u32;
let table = crate::core::db::schema::table_name("videos");
if let Ok(entries) = std::fs::read_dir(dir) {
for entry in entries.flatten() {
let fname = entry.file_name().to_string_lossy().to_string();
if !re.is_match(&fname) {
continue;
}
// Find file_uuid by file_name and delete
let rows: Vec<(String,)> = sqlx::query_as(&format!(
"SELECT file_uuid FROM {} WHERE file_name = $1",
table
))
.bind(&fname)
.fetch_all(db.pool())
.await
.unwrap_or_default();
for (uuid,) in rows {
let _ = db.delete_video(&uuid).await;
count += 1;
}
}
}
let _ = state.mongo_cache.invalidate_videos_list().await;
return Ok(Json(UnregisterResponse {
success: true,
file_uuid: format!("batch_{}", count),
message: format!("Unregistered {} files matching pattern", count),
}));
}
// Single UUID mode
let uuid = req.file_uuid.as_deref().unwrap_or("");
if uuid.is_empty() {
return Ok(Json(UnregisterResponse {
success: false,
file_uuid: String::new(),
message: "Either file_uuid or file_path+pattern is required".to_string(),
}));
}
tracing::info!("[unregister] Unregistering file: {}", uuid);
match db.delete_video(uuid).await {
Ok(_) => {
let _ = state.mongo_cache.invalidate_videos_list().await;
Ok(Json(UnregisterResponse {
success: true,
file_uuid: uuid.to_string(),
message: "File unregistered successfully".to_string(),
}))
}
Err(e) => {
tracing::error!("[unregister] Failed: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
pub async fn start_server(host: &str, port: u16) -> anyhow::Result<()> {
let _ = SERVER_START.set(Instant::now());
let embedder = std::sync::Arc::new(Embedder::new("nomic-embed-text-v2-moe:latest".to_string()));
let mongo_cache = MongoCache::init().await?;
let redis_cache = RedisCache::new()?;
let db = PostgresDb::init().await?;
let db = std::sync::Arc::new(db);
let api_state = super::middleware::ApiState { db: db.clone() };
let state = AppState {
db,
embedder,
embedder_model: "nomic-embed-text-v2-moe:latest".to_string(),
mongo_cache,
redis_cache,
api_state,
};
let protected_routes = Router::new()
.route("/api/v1/files/register", post(register_file))
.route("/api/v1/files/lookup", get(lookup_file_by_name))
.route("/api/v1/unregister", post(unregister))
.route("/api/v1/files/scan", get(scan_files))
.route("/api/v1/file/:file_uuid/probe", get(probe_by_uuid))
.route("/api/v1/file/:file_uuid/process", post(trigger_processing))
.route("/api/v1/file/:file_uuid/chunk/:chunk_id", get(get_chunk_by_path))
.route("/api/v1/progress/:file_uuid", get(get_progress))
.route("/api/v1/jobs", get(list_jobs))
.route("/api/v1/config/cache", post(cache_toggle))
// .merge(person_identity::person_identity_routes()) // V4.0: DISABLED (person_identities table removed)
.merge(identity_binding::identity_binding_routes())
.merge(identities::identity_routes())
.layer(axum::middleware::from_fn_with_state(
state.api_state.clone(),
api_key_validation,
))
.with_state(state.clone());
let cors = CorsLayer::new()
.allow_origin(tower_http::cors::AllowOrigin::any())
.allow_methods(Any)
.allow_headers(Any);
let app = Router::new()
.route("/health", get(health))
.route("/health/detailed", get(health_detailed))
.route("/api/v1/auth/login", post(login))
.route("/api/v1/auth/logout", post(logout))
.route("/api/v1/stats/ingest", get(get_ingest_stats))
.route("/api/v1/stats/sftpgo", get(get_sftpgo_status))
.route("/api/v1/stats/inference", get(get_inference_health))
.route("/api/v1/search/visual", post(search_visual_chunks))
.route(
"/api/v1/search/visual/class",
post(search_visual_chunks_by_class),
)
.route(
"/api/v1/search/visual/density",
post(search_visual_chunks_by_density),
)
.route("/api/v1/search/visual/stats", post(get_visual_chunk_stats))
.route(
"/api/v1/search/visual/combination",
post(search_visual_chunks_by_combination),
)
.merge(identity_api::identity_routes()) // Phase 3 Routes
.merge(agent_api::agent_routes()) // Phase 6 Routes
.merge(super::identity_agent_api::identity_agent_routes()) // Phase 5 Routes
.merge(five_w1h_agent_api::five_w1h_agent_routes()) // Phase 3 Routes (5W1H Agent)
.merge(super::media_api::bbox_routes()) // Media: video/bbox/thumbnail
.merge(super::trace_agent_api::trace_agent_routes()) // Trace listing
.merge(search_routes()) // Smart search drill-down
.merge(universal_search_routes()) // Universal / frames / persons search
.merge(protected_routes)
.layer(cors)
.with_state(state);
let addr: std::net::SocketAddr = format!("{}:{}", host, port).parse().unwrap();
tracing::info!("Starting API server at http://{}", addr);
let listener = tokio::net::TcpListener::bind(addr).await?;
axum::serve(listener, app).await?;
Ok(())
}
#[derive(Debug, Serialize)]
struct IngestStatsResponse {
total_videos: i64,
total_chunks: i64,
sentence_chunks: i64,
cut_chunks: i64,
time_chunks: i64,
searchable_chunks: i64,
chunks_with_visual: i64,
chunks_with_summary: i64,
pending_videos: i64,
}
async fn get_ingest_stats(
State(state): State<AppState>,
) -> Result<Json<IngestStatsResponse>, StatusCode> {
let table_videos = schema::table_name("videos");
let table_chunks = schema::table_name("chunk");
let total_videos: (i64,) = sqlx::query_as(&format!("SELECT COUNT(*) FROM {}", table_videos))
.fetch_one(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let total_chunks: (i64,) = sqlx::query_as(&format!("SELECT COUNT(*) FROM {}", table_chunks))
.fetch_one(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let sentence_chunks: (i64,) = sqlx::query_as(&format!(
"SELECT COUNT(*) FROM {} WHERE chunk_type = 'sentence'",
table_chunks
))
.fetch_one(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let cut_chunks: (i64,) = sqlx::query_as(&format!(
"SELECT COUNT(*) FROM {} WHERE chunk_type = 'cut'",
table_chunks
))
.fetch_one(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let time_chunks: (i64,) = sqlx::query_as(&format!(
"SELECT COUNT(*) FROM {} WHERE chunk_type = 'time'",
table_chunks
))
.fetch_one(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let searchable_chunks: (i64,) = sqlx::query_as(&format!(
"SELECT COUNT(*) FROM {} WHERE vector_id IS NOT NULL",
table_chunks
))
.fetch_one(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let chunks_with_visual: (i64,) = sqlx::query_as(&format!(
"SELECT COUNT(*) FROM {} WHERE visual_stats IS NOT NULL AND visual_stats != '{}'::jsonb",
table_chunks, "{}"
))
.fetch_one(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let chunks_with_summary: (i64,) = sqlx::query_as(&format!(
"SELECT COUNT(*) FROM {} WHERE summary_text IS NOT NULL",
table_chunks
))
.fetch_one(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let pending_videos: (i64,) = sqlx::query_as(&format!(
"SELECT COUNT(*) FROM {} WHERE status = 'pending'",
table_videos
))
.fetch_one(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(IngestStatsResponse {
total_videos: total_videos.0,
total_chunks: total_chunks.0,
sentence_chunks: sentence_chunks.0,
cut_chunks: cut_chunks.0,
time_chunks: time_chunks.0,
searchable_chunks: searchable_chunks.0,
chunks_with_visual: chunks_with_visual.0,
chunks_with_summary: chunks_with_summary.0,
pending_videos: pending_videos.0,
}))
}
#[derive(Debug, Serialize)]
struct SftpgoStatusResponse {
username: String,
home_dir: String,
files_count: i64,
registered_videos: Vec<RegisteredVideo>,
last_login: Option<String>,
}
#[derive(Debug, Serialize)]
struct RegisteredVideo {
uuid: String,
file_name: String,
status: String,
}
async fn get_sftpgo_status(
State(state): State<AppState>,
) -> Result<Json<SftpgoStatusResponse>, StatusCode> {
let demo_dir = "/Users/accusys/momentry/var/sftpgo/data/demo";
let files_count: i64 = std::fs::read_dir(demo_dir)
.map(|entries| entries.count() as i64)
.unwrap_or(0);
let table_videos = schema::table_name("videos");
let registered_videos: Vec<(String, String, String)> = sqlx::query_as(&format!(
"SELECT file_uuid, file_name, status FROM {} WHERE file_path LIKE '%demo%' ORDER BY id",
table_videos
))
.fetch_all(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let registered_videos = registered_videos
.into_iter()
.map(|(uuid, file_name, status)| RegisteredVideo {
uuid,
file_name,
status,
})
.collect();
Ok(Json(SftpgoStatusResponse {
username: "demo".to_string(),
home_dir: demo_dir.to_string(),
files_count,
registered_videos,
last_login: None,
}))
}
#[derive(Debug, Serialize)]
struct InferenceEngineStatus {
engine: String,
model: String,
status: String,
latency_ms: Option<u64>,
error: Option<String>,
}
#[derive(Debug, Serialize)]
struct InferenceHealthResponse {
ollama: InferenceEngineStatus,
llama_server: InferenceEngineStatus,
}
async fn get_inference_health() -> Result<Json<InferenceHealthResponse>, StatusCode> {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(5))
.build()
.unwrap();
let ollama_start = std::time::Instant::now();
let ollama_status = match client.get("http://localhost:11434/api/tags").send().await {
Ok(resp) if resp.status().is_success() => {
let latency = ollama_start.elapsed().as_millis() as u64;
InferenceEngineStatus {
engine: "Ollama".to_string(),
model: "nomic-embed-text".to_string(),
status: "ok".to_string(),
latency_ms: Some(latency),
error: None,
}
}
Ok(resp) => InferenceEngineStatus {
engine: "Ollama".to_string(),
model: "nomic-embed-text".to_string(),
status: "error".to_string(),
latency_ms: Some(ollama_start.elapsed().as_millis() as u64),
error: Some(format!("HTTP {}", resp.status())),
},
Err(e) => InferenceEngineStatus {
engine: "Ollama".to_string(),
model: "nomic-embed-text".to_string(),
status: "error".to_string(),
latency_ms: None,
error: Some(e.to_string()),
},
};
let llama_start = std::time::Instant::now();
let llama_status = match client.get("http://localhost:8081/v1/models").send().await {
Ok(resp) if resp.status().is_success() => {
let latency = llama_start.elapsed().as_millis() as u64;
InferenceEngineStatus {
engine: "llama-server".to_string(),
model: "gemma4_e4b_q5".to_string(),
status: "ok".to_string(),
latency_ms: Some(latency),
error: None,
}
}
Ok(resp) => InferenceEngineStatus {
engine: "llama-server".to_string(),
model: "gemma4_e4b_q5".to_string(),
status: "error".to_string(),
latency_ms: Some(llama_start.elapsed().as_millis() as u64),
error: Some(format!("HTTP {}", resp.status())),
},
Err(e) => InferenceEngineStatus {
engine: "llama-server".to_string(),
model: "gemma4_e4b_q5".to_string(),
status: "error".to_string(),
latency_ms: None,
error: Some(e.to_string()),
},
};
Ok(Json(InferenceHealthResponse {
ollama: ollama_status,
llama_server: llama_status,
}))
}
#[derive(Debug, Deserialize)]
struct VideoDetailsQuery {
chunk_id: Option<String>,
parent_id: Option<String>,
}
#[derive(Debug, Serialize)]
struct VideoDetailsResponse {
uuid: String,
#[serde(flatten)]
details: VideoDetailsResult,
}
#[derive(Debug, Serialize)]
#[serde(untagged)]
enum VideoDetailsResult {
Chunk(ChunkDetailResponse),
Parent(ParentChunkResponse),
}
#[derive(Debug, Serialize)]
struct FrameRange {
start_frame: i64,
end_frame: i64,
duration_frames: i64,
fps: f64,
}
#[derive(Debug, Serialize)]
struct ReferenceTime {
start: f64,
end: f64,
}
#[derive(Debug, Serialize)]
struct ChunkDetailResponse {
chunk_id: String,
chunk_type: String,
frame_range: FrameRange,
reference_time: ReferenceTime,
text_content: Option<String>,
content: Option<serde_json::Value>,
parent_id: Option<String>,
summary_text: Option<String>,
metadata: Option<serde_json::Value>,
visual_stats: Option<serde_json::Value>,
speaker_ids: Option<Vec<String>>,
person_ids: Option<Vec<String>>,
}
#[derive(Debug, Serialize)]
struct ParentChunkResponse {
parent_id: i32,
metadata: Option<serde_json::Value>,
summary_text: Option<String>,
frame_range: FrameRange,
reference_time: ReferenceTime,
}
/// Search visual chunks based on criteria
#[derive(Debug, Deserialize)]
struct VisualChunkSearchRequest {
uuid: String,
criteria: visual_chunk_search::VisualChunkSearchCriteria,
}
#[derive(Debug, Serialize)]
struct VisualChunkSearchResponse {
chunks: Vec<Chunk>,
total: usize,
}
async fn search_visual_chunks(
State(state): State<AppState>,
Json(req): Json<VisualChunkSearchRequest>,
) -> Result<Json<VisualChunkSearchResponse>, StatusCode> {
let criteria_hash = generate_visual_search_hash(&req.uuid, &req.criteria);
let cache_key = keys::visual_search(&req.uuid, &criteria_hash);
let ttl = state.mongo_cache.ttl_visual_search();
let chunks = state
.mongo_cache
.get_or_fetch(&cache_key, ttl, keys::CATEGORY_VISUAL_SEARCH, || async {
let db = PostgresDb::init()
.await
.map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?;
visual_chunk_search::search_visual_chunks(&db, &req.uuid, &req.criteria)
.await
.map_err(|e| anyhow::anyhow!("Visual search failed: {}", e))
})
.await
.map_err(|e| {
tracing::error!("Visual chunk search failed: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(VisualChunkSearchResponse {
total: chunks.len(),
chunks,
}))
}
/// Request for searching visual chunks by object class
#[derive(Debug, Deserialize)]
struct VisualChunkSearchByClassRequest {
uuid: String,
object_class: String,
min_count: Option<u32>,
max_count: Option<u32>,
}
/// Request for searching visual chunks by density
#[derive(Debug, Deserialize)]
struct VisualChunkSearchByDensityRequest {
uuid: String,
min_density: f32,
max_density: Option<f32>,
}
/// Request for getting visual chunk statistics
#[derive(Debug, Deserialize)]
struct VisualChunkStatsRequest {
uuid: String,
}
async fn search_visual_chunks_by_class(
State(state): State<AppState>,
Json(req): Json<VisualChunkSearchByClassRequest>,
) -> Result<Json<VisualChunkSearchResponse>, StatusCode> {
let db = PostgresDb::init()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let chunks = visual_chunk_search::search_visual_chunks_by_class(
&db,
&req.uuid,
&req.object_class,
req.min_count,
req.max_count,
)
.await
.map_err(|e| {
tracing::error!("Visual chunk search by class failed: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(VisualChunkSearchResponse {
total: chunks.len(),
chunks,
}))
}
async fn search_visual_chunks_by_density(
State(state): State<AppState>,
Json(req): Json<VisualChunkSearchByDensityRequest>,
) -> Result<Json<VisualChunkSearchResponse>, StatusCode> {
let db = PostgresDb::init()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let chunks = visual_chunk_search::search_visual_chunks_by_density(
&db,
&req.uuid,
req.min_density,
req.max_density,
)
.await
.map_err(|e| {
tracing::error!("Visual chunk search by density failed: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(VisualChunkSearchResponse {
total: chunks.len(),
chunks,
}))
}
#[derive(Debug, Serialize)]
struct VisualChunkStatsResponse {
uuid: String,
stats: std::collections::HashMap<String, serde_json::Value>,
}
async fn get_visual_chunk_stats(
State(state): State<AppState>,
Json(req): Json<VisualChunkStatsRequest>,
) -> Result<Json<VisualChunkStatsResponse>, StatusCode> {
let db = PostgresDb::init()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let stats = visual_chunk_search::get_visual_chunk_statistics(&db, &req.uuid)
.await
.map_err(|e| {
tracing::error!("Get visual chunk stats failed: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(VisualChunkStatsResponse {
uuid: req.uuid,
stats,
}))
}
/// Request for searching visual chunks by object combination
#[derive(Debug, Deserialize)]
struct VisualChunkSearchByCombinationRequest {
uuid: String,
combination: Vec<(String, u32)>, // (object_class, min_count)
}
async fn search_visual_chunks_by_combination(
State(state): State<AppState>,
Json(req): Json<VisualChunkSearchByCombinationRequest>,
) -> Result<Json<VisualChunkSearchResponse>, StatusCode> {
let db = PostgresDb::init()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let combination: Vec<(&str, u32)> = req
.combination
.iter()
.map(|(c, n)| (c.as_str(), *n))
.collect();
let chunks =
visual_chunk_search::search_visual_chunks_by_combination(&db, &req.uuid, &combination)
.await
.map_err(|e| {
tracing::error!("Visual chunk search by combination failed: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(VisualChunkSearchResponse {
total: chunks.len(),
chunks,
}))
}
async fn video_details(
Path(uuid): Path<String>,
Query(query): Query<VideoDetailsQuery>,
State(state): State<AppState>,
) -> Result<Json<VideoDetailsResponse>, StatusCode> {
let table = schema::table_name("chunk");
if let Some(chunk_id) = query.chunk_id {
let row: Option<(
i32, String, String, String, f64, i64, i64,
Option<String>, serde_json::Value, Option<serde_json::Value>,
Option<String>, i32, Option<String>, Option<serde_json::Value>, Option<String>,
)> = sqlx::query_as(&format!(
"SELECT file_id, uuid, chunk_id, chunk_type::text, fps, start_frame, end_frame,
text_content, content, metadata, vector_id, frame_count,
parent_chunk_id, visual_stats, summary_text
FROM {} WHERE chunk_id = $1 AND uuid = $2",
table
))
.bind(&chunk_id)
.bind(&uuid)
.fetch_optional(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let speaker_row: Option<(String, String)> = sqlx::query_as(&format!(
"SELECT COALESCE(speaker_ids, '{{}}'::text[]), COALESCE(face_ids, '{{}}'::integer[])::text[] FROM {} WHERE chunk_id = $1 AND uuid = $2",
table
))
.bind(&chunk_id)
.bind(&uuid)
.fetch_optional(state.db.pool())
.await
.ok()
.flatten();
let row = row.ok_or(StatusCode::NOT_FOUND)?;
let fps = if row.4 > 0.0 { row.4 } else { 24.0 };
let start_frame = row.5;
let end_frame = row.6;
let duration_frames = end_frame - start_frame;
let start_time = start_frame as f64 / fps;
let end_time = end_frame as f64 / fps;
let row_metadata = row.9.clone();
let mut summary_text = row.14.clone();
let mut metadata = None;
if let Some(ref pid_str) = row.12 {
if !pid_str.is_empty() {
if let Ok(pid) = pid_str.parse::<i32>() {
let parent_table = schema::table_name("parent_chunks");
let parent: Option<(Option<String>, Option<serde_json::Value>)> =
sqlx::query_as(&format!(
"SELECT summary_text, metadata FROM {} WHERE id = $1",
parent_table
))
.bind(pid)
.fetch_optional(state.db.pool())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
if let Some((s, m)) = parent {
if summary_text.is_none() {
summary_text = s;
}
let mut merged: serde_json::Value = serde_json::json!({});
if let Some(ref cm) = row_metadata {
if let Some(obj) = cm.as_object() {
for (k, v) in obj {
merged[k] = v.clone();
}
}
}
if let Some(pm) = &m {
if let Some(obj) = pm.as_object() {
for (k, v) in obj {
merged[k] = v.clone();
}
}
}
metadata = Some(merged);
}
}
}
} else if let Some(ref cm) = row_metadata {
metadata = Some(cm.clone());
}
let parse_pg_array = |s: &str| -> Vec<String> {
if s.is_empty() || s == "{}" {
return vec![];
}
s.trim_start_matches('{')
.trim_end_matches('}')
.split(',')
.map(|s| s.trim_matches('"').to_string())
.collect()
};
let (speaker_str, face_str) = speaker_row.unwrap_or(("{}".to_string(), "{}".to_string()));
let speaker_vec: Vec<String> = parse_pg_array(&speaker_str);
let speaker_ids: Option<Vec<String>> = if speaker_vec.is_empty() {
None
} else {
Some(speaker_vec)
};
let face_vec: Vec<String> = parse_pg_array(&face_str);
let person_ids: Option<Vec<String>> = if face_vec.is_empty() {
None
} else {
Some(face_vec.iter().map(|id| format!("face_{}", id)).collect())
};
return Ok(Json(VideoDetailsResponse {
uuid: row.1.clone(),
details: VideoDetailsResult::Chunk(ChunkDetailResponse {
chunk_id: row.2.clone(),
chunk_type: row.3.clone(),
frame_range: FrameRange {
start_frame,
end_frame,
duration_frames,
fps,
},
reference_time: ReferenceTime {
start: start_time,
end: end_time,
},
text_content: row.7.clone(),
content: Some(row.8.clone()),
parent_id: row.12.clone(),
summary_text,
metadata,
visual_stats: row.13.clone(),
speaker_ids,
person_ids,
}),
}));
}
Err(StatusCode::BAD_REQUEST)
}
#[derive(Debug, Serialize)]
struct DeleteVideoResponse {
success: bool,
message: String,
deleted_face_detections: u64,
deleted_processor_results: u64,
}
#[derive(Debug, Serialize)]
struct UnregisterFileResponse {
success: bool,
message: String,
file_uuid: String,
deleted_face_detections: u64,
deleted_processor_results: u64,
deleted_chunks: u64,
}
/// Detects file type using a tiered strategy:
/// 1. ffprobe (if available)
/// 2. `file` command (MIME type)
/// 3. File extension
fn detect_file_type(
path: &std::path::Path,
probe_result: Option<&crate::core::probe::ffprobe::ProbeResult>,
) -> String {
// 1. ffprobe
if let Some(probe) = probe_result {
let has_video = probe
.streams
.iter()
.any(|s| s.codec_type.as_deref() == Some("video"));
let has_audio = probe
.streams
.iter()
.any(|s| s.codec_type.as_deref() == Some("audio"));
if has_video {
return "video".to_string();
}
if has_audio {
return "audio".to_string();
}
if !probe.streams.is_empty() {
return "image".to_string();
}
}
// 2. file command
if let Ok(output) = std::process::Command::new("file")
.args(["--mime-type", "-b"])
.arg(path)
.output()
{
let mime = String::from_utf8_lossy(&output.stdout)
.trim()
.to_lowercase();
if mime.starts_with("video/") {
return "video".to_string();
}
if mime.starts_with("audio/") {
return "audio".to_string();
}
if mime.starts_with("image/") {
return "image".to_string();
}
}
// 3. Extension
match path
.extension()
.and_then(|e| e.to_str())
.map(|s| s.to_lowercase())
.as_deref()
{
Some("mp4" | "mov" | "mkv" | "avi" | "webm" | "flv" | "wmv") => "video".to_string(),
Some("mp3" | "wav" | "flac" | "aac" | "ogg" | "m4a") => "audio".to_string(),
Some("jpg" | "jpeg" | "png" | "gif" | "bmp" | "webp" | "tiff" | "svg") => {
"image".to_string()
}
_ => "unknown".to_string(),
}
}
async fn delete_video(
Path(uuid): Path<String>,
State(state): State<AppState>,
) -> Result<Json<UnregisterFileResponse>, StatusCode> {
tracing::info!("[UNREGISTER] Unregistering file: {}", uuid);
let videos_table = schema::table_name("videos");
let face_table = schema::table_name("face_detections");
let processor_table = schema::table_name("processor_results");
let chunks_table = schema::table_name("chunk");
let parent_chunks_table = schema::table_name("parent_chunks");
// Check if video exists first
let exists: Option<String> = sqlx::query_scalar(&format!(
"SELECT file_uuid FROM {} WHERE file_uuid = $1",
videos_table
))
.bind(&uuid)
.fetch_optional(state.db.pool())
.await
.map_err(|e| {
tracing::error!("[UNREGISTER] DB check failed: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
// Delete associated data regardless of video existence (cleanup orphans)
let deleted_faces: i64 =
sqlx::query(&format!("DELETE FROM {} WHERE file_uuid = $1", face_table))
.bind(&uuid)
.execute(state.db.pool())
.await
.map(|r| r.rows_affected() as i64)
.unwrap_or(0);
tracing::info!("[UNREGISTER] Deleted {} face_detections", deleted_faces);
let deleted_processors: i64 = sqlx::query(&format!(
"DELETE FROM {} WHERE file_uuid = $1",
processor_table
))
.bind(&uuid)
.execute(state.db.pool())
.await
.map(|r| r.rows_affected() as i64)
.unwrap_or(0);
tracing::info!(
"[UNREGISTER] Deleted {} processor_results",
deleted_processors
);
let deleted_parent_chunks: i64 = sqlx::query(&format!(
"DELETE FROM {} WHERE uuid = $1",
parent_chunks_table
))
.bind(&uuid)
.execute(state.db.pool())
.await
.map(|r| r.rows_affected() as i64)
.unwrap_or(0);
let deleted_chunks: i64 = sqlx::query(&format!("DELETE FROM {} WHERE uuid = $1", chunks_table))
.bind(&uuid)
.execute(state.db.pool())
.await
.map(|r| r.rows_affected() as i64)
.unwrap_or(0);
tracing::info!("[UNREGISTER] Deleted {} chunks", deleted_chunks);
if exists.is_none() {
tracing::warn!(
"[UNREGISTER] Video not found: {}. (Maybe already deleted or UUID mismatch)",
uuid
);
return Err(StatusCode::NOT_FOUND);
}
Ok(Json(UnregisterFileResponse {
success: true,
message: format!(
"File {} unregistered successfully. Record deleted from database.",
uuid
),
file_uuid: uuid,
deleted_face_detections: deleted_faces as u64,
deleted_processor_results: deleted_processors as u64,
deleted_chunks: (deleted_chunks + deleted_parent_chunks) as u64,
}))
}