3699 lines
119 KiB
Rust
3699 lines
119 KiB
Rust
use axum::{
|
||
extract::{Path, Query, State},
|
||
http::StatusCode,
|
||
response::Json,
|
||
routing::{delete, get, post},
|
||
Router,
|
||
};
|
||
use once_cell::sync::{Lazy, OnceCell};
|
||
use serde::{Deserialize, Serialize};
|
||
use std::collections::HashMap;
|
||
use sha2::{Digest, Sha256};
|
||
use sqlx::Row;
|
||
use std::time::Instant;
|
||
use tower_http::cors::{Any, CorsLayer};
|
||
|
||
use crate::core::cache::{keys, MongoCache, RedisCache};
|
||
use crate::core::config::REDIS_KEY_PREFIX;
|
||
use crate::core::db::schema;
|
||
use crate::core::db::{Database, PostgresDb, QdrantDb, RedisClient, VideoRecord, VideoStatus};
|
||
use crate::worker::resources::SystemResources;
|
||
use crate::core::text::tokenizer::tokenize_chinese_text;
|
||
use crate::{Embedder, FileManager};
|
||
|
||
use super::agent_api;
|
||
use super::five_w1h_agent_api;
|
||
use super::identities;
|
||
use super::identity_api;
|
||
use super::identity_binding;
|
||
use super::middleware::api_key_validation;
|
||
use super::search::search_routes;
|
||
use super::trace_agent_api;
|
||
use super::universal_search::universal_search_routes;
|
||
use super::visual_chunk_search;
|
||
use crate::core::chunk::types::Chunk;
|
||
|
||
static DEMO_USER_API_KEY: Lazy<String> = Lazy::new(|| {
|
||
std::env::var("MOMENTRY_DEMO_API_KEY")
|
||
.unwrap_or_else(|_| "muser_68600856036340bcafc01930eb4bd839_1774418104_97221b69".to_string())
|
||
});
|
||
|
||
fn hash_password(password: &str) -> String {
|
||
let mut hasher = Sha256::new();
|
||
hasher.update(password.as_bytes());
|
||
format!("{:x}", hasher.finalize())
|
||
}
|
||
|
||
#[derive(Debug, Deserialize)]
|
||
struct LoginRequest {
|
||
username: String,
|
||
password: String,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct LoginResponse {
|
||
success: bool,
|
||
message: Option<String>,
|
||
api_key: Option<String>,
|
||
user: Option<UserInfo>,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct UserInfo {
|
||
username: String,
|
||
}
|
||
|
||
// Global State
|
||
static SERVER_START: OnceCell<Instant> = OnceCell::new();
|
||
|
||
fn get_uptime_ms() -> u64 {
|
||
SERVER_START
|
||
.get()
|
||
.map(|i| i.elapsed().as_millis() as u64)
|
||
.unwrap_or(0)
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct HealthResponse {
|
||
status: String,
|
||
version: String,
|
||
build_git_hash: String,
|
||
build_timestamp: String,
|
||
uptime_ms: u64,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct JobListResponse {
|
||
jobs: Vec<JobInfoResponse>,
|
||
count: i64,
|
||
page: usize,
|
||
page_size: usize,
|
||
}
|
||
|
||
#[derive(Debug, Deserialize)]
|
||
struct JobsQuery {
|
||
page: Option<usize>,
|
||
page_size: Option<usize>,
|
||
status: Option<String>,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct JobInfoResponse {
|
||
id: i32,
|
||
uuid: String,
|
||
status: String,
|
||
current_processor: Option<String>,
|
||
progress_current: i32,
|
||
progress_total: i32,
|
||
created_at: String,
|
||
started_at: Option<String>,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct JobDetailResponse {
|
||
id: i32,
|
||
uuid: String,
|
||
status: String,
|
||
current_processor: Option<String>,
|
||
progress_current: i32,
|
||
progress_total: i32,
|
||
processors: Vec<ProcessorInfoResponse>,
|
||
created_at: String,
|
||
started_at: Option<String>,
|
||
updated_at: Option<String>,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct ProcessorInfoResponse {
|
||
processor_type: String,
|
||
status: String,
|
||
started_at: Option<String>,
|
||
completed_at: Option<String>,
|
||
duration_secs: Option<f64>,
|
||
error_message: Option<String>,
|
||
}
|
||
|
||
#[derive(Debug, Deserialize)]
|
||
#[serde(rename_all = "snake_case")]
|
||
enum SearchMode {
|
||
Vector,
|
||
Smart,
|
||
}
|
||
|
||
#[derive(Debug, Deserialize)]
|
||
struct SearchRequest {
|
||
query: String,
|
||
mode: Option<SearchMode>,
|
||
collection: Option<String>,
|
||
uuid: Option<String>,
|
||
limit: Option<usize>,
|
||
vector_weight: Option<f32>,
|
||
bm25_weight: Option<f32>,
|
||
}
|
||
|
||
#[derive(Debug, Deserialize)]
|
||
struct CacheToggleRequest {
|
||
enabled: bool,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct CacheToggleResponse {
|
||
success: bool,
|
||
cache_enabled: bool,
|
||
message: String,
|
||
}
|
||
|
||
// Missing structs added
|
||
|
||
#[derive(Debug, Deserialize)]
|
||
#[derive(Serialize)]
|
||
struct FileLookupMatch {
|
||
file_uuid: String,
|
||
file_name: String,
|
||
file_type: Option<String>,
|
||
status: String,
|
||
content_hash: Option<String>,
|
||
file_size: Option<i64>,
|
||
duration: Option<f64>,
|
||
width: Option<i32>,
|
||
height: Option<i32>,
|
||
}
|
||
|
||
#[derive(Serialize)]
|
||
struct FileLookupResponse {
|
||
file_name: String,
|
||
exists: bool,
|
||
matches: Vec<FileLookupMatch>,
|
||
next_name: String,
|
||
}
|
||
|
||
async fn lookup_file_by_name(
|
||
State(state): State<AppState>,
|
||
Query(params): Query<HashMap<String, String>>,
|
||
) -> Result<Json<FileLookupResponse>, StatusCode> {
|
||
let base = params.get("file_name").map(|s| s.trim().to_string()).unwrap_or_default();
|
||
if base.is_empty() {
|
||
return Ok(Json(FileLookupResponse {
|
||
file_name: String::new(),
|
||
exists: false,
|
||
matches: vec![],
|
||
next_name: String::new(),
|
||
}));
|
||
}
|
||
let table = schema::table_name("videos");
|
||
let dot_pos = base.rfind('.');
|
||
let (stem, ext) = match dot_pos {
|
||
Some(p) => (base[..p].to_string(), base[p..].to_string()),
|
||
None => (base.clone(), String::new()),
|
||
};
|
||
let pattern = format!("{}%%", &stem);
|
||
|
||
let query_sql = format!("SELECT file_uuid, file_name, file_type, status, content_hash, duration, width, height FROM {} WHERE file_name = $1 OR file_name LIKE $2 ORDER BY file_name", table);
|
||
let rows = sqlx::query(&query_sql)
|
||
.bind(&base)
|
||
.bind(&pattern)
|
||
.fetch_all(state.db.pool())
|
||
.await
|
||
.map_err(|e| { tracing::error!("lookup query error: {}", e); StatusCode::INTERNAL_SERVER_ERROR })?;
|
||
|
||
let exists = rows.iter().any(|r| r.get::<String, _>("file_name") == base);
|
||
let matches: Vec<FileLookupMatch> = rows.iter().map(|r| {
|
||
FileLookupMatch {
|
||
file_uuid: r.get("file_uuid"),
|
||
file_name: r.get("file_name"),
|
||
file_type: r.get("file_type"),
|
||
status: r.get("status"),
|
||
content_hash: r.get("content_hash"),
|
||
file_size: None,
|
||
duration: r.get("duration"),
|
||
width: r.get("width"),
|
||
height: r.get("height"),
|
||
}
|
||
}).collect();
|
||
|
||
let max_n: usize = rows.iter().filter_map(|r| {
|
||
let n: String = r.get("file_name");
|
||
if n == base { return Some(0usize); }
|
||
let rest = n.strip_prefix(&stem).and_then(|r| r.strip_suffix(&ext))?;
|
||
let inner = rest.trim().strip_prefix('(').and_then(|r| r.strip_suffix(')'))?;
|
||
inner.parse::<usize>().ok()
|
||
}).max().unwrap_or(0);
|
||
let next_name = if max_n == 0 && !exists {
|
||
base.clone()
|
||
} else {
|
||
format!("{} ({}){}", stem, max_n + 1, ext)
|
||
};
|
||
|
||
Ok(Json(FileLookupResponse {
|
||
file_name: base,
|
||
exists,
|
||
matches,
|
||
next_name,
|
||
}))
|
||
}
|
||
|
||
#[derive(Debug, Deserialize)]
|
||
struct RegisterFileRequest {
|
||
file_path: String,
|
||
pattern: Option<String>,
|
||
user_id: Option<i64>,
|
||
content_hash: Option<String>,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct RegisterFileResponse {
|
||
success: bool,
|
||
file_uuid: String,
|
||
file_name: String,
|
||
file_path: String,
|
||
file_type: Option<String>,
|
||
duration: f64,
|
||
width: u32,
|
||
height: u32,
|
||
fps: f64,
|
||
total_frames: u64,
|
||
registration_time: Option<String>,
|
||
already_exists: bool,
|
||
message: String,
|
||
}
|
||
|
||
#[derive(Debug, Serialize, Deserialize)]
|
||
struct SearchResult {
|
||
uuid: String,
|
||
chunk_id: String,
|
||
chunk_type: String,
|
||
start_time: f64,
|
||
end_time: f64,
|
||
text: String,
|
||
score: f32,
|
||
}
|
||
|
||
#[derive(Debug, Serialize, Deserialize)]
|
||
struct SearchResponse {
|
||
results: Vec<SearchResult>,
|
||
query: String,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct ProbeResponse {
|
||
file_uuid: String,
|
||
file_name: String,
|
||
duration: f64,
|
||
width: u32,
|
||
height: u32,
|
||
fps: f64,
|
||
total_frames: i64,
|
||
cached: bool,
|
||
format: crate::core::probe::FormatInfo,
|
||
streams: Vec<crate::core::probe::StreamInfo>,
|
||
}
|
||
|
||
// --- P0 API Structs ---
|
||
#[derive(Debug, Deserialize)]
|
||
struct ProcessRequest {
|
||
rules: Option<Vec<String>>,
|
||
processors: Option<Vec<String>>,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct FrameProgress {
|
||
total_frames: i64,
|
||
processed_frames: i64,
|
||
progress_percent: f64,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct AssetStatusResponse {
|
||
uuid: String,
|
||
file_name: String,
|
||
registration_time: String,
|
||
processing_status: String,
|
||
current_job_id: Option<String>,
|
||
frame_progress: Option<FrameProgress>,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct JobStatusResponse {
|
||
job_id: String,
|
||
file_uuid: String,
|
||
rule: String,
|
||
status: String,
|
||
current_processor_id: Option<String>,
|
||
frame_progress: FrameProgress,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct RuleStatusResponse {
|
||
rule: String,
|
||
supported_processor_ids: Vec<String>,
|
||
active_jobs: Vec<JobStatusResponse>,
|
||
}
|
||
|
||
// --- End P0 API Structs ---
|
||
|
||
#[derive(Debug, Deserialize)]
|
||
struct HybridSearchRequest {
|
||
query: String,
|
||
limit: Option<usize>,
|
||
uuid: Option<String>,
|
||
vector_weight: Option<f32>,
|
||
bm25_weight: Option<f32>,
|
||
}
|
||
|
||
#[derive(Debug, Serialize, Deserialize)]
|
||
struct HybridSearchResult {
|
||
uuid: String,
|
||
chunk_id: String,
|
||
chunk_type: String,
|
||
start_time: f64,
|
||
end_time: f64,
|
||
text: String,
|
||
vector_score: f64,
|
||
bm25_score: f64,
|
||
combined_score: f64,
|
||
}
|
||
|
||
#[derive(Debug, Serialize, Deserialize)]
|
||
struct HybridSearchResponse {
|
||
results: Vec<HybridSearchResult>,
|
||
query: String,
|
||
}
|
||
|
||
fn extract_text_from_content(content: &serde_json::Value) -> String {
|
||
let raw_text = content
|
||
.get("data")
|
||
.and_then(|data| data.get("text"))
|
||
.and_then(|v| v.as_str())
|
||
.or_else(|| content.get("text").and_then(|v| v.as_str()))
|
||
.unwrap_or("");
|
||
|
||
tokenize_chinese_text(raw_text)
|
||
}
|
||
|
||
fn extract_title_from_content(content: &serde_json::Value) -> String {
|
||
content
|
||
.get("data")
|
||
.and_then(|data| data.get("title"))
|
||
.and_then(|v| v.as_str())
|
||
.or_else(|| content.get("title").and_then(|v| v.as_str()))
|
||
.unwrap_or("")
|
||
.to_string()
|
||
}
|
||
|
||
#[derive(Debug, Deserialize)]
|
||
struct LookupQuery {
|
||
path: Option<String>,
|
||
uuid: Option<String>,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct LookupResponse {
|
||
file_uuid: String,
|
||
file_path: Option<String>,
|
||
file_name: Option<String>,
|
||
duration: Option<f64>,
|
||
}
|
||
|
||
#[derive(Debug, Serialize, Deserialize)]
|
||
struct FileInfoResponse {
|
||
file_uuid: String,
|
||
file_path: String,
|
||
file_name: String,
|
||
file_type: Option<String>,
|
||
duration: f64,
|
||
width: u32,
|
||
height: u32,
|
||
status: String,
|
||
processing_status: Option<serde_json::Value>,
|
||
birth_registration: Option<serde_json::Value>,
|
||
created_at: Option<String>,
|
||
registration_time: Option<String>,
|
||
file_size: Option<i64>,
|
||
probe_json: Option<String>,
|
||
total_frames: u64,
|
||
}
|
||
|
||
#[derive(Debug, Serialize, Deserialize)]
|
||
struct FilesResponse {
|
||
files: Vec<FileInfoResponse>,
|
||
count: i64,
|
||
page: usize,
|
||
page_size: usize,
|
||
}
|
||
|
||
#[derive(Debug, Deserialize)]
|
||
struct FilesQuery {
|
||
page: Option<usize>,
|
||
page_size: Option<usize>,
|
||
status: Option<String>,
|
||
q: Option<String>,
|
||
uuid: Option<String>,
|
||
}
|
||
|
||
#[derive(Clone)]
|
||
pub struct AppState {
|
||
pub db: std::sync::Arc<crate::core::db::PostgresDb>,
|
||
pub embedder: std::sync::Arc<crate::Embedder>,
|
||
pub embedder_model: String,
|
||
pub mongo_cache: crate::core::cache::MongoCache,
|
||
pub redis_cache: crate::core::cache::RedisCache,
|
||
pub api_state: super::middleware::ApiState,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct DetailedHealthResponse {
|
||
status: String,
|
||
version: String,
|
||
build_git_hash: String,
|
||
build_timestamp: String,
|
||
uptime_ms: u64,
|
||
services: ServiceHealth,
|
||
resources: ResourceStatus,
|
||
pipeline: PipelineStatus,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct PipelineStatus {
|
||
scripts: bool,
|
||
models: bool,
|
||
ffmpeg: bool,
|
||
/// Embedding server (port 11436)
|
||
embedding_server: ServiceStatus,
|
||
/// GDINO API (port 8080)
|
||
gdino_api: ServiceStatus,
|
||
/// LLM via llama.cpp (port 8082)
|
||
llm: ServiceStatus,
|
||
/// rsync file sync tool
|
||
rsync: ServiceStatus,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct ResourceStatus {
|
||
cpu_used_percent: f64,
|
||
cpu_idle_percent: f64,
|
||
memory_available_mb: u64,
|
||
memory_total_mb: u64,
|
||
memory_used_percent: f64,
|
||
gpu_available: bool,
|
||
gpu_utilization: Option<f64>,
|
||
gpu_memory_used_pct: Option<f64>,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct ServiceHealth {
|
||
postgres: ServiceStatus,
|
||
redis: ServiceStatus,
|
||
qdrant: ServiceStatus,
|
||
mongodb: ServiceStatus,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct ServiceStatus {
|
||
status: String,
|
||
latency_ms: Option<u64>,
|
||
error: Option<String>,
|
||
}
|
||
|
||
async fn health(State(state): State<AppState>) -> Json<HealthResponse> {
|
||
let postgres = check_postgres().await;
|
||
let redis = check_redis().await;
|
||
let qdrant = check_qdrant().await;
|
||
let mongodb = check_mongodb(&state.mongo_cache).await;
|
||
|
||
let all_ok = postgres.status == "ok"
|
||
&& redis.status == "ok"
|
||
&& qdrant.status == "ok"
|
||
&& mongodb.status == "ok";
|
||
|
||
let status = if all_ok { "ok" } else { "degraded" };
|
||
|
||
if all_ok {
|
||
let _ = state.redis_cache.set_health(status).await;
|
||
}
|
||
|
||
Json(HealthResponse {
|
||
status: status.to_string(),
|
||
version: env!("BUILD_VERSION").to_string(),
|
||
build_git_hash: env!("BUILD_GIT_HASH").to_string(),
|
||
build_timestamp: env!("BUILD_TIMESTAMP").to_string(),
|
||
uptime_ms: get_uptime_ms(),
|
||
})
|
||
}
|
||
|
||
async fn health_detailed(State(state): State<AppState>) -> Json<DetailedHealthResponse> {
|
||
let postgres = check_postgres().await;
|
||
let redis = check_redis().await;
|
||
let qdrant = check_qdrant().await;
|
||
let mongodb = check_mongodb(&state.mongo_cache).await;
|
||
|
||
let overall_status = if postgres.status == "ok"
|
||
&& redis.status == "ok"
|
||
&& qdrant.status == "ok"
|
||
&& mongodb.status == "ok"
|
||
{
|
||
"ok"
|
||
} else {
|
||
"degraded"
|
||
};
|
||
|
||
let sys = SystemResources::check();
|
||
|
||
let scripts_dir = std::path::Path::new("/Users/accusys/momentry_core_0.1/scripts");
|
||
let models_dir = std::path::Path::new("/Users/accusys/momentry_core_0.1/models");
|
||
let ffmpeg_full = std::path::Path::new("/opt/homebrew/opt/ffmpeg-full/bin/ffmpeg");
|
||
let has_scripts = scripts_dir.is_dir();
|
||
let has_models = models_dir.is_dir();
|
||
let has_ffmpeg = ffmpeg_full.exists();
|
||
|
||
Json(DetailedHealthResponse {
|
||
status: overall_status.to_string(),
|
||
version: env!("BUILD_VERSION").to_string(),
|
||
build_git_hash: env!("BUILD_GIT_HASH").to_string(),
|
||
build_timestamp: env!("BUILD_TIMESTAMP").to_string(),
|
||
uptime_ms: get_uptime_ms(),
|
||
services: ServiceHealth {
|
||
postgres,
|
||
redis,
|
||
qdrant,
|
||
mongodb,
|
||
},
|
||
resources: ResourceStatus {
|
||
cpu_used_percent: sys.cpu_used_percent,
|
||
cpu_idle_percent: sys.cpu_idle_percent,
|
||
memory_available_mb: sys.memory_available_mb,
|
||
memory_total_mb: sys.memory_total_mb,
|
||
memory_used_percent: sys.memory_used_percent,
|
||
gpu_available: sys.gpu_available,
|
||
gpu_utilization: sys.gpu_utilization,
|
||
gpu_memory_used_pct: sys.gpu_memory_used_pct,
|
||
},
|
||
pipeline: PipelineStatus {
|
||
scripts: has_scripts,
|
||
models: has_models,
|
||
ffmpeg: has_ffmpeg,
|
||
embedding_server: check_http("http://127.0.0.1:11436/health").await,
|
||
gdino_api: check_http("http://127.0.0.1:8080/health").await,
|
||
llm: check_http("http://127.0.0.1:8082/health").await,
|
||
rsync: check_rsync().await,
|
||
},
|
||
})
|
||
}
|
||
|
||
async fn login(Json(req): Json<LoginRequest>) -> Json<LoginResponse> {
|
||
if req.username == "demo" && req.password == "demo" {
|
||
Json(LoginResponse {
|
||
success: true,
|
||
message: Some("Login successful".to_string()),
|
||
api_key: Some(DEMO_USER_API_KEY.clone()),
|
||
user: Some(UserInfo {
|
||
username: "demo".to_string(),
|
||
}),
|
||
})
|
||
} else {
|
||
Json(LoginResponse {
|
||
success: false,
|
||
message: Some("Invalid username or password".to_string()),
|
||
api_key: None,
|
||
user: None,
|
||
})
|
||
}
|
||
}
|
||
|
||
async fn logout() -> Json<serde_json::Value> {
|
||
Json(serde_json::json!({ "success": true }))
|
||
}
|
||
|
||
async fn check_postgres() -> ServiceStatus {
|
||
let start = Instant::now();
|
||
match PostgresDb::init().await {
|
||
Ok(db) => match db.list_files(1, 0).await {
|
||
Ok(_) => ServiceStatus {
|
||
status: "ok".to_string(),
|
||
latency_ms: Some(start.elapsed().as_millis() as u64),
|
||
error: None,
|
||
},
|
||
Err(e) => ServiceStatus {
|
||
status: "error".to_string(),
|
||
latency_ms: Some(start.elapsed().as_millis() as u64),
|
||
error: Some(e.to_string()),
|
||
},
|
||
},
|
||
Err(e) => ServiceStatus {
|
||
status: "error".to_string(),
|
||
latency_ms: None,
|
||
error: Some(e.to_string()),
|
||
},
|
||
}
|
||
}
|
||
|
||
async fn check_redis() -> ServiceStatus {
|
||
let start = Instant::now();
|
||
match RedisClient::new() {
|
||
Ok(redis) => match redis.get_conn().await {
|
||
Ok(mut conn) => {
|
||
let result: Result<String, _> = redis::cmd("PING").query_async(&mut conn).await;
|
||
match result {
|
||
Ok(_) => ServiceStatus {
|
||
status: "ok".to_string(),
|
||
latency_ms: Some(start.elapsed().as_millis() as u64),
|
||
error: None,
|
||
},
|
||
Err(e) => ServiceStatus {
|
||
status: "error".to_string(),
|
||
latency_ms: Some(start.elapsed().as_millis() as u64),
|
||
error: Some(e.to_string()),
|
||
},
|
||
}
|
||
}
|
||
Err(e) => ServiceStatus {
|
||
status: "error".to_string(),
|
||
latency_ms: None,
|
||
error: Some(e.to_string()),
|
||
},
|
||
},
|
||
Err(e) => ServiceStatus {
|
||
status: "error".to_string(),
|
||
latency_ms: None,
|
||
error: Some(e.to_string()),
|
||
},
|
||
}
|
||
}
|
||
|
||
async fn check_qdrant() -> ServiceStatus {
|
||
let start = Instant::now();
|
||
let base_url =
|
||
std::env::var("QDRANT_URL").unwrap_or_else(|_| "http://localhost:6333".to_string());
|
||
let api_key =
|
||
std::env::var("QDRANT_API_KEY").unwrap_or_else(|_| "Test3200Test3200Test3200".to_string());
|
||
let url = format!("{}/collections", base_url);
|
||
|
||
let client = reqwest::Client::new();
|
||
match client
|
||
.get(&url)
|
||
.header("api-key", api_key)
|
||
.timeout(std::time::Duration::from_secs(5))
|
||
.send()
|
||
.await
|
||
{
|
||
Ok(resp) if resp.status().is_success() => ServiceStatus {
|
||
status: "ok".to_string(),
|
||
latency_ms: Some(start.elapsed().as_millis() as u64),
|
||
error: None,
|
||
},
|
||
Ok(resp) => ServiceStatus {
|
||
status: "error".to_string(),
|
||
latency_ms: Some(start.elapsed().as_millis() as u64),
|
||
error: Some(format!("HTTP {}", resp.status())),
|
||
},
|
||
Err(e) => ServiceStatus {
|
||
status: "error".to_string(),
|
||
latency_ms: None,
|
||
error: Some(e.to_string()),
|
||
},
|
||
}
|
||
}
|
||
|
||
async fn check_mongodb(cache: &MongoCache) -> ServiceStatus {
|
||
let start = Instant::now();
|
||
match cache.health_check().await {
|
||
Ok(_) => ServiceStatus {
|
||
status: "ok".to_string(),
|
||
latency_ms: Some(start.elapsed().as_millis() as u64),
|
||
error: None,
|
||
},
|
||
Err(e) => ServiceStatus {
|
||
status: "error".to_string(),
|
||
latency_ms: Some(start.elapsed().as_millis() as u64),
|
||
error: Some(e.to_string()),
|
||
},
|
||
}
|
||
}
|
||
|
||
async fn check_rsync() -> ServiceStatus {
|
||
let start = Instant::now();
|
||
let paths = [
|
||
std::path::Path::new("/Users/accusys/bin/rsync"),
|
||
std::path::Path::new("/opt/homebrew/bin/rsync"),
|
||
];
|
||
for p in &paths {
|
||
if p.exists() {
|
||
return ServiceStatus {
|
||
status: "ok".to_string(),
|
||
latency_ms: Some(start.elapsed().as_millis() as u64),
|
||
error: None,
|
||
};
|
||
}
|
||
}
|
||
ServiceStatus {
|
||
status: "error".to_string(),
|
||
latency_ms: Some(start.elapsed().as_millis() as u64),
|
||
error: Some("rsync not found (built from source expected at ~/bin/rsync)".to_string()),
|
||
}
|
||
}
|
||
|
||
async fn check_binary(name: &str) -> ServiceStatus {
|
||
let start = Instant::now();
|
||
match std::process::Command::new("which").arg(name).output() {
|
||
Ok(output) if output.status.success() => ServiceStatus {
|
||
status: "ok".to_string(),
|
||
latency_ms: Some(start.elapsed().as_millis() as u64),
|
||
error: None,
|
||
},
|
||
_ => ServiceStatus {
|
||
status: "error".to_string(),
|
||
latency_ms: Some(start.elapsed().as_millis() as u64),
|
||
error: Some(format!("{} not found in PATH", name)),
|
||
},
|
||
}
|
||
}
|
||
|
||
async fn check_http(url: &str) -> ServiceStatus {
|
||
let start = Instant::now();
|
||
match reqwest::get(url).await {
|
||
Ok(resp) => {
|
||
if resp.status().is_success() {
|
||
ServiceStatus {
|
||
status: "ok".to_string(),
|
||
latency_ms: Some(start.elapsed().as_millis() as u64),
|
||
error: None,
|
||
}
|
||
} else {
|
||
ServiceStatus {
|
||
status: "error".to_string(),
|
||
latency_ms: Some(start.elapsed().as_millis() as u64),
|
||
error: Some(format!("HTTP {}", resp.status())),
|
||
}
|
||
}
|
||
}
|
||
Err(e) => ServiceStatus {
|
||
status: "error".to_string(),
|
||
latency_ms: Some(start.elapsed().as_millis() as u64),
|
||
error: Some(e.to_string()),
|
||
},
|
||
}
|
||
}
|
||
|
||
fn generate_query_hash(query: &str, uuid: Option<&str>, limit: usize) -> String {
|
||
let data = serde_json::json!({
|
||
"query": query,
|
||
"uuid": uuid,
|
||
"limit": limit,
|
||
});
|
||
let mut hasher = Sha256::new();
|
||
hasher.update(data.to_string().as_bytes());
|
||
format!("{:x}", hasher.finalize())[..16].to_string()
|
||
}
|
||
|
||
fn generate_visual_search_hash(
|
||
uuid: &str,
|
||
criteria: &visual_chunk_search::VisualChunkSearchCriteria,
|
||
) -> String {
|
||
let data = serde_json::json!({
|
||
"uuid": uuid,
|
||
"criteria": criteria,
|
||
});
|
||
let mut hasher = Sha256::new();
|
||
hasher.update(data.to_string().as_bytes());
|
||
format!("{:x}", hasher.finalize())[..16].to_string()
|
||
}
|
||
|
||
/// Compute SHA256 for dedup. Returns hex string.
|
||
fn sha256_file(path: &std::path::Path) -> Option<String> {
|
||
crate::core::storage::content_hash::compute_sha256(path).ok()
|
||
}
|
||
|
||
/// Resolve name conflict: if file_name collides with existing but content differs,
|
||
/// append ` (N)` suffix. Returns the resolved file_name.
|
||
async fn resolve_filename(
|
||
db: &PostgresDb,
|
||
file_name: &str,
|
||
content_hash: &str,
|
||
) -> String {
|
||
let table = schema::table_name("videos");
|
||
let base = file_name.to_string();
|
||
let dot_pos = base.rfind('.');
|
||
let (stem, ext) = match dot_pos {
|
||
Some(p) => (base[..p].to_string(), base[p..].to_string()),
|
||
None => (base.clone(), String::new()),
|
||
};
|
||
let mut candidate = base.clone();
|
||
let mut attempt = 0usize;
|
||
loop {
|
||
// Check if candidate name exists with a DIFFERENT hash (same content = OK)
|
||
let conflict: Option<String> = sqlx::query_scalar(
|
||
&format!("SELECT file_uuid FROM {} WHERE file_name = $1 AND (content_hash IS DISTINCT FROM $2 OR content_hash IS NULL)", table)
|
||
)
|
||
.bind(&candidate)
|
||
.bind(content_hash)
|
||
.fetch_optional(db.pool())
|
||
.await
|
||
.unwrap_or(None);
|
||
if conflict.is_none() {
|
||
return candidate;
|
||
}
|
||
attempt += 1;
|
||
candidate = format!("{} ({}){}", stem, attempt, ext);
|
||
}
|
||
}
|
||
|
||
/// 註冊單一檔案(內部函數,不處理 pattern)
|
||
async fn register_single_file(
|
||
state: &AppState,
|
||
file_path: &str,
|
||
_user_id: Option<i64>,
|
||
provided_hash: Option<String>,
|
||
) -> RegisterFileResponse {
|
||
tracing::info!("[REGISTER] Starting registration for: {}", file_path);
|
||
|
||
let path = std::path::Path::new(file_path);
|
||
if !path.exists() {
|
||
return RegisterFileResponse {
|
||
success: false,
|
||
file_uuid: String::new(),
|
||
file_name: String::new(),
|
||
file_path: file_path.to_string(),
|
||
file_type: None,
|
||
duration: 0.0,
|
||
width: 0,
|
||
height: 0,
|
||
fps: 0.0,
|
||
total_frames: 0,
|
||
registration_time: None,
|
||
already_exists: false,
|
||
message: format!("File not found: {}", file_path),
|
||
};
|
||
}
|
||
|
||
let canonical_path = path
|
||
.canonicalize()
|
||
.map(|p| p.to_string_lossy().to_string())
|
||
.unwrap_or_else(|_| file_path.to_string());
|
||
|
||
let file_name = path
|
||
.file_name()
|
||
.map(|n| n.to_string_lossy().to_string())
|
||
.unwrap_or_default();
|
||
|
||
let db = match PostgresDb::init().await {
|
||
Ok(db) => db,
|
||
Err(e) => {
|
||
tracing::error!("[REGISTER] DB init failed: {}", e);
|
||
return RegisterFileResponse {
|
||
success: false,
|
||
file_uuid: String::new(),
|
||
file_name,
|
||
file_path: canonical_path,
|
||
file_type: None,
|
||
duration: 0.0,
|
||
width: 0,
|
||
height: 0,
|
||
fps: 0.0,
|
||
total_frames: 0,
|
||
registration_time: None,
|
||
already_exists: false,
|
||
message: format!("DB init failed: {}", e),
|
||
};
|
||
}
|
||
};
|
||
|
||
// Step 1: Try to load pre-computed data from .pre.json
|
||
let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR")
|
||
.unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string());
|
||
|
||
let birthday = std::fs::metadata(&path)
|
||
.ok()
|
||
.and_then(|m| m.modified().ok())
|
||
.map(|t| {
|
||
let secs = t.duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs();
|
||
chrono::DateTime::from_timestamp(secs as i64, 0)
|
||
.map(|dt| dt.to_rfc3339())
|
||
.unwrap_or_else(|| chrono::Utc::now().to_rfc3339())
|
||
})
|
||
.unwrap_or_else(|| chrono::Utc::now().to_rfc3339());
|
||
|
||
let mac_address = crate::core::storage::uuid::get_mac_address();
|
||
let pre_file_uuid = crate::core::storage::uuid::compute_birth_uuid(
|
||
&mac_address, &birthday, &canonical_path, &file_name,
|
||
);
|
||
let pre_path = std::path::Path::new(&output_dir).join(format!("{}.pre.json", pre_file_uuid));
|
||
let pre_data: Option<serde_json::Value> = std::fs::read_to_string(&pre_path).ok()
|
||
.and_then(|s| serde_json::from_str(&s).ok());
|
||
|
||
// Extract content_hash from pre.json or compute fresh
|
||
let (content_hash, birthday, _pre_file_uuid) = if let Some(ref pre) = pre_data {
|
||
let h = pre.get("content_hash").and_then(|v| v.as_str()).unwrap_or("").to_string();
|
||
let b = pre.get("birthday").and_then(|v| v.as_str()).unwrap_or(&birthday).to_string();
|
||
let u = pre.get("file_uuid").and_then(|v| v.as_str()).unwrap_or(&pre_file_uuid).to_string();
|
||
(h, b, u)
|
||
} else {
|
||
let h = provided_hash.filter(|h| !h.is_empty()).unwrap_or_else(|| sha256_file(&path).unwrap_or_default());
|
||
(h, birthday, pre_file_uuid)
|
||
};
|
||
// Recompute UUID with the resolved birthday
|
||
let file_uuid = crate::core::storage::uuid::compute_birth_uuid(
|
||
&mac_address, &birthday, &canonical_path, &file_name,
|
||
);
|
||
tracing::info!("[REGISTER] UUID inputs: mac={} birthday={} path={} name={} pre_found={} → {}", mac_address, birthday, canonical_path, file_name, pre_data.is_some(), file_uuid);
|
||
|
||
// Step 2: Hash check — same content = already registered (regardless of name)
|
||
let videos_table = schema::table_name("videos");
|
||
if !content_hash.is_empty() {
|
||
if let Ok(Some(existing_uuid)) = sqlx::query_scalar::<_, String>(
|
||
&format!("SELECT file_uuid FROM {} WHERE content_hash = $1 LIMIT 1", videos_table)
|
||
)
|
||
.bind(&content_hash)
|
||
.fetch_optional(db.pool())
|
||
.await
|
||
{
|
||
tracing::info!("[REGISTER] Content hash collision → already registered: {}", existing_uuid);
|
||
let existing_name: Option<String> = sqlx::query_scalar(
|
||
&format!("SELECT file_name FROM {} WHERE file_uuid = $1", videos_table)
|
||
).bind(&existing_uuid).fetch_optional(db.pool()).await.unwrap_or(None);
|
||
return RegisterFileResponse {
|
||
success: true,
|
||
file_uuid: existing_uuid,
|
||
file_name: existing_name.unwrap_or(file_name),
|
||
file_path: canonical_path,
|
||
file_type: None,
|
||
duration: 0.0,
|
||
width: 0,
|
||
height: 0,
|
||
fps: 0.0,
|
||
total_frames: 0,
|
||
registration_time: None,
|
||
already_exists: true,
|
||
message: "Content already registered (identical file)".to_string(),
|
||
};
|
||
}
|
||
}
|
||
|
||
// Step 3: Name check — same name but different content → auto-rename
|
||
let final_name = resolve_filename(&db, &file_name, &content_hash).await;
|
||
|
||
// Step 4: Compute UUID using birthday from pre.json or file creation time (never DB registration_time)
|
||
let mac_address = crate::core::storage::uuid::get_mac_address();
|
||
let file_uuid = crate::core::storage::uuid::compute_birth_uuid(
|
||
&mac_address,
|
||
&birthday,
|
||
&canonical_path,
|
||
&final_name,
|
||
);
|
||
|
||
// Step 5: Probe — use pre.json if available, otherwise run ffprobe
|
||
let cached_probe = pre_data.as_ref()
|
||
.and_then(|p| p.get("probe_json"))
|
||
.and_then(|v| serde_json::from_value::<crate::core::probe::ProbeResult>(v.clone()).ok());
|
||
|
||
let probe_result = cached_probe.or_else(|| crate::core::probe::probe_video(&canonical_path).ok());
|
||
let file_meta = std::fs::metadata(&canonical_path).ok();
|
||
|
||
let probe_json: Option<serde_json::Value> = if let Some(ref pre) = pre_data {
|
||
pre.get("probe_json").cloned()
|
||
} else {
|
||
probe_result.as_ref().map(|r| serde_json::to_value(r)).and_then(|r| r.ok()).or_else(|| {
|
||
file_meta.map(|m| serde_json::json!({
|
||
"format": {"size": m.len().to_string(), "filename": &canonical_path, "format_name": "unknown"},
|
||
"streams": []
|
||
}))
|
||
})
|
||
};
|
||
|
||
let has_video = probe_result.as_ref().map_or(false, |r| r.streams.iter().any(|s| s.codec_type.as_deref() == Some("video")));
|
||
let has_audio = probe_result.as_ref().map_or(false, |r| r.streams.iter().any(|s| s.codec_type.as_deref() == Some("audio")));
|
||
|
||
// Determine file_type: check ffprobe result, then extension
|
||
let final_file_type = if has_video {
|
||
Some("video".to_string())
|
||
} else if has_audio {
|
||
Some("audio".to_string())
|
||
} else {
|
||
let ext = std::path::Path::new(&canonical_path).extension().and_then(|e| e.to_str()).map(|e| e.to_lowercase());
|
||
match ext.as_deref() {
|
||
Some("jpg" | "jpeg" | "png" | "gif" | "bmp" | "webp" | "svg") => Some("image".to_string()),
|
||
Some("pdf") => Some("document".to_string()),
|
||
Some("doc" | "docx") => Some("document".to_string()),
|
||
Some("pages") => Some("document".to_string()),
|
||
Some("xls" | "xlsx" | "numbers") => Some("spreadsheet".to_string()),
|
||
Some("ppt" | "pptx" | "key") => Some("presentation".to_string()),
|
||
_ => probe_result.as_ref().and_then(|r| {
|
||
if r.streams.is_empty() && r.format.duration.is_some() { Some("unknown".to_string()) } else { None }
|
||
}),
|
||
}
|
||
};
|
||
|
||
let duration = probe_result.as_ref()
|
||
.and_then(|r| r.format.duration.as_ref())
|
||
.and_then(|s| s.parse::<f64>().ok())
|
||
.unwrap_or(0.0);
|
||
let mut width = 0u32;
|
||
let mut height = 0u32;
|
||
let mut fps = 0.0;
|
||
let mut total_frames = 0u64;
|
||
if let Some(ref probe) = probe_result {
|
||
if let Some(s) = probe.streams.iter().find(|s| s.codec_type.as_deref() == Some("video")) {
|
||
width = s.width.unwrap_or(0);
|
||
height = s.height.unwrap_or(0);
|
||
if let Some(fps_str) = &s.r_frame_rate {
|
||
if let Some((num, den)) = fps_str.split_once('/') {
|
||
if let (Ok(n), Ok(d)) = (num.parse::<f64>(), den.parse::<f64>()) {
|
||
if d > 0.0 {
|
||
fps = n / d;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
total_frames = s.nb_frames.as_ref().and_then(|s| s.parse().ok()).unwrap_or((duration * fps) as u64);
|
||
}
|
||
}
|
||
|
||
let videos_table = schema::table_name("videos");
|
||
let status = "registered";
|
||
let _ = sqlx::query(&format!(
|
||
"INSERT INTO {} (file_uuid, file_path, file_name, file_type, duration, width, height, fps, probe_json, status, content_hash, registration_time) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW()) ON CONFLICT (file_uuid) DO UPDATE SET file_path = EXCLUDED.file_path, file_name = EXCLUDED.file_name, status = EXCLUDED.status, content_hash = EXCLUDED.content_hash",
|
||
videos_table
|
||
))
|
||
.bind(&file_uuid).bind(&canonical_path).bind(&final_name).bind(&final_file_type)
|
||
.bind(duration).bind(width as i32).bind(height as i32).bind(fps)
|
||
.bind(&probe_json).bind(status).bind(&content_hash)
|
||
.execute(db.pool()).await;
|
||
|
||
// 若是 video 類型,同步執行 CUT + Scene 分類
|
||
let mut cut_done = false;
|
||
let mut scene_done = false;
|
||
if has_video && total_frames > 0 && fps > 0.0 {
|
||
let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR")
|
||
.unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string());
|
||
let scripts_dir = std::env::var("MOMENTRY_SCRIPTS_DIR")
|
||
.unwrap_or_else(|_| "/Users/accusys/momentry_core_0.1/scripts".to_string());
|
||
let python_path = std::env::var("MOMENTRY_PYTHON_PATH")
|
||
.unwrap_or_else(|_| "/opt/homebrew/bin/python3.11".to_string());
|
||
|
||
// CUT: 場景檢測(PySceneDetect,0.08s 即可完成)
|
||
let cut_path = std::path::Path::new(&output_dir).join(format!("{}.cut.json", file_uuid));
|
||
if !cut_path.exists() {
|
||
let cut_script = std::path::Path::new(&scripts_dir).join("cut_processor.py");
|
||
if cut_script.exists() {
|
||
let cut_output = std::process::Command::new(&python_path)
|
||
.arg(&cut_script)
|
||
.arg(&canonical_path)
|
||
.arg(&cut_path)
|
||
.output();
|
||
if let Ok(output) = cut_output {
|
||
if output.status.success() {
|
||
cut_done = true;
|
||
tracing::info!("[REGISTER] CUT completed for {}", file_uuid);
|
||
} else {
|
||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||
tracing::error!("[REGISTER] CUT failed for {}: {}", file_uuid, stderr);
|
||
}
|
||
} else {
|
||
tracing::error!("[REGISTER] CUT execution error for {}", file_uuid);
|
||
}
|
||
}
|
||
} else {
|
||
cut_done = true;
|
||
// 讀取現有 CUT JSON 取得場景數
|
||
if let Ok(content) = std::fs::read_to_string(&cut_path) {
|
||
if let Ok(cut_data) = serde_json::from_str::<serde_json::Value>(&content) {
|
||
let scenes = cut_data
|
||
.get("scenes")
|
||
.and_then(|s| s.as_array())
|
||
.map(|a| a.len() as i32)
|
||
.unwrap_or(0);
|
||
tracing::info!(
|
||
"[REGISTER] CUT already exists: {} scenes for {}",
|
||
scenes,
|
||
file_uuid
|
||
);
|
||
}
|
||
}
|
||
}
|
||
|
||
// Scene: 場景分類(MIT Places365,取樣間隔 2s)
|
||
let scene_path =
|
||
std::path::Path::new(&output_dir).join(format!("{}.scene.json", file_uuid));
|
||
if !scene_path.exists() {
|
||
let scene_script = std::path::Path::new(&scripts_dir).join("scene_classifier.py");
|
||
if scene_script.exists() {
|
||
let scene_output = std::process::Command::new(&python_path)
|
||
.arg(&scene_script)
|
||
.arg(&canonical_path)
|
||
.arg(&scene_path)
|
||
.arg("--sample-interval")
|
||
.arg("2")
|
||
.output();
|
||
if let Ok(output) = scene_output {
|
||
if output.status.success() {
|
||
scene_done = true;
|
||
tracing::info!(
|
||
"[REGISTER] Scene classification completed for {}",
|
||
file_uuid
|
||
);
|
||
}
|
||
}
|
||
}
|
||
} else {
|
||
scene_done = true;
|
||
}
|
||
}
|
||
|
||
// 更新 DB: cut_done, scene_done, audio_tracks
|
||
let audio_tracks: Vec<serde_json::Value> = probe_result.as_ref().map_or(vec![], |pr| {
|
||
pr.streams.iter()
|
||
.filter(|s| s.codec_type.as_deref() == Some("audio"))
|
||
.map(|s| {
|
||
serde_json::json!({
|
||
"index": s.index,
|
||
"codec": s.codec_name,
|
||
"channels": s.channels,
|
||
"sample_rate": s.sample_rate,
|
||
"language": s.tags.as_ref().and_then(|t| t.get("language")).unwrap_or(&serde_json::Value::Null),
|
||
})
|
||
})
|
||
.collect()
|
||
});
|
||
let audio_tracks_json = serde_json::to_value(&audio_tracks).ok();
|
||
// 計算 cut_count 與 cut_max_duration
|
||
let cut_path = std::path::Path::new(
|
||
&std::env::var("MOMENTRY_OUTPUT_DIR")
|
||
.unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string()),
|
||
)
|
||
.join(format!("{}.cut.json", file_uuid));
|
||
let mut cut_count = 0i32;
|
||
let mut cut_max_duration = 0.0f64;
|
||
if let Ok(content) = std::fs::read_to_string(&cut_path) {
|
||
if let Ok(cut_data) = serde_json::from_str::<serde_json::Value>(&content) {
|
||
if let Some(scenes) = cut_data.get("scenes").and_then(|s| s.as_array()) {
|
||
cut_count = scenes.len() as i32;
|
||
cut_max_duration = scenes
|
||
.iter()
|
||
.filter_map(|s| {
|
||
let start = s.get("start_time").and_then(|v| v.as_f64()).unwrap_or(0.0);
|
||
let end = s.get("end_time").and_then(|v| v.as_f64()).unwrap_or(0.0);
|
||
if end > start {
|
||
Some(end - start)
|
||
} else {
|
||
None
|
||
}
|
||
})
|
||
.fold(0.0_f64, f64::max);
|
||
}
|
||
}
|
||
}
|
||
let videos_table = schema::table_name("videos");
|
||
let _ = sqlx::query(
|
||
&format!("UPDATE {} SET cut_done = $1, scene_done = $2, audio_tracks = $3, cut_count = $4, cut_max_duration = $5 WHERE file_uuid = $6", videos_table)
|
||
)
|
||
.bind(cut_done).bind(scene_done).bind(&audio_tracks_json).bind(cut_count).bind(cut_max_duration).bind(&file_uuid)
|
||
.execute(db.pool()).await;
|
||
|
||
// 寫入 {file_uuid}.probe.json(含音軌資訊)
|
||
if let Some(json_val) = probe_json {
|
||
let probe_path = std::path::Path::new(
|
||
&std::env::var("MOMENTRY_OUTPUT_DIR")
|
||
.unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string()),
|
||
)
|
||
.join(format!("{}.probe.json", file_uuid));
|
||
let json_str = serde_json::to_string_pretty(&json_val).unwrap_or_default();
|
||
let _ = std::fs::write(&probe_path, json_str);
|
||
}
|
||
|
||
RegisterFileResponse {
|
||
success: true,
|
||
file_uuid,
|
||
file_name,
|
||
file_path: canonical_path,
|
||
file_type: final_file_type,
|
||
duration,
|
||
width,
|
||
height,
|
||
fps,
|
||
total_frames,
|
||
registration_time: None,
|
||
already_exists: false,
|
||
message: "File registered successfully".to_string(),
|
||
}
|
||
}
|
||
|
||
async fn register_file(
|
||
State(state): State<AppState>,
|
||
Json(req): Json<RegisterFileRequest>,
|
||
) -> Result<Json<RegisterFileResponse>, StatusCode> {
|
||
let file_path = req.file_path.clone();
|
||
let pattern = req.pattern;
|
||
|
||
// 如果有 pattern,掃描目錄下所有符合的檔案逐一註冊
|
||
if let Some(ref pat) = pattern {
|
||
let dir = std::path::Path::new(&file_path);
|
||
if !dir.is_dir() {
|
||
return Ok(Json(RegisterFileResponse {
|
||
success: false,
|
||
file_uuid: String::new(),
|
||
file_name: String::new(),
|
||
file_path: file_path.clone(),
|
||
file_type: None,
|
||
duration: 0.0,
|
||
width: 0,
|
||
height: 0,
|
||
fps: 0.0,
|
||
total_frames: 0,
|
||
registration_time: None,
|
||
already_exists: false,
|
||
message: format!(
|
||
"Pattern requires a directory, but path is not a dir: {}",
|
||
file_path
|
||
),
|
||
}));
|
||
}
|
||
let re = regex::Regex::new(pat).map_err(|e| {
|
||
tracing::error!("[REGISTER] Invalid regex pattern: {}", e);
|
||
StatusCode::BAD_REQUEST
|
||
})?;
|
||
|
||
let mut registered = 0u32;
|
||
let mut failed = 0u32;
|
||
let mut skipped = 0u32;
|
||
|
||
if let Ok(entries) = std::fs::read_dir(dir) {
|
||
for entry in entries.flatten() {
|
||
let entry_path = entry.path();
|
||
if !entry_path.is_file() {
|
||
continue;
|
||
}
|
||
let fname = entry_path
|
||
.file_name()
|
||
.and_then(|n| n.to_str())
|
||
.unwrap_or("");
|
||
if !re.is_match(fname) {
|
||
continue;
|
||
}
|
||
|
||
let result = register_single_file(
|
||
&state,
|
||
&entry_path.to_string_lossy().to_string(),
|
||
req.user_id,
|
||
None,
|
||
)
|
||
.await;
|
||
if result.success {
|
||
registered += 1;
|
||
} else if result.already_exists {
|
||
skipped += 1;
|
||
} else {
|
||
failed += 1;
|
||
}
|
||
}
|
||
}
|
||
|
||
return Ok(Json(RegisterFileResponse {
|
||
success: true,
|
||
file_uuid: format!("batch_{}_registered_{}_failed", registered, failed),
|
||
file_name: format!(
|
||
"{} files registered, {} skipped, {} failed",
|
||
registered, skipped, failed
|
||
),
|
||
file_path: file_path.clone(),
|
||
file_type: None,
|
||
duration: 0.0,
|
||
width: 0,
|
||
height: 0,
|
||
fps: 0.0,
|
||
total_frames: 0,
|
||
registration_time: None,
|
||
already_exists: false,
|
||
message: format!(
|
||
"Batch register: {} registered, {} skipped, {} failed",
|
||
registered, skipped, failed
|
||
),
|
||
}));
|
||
}
|
||
|
||
// 單一檔案註冊
|
||
let resp = register_single_file(&state, &file_path, req.user_id, req.content_hash).await;
|
||
return Ok(Json(resp));
|
||
}
|
||
|
||
async fn probe_by_uuid(
|
||
State(state): State<AppState>,
|
||
Path(file_uuid): Path<String>,
|
||
) -> Result<Json<ProbeResponse>, StatusCode> {
|
||
let table = schema::table_name("videos");
|
||
let row: Option<(String, String)> = sqlx::query_as(&format!(
|
||
"SELECT file_name, file_path FROM {} WHERE file_uuid = $1",
|
||
table
|
||
))
|
||
.bind(&file_uuid)
|
||
.fetch_optional(state.db.pool())
|
||
.await
|
||
.map_err(|e| {
|
||
tracing::error!("DB error fetching video: {}", e);
|
||
StatusCode::INTERNAL_SERVER_ERROR
|
||
})?;
|
||
|
||
let (file_name, path) = row.ok_or_else(|| {
|
||
tracing::warn!("Video not found: {}", file_uuid);
|
||
StatusCode::NOT_FOUND
|
||
})?;
|
||
|
||
// 2. Check for cached probe.json
|
||
let probe_path = format!(
|
||
"{}/{}.probe.json",
|
||
crate::core::config::OUTPUT_DIR.as_str(),
|
||
file_uuid
|
||
);
|
||
|
||
let (probe_result, cached) = if let Ok(content) = std::fs::read_to_string(&probe_path) {
|
||
tracing::info!("Using cached probe.json: {}", probe_path);
|
||
let result: crate::core::probe::ProbeResult =
|
||
serde_json::from_str(&content).map_err(|e| {
|
||
tracing::error!("Failed to parse cached probe.json: {}", e);
|
||
StatusCode::INTERNAL_SERVER_ERROR
|
||
})?;
|
||
(result, true)
|
||
} else {
|
||
tracing::info!("Running ffprobe for: {}", path);
|
||
let result = crate::core::probe::probe_video(&path).map_err(|e| {
|
||
tracing::error!("ffprobe failed: {}", e);
|
||
StatusCode::INTERNAL_SERVER_ERROR
|
||
})?;
|
||
|
||
// Save probe.json to OUTPUT_DIR
|
||
let file_manager = FileManager::new(std::path::PathBuf::from(
|
||
crate::core::config::OUTPUT_DIR.as_str(),
|
||
));
|
||
let json_str = serde_json::to_string(&result).map_err(|e| {
|
||
tracing::error!("Failed to serialize probe result: {}", e);
|
||
StatusCode::INTERNAL_SERVER_ERROR
|
||
})?;
|
||
file_manager
|
||
.save_json(&file_uuid, "probe", &json_str)
|
||
.map_err(|e| {
|
||
tracing::error!("Failed to save probe.json: {}", e);
|
||
StatusCode::INTERNAL_SERVER_ERROR
|
||
})?;
|
||
|
||
(result, false)
|
||
};
|
||
|
||
// 3. Extract video info
|
||
let duration = probe_result
|
||
.format
|
||
.duration
|
||
.as_ref()
|
||
.and_then(|s| s.parse::<f64>().ok())
|
||
.unwrap_or(0.0);
|
||
|
||
let mut width = 0u32;
|
||
let mut height = 0u32;
|
||
let mut fps = 0.0;
|
||
|
||
for stream in &probe_result.streams {
|
||
if stream.codec_type.as_deref() == Some("video") {
|
||
width = stream.width.unwrap_or(0);
|
||
height = stream.height.unwrap_or(0);
|
||
if let Some(fps_str) = &stream.r_frame_rate {
|
||
fps = if fps_str.contains('/') {
|
||
let parts: Vec<&str> = fps_str.split('/').collect();
|
||
if parts.len() == 2 {
|
||
let num: f64 = parts[0].parse().unwrap_or(0.0);
|
||
let den: f64 = parts[1].parse().unwrap_or(1.0);
|
||
if den > 0.0 {
|
||
num / den
|
||
} else {
|
||
0.0
|
||
}
|
||
} else {
|
||
0.0
|
||
}
|
||
} else {
|
||
fps_str.parse().unwrap_or(0.0)
|
||
};
|
||
}
|
||
}
|
||
}
|
||
|
||
// 4. Calculate total_frames and update DB
|
||
let total_frames = probe_result
|
||
.streams
|
||
.iter()
|
||
.find(|s| s.codec_type.as_deref() == Some("video"))
|
||
.and_then(|s| s.nb_frames.as_ref())
|
||
.and_then(|n| n.parse::<i64>().ok())
|
||
.unwrap_or_else(|| (duration * fps).floor() as i64);
|
||
|
||
// Update videos table with probe metadata
|
||
let _ = sqlx::query(&format!(
|
||
"UPDATE {} SET duration = $1, width = $2, height = $3, fps = $4, total_frames = $5 WHERE file_uuid = $6",
|
||
schema::table_name("videos")
|
||
))
|
||
.bind(duration)
|
||
.bind(width as i32)
|
||
.bind(height as i32)
|
||
.bind(fps)
|
||
.bind(total_frames)
|
||
.bind(&file_uuid)
|
||
.execute(state.db.pool())
|
||
.await;
|
||
|
||
Ok(Json(ProbeResponse {
|
||
file_uuid,
|
||
file_name,
|
||
duration,
|
||
width,
|
||
height,
|
||
fps,
|
||
total_frames,
|
||
cached,
|
||
format: probe_result.format,
|
||
streams: probe_result.streams,
|
||
}))
|
||
}
|
||
|
||
// --- P0 Core API Handlers ---
|
||
|
||
async fn trigger_processing(
|
||
State(state): State<AppState>,
|
||
Path(file_uuid): Path<String>,
|
||
Json(req): Json<ProcessRequest>,
|
||
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
|
||
let table = schema::table_name("videos");
|
||
let asset: Option<(String, i64)> = sqlx::query_as(&format!(
|
||
"SELECT file_name, COALESCE(total_frames, 0) FROM {} WHERE file_uuid = $1",
|
||
table
|
||
))
|
||
.bind(&file_uuid)
|
||
.fetch_optional(state.db.pool())
|
||
.await
|
||
.map_err(|e| {
|
||
(
|
||
StatusCode::INTERNAL_SERVER_ERROR,
|
||
format!("DB Error: {}", e),
|
||
)
|
||
})?;
|
||
|
||
let (file_name, total_frames) =
|
||
asset.ok_or((StatusCode::NOT_FOUND, "File not found".to_string()))?;
|
||
|
||
// Get video file_path
|
||
let video_path: Option<String> = sqlx::query_scalar(&format!(
|
||
"SELECT file_path FROM {} WHERE file_uuid = $1",
|
||
table
|
||
))
|
||
.bind(&file_uuid)
|
||
.fetch_optional(state.db.pool())
|
||
.await
|
||
.map_err(|e| {
|
||
(
|
||
StatusCode::INTERNAL_SERVER_ERROR,
|
||
format!("Failed to get video path: {}", e),
|
||
)
|
||
})?;
|
||
|
||
// 2. Create Monitor Job (Worker polls this table)
|
||
let monitor_job = state
|
||
.db
|
||
.create_monitor_job(&file_uuid, video_path.as_deref())
|
||
.await
|
||
.map_err(|e| {
|
||
(
|
||
StatusCode::INTERNAL_SERVER_ERROR,
|
||
format!("Failed to create monitor job: {}", e),
|
||
)
|
||
})?;
|
||
|
||
// Update processors if specified
|
||
let processors_to_run: Vec<&str> = if let Some(procs) = &req.processors {
|
||
let table = crate::core::db::schema::table_name("monitor_jobs");
|
||
sqlx::query(&format!(
|
||
"UPDATE {} SET processors = $1 WHERE id = $2",
|
||
table
|
||
))
|
||
.bind(procs)
|
||
.bind(monitor_job.id)
|
||
.execute(state.db.pool())
|
||
.await
|
||
.map_err(|e| {
|
||
(
|
||
StatusCode::INTERNAL_SERVER_ERROR,
|
||
format!("Failed to update processors: {}", e),
|
||
)
|
||
})?;
|
||
procs.iter().map(|s| s.as_str()).collect()
|
||
} else {
|
||
// Empty means all processors
|
||
vec![
|
||
"asr",
|
||
"cut",
|
||
"yolo",
|
||
"ocr",
|
||
"face",
|
||
"pose",
|
||
"asrx",
|
||
"visual_chunk",
|
||
]
|
||
};
|
||
|
||
// 3. Update Asset Status
|
||
let table = schema::table_name("videos");
|
||
sqlx::query(&format!(
|
||
"UPDATE {} SET status = 'processing' WHERE file_uuid = $1",
|
||
table
|
||
))
|
||
.bind(&file_uuid)
|
||
.execute(state.db.pool())
|
||
.await
|
||
.map_err(|e| {
|
||
(
|
||
StatusCode::INTERNAL_SERVER_ERROR,
|
||
format!("Failed to update asset status: {}", e),
|
||
)
|
||
})?;
|
||
|
||
// Initialize processing_status with all processors and total_frames
|
||
state
|
||
.db
|
||
.init_processing_status(&file_uuid, processors_to_run, total_frames as u64)
|
||
.await
|
||
.map_err(|e| {
|
||
(
|
||
StatusCode::INTERNAL_SERVER_ERROR,
|
||
format!("Failed to init processing status: {}", e),
|
||
)
|
||
})?;
|
||
|
||
tracing::info!("Job {} created for asset {}", monitor_job.id, file_uuid);
|
||
|
||
let prefix = REDIS_KEY_PREFIX.as_str();
|
||
let mut processor_pids: Vec<i32> = Vec::new();
|
||
if let Ok(redis_client) = RedisClient::new() {
|
||
if let Ok(mut conn) = redis_client.get_conn().await {
|
||
for name in ["asr", "cut", "asrx", "yolo", "ocr", "face", "pose"] {
|
||
let key = format!("{}worker:job:{}:processor:{}", prefix, file_uuid, name);
|
||
let pid: Option<i32> = redis::cmd("HGET")
|
||
.arg(&key)
|
||
.arg("pid")
|
||
.query_async(&mut conn)
|
||
.await
|
||
.ok()
|
||
.flatten();
|
||
if let Some(p) = pid {
|
||
processor_pids.push(p);
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
Ok(Json(serde_json::json!({
|
||
"job_id": monitor_job.id,
|
||
"file_uuid": file_uuid,
|
||
"status": "PENDING",
|
||
"pids": processor_pids,
|
||
"message": format!("Processing triggered for {}", file_name)
|
||
})))
|
||
}
|
||
|
||
async fn get_chunk_by_path(
|
||
Path((file_uuid, chunk_id)): Path<(String, String)>,
|
||
State(state): State<AppState>,
|
||
) -> Result<Json<Chunk>, StatusCode> {
|
||
let chunk = state
|
||
.db
|
||
.get_chunk_by_chunk_id_and_uuid(&chunk_id, &file_uuid)
|
||
.await
|
||
.map_err(|_| {
|
||
tracing::error!("[get_chunk_by_path] DB error: {}:{}", file_uuid, chunk_id);
|
||
StatusCode::INTERNAL_SERVER_ERROR
|
||
})?
|
||
.ok_or_else(|| {
|
||
tracing::warn!("[get_chunk_by_path] Not found: {}:{}", file_uuid, chunk_id);
|
||
StatusCode::NOT_FOUND
|
||
})?;
|
||
Ok(Json(chunk))
|
||
}
|
||
|
||
async fn get_asset_status(
|
||
State(state): State<AppState>,
|
||
Path(uuid): Path<String>,
|
||
) -> Result<Json<AssetStatusResponse>, StatusCode> {
|
||
let videos_table = schema::table_name("videos");
|
||
let row: Option<(String, String, chrono::DateTime<chrono::Utc>, String, i64)> = sqlx::query_as(
|
||
&format!(
|
||
"SELECT file_uuid, file_name, created_at AT TIME ZONE 'UTC', COALESCE(processing_status, 'REGISTERED'), COALESCE(total_frames, 0) FROM {} WHERE file_uuid = $1",
|
||
videos_table
|
||
)
|
||
)
|
||
.bind(&uuid)
|
||
.fetch_optional(state.db.pool())
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
let (uuid, file_name, time, status, total) = row.ok_or(StatusCode::NOT_FOUND)?;
|
||
|
||
let jobs_table = schema::table_name("monitor_jobs");
|
||
let job: Option<(String, String, i64, i64)> = sqlx::query_as(
|
||
&format!(
|
||
"SELECT id::text, COALESCE(status, 'QUEUED'), COALESCE(processed_frames, 0), COALESCE(total_frames, 0) FROM {} WHERE file_uuid = $1 ORDER BY created_at DESC LIMIT 1",
|
||
jobs_table
|
||
)
|
||
)
|
||
.bind(&uuid)
|
||
.fetch_optional(state.db.pool())
|
||
.await
|
||
.ok()
|
||
.flatten();
|
||
|
||
let progress = if let Some((jid, jstatus, pf, tf)) = job {
|
||
if tf > 0 && (jstatus == "RUNNING" || jstatus == "QUEUED") {
|
||
Some((
|
||
jid,
|
||
FrameProgress {
|
||
total_frames: tf,
|
||
processed_frames: pf,
|
||
progress_percent: (pf as f64 / tf as f64) * 100.0,
|
||
},
|
||
))
|
||
} else {
|
||
None
|
||
}
|
||
} else {
|
||
None
|
||
};
|
||
|
||
Ok(Json(AssetStatusResponse {
|
||
uuid,
|
||
file_name,
|
||
registration_time: time.to_rfc3339(),
|
||
processing_status: status,
|
||
current_job_id: progress.as_ref().map(|(id, _)| id.clone()),
|
||
frame_progress: progress.map(|(_, p)| p),
|
||
}))
|
||
}
|
||
|
||
async fn get_job_status(
|
||
State(state): State<AppState>,
|
||
Path(job_id): Path<String>,
|
||
) -> Result<Json<JobStatusResponse>, StatusCode> {
|
||
let jobs_table = schema::table_name("monitor_jobs");
|
||
let row: Option<(String, String, String, String, Option<String>, i64, i64)> = sqlx::query_as(
|
||
&format!(
|
||
"SELECT j.id::text, j.file_uuid, COALESCE(j.rule, 'unknown'), COALESCE(j.status, 'QUEUED'), j.assigned_processor_id::text, j.processed_frames, j.total_frames FROM {} j WHERE j.id = $1::uuid",
|
||
jobs_table
|
||
)
|
||
)
|
||
.bind(&job_id)
|
||
.fetch_optional(state.db.pool())
|
||
.await
|
||
.map_err(|e| {
|
||
eprintln!("DB Error in get_job_status: {}", e);
|
||
StatusCode::INTERNAL_SERVER_ERROR
|
||
})?;
|
||
|
||
let (id, asset, rule, status, proc_id, pf, tf) = row.ok_or(StatusCode::NOT_FOUND)?;
|
||
|
||
Ok(Json(JobStatusResponse {
|
||
job_id: id,
|
||
file_uuid: asset,
|
||
rule,
|
||
status,
|
||
current_processor_id: proc_id,
|
||
frame_progress: FrameProgress {
|
||
total_frames: tf,
|
||
processed_frames: pf,
|
||
progress_percent: if tf > 0 {
|
||
(pf as f64 / tf as f64) * 100.0
|
||
} else {
|
||
0.0
|
||
},
|
||
},
|
||
}))
|
||
}
|
||
|
||
async fn get_rule_status(
|
||
State(state): State<AppState>,
|
||
Path(rule): Path<String>,
|
||
) -> Result<Json<RuleStatusResponse>, StatusCode> {
|
||
let procs: Vec<String> = sqlx::query_scalar(
|
||
"SELECT id::text FROM processors WHERE supported_rules @> ARRAY[$1]::TEXT[]",
|
||
)
|
||
.bind(&rule)
|
||
.fetch_all(state.db.pool())
|
||
.await
|
||
.unwrap_or_default();
|
||
|
||
let jobs_table = schema::table_name("monitor_jobs");
|
||
let jobs: Vec<(String, String, String, String, Option<String>, i64, i64)> = sqlx::query_as(
|
||
&format!(
|
||
"SELECT id::text, file_uuid, COALESCE(rule, 'unknown'), status, assigned_processor_id::text, processed_frames, total_frames FROM {} WHERE rule = $1 AND status IN ('QUEUED','RUNNING')",
|
||
jobs_table
|
||
)
|
||
)
|
||
.bind(&rule)
|
||
.fetch_all(state.db.pool())
|
||
.await
|
||
.unwrap_or_default();
|
||
|
||
let active = jobs
|
||
.into_iter()
|
||
.map(|(id, asset, r, s, p, pf, tf)| JobStatusResponse {
|
||
job_id: id,
|
||
file_uuid: asset,
|
||
rule: r,
|
||
status: s,
|
||
current_processor_id: p,
|
||
frame_progress: FrameProgress {
|
||
total_frames: tf,
|
||
processed_frames: pf,
|
||
progress_percent: if tf > 0 {
|
||
(pf as f64 / tf as f64) * 100.0
|
||
} else {
|
||
0.0
|
||
},
|
||
},
|
||
})
|
||
.collect();
|
||
|
||
Ok(Json(RuleStatusResponse {
|
||
rule,
|
||
supported_processor_ids: procs,
|
||
active_jobs: active,
|
||
}))
|
||
}
|
||
|
||
// --- End P0 Core API Handlers ---
|
||
|
||
async fn search(
|
||
State(state): State<AppState>,
|
||
Json(req): Json<SearchRequest>,
|
||
) -> Result<Json<SearchResponse>, StatusCode> {
|
||
let mode = req.mode.unwrap_or(SearchMode::Smart);
|
||
let limit = req.limit.unwrap_or(10);
|
||
let query_hash = generate_query_hash(&req.query, req.uuid.as_deref(), limit);
|
||
let cache_key = keys::search(&format!("{:?}", mode));
|
||
let ttl = state.mongo_cache.ttl_search();
|
||
|
||
let response = state
|
||
.mongo_cache
|
||
.get_or_fetch(&cache_key, ttl, keys::CATEGORY_SEARCH, || async {
|
||
let pg = PostgresDb::init()
|
||
.await
|
||
.map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?;
|
||
|
||
let results: Vec<SearchResult> = match mode {
|
||
SearchMode::Vector => {
|
||
let query_vector = state
|
||
.embedder
|
||
.embed_query(&req.query)
|
||
.await
|
||
.map_err(|e| anyhow::anyhow!("Embedding failed: {}", e))?;
|
||
|
||
let qdrant = QdrantDb::new();
|
||
let search_results = if let Some(ref uuid) = req.uuid {
|
||
qdrant.search_in_uuid(&query_vector, uuid, limit).await?
|
||
} else {
|
||
qdrant.search(&query_vector, limit).await?
|
||
};
|
||
|
||
let mut results = Vec::new();
|
||
for r in search_results {
|
||
if let Some(chunk) = pg
|
||
.get_chunk_by_chunk_id_and_uuid(&r.chunk_id, &r.uuid)
|
||
.await
|
||
.ok()
|
||
.flatten()
|
||
{
|
||
let text = extract_text_from_content(&chunk.content);
|
||
results.push(SearchResult {
|
||
uuid: r.uuid,
|
||
chunk_id: r.chunk_id,
|
||
chunk_type: chunk.chunk_type.as_str().to_string(),
|
||
start_time: chunk.start_time().seconds(),
|
||
end_time: chunk.end_time().seconds(),
|
||
text,
|
||
score: r.score,
|
||
});
|
||
}
|
||
}
|
||
results
|
||
}
|
||
SearchMode::Smart => {
|
||
// Vector search + BM25 reranking
|
||
let query_vector = state
|
||
.embedder
|
||
.embed_query(&req.query)
|
||
.await
|
||
.map_err(|e| anyhow::anyhow!("Embedding failed: {}", e))?;
|
||
|
||
let qdrant = QdrantDb::new();
|
||
let search_results = if let Some(ref uuid) = req.uuid {
|
||
qdrant
|
||
.search_in_uuid(&query_vector, uuid, limit * 2)
|
||
.await?
|
||
} else {
|
||
qdrant.search(&query_vector, limit * 2).await?
|
||
};
|
||
|
||
// 取得所有 chunk 並用 BM25 重新排序
|
||
let mut results_with_bm25: Vec<(SearchResult, f32)> = Vec::new();
|
||
for r in search_results {
|
||
if let Some(chunk) = pg
|
||
.get_chunk_by_chunk_id_and_uuid(&r.chunk_id, &r.uuid)
|
||
.await
|
||
.ok()
|
||
.flatten()
|
||
{
|
||
let text = extract_text_from_content(&chunk.content);
|
||
let vector_score = r.score;
|
||
results_with_bm25.push((
|
||
SearchResult {
|
||
uuid: r.uuid,
|
||
chunk_id: r.chunk_id,
|
||
chunk_type: chunk.chunk_type.as_str().to_string(),
|
||
start_time: chunk.start_time().seconds(),
|
||
end_time: chunk.end_time().seconds(),
|
||
text,
|
||
score: vector_score,
|
||
},
|
||
vector_score,
|
||
));
|
||
}
|
||
}
|
||
|
||
// 依 vector score 排序後取前 limit 個
|
||
results_with_bm25
|
||
.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
|
||
results_with_bm25
|
||
.into_iter()
|
||
.take(limit)
|
||
.map(|(r, _)| r)
|
||
.collect()
|
||
}
|
||
};
|
||
|
||
Ok::<SearchResponse, anyhow::Error>(SearchResponse {
|
||
results,
|
||
query: req.query.clone(),
|
||
})
|
||
})
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
Ok(Json(response))
|
||
}
|
||
|
||
async fn search_bm25(
|
||
State(state): State<AppState>,
|
||
Json(req): Json<SearchRequest>,
|
||
) -> Result<Json<SearchResponse>, StatusCode> {
|
||
let limit = req.limit.unwrap_or(10);
|
||
|
||
let bm25_results = state
|
||
.db
|
||
.search_bm25(&req.query, req.uuid.as_deref(), limit)
|
||
.await
|
||
.map_err(|e| {
|
||
tracing::error!("BM25 search failed: {}", e);
|
||
StatusCode::INTERNAL_SERVER_ERROR
|
||
})?;
|
||
|
||
let results: Vec<SearchResult> = bm25_results
|
||
.into_iter()
|
||
.map(|r| SearchResult {
|
||
uuid: r.uuid,
|
||
chunk_id: r.chunk_id,
|
||
chunk_type: r.chunk_type,
|
||
start_time: r.start_time,
|
||
end_time: r.end_time,
|
||
text: r.text,
|
||
score: r.bm25_score,
|
||
})
|
||
.collect();
|
||
|
||
Ok(Json(SearchResponse {
|
||
results,
|
||
query: req.query.clone(),
|
||
}))
|
||
}
|
||
|
||
async fn search_smart(
|
||
State(state): State<AppState>,
|
||
Json(req): Json<SearchRequest>,
|
||
) -> Result<Json<SearchResponse>, StatusCode> {
|
||
let limit = req.limit.unwrap_or(10);
|
||
let query_hash = generate_query_hash(&req.query, req.uuid.as_deref(), limit);
|
||
let cache_key = keys::search(&format!("{}smart", query_hash));
|
||
let ttl = state.mongo_cache.ttl_search();
|
||
|
||
let response = state
|
||
.mongo_cache
|
||
.get_or_fetch(&cache_key, ttl, keys::CATEGORY_SEARCH, || async {
|
||
let pg = PostgresDb::init()
|
||
.await
|
||
.map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?;
|
||
|
||
let keywords = vec![req.query.clone()];
|
||
|
||
let search_terms = keywords.join(" ");
|
||
|
||
let bm25_results = pg
|
||
.search_bm25(&search_terms, req.uuid.as_deref(), limit)
|
||
.await?;
|
||
|
||
let results: Vec<SearchResult> = bm25_results
|
||
.into_iter()
|
||
.map(|r| SearchResult {
|
||
uuid: r.uuid,
|
||
chunk_id: r.chunk_id,
|
||
chunk_type: r.chunk_type,
|
||
start_time: r.start_time,
|
||
end_time: r.end_time,
|
||
text: r.text,
|
||
score: r.bm25_score,
|
||
})
|
||
.collect();
|
||
|
||
Ok::<SearchResponse, anyhow::Error>(SearchResponse {
|
||
results,
|
||
query: req.query.clone(),
|
||
})
|
||
})
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
Ok(Json(response))
|
||
}
|
||
|
||
async fn hybrid_search(
|
||
State(state): State<AppState>,
|
||
Json(req): Json<HybridSearchRequest>,
|
||
) -> Result<Json<HybridSearchResponse>, StatusCode> {
|
||
let limit = req.limit.unwrap_or(10);
|
||
let vector_weight = req.vector_weight.unwrap_or(0.7);
|
||
let bm25_weight = req.bm25_weight.unwrap_or(0.3);
|
||
|
||
let query_hash = generate_query_hash(&req.query, req.uuid.as_deref(), limit);
|
||
let cache_key = keys::hybrid_search(&query_hash);
|
||
let ttl = state.mongo_cache.ttl_hybrid_search();
|
||
|
||
let response = state
|
||
.mongo_cache
|
||
.get_or_fetch(&cache_key, ttl, keys::CATEGORY_HYBRID_SEARCH, || async {
|
||
let query_vector = state
|
||
.embedder
|
||
.embed_query(&req.query)
|
||
.await
|
||
.map_err(|e| anyhow::anyhow!("Embedding failed: {}", e))?;
|
||
|
||
let pg = PostgresDb::init()
|
||
.await
|
||
.map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?;
|
||
|
||
let results = pg
|
||
.hybrid_search(
|
||
&req.query,
|
||
&query_vector,
|
||
req.uuid.as_deref(),
|
||
limit,
|
||
vector_weight,
|
||
bm25_weight,
|
||
)
|
||
.await?;
|
||
|
||
let search_results: Vec<HybridSearchResult> = results
|
||
.into_iter()
|
||
.map(|r| HybridSearchResult {
|
||
uuid: r.uuid,
|
||
chunk_id: r.chunk_id,
|
||
chunk_type: r.chunk_type,
|
||
start_time: r.start_time,
|
||
end_time: r.end_time,
|
||
text: r.text,
|
||
vector_score: r.vector_score,
|
||
bm25_score: r.bm25_score,
|
||
combined_score: r.combined_score,
|
||
})
|
||
.collect();
|
||
|
||
Ok::<HybridSearchResponse, anyhow::Error>(HybridSearchResponse {
|
||
results: search_results,
|
||
query: req.query.clone(),
|
||
})
|
||
})
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
Ok(Json(response))
|
||
}
|
||
|
||
async fn lookup(
|
||
State(state): State<AppState>,
|
||
Query(query): Query<LookupQuery>,
|
||
) -> Result<Json<LookupResponse>, StatusCode> {
|
||
if let Some(path) = query.path {
|
||
let uuid = crate::uuid::compute_uuid_from_path(&path);
|
||
return Ok(Json(LookupResponse {
|
||
file_uuid: uuid,
|
||
file_path: None,
|
||
file_name: None,
|
||
duration: None,
|
||
}));
|
||
}
|
||
|
||
if let Some(uuid) = query.uuid {
|
||
let cache_key = keys::video_meta(&uuid);
|
||
let ttl = state.mongo_cache.ttl_video_meta();
|
||
|
||
let video = state
|
||
.mongo_cache
|
||
.get_or_fetch(&cache_key, ttl, keys::CATEGORY_VIDEO_META, || async {
|
||
let db = PostgresDb::init()
|
||
.await
|
||
.map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?;
|
||
db.get_video_by_uuid(&uuid)
|
||
.await
|
||
.map_err(|e| anyhow::anyhow!("{}", e))
|
||
})
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
if let Some(v) = video {
|
||
return Ok(Json(LookupResponse {
|
||
file_uuid: v.file_uuid,
|
||
file_path: Some(v.file_path),
|
||
file_name: Some(v.file_name),
|
||
duration: Some(v.duration),
|
||
}));
|
||
}
|
||
}
|
||
|
||
Err(StatusCode::NOT_FOUND)
|
||
}
|
||
|
||
#[derive(Debug, Serialize, Deserialize)]
|
||
struct ScannedFileInfo {
|
||
file_name: String,
|
||
relative_path: String,
|
||
file_path: String,
|
||
file_size: u64,
|
||
modified_time: String,
|
||
// Registration info
|
||
is_registered: bool,
|
||
file_uuid: Option<String>,
|
||
status: Option<String>,
|
||
registration_time: Option<String>,
|
||
}
|
||
|
||
#[derive(Debug, Serialize, Deserialize)]
|
||
struct ScanFilesResponse {
|
||
files: Vec<ScannedFileInfo>,
|
||
total: usize,
|
||
registered_count: usize,
|
||
unregistered_count: usize,
|
||
}
|
||
|
||
fn scan_directory_recursive(
|
||
dir: &std::path::Path,
|
||
root: &std::path::Path,
|
||
allowed_extensions: &[&str],
|
||
registered_paths: &std::collections::HashMap<String, (String, String, Option<String>)>,
|
||
files: &mut Vec<ScannedFileInfo>,
|
||
) {
|
||
if let Ok(entries) = std::fs::read_dir(dir) {
|
||
for entry in entries.flatten() {
|
||
let path = entry.path();
|
||
if path.is_dir() {
|
||
if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
|
||
if name.starts_with('.') {
|
||
return;
|
||
}
|
||
}
|
||
scan_directory_recursive(&path, root, allowed_extensions, registered_paths, files);
|
||
} else if path.is_file() {
|
||
if let Some(ext) = path.extension().and_then(|e| e.to_str()) {
|
||
if allowed_extensions.contains(&ext.to_lowercase().as_str()) {
|
||
if let Ok(meta) = entry.metadata() {
|
||
let abs_path = path.to_string_lossy().to_string();
|
||
let rel_path = path
|
||
.strip_prefix(root)
|
||
.map(|p| p.to_string_lossy().to_string())
|
||
.unwrap_or_else(|_| abs_path.clone());
|
||
|
||
let file_name = path
|
||
.file_name()
|
||
.map(|n| n.to_string_lossy().to_string())
|
||
.unwrap_or_default();
|
||
|
||
let modified_time = meta
|
||
.modified()
|
||
.ok()
|
||
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
|
||
.map(|d| {
|
||
chrono::DateTime::from_timestamp(d.as_secs() as i64, 0)
|
||
.map(|dt| dt.to_rfc3339())
|
||
.unwrap_or_default()
|
||
})
|
||
.unwrap_or_default();
|
||
|
||
// Check registration
|
||
if let Some((uuid, status, reg_time)) = registered_paths.get(&abs_path)
|
||
{
|
||
files.push(ScannedFileInfo {
|
||
file_name,
|
||
relative_path: rel_path,
|
||
file_path: abs_path,
|
||
file_size: meta.len(),
|
||
modified_time,
|
||
is_registered: true,
|
||
file_uuid: Some(uuid.clone()),
|
||
status: Some(status.clone()),
|
||
registration_time: reg_time.clone(),
|
||
});
|
||
} else {
|
||
files.push(ScannedFileInfo {
|
||
file_name,
|
||
relative_path: rel_path,
|
||
file_path: abs_path,
|
||
file_size: meta.len(),
|
||
modified_time,
|
||
is_registered: false,
|
||
file_uuid: None,
|
||
status: None,
|
||
registration_time: None,
|
||
});
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
async fn scan_files(State(state): State<AppState>) -> Result<Json<ScanFilesResponse>, StatusCode> {
|
||
let demo_dir_str = std::env::var("MOMENTRY_SFTP_ROOT")
|
||
.unwrap_or_else(|_| "/Users/accusys/momentry/var/sftpgo/data/demo".to_string());
|
||
let demo_dir = std::path::Path::new(&demo_dir_str);
|
||
|
||
let allowed_extensions = vec!["mp4", "mov", "mkv", "avi", "webm", "jpg", "jpeg", "png", "gif", "webp"];
|
||
|
||
// 1. Get registered files from DB (Map key: absolute file_path)
|
||
let table = schema::table_name("videos");
|
||
let registered_db: Vec<(String, String, String, String, Option<String>)> = sqlx::query_as(&format!(
|
||
"SELECT file_path, file_name, file_uuid, status, registration_time::text FROM {} ORDER BY id",
|
||
table
|
||
))
|
||
.fetch_all(state.db.pool())
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
let registered_paths: std::collections::HashMap<String, (String, String, Option<String>)> =
|
||
registered_db
|
||
.into_iter()
|
||
.map(|(path, _name, uuid, status, reg_time)| (path, (uuid, status, reg_time)))
|
||
.collect();
|
||
|
||
// 2. Scan filesystem recursively
|
||
let mut result_files = Vec::new();
|
||
|
||
if demo_dir.exists() {
|
||
scan_directory_recursive(
|
||
demo_dir,
|
||
demo_dir,
|
||
&allowed_extensions,
|
||
®istered_paths,
|
||
&mut result_files,
|
||
);
|
||
}
|
||
|
||
// 3. Sort: registered first, then by name
|
||
result_files.sort_by(|a, b| {
|
||
b.is_registered
|
||
.cmp(&a.is_registered)
|
||
.then(a.relative_path.cmp(&b.relative_path))
|
||
});
|
||
|
||
let registered_count = result_files.iter().filter(|f| f.is_registered).count();
|
||
let unregistered_count = result_files.iter().filter(|f| !f.is_registered).count();
|
||
|
||
Ok(Json(ScanFilesResponse {
|
||
files: result_files,
|
||
total: registered_count + unregistered_count,
|
||
registered_count,
|
||
unregistered_count,
|
||
}))
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct ProgressResponse {
|
||
file_uuid: String,
|
||
user: Option<String>,
|
||
group: Option<String>,
|
||
file_name: Option<String>,
|
||
duration: Option<f64>,
|
||
overall_progress: u32,
|
||
cpu_percent: Option<f64>,
|
||
gpu_percent: Option<f64>,
|
||
memory_percent: Option<f64>,
|
||
memory_mb: Option<u64>,
|
||
system: Option<SystemHealthInfo>,
|
||
processors: Vec<ProcessorProgressInfo>,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct SystemHealthInfo {
|
||
cpu_idle_pct: f64,
|
||
memory_available_mb: u64,
|
||
memory_total_mb: u64,
|
||
memory_used_pct: f64,
|
||
gpu_available: bool,
|
||
gpu_utilization_pct: Option<f64>,
|
||
gpu_memory_used_pct: Option<f64>,
|
||
dynamic_concurrency: u32,
|
||
config_concurrency: u32,
|
||
running_processors: u32,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct ProcessorProgressInfo {
|
||
name: String,
|
||
status: String,
|
||
current: u32,
|
||
total: u32,
|
||
progress: u32,
|
||
message: String,
|
||
frames_processed: i32,
|
||
chunks_produced: i32,
|
||
retry_count: i32,
|
||
}
|
||
|
||
/// 從 .json 輸出檔讀取 processor 的已處理幀數
|
||
fn get_json_frame_count(file_uuid: &str, processor: &str) -> i32 {
|
||
let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR")
|
||
.unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string());
|
||
let path = std::path::Path::new(&output_dir).join(format!("{}.{}.json", file_uuid, processor));
|
||
match std::fs::read_to_string(&path) {
|
||
Ok(content) => match serde_json::from_str::<serde_json::Value>(&content) {
|
||
Ok(val) => val.get("frame_count").and_then(|v| v.as_i64()).unwrap_or(0) as i32,
|
||
Err(_) => 0,
|
||
},
|
||
Err(_) => 0,
|
||
}
|
||
}
|
||
|
||
fn get_system_stats() -> (Option<f64>, Option<f64>, Option<f64>, Option<u64>) {
|
||
use std::process::Command;
|
||
|
||
let pid = std::process::id().to_string();
|
||
|
||
let cpu = Command::new("ps")
|
||
.args(["-p", &pid, "-o", "%cpu="])
|
||
.output()
|
||
.ok()
|
||
.and_then(|o| String::from_utf8_lossy(&o.stdout).trim().parse().ok());
|
||
|
||
let (mem_percent, mem_rss) = Command::new("ps")
|
||
.args(["-p", &pid, "-o", "%mem=,rss="])
|
||
.output()
|
||
.ok()
|
||
.map(|o| {
|
||
let output = String::from_utf8_lossy(&o.stdout);
|
||
let parts: Vec<&str> = output.split_whitespace().collect();
|
||
let percent = parts.first().and_then(|s| s.parse().ok());
|
||
let rss = parts.get(1).and_then(|s| s.parse().ok());
|
||
(percent, rss)
|
||
})
|
||
.unwrap_or((None, None));
|
||
|
||
let gpu = Command::new("nvidia-smi")
|
||
.args([
|
||
"--query-gpu=utilization.gpu",
|
||
"--format=csv,noheader,nounits",
|
||
])
|
||
.output()
|
||
.ok()
|
||
.and_then(|o| String::from_utf8_lossy(&o.stdout).trim().parse().ok());
|
||
|
||
(cpu, gpu, mem_percent, mem_rss)
|
||
}
|
||
|
||
async fn get_progress(
|
||
axum::extract::Path(file_uuid): axum::extract::Path<String>,
|
||
) -> Result<Json<ProgressResponse>, StatusCode> {
|
||
let redis = RedisClient::new().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
let pg = PostgresDb::init()
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
let mut conn = redis
|
||
.get_conn()
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
let processor_names = ["asr", "cut", "asrx", "yolo", "ocr", "face", "pose"];
|
||
let mut processors = Vec::new();
|
||
let mut completed_count = 0u32;
|
||
|
||
for name in processor_names {
|
||
let prefix = REDIS_KEY_PREFIX.as_str();
|
||
let key = format!("{}worker:job:{}:processor:{}", prefix, file_uuid, name);
|
||
let status: String = redis::cmd("HGET")
|
||
.arg(&key)
|
||
.arg("status")
|
||
.query_async(&mut conn)
|
||
.await
|
||
.unwrap_or_else(|_| "pending".to_string());
|
||
let current: u32 = redis::cmd("HGET")
|
||
.arg(&key)
|
||
.arg("current")
|
||
.query_async(&mut conn)
|
||
.await
|
||
.unwrap_or_else(|_| "0".to_string())
|
||
.parse()
|
||
.unwrap_or(0);
|
||
let total: u32 = redis::cmd("HGET")
|
||
.arg(&key)
|
||
.arg("total")
|
||
.query_async(&mut conn)
|
||
.await
|
||
.unwrap_or_else(|_| "0".to_string())
|
||
.parse()
|
||
.unwrap_or(0);
|
||
let message: String = redis::cmd("HGET")
|
||
.arg(&key)
|
||
.arg("message")
|
||
.query_async(&mut conn)
|
||
.await
|
||
.unwrap_or_else(|_| "".to_string());
|
||
|
||
let progress = if total > 0 {
|
||
((current as f64 / total as f64) * 100.0) as u32
|
||
} else if status == "complete" {
|
||
100
|
||
} else {
|
||
0
|
||
};
|
||
|
||
// 從 Redis 讀取 frames_processed / chunks_produced / retry_count
|
||
let frames_processed: i32 = redis::cmd("HGET")
|
||
.arg(&key)
|
||
.arg("frames_processed")
|
||
.query_async(&mut conn)
|
||
.await
|
||
.unwrap_or_else(|_| "0".to_string())
|
||
.parse()
|
||
.unwrap_or(0);
|
||
let chunks_produced: i32 = redis::cmd("HGET")
|
||
.arg(&key)
|
||
.arg("chunks_produced")
|
||
.query_async(&mut conn)
|
||
.await
|
||
.unwrap_or_else(|_| "0".to_string())
|
||
.parse()
|
||
.unwrap_or(0);
|
||
let retry_count: i32 = redis::cmd("HGET")
|
||
.arg(&key)
|
||
.arg("retry_count")
|
||
.query_async(&mut conn)
|
||
.await
|
||
.unwrap_or_else(|_| "0".to_string())
|
||
.parse()
|
||
.unwrap_or(0);
|
||
|
||
if status == "complete" {
|
||
completed_count += 1;
|
||
}
|
||
|
||
processors.push(ProcessorProgressInfo {
|
||
name: name.to_string(),
|
||
status,
|
||
current,
|
||
total,
|
||
progress,
|
||
message,
|
||
frames_processed,
|
||
chunks_produced,
|
||
retry_count,
|
||
});
|
||
}
|
||
|
||
let overall_progress = (completed_count as f64 / processor_names.len() as f64 * 100.0) as u32;
|
||
|
||
let job_key = format!("{}worker:job:{}", REDIS_KEY_PREFIX.as_str(), file_uuid);
|
||
let user: Option<String> = redis::cmd("HGET")
|
||
.arg(&job_key)
|
||
.arg("user")
|
||
.query_async(&mut conn)
|
||
.await
|
||
.ok()
|
||
.filter(|s: &String| !s.is_empty());
|
||
|
||
let group: Option<String> = redis::cmd("HGET")
|
||
.arg(&job_key)
|
||
.arg("group")
|
||
.query_async(&mut conn)
|
||
.await
|
||
.ok()
|
||
.filter(|s: &String| !s.is_empty());
|
||
|
||
let (file_name, duration) = pg
|
||
.get_video_by_uuid(&file_uuid)
|
||
.await
|
||
.ok()
|
||
.flatten()
|
||
.map(|v| (Some(v.file_name), Some(v.duration)))
|
||
.unwrap_or((None, None));
|
||
|
||
let (cpu_percent, gpu_percent, memory_percent, memory_mb) = get_system_stats();
|
||
|
||
// 從 Redis 讀取系統健康資訊(由 Worker 寫入)
|
||
let health_key = format!("{}health", REDIS_KEY_PREFIX.as_str());
|
||
let cpu_idle: Option<String> = redis::cmd("HGET")
|
||
.arg(&health_key)
|
||
.arg("cpu_idle_pct")
|
||
.query_async(&mut conn)
|
||
.await
|
||
.ok();
|
||
let mem_avail: Option<String> = redis::cmd("HGET")
|
||
.arg(&health_key)
|
||
.arg("memory_available_mb")
|
||
.query_async(&mut conn)
|
||
.await
|
||
.ok();
|
||
let mem_total: Option<String> = redis::cmd("HGET")
|
||
.arg(&health_key)
|
||
.arg("memory_total_mb")
|
||
.query_async(&mut conn)
|
||
.await
|
||
.ok();
|
||
let mem_used: Option<String> = redis::cmd("HGET")
|
||
.arg(&health_key)
|
||
.arg("memory_used_pct")
|
||
.query_async(&mut conn)
|
||
.await
|
||
.ok();
|
||
let gpu_avail: Option<String> = redis::cmd("HGET")
|
||
.arg(&health_key)
|
||
.arg("gpu_available")
|
||
.query_async(&mut conn)
|
||
.await
|
||
.ok();
|
||
let gpu_util: Option<String> = redis::cmd("HGET")
|
||
.arg(&health_key)
|
||
.arg("gpu_utilization_pct")
|
||
.query_async(&mut conn)
|
||
.await
|
||
.ok();
|
||
let gpu_mem: Option<String> = redis::cmd("HGET")
|
||
.arg(&health_key)
|
||
.arg("gpu_memory_used_pct")
|
||
.query_async(&mut conn)
|
||
.await
|
||
.ok();
|
||
let dyn_conc: Option<String> = redis::cmd("HGET")
|
||
.arg(&health_key)
|
||
.arg("dynamic_concurrency")
|
||
.query_async(&mut conn)
|
||
.await
|
||
.ok();
|
||
let cfg_conc: Option<String> = redis::cmd("HGET")
|
||
.arg(&health_key)
|
||
.arg("config_concurrency")
|
||
.query_async(&mut conn)
|
||
.await
|
||
.ok();
|
||
let run_proc: Option<String> = redis::cmd("HGET")
|
||
.arg(&health_key)
|
||
.arg("running_processors")
|
||
.query_async(&mut conn)
|
||
.await
|
||
.ok();
|
||
|
||
let system = cpu_idle.and_then(|idle| {
|
||
Some(SystemHealthInfo {
|
||
cpu_idle_pct: idle.parse().ok()?,
|
||
memory_available_mb: mem_avail?.parse().ok()?,
|
||
memory_total_mb: mem_total?.parse().ok()?,
|
||
memory_used_pct: mem_used?.parse().ok()?,
|
||
gpu_available: gpu_avail.map(|v| v == "true").unwrap_or(false),
|
||
gpu_utilization_pct: gpu_util.and_then(|v| v.parse().ok()),
|
||
gpu_memory_used_pct: gpu_mem.and_then(|v| v.parse().ok()),
|
||
dynamic_concurrency: dyn_conc?.parse().ok()?,
|
||
config_concurrency: cfg_conc?.parse().ok()?,
|
||
running_processors: run_proc?.parse().ok()?,
|
||
})
|
||
});
|
||
|
||
Ok(Json(ProgressResponse {
|
||
file_uuid,
|
||
user,
|
||
group,
|
||
file_name,
|
||
duration,
|
||
overall_progress,
|
||
cpu_percent,
|
||
gpu_percent,
|
||
memory_percent,
|
||
memory_mb,
|
||
system,
|
||
processors,
|
||
}))
|
||
}
|
||
|
||
async fn list_jobs(Query(params): Query<JobsQuery>) -> Result<Json<JobListResponse>, StatusCode> {
|
||
let page = params.page.unwrap_or(1);
|
||
let page_size = params.page_size.unwrap_or(20);
|
||
let status_filter = params
|
||
.status
|
||
.unwrap_or_else(|| "pending,running".to_string());
|
||
let offset = ((page - 1) as i64) * (page_size as i64);
|
||
|
||
let pg = PostgresDb::init()
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
let table = crate::core::db::schema::table_name("monitor_jobs");
|
||
|
||
// Build status IN clause
|
||
let statuses: Vec<String> = status_filter
|
||
.split(',')
|
||
.map(|s| format!("'{}'", s.trim()))
|
||
.collect();
|
||
let status_clause = statuses.join(",");
|
||
|
||
let query = format!(
|
||
"SELECT id, uuid, video_path, status, current_processor, progress_total, progress_current,
|
||
error_count, last_error, started_at::TEXT, updated_at::TEXT, created_at::TEXT,
|
||
processors, completed_processors, failed_processors, video_id
|
||
FROM {}
|
||
WHERE status IN ({})
|
||
ORDER BY created_at DESC
|
||
LIMIT {} OFFSET {}",
|
||
table, status_clause, page_size, offset
|
||
);
|
||
|
||
let count_query = format!(
|
||
"SELECT COUNT(*) FROM {} WHERE status IN ({})",
|
||
table, status_clause
|
||
);
|
||
|
||
let total_count: i64 = sqlx::query_scalar(&count_query)
|
||
.fetch_one(pg.pool())
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
use crate::core::db::MonitorJobStatus;
|
||
|
||
let rows = sqlx::query(&query)
|
||
.fetch_all(pg.pool())
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
let job_infos: Vec<JobInfoResponse> = rows
|
||
.into_iter()
|
||
.map(|r| {
|
||
let status_str: String = r.try_get("status").unwrap_or_default();
|
||
let status =
|
||
MonitorJobStatus::from_db_str(&status_str).unwrap_or(MonitorJobStatus::Pending);
|
||
JobInfoResponse {
|
||
id: r.try_get("id").unwrap_or(0),
|
||
uuid: r.try_get("uuid").unwrap_or_default(),
|
||
status: status.as_str().to_string(),
|
||
current_processor: r.try_get("current_processor").ok(),
|
||
progress_current: r.try_get("progress_current").unwrap_or(0),
|
||
progress_total: r.try_get("progress_total").unwrap_or(0),
|
||
created_at: r.try_get::<String, _>("created_at").unwrap_or_default(),
|
||
started_at: r.try_get::<String, _>("started_at").ok(),
|
||
}
|
||
})
|
||
.collect();
|
||
|
||
Ok(Json(JobListResponse {
|
||
jobs: job_infos,
|
||
count: total_count,
|
||
page,
|
||
page_size,
|
||
}))
|
||
}
|
||
|
||
async fn get_job(
|
||
axum::extract::Path(uuid): axum::extract::Path<String>,
|
||
) -> Result<Json<JobDetailResponse>, StatusCode> {
|
||
tracing::info!("[get_job] START - uuid: {}", uuid);
|
||
|
||
let pg = PostgresDb::init().await.map_err(|e| {
|
||
tracing::error!("[get_job] ERROR - Failed to init PostgresDb: {}", e);
|
||
StatusCode::INTERNAL_SERVER_ERROR
|
||
})?;
|
||
tracing::info!("[get_job] PostgresDb initialized");
|
||
|
||
let job = pg
|
||
.get_monitor_job_by_uuid(&uuid)
|
||
.await
|
||
.map_err(|e| {
|
||
tracing::error!("[get_job] ERROR - Failed to get monitor job: {}", e);
|
||
StatusCode::INTERNAL_SERVER_ERROR
|
||
})?
|
||
.ok_or_else(|| {
|
||
tracing::warn!("[get_job] Job not found: {}", uuid);
|
||
StatusCode::NOT_FOUND
|
||
})?;
|
||
|
||
tracing::info!("[get_job] Found job: id={}, uuid={}", job.id, job.uuid);
|
||
|
||
let results = pg.get_processor_results_by_job(job.id).await.map_err(|e| {
|
||
tracing::error!("[get_job] ERROR - Failed to get processor results: {}", e);
|
||
StatusCode::INTERNAL_SERVER_ERROR
|
||
})?;
|
||
|
||
tracing::info!("[get_job] Got {} processor results", results.len());
|
||
|
||
let processors: Vec<ProcessorInfoResponse> = results
|
||
.into_iter()
|
||
.map(|r| ProcessorInfoResponse {
|
||
processor_type: r.processor_type.as_str().to_string(),
|
||
status: r.status.as_str().to_string(),
|
||
started_at: r.started_at,
|
||
completed_at: r.completed_at,
|
||
duration_secs: r.duration_secs,
|
||
error_message: r.error_message,
|
||
})
|
||
.collect();
|
||
|
||
tracing::info!("[get_job] Mapped {} processors", processors.len());
|
||
|
||
let response = JobDetailResponse {
|
||
id: job.id,
|
||
uuid: job.uuid,
|
||
status: job.status.as_str().to_string(),
|
||
current_processor: job.current_processor,
|
||
progress_current: job.progress_current,
|
||
progress_total: job.progress_total,
|
||
processors,
|
||
created_at: job.created_at.to_string(),
|
||
started_at: job.started_at.map(|t| t.to_string()),
|
||
updated_at: job.updated_at.map(|t| t.to_string()),
|
||
};
|
||
|
||
tracing::info!("[get_job] SUCCESS - returning response");
|
||
Ok(Json(response))
|
||
}
|
||
|
||
async fn cache_toggle(
|
||
State(_state): State<AppState>,
|
||
Json(req): Json<CacheToggleRequest>,
|
||
) -> Result<Json<CacheToggleResponse>, StatusCode> {
|
||
tracing::info!("[cache_toggle] Setting cache enabled to: {}", req.enabled);
|
||
|
||
crate::core::config::set_cache_enabled(req.enabled);
|
||
|
||
let response = CacheToggleResponse {
|
||
success: true,
|
||
cache_enabled: req.enabled,
|
||
message: if req.enabled {
|
||
"Cache enabled".to_string()
|
||
} else {
|
||
"Cache disabled".to_string()
|
||
},
|
||
};
|
||
|
||
tracing::info!("[cache_toggle] SUCCESS");
|
||
Ok(Json(response))
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct UnregisterResponse {
|
||
success: bool,
|
||
file_uuid: String,
|
||
message: String,
|
||
}
|
||
|
||
#[derive(Debug, Deserialize)]
|
||
struct UnregisterRequest {
|
||
file_uuid: Option<String>,
|
||
file_path: Option<String>,
|
||
pattern: Option<String>,
|
||
}
|
||
|
||
async fn unregister(
|
||
State(state): State<AppState>,
|
||
Json(req): Json<UnregisterRequest>,
|
||
) -> Result<Json<UnregisterResponse>, StatusCode> {
|
||
let db = &state.db;
|
||
|
||
// Pattern mode: unregister all matching files in a directory
|
||
if let (Some(ref dir_path), Some(ref pat)) = (&req.file_path, &req.pattern) {
|
||
let dir = std::path::Path::new(dir_path);
|
||
if !dir.is_dir() {
|
||
return Ok(Json(UnregisterResponse {
|
||
success: false,
|
||
file_uuid: String::new(),
|
||
message: format!("Not a directory: {}", dir_path),
|
||
}));
|
||
}
|
||
let re = regex::Regex::new(pat).map_err(|_| StatusCode::BAD_REQUEST)?;
|
||
let mut count = 0u32;
|
||
let table = crate::core::db::schema::table_name("videos");
|
||
if let Ok(entries) = std::fs::read_dir(dir) {
|
||
for entry in entries.flatten() {
|
||
let fname = entry.file_name().to_string_lossy().to_string();
|
||
if !re.is_match(&fname) {
|
||
continue;
|
||
}
|
||
// Find file_uuid by file_name and delete
|
||
let rows: Vec<(String,)> = sqlx::query_as(&format!(
|
||
"SELECT file_uuid FROM {} WHERE file_name = $1",
|
||
table
|
||
))
|
||
.bind(&fname)
|
||
.fetch_all(db.pool())
|
||
.await
|
||
.unwrap_or_default();
|
||
for (uuid,) in rows {
|
||
let _ = db.delete_video(&uuid).await;
|
||
count += 1;
|
||
}
|
||
}
|
||
}
|
||
let _ = state.mongo_cache.invalidate_videos_list().await;
|
||
return Ok(Json(UnregisterResponse {
|
||
success: true,
|
||
file_uuid: format!("batch_{}", count),
|
||
message: format!("Unregistered {} files matching pattern", count),
|
||
}));
|
||
}
|
||
|
||
// Single UUID mode
|
||
let uuid = req.file_uuid.as_deref().unwrap_or("");
|
||
if uuid.is_empty() {
|
||
return Ok(Json(UnregisterResponse {
|
||
success: false,
|
||
file_uuid: String::new(),
|
||
message: "Either file_uuid or file_path+pattern is required".to_string(),
|
||
}));
|
||
}
|
||
|
||
tracing::info!("[unregister] Unregistering file: {}", uuid);
|
||
match db.delete_video(uuid).await {
|
||
Ok(_) => {
|
||
let _ = state.mongo_cache.invalidate_videos_list().await;
|
||
Ok(Json(UnregisterResponse {
|
||
success: true,
|
||
file_uuid: uuid.to_string(),
|
||
message: "File unregistered successfully".to_string(),
|
||
}))
|
||
}
|
||
Err(e) => {
|
||
tracing::error!("[unregister] Failed: {}", e);
|
||
Err(StatusCode::INTERNAL_SERVER_ERROR)
|
||
}
|
||
}
|
||
}
|
||
|
||
pub async fn start_server(host: &str, port: u16) -> anyhow::Result<()> {
|
||
let _ = SERVER_START.set(Instant::now());
|
||
|
||
let embedder = std::sync::Arc::new(Embedder::new("nomic-embed-text-v2-moe:latest".to_string()));
|
||
let mongo_cache = MongoCache::init().await?;
|
||
let redis_cache = RedisCache::new()?;
|
||
let db = PostgresDb::init().await?;
|
||
let db = std::sync::Arc::new(db);
|
||
let api_state = super::middleware::ApiState { db: db.clone() };
|
||
|
||
let state = AppState {
|
||
db,
|
||
embedder,
|
||
embedder_model: "nomic-embed-text-v2-moe:latest".to_string(),
|
||
mongo_cache,
|
||
redis_cache,
|
||
api_state,
|
||
};
|
||
|
||
let protected_routes = Router::new()
|
||
.route("/api/v1/files/register", post(register_file))
|
||
.route("/api/v1/files/lookup", get(lookup_file_by_name))
|
||
.route("/api/v1/unregister", post(unregister))
|
||
.route("/api/v1/files/scan", get(scan_files))
|
||
.route("/api/v1/file/:file_uuid/probe", get(probe_by_uuid))
|
||
.route("/api/v1/file/:file_uuid/process", post(trigger_processing))
|
||
.route("/api/v1/file/:file_uuid/chunk/:chunk_id", get(get_chunk_by_path))
|
||
|
||
.route("/api/v1/progress/:file_uuid", get(get_progress))
|
||
.route("/api/v1/jobs", get(list_jobs))
|
||
.route("/api/v1/config/cache", post(cache_toggle))
|
||
// .merge(person_identity::person_identity_routes()) // V4.0: DISABLED (person_identities table removed)
|
||
.merge(identity_binding::identity_binding_routes())
|
||
.merge(identities::identity_routes())
|
||
.layer(axum::middleware::from_fn_with_state(
|
||
state.api_state.clone(),
|
||
api_key_validation,
|
||
))
|
||
.with_state(state.clone());
|
||
|
||
let cors = CorsLayer::new()
|
||
.allow_origin(tower_http::cors::AllowOrigin::any())
|
||
.allow_methods(Any)
|
||
.allow_headers(Any);
|
||
|
||
let app = Router::new()
|
||
.route("/health", get(health))
|
||
.route("/health/detailed", get(health_detailed))
|
||
.route("/api/v1/auth/login", post(login))
|
||
.route("/api/v1/auth/logout", post(logout))
|
||
.route("/api/v1/stats/ingest", get(get_ingest_stats))
|
||
.route("/api/v1/stats/sftpgo", get(get_sftpgo_status))
|
||
.route("/api/v1/stats/inference", get(get_inference_health))
|
||
.route("/api/v1/search/visual", post(search_visual_chunks))
|
||
.route(
|
||
"/api/v1/search/visual/class",
|
||
post(search_visual_chunks_by_class),
|
||
)
|
||
.route(
|
||
"/api/v1/search/visual/density",
|
||
post(search_visual_chunks_by_density),
|
||
)
|
||
.route("/api/v1/search/visual/stats", post(get_visual_chunk_stats))
|
||
.route(
|
||
"/api/v1/search/visual/combination",
|
||
post(search_visual_chunks_by_combination),
|
||
)
|
||
.merge(identity_api::identity_routes()) // Phase 3 Routes
|
||
.merge(agent_api::agent_routes()) // Phase 6 Routes
|
||
.merge(super::identity_agent_api::identity_agent_routes()) // Phase 5 Routes
|
||
.merge(five_w1h_agent_api::five_w1h_agent_routes()) // Phase 3 Routes (5W1H Agent)
|
||
.merge(super::media_api::bbox_routes()) // Media: video/bbox/thumbnail
|
||
.merge(super::trace_agent_api::trace_agent_routes()) // Trace listing
|
||
.merge(search_routes()) // Smart search drill-down
|
||
.merge(universal_search_routes()) // Universal / frames / persons search
|
||
.merge(protected_routes)
|
||
.layer(cors)
|
||
.with_state(state);
|
||
|
||
let addr: std::net::SocketAddr = format!("{}:{}", host, port).parse().unwrap();
|
||
tracing::info!("Starting API server at http://{}", addr);
|
||
|
||
let listener = tokio::net::TcpListener::bind(addr).await?;
|
||
axum::serve(listener, app).await?;
|
||
|
||
Ok(())
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct IngestStatsResponse {
|
||
total_videos: i64,
|
||
total_chunks: i64,
|
||
sentence_chunks: i64,
|
||
cut_chunks: i64,
|
||
time_chunks: i64,
|
||
searchable_chunks: i64,
|
||
chunks_with_visual: i64,
|
||
chunks_with_summary: i64,
|
||
pending_videos: i64,
|
||
}
|
||
|
||
async fn get_ingest_stats(
|
||
State(state): State<AppState>,
|
||
) -> Result<Json<IngestStatsResponse>, StatusCode> {
|
||
let table_videos = schema::table_name("videos");
|
||
let table_chunks = schema::table_name("chunk");
|
||
|
||
let total_videos: (i64,) = sqlx::query_as(&format!("SELECT COUNT(*) FROM {}", table_videos))
|
||
.fetch_one(state.db.pool())
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
let total_chunks: (i64,) = sqlx::query_as(&format!("SELECT COUNT(*) FROM {}", table_chunks))
|
||
.fetch_one(state.db.pool())
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
let sentence_chunks: (i64,) = sqlx::query_as(&format!(
|
||
"SELECT COUNT(*) FROM {} WHERE chunk_type = 'sentence'",
|
||
table_chunks
|
||
))
|
||
.fetch_one(state.db.pool())
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
let cut_chunks: (i64,) = sqlx::query_as(&format!(
|
||
"SELECT COUNT(*) FROM {} WHERE chunk_type = 'cut'",
|
||
table_chunks
|
||
))
|
||
.fetch_one(state.db.pool())
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
let time_chunks: (i64,) = sqlx::query_as(&format!(
|
||
"SELECT COUNT(*) FROM {} WHERE chunk_type = 'time'",
|
||
table_chunks
|
||
))
|
||
.fetch_one(state.db.pool())
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
let searchable_chunks: (i64,) = sqlx::query_as(&format!(
|
||
"SELECT COUNT(*) FROM {} WHERE vector_id IS NOT NULL",
|
||
table_chunks
|
||
))
|
||
.fetch_one(state.db.pool())
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
let chunks_with_visual: (i64,) = sqlx::query_as(&format!(
|
||
"SELECT COUNT(*) FROM {} WHERE visual_stats IS NOT NULL AND visual_stats != '{}'::jsonb",
|
||
table_chunks, "{}"
|
||
))
|
||
.fetch_one(state.db.pool())
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
let chunks_with_summary: (i64,) = sqlx::query_as(&format!(
|
||
"SELECT COUNT(*) FROM {} WHERE summary_text IS NOT NULL",
|
||
table_chunks
|
||
))
|
||
.fetch_one(state.db.pool())
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
let pending_videos: (i64,) = sqlx::query_as(&format!(
|
||
"SELECT COUNT(*) FROM {} WHERE status = 'pending'",
|
||
table_videos
|
||
))
|
||
.fetch_one(state.db.pool())
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
Ok(Json(IngestStatsResponse {
|
||
total_videos: total_videos.0,
|
||
total_chunks: total_chunks.0,
|
||
sentence_chunks: sentence_chunks.0,
|
||
cut_chunks: cut_chunks.0,
|
||
time_chunks: time_chunks.0,
|
||
searchable_chunks: searchable_chunks.0,
|
||
chunks_with_visual: chunks_with_visual.0,
|
||
chunks_with_summary: chunks_with_summary.0,
|
||
pending_videos: pending_videos.0,
|
||
}))
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct SftpgoStatusResponse {
|
||
username: String,
|
||
home_dir: String,
|
||
files_count: i64,
|
||
registered_videos: Vec<RegisteredVideo>,
|
||
last_login: Option<String>,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct RegisteredVideo {
|
||
uuid: String,
|
||
file_name: String,
|
||
status: String,
|
||
}
|
||
|
||
async fn get_sftpgo_status(
|
||
State(state): State<AppState>,
|
||
) -> Result<Json<SftpgoStatusResponse>, StatusCode> {
|
||
let demo_dir = "/Users/accusys/momentry/var/sftpgo/data/demo";
|
||
|
||
let files_count: i64 = std::fs::read_dir(demo_dir)
|
||
.map(|entries| entries.count() as i64)
|
||
.unwrap_or(0);
|
||
|
||
let table_videos = schema::table_name("videos");
|
||
|
||
let registered_videos: Vec<(String, String, String)> = sqlx::query_as(&format!(
|
||
"SELECT file_uuid, file_name, status FROM {} WHERE file_path LIKE '%demo%' ORDER BY id",
|
||
table_videos
|
||
))
|
||
.fetch_all(state.db.pool())
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
let registered_videos = registered_videos
|
||
.into_iter()
|
||
.map(|(uuid, file_name, status)| RegisteredVideo {
|
||
uuid,
|
||
file_name,
|
||
status,
|
||
})
|
||
.collect();
|
||
|
||
Ok(Json(SftpgoStatusResponse {
|
||
username: "demo".to_string(),
|
||
home_dir: demo_dir.to_string(),
|
||
files_count,
|
||
registered_videos,
|
||
last_login: None,
|
||
}))
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct InferenceEngineStatus {
|
||
engine: String,
|
||
model: String,
|
||
status: String,
|
||
latency_ms: Option<u64>,
|
||
error: Option<String>,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct InferenceHealthResponse {
|
||
ollama: InferenceEngineStatus,
|
||
llama_server: InferenceEngineStatus,
|
||
}
|
||
|
||
async fn get_inference_health() -> Result<Json<InferenceHealthResponse>, StatusCode> {
|
||
let client = reqwest::Client::builder()
|
||
.timeout(std::time::Duration::from_secs(5))
|
||
.build()
|
||
.unwrap();
|
||
|
||
let ollama_start = std::time::Instant::now();
|
||
let ollama_status = match client.get("http://localhost:11434/api/tags").send().await {
|
||
Ok(resp) if resp.status().is_success() => {
|
||
let latency = ollama_start.elapsed().as_millis() as u64;
|
||
InferenceEngineStatus {
|
||
engine: "Ollama".to_string(),
|
||
model: "nomic-embed-text".to_string(),
|
||
status: "ok".to_string(),
|
||
latency_ms: Some(latency),
|
||
error: None,
|
||
}
|
||
}
|
||
Ok(resp) => InferenceEngineStatus {
|
||
engine: "Ollama".to_string(),
|
||
model: "nomic-embed-text".to_string(),
|
||
status: "error".to_string(),
|
||
latency_ms: Some(ollama_start.elapsed().as_millis() as u64),
|
||
error: Some(format!("HTTP {}", resp.status())),
|
||
},
|
||
Err(e) => InferenceEngineStatus {
|
||
engine: "Ollama".to_string(),
|
||
model: "nomic-embed-text".to_string(),
|
||
status: "error".to_string(),
|
||
latency_ms: None,
|
||
error: Some(e.to_string()),
|
||
},
|
||
};
|
||
|
||
let llama_start = std::time::Instant::now();
|
||
let llama_status = match client.get("http://localhost:8081/v1/models").send().await {
|
||
Ok(resp) if resp.status().is_success() => {
|
||
let latency = llama_start.elapsed().as_millis() as u64;
|
||
InferenceEngineStatus {
|
||
engine: "llama-server".to_string(),
|
||
model: "gemma4_e4b_q5".to_string(),
|
||
status: "ok".to_string(),
|
||
latency_ms: Some(latency),
|
||
error: None,
|
||
}
|
||
}
|
||
Ok(resp) => InferenceEngineStatus {
|
||
engine: "llama-server".to_string(),
|
||
model: "gemma4_e4b_q5".to_string(),
|
||
status: "error".to_string(),
|
||
latency_ms: Some(llama_start.elapsed().as_millis() as u64),
|
||
error: Some(format!("HTTP {}", resp.status())),
|
||
},
|
||
Err(e) => InferenceEngineStatus {
|
||
engine: "llama-server".to_string(),
|
||
model: "gemma4_e4b_q5".to_string(),
|
||
status: "error".to_string(),
|
||
latency_ms: None,
|
||
error: Some(e.to_string()),
|
||
},
|
||
};
|
||
|
||
Ok(Json(InferenceHealthResponse {
|
||
ollama: ollama_status,
|
||
llama_server: llama_status,
|
||
}))
|
||
}
|
||
|
||
#[derive(Debug, Deserialize)]
|
||
struct VideoDetailsQuery {
|
||
chunk_id: Option<String>,
|
||
parent_id: Option<String>,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct VideoDetailsResponse {
|
||
uuid: String,
|
||
#[serde(flatten)]
|
||
details: VideoDetailsResult,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
#[serde(untagged)]
|
||
enum VideoDetailsResult {
|
||
Chunk(ChunkDetailResponse),
|
||
Parent(ParentChunkResponse),
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct FrameRange {
|
||
start_frame: i64,
|
||
end_frame: i64,
|
||
duration_frames: i64,
|
||
fps: f64,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct ReferenceTime {
|
||
start: f64,
|
||
end: f64,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct ChunkDetailResponse {
|
||
chunk_id: String,
|
||
chunk_type: String,
|
||
frame_range: FrameRange,
|
||
reference_time: ReferenceTime,
|
||
text_content: Option<String>,
|
||
content: Option<serde_json::Value>,
|
||
parent_id: Option<String>,
|
||
summary_text: Option<String>,
|
||
metadata: Option<serde_json::Value>,
|
||
visual_stats: Option<serde_json::Value>,
|
||
speaker_ids: Option<Vec<String>>,
|
||
person_ids: Option<Vec<String>>,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct ParentChunkResponse {
|
||
parent_id: i32,
|
||
metadata: Option<serde_json::Value>,
|
||
summary_text: Option<String>,
|
||
frame_range: FrameRange,
|
||
reference_time: ReferenceTime,
|
||
}
|
||
|
||
/// Search visual chunks based on criteria
|
||
#[derive(Debug, Deserialize)]
|
||
struct VisualChunkSearchRequest {
|
||
uuid: String,
|
||
criteria: visual_chunk_search::VisualChunkSearchCriteria,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct VisualChunkSearchResponse {
|
||
chunks: Vec<Chunk>,
|
||
total: usize,
|
||
}
|
||
|
||
async fn search_visual_chunks(
|
||
State(state): State<AppState>,
|
||
Json(req): Json<VisualChunkSearchRequest>,
|
||
) -> Result<Json<VisualChunkSearchResponse>, StatusCode> {
|
||
let criteria_hash = generate_visual_search_hash(&req.uuid, &req.criteria);
|
||
let cache_key = keys::visual_search(&req.uuid, &criteria_hash);
|
||
let ttl = state.mongo_cache.ttl_visual_search();
|
||
|
||
let chunks = state
|
||
.mongo_cache
|
||
.get_or_fetch(&cache_key, ttl, keys::CATEGORY_VISUAL_SEARCH, || async {
|
||
let db = PostgresDb::init()
|
||
.await
|
||
.map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?;
|
||
|
||
visual_chunk_search::search_visual_chunks(&db, &req.uuid, &req.criteria)
|
||
.await
|
||
.map_err(|e| anyhow::anyhow!("Visual search failed: {}", e))
|
||
})
|
||
.await
|
||
.map_err(|e| {
|
||
tracing::error!("Visual chunk search failed: {}", e);
|
||
StatusCode::INTERNAL_SERVER_ERROR
|
||
})?;
|
||
|
||
Ok(Json(VisualChunkSearchResponse {
|
||
total: chunks.len(),
|
||
chunks,
|
||
}))
|
||
}
|
||
|
||
/// Request for searching visual chunks by object class
|
||
#[derive(Debug, Deserialize)]
|
||
struct VisualChunkSearchByClassRequest {
|
||
uuid: String,
|
||
object_class: String,
|
||
min_count: Option<u32>,
|
||
max_count: Option<u32>,
|
||
}
|
||
|
||
/// Request for searching visual chunks by density
|
||
#[derive(Debug, Deserialize)]
|
||
struct VisualChunkSearchByDensityRequest {
|
||
uuid: String,
|
||
min_density: f32,
|
||
max_density: Option<f32>,
|
||
}
|
||
|
||
/// Request for getting visual chunk statistics
|
||
#[derive(Debug, Deserialize)]
|
||
struct VisualChunkStatsRequest {
|
||
uuid: String,
|
||
}
|
||
|
||
async fn search_visual_chunks_by_class(
|
||
State(state): State<AppState>,
|
||
Json(req): Json<VisualChunkSearchByClassRequest>,
|
||
) -> Result<Json<VisualChunkSearchResponse>, StatusCode> {
|
||
let db = PostgresDb::init()
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
let chunks = visual_chunk_search::search_visual_chunks_by_class(
|
||
&db,
|
||
&req.uuid,
|
||
&req.object_class,
|
||
req.min_count,
|
||
req.max_count,
|
||
)
|
||
.await
|
||
.map_err(|e| {
|
||
tracing::error!("Visual chunk search by class failed: {}", e);
|
||
StatusCode::INTERNAL_SERVER_ERROR
|
||
})?;
|
||
|
||
Ok(Json(VisualChunkSearchResponse {
|
||
total: chunks.len(),
|
||
chunks,
|
||
}))
|
||
}
|
||
|
||
async fn search_visual_chunks_by_density(
|
||
State(state): State<AppState>,
|
||
Json(req): Json<VisualChunkSearchByDensityRequest>,
|
||
) -> Result<Json<VisualChunkSearchResponse>, StatusCode> {
|
||
let db = PostgresDb::init()
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
let chunks = visual_chunk_search::search_visual_chunks_by_density(
|
||
&db,
|
||
&req.uuid,
|
||
req.min_density,
|
||
req.max_density,
|
||
)
|
||
.await
|
||
.map_err(|e| {
|
||
tracing::error!("Visual chunk search by density failed: {}", e);
|
||
StatusCode::INTERNAL_SERVER_ERROR
|
||
})?;
|
||
|
||
Ok(Json(VisualChunkSearchResponse {
|
||
total: chunks.len(),
|
||
chunks,
|
||
}))
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct VisualChunkStatsResponse {
|
||
uuid: String,
|
||
stats: std::collections::HashMap<String, serde_json::Value>,
|
||
}
|
||
|
||
async fn get_visual_chunk_stats(
|
||
State(state): State<AppState>,
|
||
Json(req): Json<VisualChunkStatsRequest>,
|
||
) -> Result<Json<VisualChunkStatsResponse>, StatusCode> {
|
||
let db = PostgresDb::init()
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
let stats = visual_chunk_search::get_visual_chunk_statistics(&db, &req.uuid)
|
||
.await
|
||
.map_err(|e| {
|
||
tracing::error!("Get visual chunk stats failed: {}", e);
|
||
StatusCode::INTERNAL_SERVER_ERROR
|
||
})?;
|
||
|
||
Ok(Json(VisualChunkStatsResponse {
|
||
uuid: req.uuid,
|
||
stats,
|
||
}))
|
||
}
|
||
|
||
/// Request for searching visual chunks by object combination
|
||
#[derive(Debug, Deserialize)]
|
||
struct VisualChunkSearchByCombinationRequest {
|
||
uuid: String,
|
||
combination: Vec<(String, u32)>, // (object_class, min_count)
|
||
}
|
||
|
||
async fn search_visual_chunks_by_combination(
|
||
State(state): State<AppState>,
|
||
Json(req): Json<VisualChunkSearchByCombinationRequest>,
|
||
) -> Result<Json<VisualChunkSearchResponse>, StatusCode> {
|
||
let db = PostgresDb::init()
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
let combination: Vec<(&str, u32)> = req
|
||
.combination
|
||
.iter()
|
||
.map(|(c, n)| (c.as_str(), *n))
|
||
.collect();
|
||
|
||
let chunks =
|
||
visual_chunk_search::search_visual_chunks_by_combination(&db, &req.uuid, &combination)
|
||
.await
|
||
.map_err(|e| {
|
||
tracing::error!("Visual chunk search by combination failed: {}", e);
|
||
StatusCode::INTERNAL_SERVER_ERROR
|
||
})?;
|
||
|
||
Ok(Json(VisualChunkSearchResponse {
|
||
total: chunks.len(),
|
||
chunks,
|
||
}))
|
||
}
|
||
|
||
async fn video_details(
|
||
Path(uuid): Path<String>,
|
||
Query(query): Query<VideoDetailsQuery>,
|
||
State(state): State<AppState>,
|
||
) -> Result<Json<VideoDetailsResponse>, StatusCode> {
|
||
let table = schema::table_name("chunk");
|
||
|
||
if let Some(chunk_id) = query.chunk_id {
|
||
let row: Option<(
|
||
i32, String, String, String, f64, i64, i64,
|
||
Option<String>, serde_json::Value, Option<serde_json::Value>,
|
||
Option<String>, i32, Option<String>, Option<serde_json::Value>, Option<String>,
|
||
)> = sqlx::query_as(&format!(
|
||
"SELECT file_id, uuid, chunk_id, chunk_type::text, fps, start_frame, end_frame,
|
||
text_content, content, metadata, vector_id, frame_count,
|
||
parent_chunk_id, visual_stats, summary_text
|
||
FROM {} WHERE chunk_id = $1 AND uuid = $2",
|
||
table
|
||
))
|
||
.bind(&chunk_id)
|
||
.bind(&uuid)
|
||
.fetch_optional(state.db.pool())
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
let speaker_row: Option<(String, String)> = sqlx::query_as(&format!(
|
||
"SELECT COALESCE(speaker_ids, '{{}}'::text[]), COALESCE(face_ids, '{{}}'::integer[])::text[] FROM {} WHERE chunk_id = $1 AND uuid = $2",
|
||
table
|
||
))
|
||
.bind(&chunk_id)
|
||
.bind(&uuid)
|
||
.fetch_optional(state.db.pool())
|
||
.await
|
||
.ok()
|
||
.flatten();
|
||
|
||
let row = row.ok_or(StatusCode::NOT_FOUND)?;
|
||
|
||
let fps = if row.4 > 0.0 { row.4 } else { 24.0 };
|
||
let start_frame = row.5;
|
||
let end_frame = row.6;
|
||
let duration_frames = end_frame - start_frame;
|
||
|
||
let start_time = start_frame as f64 / fps;
|
||
let end_time = end_frame as f64 / fps;
|
||
|
||
let row_metadata = row.9.clone();
|
||
|
||
let mut summary_text = row.14.clone();
|
||
let mut metadata = None;
|
||
|
||
if let Some(ref pid_str) = row.12 {
|
||
if !pid_str.is_empty() {
|
||
if let Ok(pid) = pid_str.parse::<i32>() {
|
||
let parent_table = schema::table_name("parent_chunks");
|
||
let parent: Option<(Option<String>, Option<serde_json::Value>)> =
|
||
sqlx::query_as(&format!(
|
||
"SELECT summary_text, metadata FROM {} WHERE id = $1",
|
||
parent_table
|
||
))
|
||
.bind(pid)
|
||
.fetch_optional(state.db.pool())
|
||
.await
|
||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||
|
||
if let Some((s, m)) = parent {
|
||
if summary_text.is_none() {
|
||
summary_text = s;
|
||
}
|
||
|
||
let mut merged: serde_json::Value = serde_json::json!({});
|
||
|
||
if let Some(ref cm) = row_metadata {
|
||
if let Some(obj) = cm.as_object() {
|
||
for (k, v) in obj {
|
||
merged[k] = v.clone();
|
||
}
|
||
}
|
||
}
|
||
|
||
if let Some(pm) = &m {
|
||
if let Some(obj) = pm.as_object() {
|
||
for (k, v) in obj {
|
||
merged[k] = v.clone();
|
||
}
|
||
}
|
||
}
|
||
|
||
metadata = Some(merged);
|
||
}
|
||
}
|
||
}
|
||
} else if let Some(ref cm) = row_metadata {
|
||
metadata = Some(cm.clone());
|
||
}
|
||
|
||
let parse_pg_array = |s: &str| -> Vec<String> {
|
||
if s.is_empty() || s == "{}" {
|
||
return vec![];
|
||
}
|
||
s.trim_start_matches('{')
|
||
.trim_end_matches('}')
|
||
.split(',')
|
||
.map(|s| s.trim_matches('"').to_string())
|
||
.collect()
|
||
};
|
||
|
||
let (speaker_str, face_str) = speaker_row.unwrap_or(("{}".to_string(), "{}".to_string()));
|
||
let speaker_vec: Vec<String> = parse_pg_array(&speaker_str);
|
||
let speaker_ids: Option<Vec<String>> = if speaker_vec.is_empty() {
|
||
None
|
||
} else {
|
||
Some(speaker_vec)
|
||
};
|
||
let face_vec: Vec<String> = parse_pg_array(&face_str);
|
||
let person_ids: Option<Vec<String>> = if face_vec.is_empty() {
|
||
None
|
||
} else {
|
||
Some(face_vec.iter().map(|id| format!("face_{}", id)).collect())
|
||
};
|
||
|
||
return Ok(Json(VideoDetailsResponse {
|
||
uuid: row.1.clone(),
|
||
details: VideoDetailsResult::Chunk(ChunkDetailResponse {
|
||
chunk_id: row.2.clone(),
|
||
chunk_type: row.3.clone(),
|
||
frame_range: FrameRange {
|
||
start_frame,
|
||
end_frame,
|
||
duration_frames,
|
||
fps,
|
||
},
|
||
reference_time: ReferenceTime {
|
||
start: start_time,
|
||
end: end_time,
|
||
},
|
||
text_content: row.7.clone(),
|
||
content: Some(row.8.clone()),
|
||
parent_id: row.12.clone(),
|
||
summary_text,
|
||
metadata,
|
||
visual_stats: row.13.clone(),
|
||
speaker_ids,
|
||
person_ids,
|
||
}),
|
||
}));
|
||
}
|
||
|
||
Err(StatusCode::BAD_REQUEST)
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct DeleteVideoResponse {
|
||
success: bool,
|
||
message: String,
|
||
deleted_face_detections: u64,
|
||
deleted_processor_results: u64,
|
||
}
|
||
|
||
#[derive(Debug, Serialize)]
|
||
struct UnregisterFileResponse {
|
||
success: bool,
|
||
message: String,
|
||
file_uuid: String,
|
||
deleted_face_detections: u64,
|
||
deleted_processor_results: u64,
|
||
deleted_chunks: u64,
|
||
}
|
||
|
||
/// Detects file type using a tiered strategy:
|
||
/// 1. ffprobe (if available)
|
||
/// 2. `file` command (MIME type)
|
||
/// 3. File extension
|
||
fn detect_file_type(
|
||
path: &std::path::Path,
|
||
probe_result: Option<&crate::core::probe::ffprobe::ProbeResult>,
|
||
) -> String {
|
||
// 1. ffprobe
|
||
if let Some(probe) = probe_result {
|
||
let has_video = probe
|
||
.streams
|
||
.iter()
|
||
.any(|s| s.codec_type.as_deref() == Some("video"));
|
||
let has_audio = probe
|
||
.streams
|
||
.iter()
|
||
.any(|s| s.codec_type.as_deref() == Some("audio"));
|
||
|
||
if has_video {
|
||
return "video".to_string();
|
||
}
|
||
if has_audio {
|
||
return "audio".to_string();
|
||
}
|
||
if !probe.streams.is_empty() {
|
||
return "image".to_string();
|
||
}
|
||
}
|
||
|
||
// 2. file command
|
||
if let Ok(output) = std::process::Command::new("file")
|
||
.args(["--mime-type", "-b"])
|
||
.arg(path)
|
||
.output()
|
||
{
|
||
let mime = String::from_utf8_lossy(&output.stdout)
|
||
.trim()
|
||
.to_lowercase();
|
||
if mime.starts_with("video/") {
|
||
return "video".to_string();
|
||
}
|
||
if mime.starts_with("audio/") {
|
||
return "audio".to_string();
|
||
}
|
||
if mime.starts_with("image/") {
|
||
return "image".to_string();
|
||
}
|
||
}
|
||
|
||
// 3. Extension
|
||
match path
|
||
.extension()
|
||
.and_then(|e| e.to_str())
|
||
.map(|s| s.to_lowercase())
|
||
.as_deref()
|
||
{
|
||
Some("mp4" | "mov" | "mkv" | "avi" | "webm" | "flv" | "wmv") => "video".to_string(),
|
||
Some("mp3" | "wav" | "flac" | "aac" | "ogg" | "m4a") => "audio".to_string(),
|
||
Some("jpg" | "jpeg" | "png" | "gif" | "bmp" | "webp" | "tiff" | "svg") => {
|
||
"image".to_string()
|
||
}
|
||
_ => "unknown".to_string(),
|
||
}
|
||
}
|
||
|
||
async fn delete_video(
|
||
Path(uuid): Path<String>,
|
||
State(state): State<AppState>,
|
||
) -> Result<Json<UnregisterFileResponse>, StatusCode> {
|
||
tracing::info!("[UNREGISTER] Unregistering file: {}", uuid);
|
||
|
||
let videos_table = schema::table_name("videos");
|
||
let face_table = schema::table_name("face_detections");
|
||
let processor_table = schema::table_name("processor_results");
|
||
let chunks_table = schema::table_name("chunk");
|
||
let parent_chunks_table = schema::table_name("parent_chunks");
|
||
|
||
// Check if video exists first
|
||
let exists: Option<String> = sqlx::query_scalar(&format!(
|
||
"SELECT file_uuid FROM {} WHERE file_uuid = $1",
|
||
videos_table
|
||
))
|
||
.bind(&uuid)
|
||
.fetch_optional(state.db.pool())
|
||
.await
|
||
.map_err(|e| {
|
||
tracing::error!("[UNREGISTER] DB check failed: {}", e);
|
||
StatusCode::INTERNAL_SERVER_ERROR
|
||
})?;
|
||
|
||
// Delete associated data regardless of video existence (cleanup orphans)
|
||
let deleted_faces: i64 =
|
||
sqlx::query(&format!("DELETE FROM {} WHERE file_uuid = $1", face_table))
|
||
.bind(&uuid)
|
||
.execute(state.db.pool())
|
||
.await
|
||
.map(|r| r.rows_affected() as i64)
|
||
.unwrap_or(0);
|
||
|
||
tracing::info!("[UNREGISTER] Deleted {} face_detections", deleted_faces);
|
||
|
||
let deleted_processors: i64 = sqlx::query(&format!(
|
||
"DELETE FROM {} WHERE file_uuid = $1",
|
||
processor_table
|
||
))
|
||
.bind(&uuid)
|
||
.execute(state.db.pool())
|
||
.await
|
||
.map(|r| r.rows_affected() as i64)
|
||
.unwrap_or(0);
|
||
|
||
tracing::info!(
|
||
"[UNREGISTER] Deleted {} processor_results",
|
||
deleted_processors
|
||
);
|
||
|
||
let deleted_parent_chunks: i64 = sqlx::query(&format!(
|
||
"DELETE FROM {} WHERE uuid = $1",
|
||
parent_chunks_table
|
||
))
|
||
.bind(&uuid)
|
||
.execute(state.db.pool())
|
||
.await
|
||
.map(|r| r.rows_affected() as i64)
|
||
.unwrap_or(0);
|
||
|
||
let deleted_chunks: i64 = sqlx::query(&format!("DELETE FROM {} WHERE uuid = $1", chunks_table))
|
||
.bind(&uuid)
|
||
.execute(state.db.pool())
|
||
.await
|
||
.map(|r| r.rows_affected() as i64)
|
||
.unwrap_or(0);
|
||
|
||
tracing::info!("[UNREGISTER] Deleted {} chunks", deleted_chunks);
|
||
|
||
if exists.is_none() {
|
||
tracing::warn!(
|
||
"[UNREGISTER] Video not found: {}. (Maybe already deleted or UUID mismatch)",
|
||
uuid
|
||
);
|
||
return Err(StatusCode::NOT_FOUND);
|
||
}
|
||
|
||
Ok(Json(UnregisterFileResponse {
|
||
success: true,
|
||
message: format!(
|
||
"File {} unregistered successfully. Record deleted from database.",
|
||
uuid
|
||
),
|
||
file_uuid: uuid,
|
||
deleted_face_detections: deleted_faces as u64,
|
||
deleted_processor_results: deleted_processors as u64,
|
||
deleted_chunks: (deleted_chunks + deleted_parent_chunks) as u64,
|
||
}))
|
||
}
|