use axum::{ extract::{Path, Query, State}, http::StatusCode, response::Json, routing::{delete, get, post}, Router, }; use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use sqlx::Row; use std::time::Instant; use tower_http::cors::{Any, CorsLayer}; use crate::core::cache::{keys, MongoCache, RedisCache}; use crate::core::config::REDIS_KEY_PREFIX; use crate::core::db::schema; use crate::core::db::{Database, PostgresDb, QdrantDb, RedisClient, VideoRecord, VideoStatus}; use crate::core::text::tokenizer::tokenize_chinese_text; use crate::{Embedder, FileManager}; use super::agent_api; use super::five_w1h_agent_api; use super::identities; use super::identity_api; use super::identity_binding; use super::middleware::api_key_validation; use super::search::search_routes; use super::trace_agent_api; use super::universal_search::universal_search_routes; use super::visual_chunk_search; use crate::core::chunk::types::Chunk; static DEMO_USER_API_KEY: &str = "muser_test_001"; fn hash_password(password: &str) -> String { let mut hasher = Sha256::new(); hasher.update(password.as_bytes()); format!("{:x}", hasher.finalize()) } #[derive(Debug, Deserialize)] struct LoginRequest { username: String, password: String, } #[derive(Debug, Serialize)] struct LoginResponse { success: bool, message: Option, api_key: Option, user: Option, } #[derive(Debug, Serialize)] struct UserInfo { username: String, } // Global State static SERVER_START: OnceCell = OnceCell::new(); fn get_uptime_ms() -> u64 { SERVER_START .get() .map(|i| i.elapsed().as_millis() as u64) .unwrap_or(0) } #[derive(Debug, Serialize)] struct HealthResponse { status: String, version: String, uptime_ms: u64, } #[derive(Debug, Serialize)] struct JobListResponse { jobs: Vec, count: i64, page: usize, page_size: usize, } #[derive(Debug, Deserialize)] struct JobsQuery { page: Option, page_size: Option, status: Option, } #[derive(Debug, Serialize)] struct JobInfoResponse { id: i32, uuid: String, status: String, current_processor: Option, progress_current: i32, progress_total: i32, created_at: String, started_at: Option, } #[derive(Debug, Serialize)] struct JobDetailResponse { id: i32, uuid: String, status: String, current_processor: Option, progress_current: i32, progress_total: i32, processors: Vec, created_at: String, started_at: Option, updated_at: Option, } #[derive(Debug, Serialize)] struct ProcessorInfoResponse { processor_type: String, status: String, started_at: Option, completed_at: Option, duration_secs: Option, error_message: Option, } #[derive(Debug, Deserialize)] #[serde(rename_all = "snake_case")] enum SearchMode { Vector, Smart, } #[derive(Debug, Deserialize)] struct SearchRequest { query: String, mode: Option, collection: Option, uuid: Option, limit: Option, vector_weight: Option, bm25_weight: Option, } #[derive(Debug, Deserialize)] struct CacheToggleRequest { enabled: bool, } #[derive(Debug, Serialize)] struct CacheToggleResponse { success: bool, cache_enabled: bool, message: String, } // Missing structs added #[derive(Debug, Deserialize)] struct RegisterFileRequest { file_path: String, pattern: Option, user_id: Option, } #[derive(Debug, Serialize)] struct RegisterFileResponse { success: bool, file_uuid: String, file_name: String, file_path: String, file_type: Option, duration: f64, width: u32, height: u32, fps: f64, total_frames: u64, registration_time: Option, already_exists: bool, message: String, } #[derive(Debug, Serialize, Deserialize)] struct SearchResult { uuid: String, chunk_id: String, chunk_type: String, start_time: f64, end_time: f64, text: String, score: f32, } #[derive(Debug, Serialize, Deserialize)] struct SearchResponse { results: Vec, query: String, } #[derive(Debug, Serialize)] struct ProbeResponse { file_uuid: String, file_name: String, duration: f64, width: u32, height: u32, fps: f64, total_frames: i64, cached: bool, format: crate::core::probe::FormatInfo, streams: Vec, } // --- P0 API Structs --- #[derive(Debug, Deserialize)] struct ProcessRequest { rules: Option>, processors: Option>, } #[derive(Debug, Serialize)] struct FrameProgress { total_frames: i64, processed_frames: i64, progress_percent: f64, } #[derive(Debug, Serialize)] struct AssetStatusResponse { uuid: String, file_name: String, registration_time: String, processing_status: String, current_job_id: Option, frame_progress: Option, } #[derive(Debug, Serialize)] struct JobStatusResponse { job_id: String, file_uuid: String, rule: String, status: String, current_processor_id: Option, frame_progress: FrameProgress, } #[derive(Debug, Serialize)] struct RuleStatusResponse { rule: String, supported_processor_ids: Vec, active_jobs: Vec, } // --- End P0 API Structs --- #[derive(Debug, Deserialize)] struct HybridSearchRequest { query: String, limit: Option, uuid: Option, vector_weight: Option, bm25_weight: Option, } #[derive(Debug, Serialize, Deserialize)] struct HybridSearchResult { uuid: String, chunk_id: String, chunk_type: String, start_time: f64, end_time: f64, text: String, vector_score: f64, bm25_score: f64, combined_score: f64, } #[derive(Debug, Serialize, Deserialize)] struct HybridSearchResponse { results: Vec, query: String, } fn extract_text_from_content(content: &serde_json::Value) -> String { let raw_text = content .get("data") .and_then(|data| data.get("text")) .and_then(|v| v.as_str()) .or_else(|| content.get("text").and_then(|v| v.as_str())) .unwrap_or(""); tokenize_chinese_text(raw_text) } fn extract_title_from_content(content: &serde_json::Value) -> String { content .get("data") .and_then(|data| data.get("title")) .and_then(|v| v.as_str()) .or_else(|| content.get("title").and_then(|v| v.as_str())) .unwrap_or("") .to_string() } #[derive(Debug, Deserialize)] struct LookupQuery { path: Option, uuid: Option, } #[derive(Debug, Serialize)] struct LookupResponse { file_uuid: String, file_path: Option, file_name: Option, duration: Option, } #[derive(Debug, Serialize, Deserialize)] struct FileInfoResponse { file_uuid: String, file_path: String, file_name: String, file_type: Option, duration: f64, width: u32, height: u32, status: String, processing_status: Option, birth_registration: Option, created_at: Option, registration_time: Option, file_size: Option, probe_json: Option, total_frames: u64, } #[derive(Debug, Serialize, Deserialize)] struct FilesResponse { files: Vec, count: i64, page: usize, page_size: usize, } #[derive(Debug, Deserialize)] struct FilesQuery { page: Option, page_size: Option, status: Option, q: Option, uuid: Option, } #[derive(Clone)] pub struct AppState { pub db: std::sync::Arc, pub embedder: std::sync::Arc, pub embedder_model: String, pub mongo_cache: crate::core::cache::MongoCache, pub redis_cache: crate::core::cache::RedisCache, pub api_state: super::middleware::ApiState, } #[derive(Debug, Serialize)] struct DetailedHealthResponse { status: String, version: String, uptime_ms: u64, services: ServiceHealth, } #[derive(Debug, Serialize)] struct ServiceHealth { postgres: ServiceStatus, redis: ServiceStatus, qdrant: ServiceStatus, mongodb: ServiceStatus, } #[derive(Debug, Serialize)] struct ServiceStatus { status: String, latency_ms: Option, error: Option, } async fn health(State(state): State) -> Json { let postgres = check_postgres().await; let redis = check_redis().await; let qdrant = check_qdrant().await; let mongodb = check_mongodb(&state.mongo_cache).await; let all_ok = postgres.status == "ok" && redis.status == "ok" && qdrant.status == "ok" && mongodb.status == "ok"; let status = if all_ok { "ok" } else { "degraded" }; if all_ok { let _ = state.redis_cache.set_health(status).await; } Json(HealthResponse { status: status.to_string(), version: env!("BUILD_VERSION").to_string(), uptime_ms: get_uptime_ms(), }) } async fn health_detailed(State(state): State) -> Json { let postgres = check_postgres().await; let redis = check_redis().await; let qdrant = check_qdrant().await; let mongodb = check_mongodb(&state.mongo_cache).await; let overall_status = if postgres.status == "ok" && redis.status == "ok" && qdrant.status == "ok" && mongodb.status == "ok" { "ok" } else { "degraded" }; Json(DetailedHealthResponse { status: overall_status.to_string(), version: env!("BUILD_VERSION").to_string(), uptime_ms: get_uptime_ms(), services: ServiceHealth { postgres, redis, qdrant, mongodb, }, }) } async fn login(Json(req): Json) -> Json { if req.username == "demo" && req.password == "demo" { Json(LoginResponse { success: true, message: Some("Login successful".to_string()), api_key: Some(DEMO_USER_API_KEY.to_string()), user: Some(UserInfo { username: "demo".to_string(), }), }) } else { Json(LoginResponse { success: false, message: Some("Invalid username or password".to_string()), api_key: None, user: None, }) } } async fn logout() -> Json { Json(serde_json::json!({ "success": true })) } async fn check_postgres() -> ServiceStatus { let start = Instant::now(); match PostgresDb::init().await { Ok(db) => match db.list_files(1, 0).await { Ok(_) => ServiceStatus { status: "ok".to_string(), latency_ms: Some(start.elapsed().as_millis() as u64), error: None, }, Err(e) => ServiceStatus { status: "error".to_string(), latency_ms: Some(start.elapsed().as_millis() as u64), error: Some(e.to_string()), }, }, Err(e) => ServiceStatus { status: "error".to_string(), latency_ms: None, error: Some(e.to_string()), }, } } async fn check_redis() -> ServiceStatus { let start = Instant::now(); match RedisClient::new() { Ok(redis) => match redis.get_conn().await { Ok(mut conn) => { let result: Result = redis::cmd("PING").query_async(&mut conn).await; match result { Ok(_) => ServiceStatus { status: "ok".to_string(), latency_ms: Some(start.elapsed().as_millis() as u64), error: None, }, Err(e) => ServiceStatus { status: "error".to_string(), latency_ms: Some(start.elapsed().as_millis() as u64), error: Some(e.to_string()), }, } } Err(e) => ServiceStatus { status: "error".to_string(), latency_ms: None, error: Some(e.to_string()), }, }, Err(e) => ServiceStatus { status: "error".to_string(), latency_ms: None, error: Some(e.to_string()), }, } } async fn check_qdrant() -> ServiceStatus { let start = Instant::now(); let base_url = std::env::var("QDRANT_URL").unwrap_or_else(|_| "http://localhost:6333".to_string()); let api_key = std::env::var("QDRANT_API_KEY").unwrap_or_else(|_| "Test3200Test3200Test3200".to_string()); let url = format!("{}/collections", base_url); let client = reqwest::Client::new(); match client .get(&url) .header("api-key", api_key) .timeout(std::time::Duration::from_secs(5)) .send() .await { Ok(resp) if resp.status().is_success() => ServiceStatus { status: "ok".to_string(), latency_ms: Some(start.elapsed().as_millis() as u64), error: None, }, Ok(resp) => ServiceStatus { status: "error".to_string(), latency_ms: Some(start.elapsed().as_millis() as u64), error: Some(format!("HTTP {}", resp.status())), }, Err(e) => ServiceStatus { status: "error".to_string(), latency_ms: None, error: Some(e.to_string()), }, } } async fn check_mongodb(cache: &MongoCache) -> ServiceStatus { let start = Instant::now(); match cache.health_check().await { Ok(_) => ServiceStatus { status: "ok".to_string(), latency_ms: Some(start.elapsed().as_millis() as u64), error: None, }, Err(e) => ServiceStatus { status: "error".to_string(), latency_ms: Some(start.elapsed().as_millis() as u64), error: Some(e.to_string()), }, } } fn generate_query_hash(query: &str, uuid: Option<&str>, limit: usize) -> String { let data = serde_json::json!({ "query": query, "uuid": uuid, "limit": limit, }); let mut hasher = Sha256::new(); hasher.update(data.to_string().as_bytes()); format!("{:x}", hasher.finalize())[..16].to_string() } fn generate_visual_search_hash( uuid: &str, criteria: &visual_chunk_search::VisualChunkSearchCriteria, ) -> String { let data = serde_json::json!({ "uuid": uuid, "criteria": criteria, }); let mut hasher = Sha256::new(); hasher.update(data.to_string().as_bytes()); format!("{:x}", hasher.finalize())[..16].to_string() } /// 註冊單一檔案(內部函數,不處理 pattern) async fn register_single_file( state: &AppState, file_path: &str, _user_id: Option, ) -> RegisterFileResponse { tracing::info!("[REGISTER] Starting registration for: {}", file_path); let path = std::path::Path::new(file_path); if !path.exists() { return RegisterFileResponse { success: false, file_uuid: String::new(), file_name: String::new(), file_path: file_path.to_string(), file_type: None, duration: 0.0, width: 0, height: 0, fps: 0.0, total_frames: 0, registration_time: None, already_exists: false, message: format!("File not found: {}", file_path), }; } let canonical_path = path .canonicalize() .map(|p| p.to_string_lossy().to_string()) .unwrap_or_else(|_| file_path.to_string()); let file_name = path .file_name() .map(|n| n.to_string_lossy().to_string()) .unwrap_or_default(); let db = match PostgresDb::init().await { Ok(db) => db, Err(e) => { tracing::error!("[REGISTER] DB init failed: {}", e); return RegisterFileResponse { success: false, file_uuid: String::new(), file_name, file_path: canonical_path, file_type: None, duration: 0.0, width: 0, height: 0, fps: 0.0, total_frames: 0, registration_time: None, already_exists: false, message: format!("DB init failed: {}", e), }; } }; let birthday = sqlx::query_scalar::<_, chrono::DateTime>( "SELECT registration_time FROM dev.videos WHERE file_name = $1 AND registration_time IS NOT NULL LIMIT 1" ) .bind(&file_name) .fetch_optional(db.pool()) .await .unwrap_or(None) .map(|t| t.to_rfc3339()) .unwrap_or_else(|| chrono::Utc::now().to_rfc3339()); let mac_address = crate::core::storage::uuid::get_mac_address(); let file_uuid = crate::core::storage::uuid::compute_birth_uuid( &mac_address, &birthday, &canonical_path, &file_name, ); // Check if already exists if let Ok(Some(_)) = db.get_video_by_uuid(&file_uuid).await { tracing::info!("[REGISTER] File already registered: {}", file_uuid); return RegisterFileResponse { success: true, file_uuid, file_name, file_path: canonical_path, file_type: None, duration: 0.0, width: 0, height: 0, fps: 0.0, total_frames: 0, registration_time: None, already_exists: true, message: "File already registered".to_string(), }; } // Probe let probe_result = match crate::core::probe::probe_video(&canonical_path) { Ok(r) => r, Err(e) => { return RegisterFileResponse { success: false, file_uuid, file_name, file_path: canonical_path, file_type: None, duration: 0.0, width: 0, height: 0, fps: 0.0, total_frames: 0, registration_time: None, already_exists: false, message: format!("Probe failed: {}", e), }; } }; let has_video = probe_result .streams .iter() .any(|s| s.codec_type.as_deref() == Some("video")); let has_audio = probe_result .streams .iter() .any(|s| s.codec_type.as_deref() == Some("audio")); let final_file_type = if has_video { Some("video".to_string()) } else if has_audio { Some("audio".to_string()) } else { None }; let duration = probe_result .format .duration .as_ref() .and_then(|s| s.parse::().ok()) .unwrap_or(0.0); let mut width = 0u32; let mut height = 0u32; let mut fps = 0.0; let mut total_frames = 0u64; if let Some(s) = probe_result .streams .iter() .find(|s| s.codec_type.as_deref() == Some("video")) { width = s.width.unwrap_or(0); height = s.height.unwrap_or(0); if let Some(fps_str) = &s.r_frame_rate { if let Some((num, den)) = fps_str.split_once('/') { if let (Ok(n), Ok(d)) = (num.parse::(), den.parse::()) { if d > 0.0 { fps = n / d; } } } } total_frames = s .nb_frames .as_ref() .and_then(|s| s.parse().ok()) .unwrap_or((duration * fps) as u64); } let videos_table = schema::table_name("videos"); let probe_json = serde_json::to_value(&probe_result).ok(); let status = "pending"; let _ = sqlx::query(&format!( "INSERT INTO {} (file_uuid, file_path, file_name, file_type, duration, width, height, fps, probe_json, status, registration_time) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW()) ON CONFLICT (file_uuid) DO UPDATE SET file_path = EXCLUDED.file_path, file_name = EXCLUDED.file_name, status = EXCLUDED.status", videos_table )) .bind(&file_uuid).bind(&canonical_path).bind(&file_name).bind(&final_file_type) .bind(duration).bind(width as i32).bind(height as i32).bind(fps) .bind(&probe_json).bind(status) .execute(db.pool()).await; // 若是 video 類型,同步執行 CUT + Scene 分類 let mut cut_done = false; let mut scene_done = false; if has_video && total_frames > 0 && fps > 0.0 { let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR") .unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string()); let scripts_dir = std::env::var("MOMENTRY_SCRIPTS_DIR") .unwrap_or_else(|_| "/Users/accusys/momentry_core_0.1/scripts".to_string()); let python_path = std::env::var("MOMENTRY_PYTHON_PATH") .unwrap_or_else(|_| "/opt/homebrew/bin/python3.11".to_string()); // CUT: 場景檢測(PySceneDetect,0.08s 即可完成) let cut_path = std::path::Path::new(&output_dir).join(format!("{}.cut.json", file_uuid)); if !cut_path.exists() { let cut_script = std::path::Path::new(&scripts_dir).join("cut_processor.py"); if cut_script.exists() { let cut_output = std::process::Command::new(&python_path) .arg(&cut_script) .arg(&canonical_path) .arg(&cut_path) .output(); if let Ok(output) = cut_output { if output.status.success() { cut_done = true; tracing::info!("[REGISTER] CUT completed for {}", file_uuid); } else { let stderr = String::from_utf8_lossy(&output.stderr); tracing::error!("[REGISTER] CUT failed for {}: {}", file_uuid, stderr); } } else { tracing::error!("[REGISTER] CUT execution error for {}", file_uuid); } } } else { cut_done = true; // 讀取現有 CUT JSON 取得場景數 if let Ok(content) = std::fs::read_to_string(&cut_path) { if let Ok(cut_data) = serde_json::from_str::(&content) { let scenes = cut_data .get("scenes") .and_then(|s| s.as_array()) .map(|a| a.len() as i32) .unwrap_or(0); tracing::info!( "[REGISTER] CUT already exists: {} scenes for {}", scenes, file_uuid ); } } } // Scene: 場景分類(MIT Places365,取樣間隔 2s) let scene_path = std::path::Path::new(&output_dir).join(format!("{}.scene.json", file_uuid)); if !scene_path.exists() { let scene_script = std::path::Path::new(&scripts_dir).join("scene_classifier.py"); if scene_script.exists() { let scene_output = std::process::Command::new(&python_path) .arg(&scene_script) .arg(&canonical_path) .arg(&scene_path) .arg("--sample-interval") .arg("2") .output(); if let Ok(output) = scene_output { if output.status.success() { scene_done = true; tracing::info!( "[REGISTER] Scene classification completed for {}", file_uuid ); } } } } else { scene_done = true; } } // 更新 DB: cut_done, scene_done, audio_tracks let audio_tracks: Vec = probe_result.streams.iter() .filter(|s| s.codec_type.as_deref() == Some("audio")) .map(|s| { serde_json::json!({ "index": s.index, "codec": s.codec_name, "channels": s.channels, "sample_rate": s.sample_rate, "language": s.tags.as_ref().and_then(|t| t.get("language")).unwrap_or(&serde_json::Value::Null), }) }) .collect(); let audio_tracks_json = serde_json::to_value(&audio_tracks).ok(); // 計算 cut_count 與 cut_max_duration let cut_path = std::path::Path::new( &std::env::var("MOMENTRY_OUTPUT_DIR") .unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string()), ) .join(format!("{}.cut.json", file_uuid)); let mut cut_count = 0i32; let mut cut_max_duration = 0.0f64; if let Ok(content) = std::fs::read_to_string(&cut_path) { if let Ok(cut_data) = serde_json::from_str::(&content) { if let Some(scenes) = cut_data.get("scenes").and_then(|s| s.as_array()) { cut_count = scenes.len() as i32; cut_max_duration = scenes .iter() .filter_map(|s| { let start = s.get("start_time").and_then(|v| v.as_f64()).unwrap_or(0.0); let end = s.get("end_time").and_then(|v| v.as_f64()).unwrap_or(0.0); if end > start { Some(end - start) } else { None } }) .fold(0.0_f64, f64::max); } } } let _ = sqlx::query( "UPDATE dev.videos SET cut_done = $1, scene_done = $2, audio_tracks = $3, cut_count = $4, cut_max_duration = $5 WHERE file_uuid = $6" ) .bind(cut_done).bind(scene_done).bind(&audio_tracks_json).bind(cut_count).bind(cut_max_duration).bind(&file_uuid) .execute(db.pool()).await; // 寫入 {file_uuid}.probe.json(含音軌資訊) if let Some(json_val) = probe_json { let probe_path = std::path::Path::new( &std::env::var("MOMENTRY_OUTPUT_DIR") .unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string()), ) .join(format!("{}.probe.json", file_uuid)); let json_str = serde_json::to_string_pretty(&json_val).unwrap_or_default(); let _ = std::fs::write(&probe_path, json_str); } RegisterFileResponse { success: true, file_uuid, file_name, file_path: canonical_path, file_type: final_file_type, duration, width, height, fps, total_frames, registration_time: None, already_exists: false, message: "File registered successfully".to_string(), } } async fn register_file( State(state): State, Json(req): Json, ) -> Result, StatusCode> { let file_path = req.file_path.clone(); let pattern = req.pattern; // 如果有 pattern,掃描目錄下所有符合的檔案逐一註冊 if let Some(ref pat) = pattern { let dir = std::path::Path::new(&file_path); if !dir.is_dir() { return Ok(Json(RegisterFileResponse { success: false, file_uuid: String::new(), file_name: String::new(), file_path: file_path.clone(), file_type: None, duration: 0.0, width: 0, height: 0, fps: 0.0, total_frames: 0, registration_time: None, already_exists: false, message: format!( "Pattern requires a directory, but path is not a dir: {}", file_path ), })); } let re = regex::Regex::new(pat).map_err(|e| { tracing::error!("[REGISTER] Invalid regex pattern: {}", e); StatusCode::BAD_REQUEST })?; let mut registered = 0u32; let mut failed = 0u32; let mut skipped = 0u32; if let Ok(entries) = std::fs::read_dir(dir) { for entry in entries.flatten() { let entry_path = entry.path(); if !entry_path.is_file() { continue; } let fname = entry_path .file_name() .and_then(|n| n.to_str()) .unwrap_or(""); if !re.is_match(fname) { continue; } let result = register_single_file( &state, &entry_path.to_string_lossy().to_string(), req.user_id, ) .await; if result.success { registered += 1; } else if result.already_exists { skipped += 1; } else { failed += 1; } } } return Ok(Json(RegisterFileResponse { success: true, file_uuid: format!("batch_{}_registered_{}_failed", registered, failed), file_name: format!( "{} files registered, {} skipped, {} failed", registered, skipped, failed ), file_path: file_path.clone(), file_type: None, duration: 0.0, width: 0, height: 0, fps: 0.0, total_frames: 0, registration_time: None, already_exists: false, message: format!( "Batch register: {} registered, {} skipped, {} failed", registered, skipped, failed ), })); } // 單一檔案註冊 let resp = register_single_file(&state, &file_path, req.user_id).await; return Ok(Json(resp)); } async fn probe_by_uuid( State(state): State, Path(file_uuid): Path, ) -> Result, StatusCode> { let table = schema::table_name("videos"); let row: Option<(String, String)> = sqlx::query_as(&format!( "SELECT file_name, file_path FROM {} WHERE file_uuid = $1", table )) .bind(&file_uuid) .fetch_optional(state.db.pool()) .await .map_err(|e| { tracing::error!("DB error fetching video: {}", e); StatusCode::INTERNAL_SERVER_ERROR })?; let (file_name, path) = row.ok_or_else(|| { tracing::warn!("Video not found: {}", file_uuid); StatusCode::NOT_FOUND })?; // 2. Check for cached probe.json let probe_path = format!( "{}/{}.probe.json", crate::core::config::OUTPUT_DIR.as_str(), file_uuid ); let (probe_result, cached) = if let Ok(content) = std::fs::read_to_string(&probe_path) { tracing::info!("Using cached probe.json: {}", probe_path); let result: crate::core::probe::ProbeResult = serde_json::from_str(&content).map_err(|e| { tracing::error!("Failed to parse cached probe.json: {}", e); StatusCode::INTERNAL_SERVER_ERROR })?; (result, true) } else { tracing::info!("Running ffprobe for: {}", path); let result = crate::core::probe::probe_video(&path).map_err(|e| { tracing::error!("ffprobe failed: {}", e); StatusCode::INTERNAL_SERVER_ERROR })?; // Save probe.json to OUTPUT_DIR let file_manager = FileManager::new(std::path::PathBuf::from( crate::core::config::OUTPUT_DIR.as_str(), )); let json_str = serde_json::to_string(&result).map_err(|e| { tracing::error!("Failed to serialize probe result: {}", e); StatusCode::INTERNAL_SERVER_ERROR })?; file_manager .save_json(&file_uuid, "probe", &json_str) .map_err(|e| { tracing::error!("Failed to save probe.json: {}", e); StatusCode::INTERNAL_SERVER_ERROR })?; (result, false) }; // 3. Extract video info let duration = probe_result .format .duration .as_ref() .and_then(|s| s.parse::().ok()) .unwrap_or(0.0); let mut width = 0u32; let mut height = 0u32; let mut fps = 0.0; for stream in &probe_result.streams { if stream.codec_type.as_deref() == Some("video") { width = stream.width.unwrap_or(0); height = stream.height.unwrap_or(0); if let Some(fps_str) = &stream.r_frame_rate { fps = if fps_str.contains('/') { let parts: Vec<&str> = fps_str.split('/').collect(); if parts.len() == 2 { let num: f64 = parts[0].parse().unwrap_or(0.0); let den: f64 = parts[1].parse().unwrap_or(1.0); if den > 0.0 { num / den } else { 0.0 } } else { 0.0 } } else { fps_str.parse().unwrap_or(0.0) }; } } } // 4. Calculate total_frames and update DB let total_frames = probe_result .streams .iter() .find(|s| s.codec_type.as_deref() == Some("video")) .and_then(|s| s.nb_frames.as_ref()) .and_then(|n| n.parse::().ok()) .unwrap_or_else(|| (duration * fps).floor() as i64); // Update videos table with probe metadata let _ = sqlx::query(&format!( "UPDATE {} SET duration = $1, width = $2, height = $3, fps = $4, total_frames = $5 WHERE file_uuid = $6", schema::table_name("videos") )) .bind(duration) .bind(width as i32) .bind(height as i32) .bind(fps) .bind(total_frames) .bind(&file_uuid) .execute(state.db.pool()) .await; Ok(Json(ProbeResponse { file_uuid, file_name, duration, width, height, fps, total_frames, cached, format: probe_result.format, streams: probe_result.streams, })) } // --- P0 Core API Handlers --- async fn trigger_processing( State(state): State, Path(file_uuid): Path, Json(req): Json, ) -> Result, (StatusCode, String)> { let table = schema::table_name("videos"); let asset: Option<(String, i64)> = sqlx::query_as(&format!( "SELECT file_name, COALESCE(total_frames, 0) FROM {} WHERE file_uuid = $1", table )) .bind(&file_uuid) .fetch_optional(state.db.pool()) .await .map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, format!("DB Error: {}", e), ) })?; let (file_name, total_frames) = asset.ok_or((StatusCode::NOT_FOUND, "File not found".to_string()))?; // Get video file_path let video_path: Option = sqlx::query_scalar(&format!( "SELECT file_path FROM {} WHERE file_uuid = $1", table )) .bind(&file_uuid) .fetch_optional(state.db.pool()) .await .map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to get video path: {}", e), ) })?; // 2. Create Monitor Job (Worker polls this table) let monitor_job = state .db .create_monitor_job(&file_uuid, video_path.as_deref()) .await .map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to create monitor job: {}", e), ) })?; // Update processors if specified let processors_to_run: Vec<&str> = if let Some(procs) = &req.processors { let table = crate::core::db::schema::table_name("monitor_jobs"); sqlx::query(&format!( "UPDATE {} SET processors = $1 WHERE id = $2", table )) .bind(procs) .bind(monitor_job.id) .execute(state.db.pool()) .await .map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to update processors: {}", e), ) })?; procs.iter().map(|s| s.as_str()).collect() } else { // Empty means all processors vec![ "asr", "cut", "yolo", "ocr", "face", "pose", "asrx", "visual_chunk", ] }; // 3. Update Asset Status let table = schema::table_name("videos"); sqlx::query(&format!( "UPDATE {} SET status = 'processing' WHERE file_uuid = $1", table )) .bind(&file_uuid) .execute(state.db.pool()) .await .map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to update asset status: {}", e), ) })?; // Initialize processing_status with all processors and total_frames state .db .init_processing_status(&file_uuid, processors_to_run, total_frames as u64) .await .map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to init processing status: {}", e), ) })?; tracing::info!("Job {} created for asset {}", monitor_job.id, file_uuid); let prefix = REDIS_KEY_PREFIX.as_str(); let mut processor_pids: Vec = Vec::new(); if let Ok(redis_client) = RedisClient::new() { if let Ok(mut conn) = redis_client.get_conn().await { for name in ["asr", "cut", "asrx", "yolo", "ocr", "face", "pose"] { let key = format!("{}worker:job:{}:processor:{}", prefix, file_uuid, name); let pid: Option = redis::cmd("HGET") .arg(&key) .arg("pid") .query_async(&mut conn) .await .ok() .flatten(); if let Some(p) = pid { processor_pids.push(p); } } } } Ok(Json(serde_json::json!({ "job_id": monitor_job.id, "file_uuid": file_uuid, "status": "PENDING", "pids": processor_pids, "message": format!("Processing triggered for {}", file_name) }))) } async fn get_asset_status( State(state): State, Path(uuid): Path, ) -> Result, StatusCode> { let videos_table = schema::table_name("videos"); let row: Option<(String, String, chrono::DateTime, String, i64)> = sqlx::query_as( &format!( "SELECT file_uuid, file_name, created_at AT TIME ZONE 'UTC', COALESCE(processing_status, 'REGISTERED'), COALESCE(total_frames, 0) FROM {} WHERE file_uuid = $1", videos_table ) ) .bind(&uuid) .fetch_optional(state.db.pool()) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let (uuid, file_name, time, status, total) = row.ok_or(StatusCode::NOT_FOUND)?; let jobs_table = schema::table_name("monitor_jobs"); let job: Option<(String, String, i64, i64)> = sqlx::query_as( &format!( "SELECT id::text, COALESCE(status, 'QUEUED'), COALESCE(processed_frames, 0), COALESCE(total_frames, 0) FROM {} WHERE file_uuid = $1 ORDER BY created_at DESC LIMIT 1", jobs_table ) ) .bind(&uuid) .fetch_optional(state.db.pool()) .await .ok() .flatten(); let progress = if let Some((jid, jstatus, pf, tf)) = job { if tf > 0 && (jstatus == "RUNNING" || jstatus == "QUEUED") { Some(( jid, FrameProgress { total_frames: tf, processed_frames: pf, progress_percent: (pf as f64 / tf as f64) * 100.0, }, )) } else { None } } else { None }; Ok(Json(AssetStatusResponse { uuid, file_name, registration_time: time.to_rfc3339(), processing_status: status, current_job_id: progress.as_ref().map(|(id, _)| id.clone()), frame_progress: progress.map(|(_, p)| p), })) } async fn get_job_status( State(state): State, Path(job_id): Path, ) -> Result, StatusCode> { let jobs_table = schema::table_name("monitor_jobs"); let row: Option<(String, String, String, String, Option, i64, i64)> = sqlx::query_as( &format!( "SELECT j.id::text, j.file_uuid, COALESCE(j.rule, 'unknown'), COALESCE(j.status, 'QUEUED'), j.assigned_processor_id::text, j.processed_frames, j.total_frames FROM {} j WHERE j.id = $1::uuid", jobs_table ) ) .bind(&job_id) .fetch_optional(state.db.pool()) .await .map_err(|e| { eprintln!("DB Error in get_job_status: {}", e); StatusCode::INTERNAL_SERVER_ERROR })?; let (id, asset, rule, status, proc_id, pf, tf) = row.ok_or(StatusCode::NOT_FOUND)?; Ok(Json(JobStatusResponse { job_id: id, file_uuid: asset, rule, status, current_processor_id: proc_id, frame_progress: FrameProgress { total_frames: tf, processed_frames: pf, progress_percent: if tf > 0 { (pf as f64 / tf as f64) * 100.0 } else { 0.0 }, }, })) } async fn get_rule_status( State(state): State, Path(rule): Path, ) -> Result, StatusCode> { let procs: Vec = sqlx::query_scalar( "SELECT id::text FROM processors WHERE supported_rules @> ARRAY[$1]::TEXT[]", ) .bind(&rule) .fetch_all(state.db.pool()) .await .unwrap_or_default(); let jobs_table = schema::table_name("monitor_jobs"); let jobs: Vec<(String, String, String, String, Option, i64, i64)> = sqlx::query_as( &format!( "SELECT id::text, file_uuid, COALESCE(rule, 'unknown'), status, assigned_processor_id::text, processed_frames, total_frames FROM {} WHERE rule = $1 AND status IN ('QUEUED','RUNNING')", jobs_table ) ) .bind(&rule) .fetch_all(state.db.pool()) .await .unwrap_or_default(); let active = jobs .into_iter() .map(|(id, asset, r, s, p, pf, tf)| JobStatusResponse { job_id: id, file_uuid: asset, rule: r, status: s, current_processor_id: p, frame_progress: FrameProgress { total_frames: tf, processed_frames: pf, progress_percent: if tf > 0 { (pf as f64 / tf as f64) * 100.0 } else { 0.0 }, }, }) .collect(); Ok(Json(RuleStatusResponse { rule, supported_processor_ids: procs, active_jobs: active, })) } // --- End P0 Core API Handlers --- async fn search( State(state): State, Json(req): Json, ) -> Result, StatusCode> { let mode = req.mode.unwrap_or(SearchMode::Smart); let limit = req.limit.unwrap_or(10); let query_hash = generate_query_hash(&req.query, req.uuid.as_deref(), limit); let cache_key = keys::search(&format!("{:?}", mode)); let ttl = state.mongo_cache.ttl_search(); let response = state .mongo_cache .get_or_fetch(&cache_key, ttl, keys::CATEGORY_SEARCH, || async { let pg = PostgresDb::init() .await .map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?; let results: Vec = match mode { SearchMode::Vector => { let query_vector = state .embedder .embed_query(&req.query) .await .map_err(|e| anyhow::anyhow!("Embedding failed: {}", e))?; let qdrant = QdrantDb::new(); let search_results = if let Some(ref uuid) = req.uuid { qdrant.search_in_uuid(&query_vector, uuid, limit).await? } else { qdrant.search(&query_vector, limit).await? }; let mut results = Vec::new(); for r in search_results { if let Some(chunk) = pg .get_chunk_by_chunk_id_and_uuid(&r.chunk_id, &r.uuid) .await .ok() .flatten() { let text = extract_text_from_content(&chunk.content); results.push(SearchResult { uuid: r.uuid, chunk_id: r.chunk_id, chunk_type: chunk.chunk_type.as_str().to_string(), start_time: chunk.start_time().seconds(), end_time: chunk.end_time().seconds(), text, score: r.score, }); } } results } SearchMode::Smart => { // Vector search + BM25 reranking let query_vector = state .embedder .embed_query(&req.query) .await .map_err(|e| anyhow::anyhow!("Embedding failed: {}", e))?; let qdrant = QdrantDb::new(); let search_results = if let Some(ref uuid) = req.uuid { qdrant .search_in_uuid(&query_vector, uuid, limit * 2) .await? } else { qdrant.search(&query_vector, limit * 2).await? }; // 取得所有 chunk 並用 BM25 重新排序 let mut results_with_bm25: Vec<(SearchResult, f32)> = Vec::new(); for r in search_results { if let Some(chunk) = pg .get_chunk_by_chunk_id_and_uuid(&r.chunk_id, &r.uuid) .await .ok() .flatten() { let text = extract_text_from_content(&chunk.content); let vector_score = r.score; results_with_bm25.push(( SearchResult { uuid: r.uuid, chunk_id: r.chunk_id, chunk_type: chunk.chunk_type.as_str().to_string(), start_time: chunk.start_time().seconds(), end_time: chunk.end_time().seconds(), text, score: vector_score, }, vector_score, )); } } // 依 vector score 排序後取前 limit 個 results_with_bm25 .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)); results_with_bm25 .into_iter() .take(limit) .map(|(r, _)| r) .collect() } }; Ok::(SearchResponse { results, query: req.query.clone(), }) }) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; Ok(Json(response)) } async fn search_bm25( State(state): State, Json(req): Json, ) -> Result, StatusCode> { let limit = req.limit.unwrap_or(10); let bm25_results = state .db .search_bm25(&req.query, req.uuid.as_deref(), limit) .await .map_err(|e| { tracing::error!("BM25 search failed: {}", e); StatusCode::INTERNAL_SERVER_ERROR })?; let results: Vec = bm25_results .into_iter() .map(|r| SearchResult { uuid: r.uuid, chunk_id: r.chunk_id, chunk_type: r.chunk_type, start_time: r.start_time, end_time: r.end_time, text: r.text, score: r.bm25_score, }) .collect(); Ok(Json(SearchResponse { results, query: req.query.clone(), })) } async fn search_smart( State(state): State, Json(req): Json, ) -> Result, StatusCode> { let limit = req.limit.unwrap_or(10); let query_hash = generate_query_hash(&req.query, req.uuid.as_deref(), limit); let cache_key = keys::search(&format!("{}smart", query_hash)); let ttl = state.mongo_cache.ttl_search(); let response = state .mongo_cache .get_or_fetch(&cache_key, ttl, keys::CATEGORY_SEARCH, || async { let pg = PostgresDb::init() .await .map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?; let keywords = vec![req.query.clone()]; let search_terms = keywords.join(" "); let bm25_results = pg .search_bm25(&search_terms, req.uuid.as_deref(), limit) .await?; let results: Vec = bm25_results .into_iter() .map(|r| SearchResult { uuid: r.uuid, chunk_id: r.chunk_id, chunk_type: r.chunk_type, start_time: r.start_time, end_time: r.end_time, text: r.text, score: r.bm25_score, }) .collect(); Ok::(SearchResponse { results, query: req.query.clone(), }) }) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; Ok(Json(response)) } async fn hybrid_search( State(state): State, Json(req): Json, ) -> Result, StatusCode> { let limit = req.limit.unwrap_or(10); let vector_weight = req.vector_weight.unwrap_or(0.7); let bm25_weight = req.bm25_weight.unwrap_or(0.3); let query_hash = generate_query_hash(&req.query, req.uuid.as_deref(), limit); let cache_key = keys::hybrid_search(&query_hash); let ttl = state.mongo_cache.ttl_hybrid_search(); let response = state .mongo_cache .get_or_fetch(&cache_key, ttl, keys::CATEGORY_HYBRID_SEARCH, || async { let query_vector = state .embedder .embed_query(&req.query) .await .map_err(|e| anyhow::anyhow!("Embedding failed: {}", e))?; let pg = PostgresDb::init() .await .map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?; let results = pg .hybrid_search( &req.query, &query_vector, req.uuid.as_deref(), limit, vector_weight, bm25_weight, ) .await?; let search_results: Vec = results .into_iter() .map(|r| HybridSearchResult { uuid: r.uuid, chunk_id: r.chunk_id, chunk_type: r.chunk_type, start_time: r.start_time, end_time: r.end_time, text: r.text, vector_score: r.vector_score, bm25_score: r.bm25_score, combined_score: r.combined_score, }) .collect(); Ok::(HybridSearchResponse { results: search_results, query: req.query.clone(), }) }) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; Ok(Json(response)) } async fn lookup( State(state): State, Query(query): Query, ) -> Result, StatusCode> { if let Some(path) = query.path { let uuid = crate::uuid::compute_uuid_from_path(&path); return Ok(Json(LookupResponse { file_uuid: uuid, file_path: None, file_name: None, duration: None, })); } if let Some(uuid) = query.uuid { let cache_key = keys::video_meta(&uuid); let ttl = state.mongo_cache.ttl_video_meta(); let video = state .mongo_cache .get_or_fetch(&cache_key, ttl, keys::CATEGORY_VIDEO_META, || async { let db = PostgresDb::init() .await .map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?; db.get_video_by_uuid(&uuid) .await .map_err(|e| anyhow::anyhow!("{}", e)) }) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; if let Some(v) = video { return Ok(Json(LookupResponse { file_uuid: v.file_uuid, file_path: Some(v.file_path), file_name: Some(v.file_name), duration: Some(v.duration), })); } } Err(StatusCode::NOT_FOUND) } #[derive(Debug, Serialize, Deserialize)] struct ScannedFileInfo { file_name: String, relative_path: String, file_path: String, file_size: u64, modified_time: String, // Registration info is_registered: bool, file_uuid: Option, status: Option, registration_time: Option, } #[derive(Debug, Serialize, Deserialize)] struct ScanFilesResponse { files: Vec, total: usize, registered_count: usize, unregistered_count: usize, } fn scan_directory_recursive( dir: &std::path::Path, root: &std::path::Path, allowed_extensions: &[&str], registered_paths: &std::collections::HashMap)>, files: &mut Vec, ) { if let Ok(entries) = std::fs::read_dir(dir) { for entry in entries.flatten() { let path = entry.path(); if path.is_dir() { if let Some(name) = path.file_name().and_then(|n| n.to_str()) { if name.starts_with('.') { return; } } scan_directory_recursive(&path, root, allowed_extensions, registered_paths, files); } else if path.is_file() { if let Some(ext) = path.extension().and_then(|e| e.to_str()) { if allowed_extensions.contains(&ext.to_lowercase().as_str()) { if let Ok(meta) = entry.metadata() { let abs_path = path.to_string_lossy().to_string(); let rel_path = path .strip_prefix(root) .map(|p| p.to_string_lossy().to_string()) .unwrap_or_else(|_| abs_path.clone()); let file_name = path .file_name() .map(|n| n.to_string_lossy().to_string()) .unwrap_or_default(); let modified_time = meta .modified() .ok() .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) .map(|d| { chrono::DateTime::from_timestamp(d.as_secs() as i64, 0) .map(|dt| dt.to_rfc3339()) .unwrap_or_default() }) .unwrap_or_default(); // Check registration if let Some((uuid, status, reg_time)) = registered_paths.get(&abs_path) { files.push(ScannedFileInfo { file_name, relative_path: rel_path, file_path: abs_path, file_size: meta.len(), modified_time, is_registered: true, file_uuid: Some(uuid.clone()), status: Some(status.clone()), registration_time: reg_time.clone(), }); } else { files.push(ScannedFileInfo { file_name, relative_path: rel_path, file_path: abs_path, file_size: meta.len(), modified_time, is_registered: false, file_uuid: None, status: None, registration_time: None, }); } } } } } } } } async fn scan_files(State(state): State) -> Result, StatusCode> { let demo_dir_str = std::env::var("MOMENTRY_SFTP_ROOT") .unwrap_or_else(|_| "/Users/accusys/momentry/var/sftpgo/data/demo".to_string()); let demo_dir = std::path::Path::new(&demo_dir_str); let allowed_extensions = vec!["mp4", "mov", "mkv", "avi", "webm"]; // 1. Get registered files from DB (Map key: absolute file_path) let table = schema::table_name("videos"); let registered_db: Vec<(String, String, String, String, Option)> = sqlx::query_as(&format!( "SELECT file_path, file_name, file_uuid, status, registration_time::text FROM {} ORDER BY id", table )) .fetch_all(state.db.pool()) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let registered_paths: std::collections::HashMap)> = registered_db .into_iter() .map(|(path, _name, uuid, status, reg_time)| (path, (uuid, status, reg_time))) .collect(); // 2. Scan filesystem recursively let mut result_files = Vec::new(); if demo_dir.exists() { scan_directory_recursive( demo_dir, demo_dir, &allowed_extensions, ®istered_paths, &mut result_files, ); } // 3. Sort: registered first, then by name result_files.sort_by(|a, b| { b.is_registered .cmp(&a.is_registered) .then(a.relative_path.cmp(&b.relative_path)) }); let registered_count = result_files.iter().filter(|f| f.is_registered).count(); let unregistered_count = result_files.iter().filter(|f| !f.is_registered).count(); Ok(Json(ScanFilesResponse { files: result_files, total: registered_count + unregistered_count, registered_count, unregistered_count, })) } #[derive(Debug, Serialize)] struct ProgressResponse { file_uuid: String, user: Option, group: Option, file_name: Option, duration: Option, overall_progress: u32, cpu_percent: Option, gpu_percent: Option, memory_percent: Option, memory_mb: Option, system: Option, processors: Vec, } #[derive(Debug, Serialize)] struct SystemHealthInfo { cpu_idle_pct: f64, memory_available_mb: u64, memory_total_mb: u64, memory_used_pct: f64, gpu_available: bool, gpu_utilization_pct: Option, gpu_memory_used_pct: Option, dynamic_concurrency: u32, config_concurrency: u32, running_processors: u32, } #[derive(Debug, Serialize)] struct ProcessorProgressInfo { name: String, status: String, current: u32, total: u32, progress: u32, message: String, frames_processed: i32, chunks_produced: i32, retry_count: i32, } /// 從 .json 輸出檔讀取 processor 的已處理幀數 fn get_json_frame_count(file_uuid: &str, processor: &str) -> i32 { let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR") .unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string()); let path = std::path::Path::new(&output_dir).join(format!("{}.{}.json", file_uuid, processor)); match std::fs::read_to_string(&path) { Ok(content) => match serde_json::from_str::(&content) { Ok(val) => val.get("frame_count").and_then(|v| v.as_i64()).unwrap_or(0) as i32, Err(_) => 0, }, Err(_) => 0, } } fn get_system_stats() -> (Option, Option, Option, Option) { use std::process::Command; let pid = std::process::id().to_string(); let cpu = Command::new("ps") .args(["-p", &pid, "-o", "%cpu="]) .output() .ok() .and_then(|o| String::from_utf8_lossy(&o.stdout).trim().parse().ok()); let (mem_percent, mem_rss) = Command::new("ps") .args(["-p", &pid, "-o", "%mem=,rss="]) .output() .ok() .map(|o| { let output = String::from_utf8_lossy(&o.stdout); let parts: Vec<&str> = output.split_whitespace().collect(); let percent = parts.first().and_then(|s| s.parse().ok()); let rss = parts.get(1).and_then(|s| s.parse().ok()); (percent, rss) }) .unwrap_or((None, None)); let gpu = Command::new("nvidia-smi") .args([ "--query-gpu=utilization.gpu", "--format=csv,noheader,nounits", ]) .output() .ok() .and_then(|o| String::from_utf8_lossy(&o.stdout).trim().parse().ok()); (cpu, gpu, mem_percent, mem_rss) } async fn get_progress( axum::extract::Path(file_uuid): axum::extract::Path, ) -> Result, StatusCode> { let redis = RedisClient::new().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let pg = PostgresDb::init() .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let mut conn = redis .get_conn() .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let processor_names = ["asr", "cut", "asrx", "yolo", "ocr", "face", "pose"]; let mut processors = Vec::new(); let mut completed_count = 0u32; for name in processor_names { let prefix = REDIS_KEY_PREFIX.as_str(); let key = format!("{}worker:job:{}:processor:{}", prefix, file_uuid, name); let status: String = redis::cmd("HGET") .arg(&key) .arg("status") .query_async(&mut conn) .await .unwrap_or_else(|_| "pending".to_string()); let current: u32 = redis::cmd("HGET") .arg(&key) .arg("current") .query_async(&mut conn) .await .unwrap_or_else(|_| "0".to_string()) .parse() .unwrap_or(0); let total: u32 = redis::cmd("HGET") .arg(&key) .arg("total") .query_async(&mut conn) .await .unwrap_or_else(|_| "0".to_string()) .parse() .unwrap_or(0); let message: String = redis::cmd("HGET") .arg(&key) .arg("message") .query_async(&mut conn) .await .unwrap_or_else(|_| "".to_string()); let progress = if total > 0 { ((current as f64 / total as f64) * 100.0) as u32 } else if status == "complete" { 100 } else { 0 }; // 從 Redis 讀取 frames_processed / chunks_produced / retry_count let frames_processed: i32 = redis::cmd("HGET") .arg(&key) .arg("frames_processed") .query_async(&mut conn) .await .unwrap_or_else(|_| "0".to_string()) .parse() .unwrap_or(0); let chunks_produced: i32 = redis::cmd("HGET") .arg(&key) .arg("chunks_produced") .query_async(&mut conn) .await .unwrap_or_else(|_| "0".to_string()) .parse() .unwrap_or(0); let retry_count: i32 = redis::cmd("HGET") .arg(&key) .arg("retry_count") .query_async(&mut conn) .await .unwrap_or_else(|_| "0".to_string()) .parse() .unwrap_or(0); if status == "complete" { completed_count += 1; } processors.push(ProcessorProgressInfo { name: name.to_string(), status, current, total, progress, message, frames_processed, chunks_produced, retry_count, }); } let overall_progress = (completed_count as f64 / processor_names.len() as f64 * 100.0) as u32; let job_key = format!("{}worker:job:{}", REDIS_KEY_PREFIX.as_str(), file_uuid); let user: Option = redis::cmd("HGET") .arg(&job_key) .arg("user") .query_async(&mut conn) .await .ok() .filter(|s: &String| !s.is_empty()); let group: Option = redis::cmd("HGET") .arg(&job_key) .arg("group") .query_async(&mut conn) .await .ok() .filter(|s: &String| !s.is_empty()); let (file_name, duration) = pg .get_video_by_uuid(&file_uuid) .await .ok() .flatten() .map(|v| (Some(v.file_name), Some(v.duration))) .unwrap_or((None, None)); let (cpu_percent, gpu_percent, memory_percent, memory_mb) = get_system_stats(); // 從 Redis 讀取系統健康資訊(由 Worker 寫入) let health_key = format!("{}health", REDIS_KEY_PREFIX.as_str()); let cpu_idle: Option = redis::cmd("HGET") .arg(&health_key) .arg("cpu_idle_pct") .query_async(&mut conn) .await .ok(); let mem_avail: Option = redis::cmd("HGET") .arg(&health_key) .arg("memory_available_mb") .query_async(&mut conn) .await .ok(); let mem_total: Option = redis::cmd("HGET") .arg(&health_key) .arg("memory_total_mb") .query_async(&mut conn) .await .ok(); let mem_used: Option = redis::cmd("HGET") .arg(&health_key) .arg("memory_used_pct") .query_async(&mut conn) .await .ok(); let gpu_avail: Option = redis::cmd("HGET") .arg(&health_key) .arg("gpu_available") .query_async(&mut conn) .await .ok(); let gpu_util: Option = redis::cmd("HGET") .arg(&health_key) .arg("gpu_utilization_pct") .query_async(&mut conn) .await .ok(); let gpu_mem: Option = redis::cmd("HGET") .arg(&health_key) .arg("gpu_memory_used_pct") .query_async(&mut conn) .await .ok(); let dyn_conc: Option = redis::cmd("HGET") .arg(&health_key) .arg("dynamic_concurrency") .query_async(&mut conn) .await .ok(); let cfg_conc: Option = redis::cmd("HGET") .arg(&health_key) .arg("config_concurrency") .query_async(&mut conn) .await .ok(); let run_proc: Option = redis::cmd("HGET") .arg(&health_key) .arg("running_processors") .query_async(&mut conn) .await .ok(); let system = cpu_idle.and_then(|idle| { Some(SystemHealthInfo { cpu_idle_pct: idle.parse().ok()?, memory_available_mb: mem_avail?.parse().ok()?, memory_total_mb: mem_total?.parse().ok()?, memory_used_pct: mem_used?.parse().ok()?, gpu_available: gpu_avail.map(|v| v == "true").unwrap_or(false), gpu_utilization_pct: gpu_util.and_then(|v| v.parse().ok()), gpu_memory_used_pct: gpu_mem.and_then(|v| v.parse().ok()), dynamic_concurrency: dyn_conc?.parse().ok()?, config_concurrency: cfg_conc?.parse().ok()?, running_processors: run_proc?.parse().ok()?, }) }); Ok(Json(ProgressResponse { file_uuid, user, group, file_name, duration, overall_progress, cpu_percent, gpu_percent, memory_percent, memory_mb, system, processors, })) } async fn list_jobs(Query(params): Query) -> Result, StatusCode> { let page = params.page.unwrap_or(1); let page_size = params.page_size.unwrap_or(20); let status_filter = params .status .unwrap_or_else(|| "pending,running".to_string()); let offset = ((page - 1) as i64) * (page_size as i64); let pg = PostgresDb::init() .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let table = crate::core::db::schema::table_name("monitor_jobs"); // Build status IN clause let statuses: Vec = status_filter .split(',') .map(|s| format!("'{}'", s.trim())) .collect(); let status_clause = statuses.join(","); let query = format!( "SELECT id, uuid, video_path, status, current_processor, progress_total, progress_current, error_count, last_error, started_at::TEXT, updated_at::TEXT, created_at::TEXT, processors, completed_processors, failed_processors, video_id FROM {} WHERE status IN ({}) ORDER BY created_at DESC LIMIT {} OFFSET {}", table, status_clause, page_size, offset ); let count_query = format!( "SELECT COUNT(*) FROM {} WHERE status IN ({})", table, status_clause ); let total_count: i64 = sqlx::query_scalar(&count_query) .fetch_one(pg.pool()) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; use crate::core::db::MonitorJobStatus; let rows = sqlx::query(&query) .fetch_all(pg.pool()) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let job_infos: Vec = rows .into_iter() .map(|r| { let status_str: String = r.try_get("status").unwrap_or_default(); let status = MonitorJobStatus::from_db_str(&status_str).unwrap_or(MonitorJobStatus::Pending); JobInfoResponse { id: r.try_get("id").unwrap_or(0), uuid: r.try_get("uuid").unwrap_or_default(), status: status.as_str().to_string(), current_processor: r.try_get("current_processor").ok(), progress_current: r.try_get("progress_current").unwrap_or(0), progress_total: r.try_get("progress_total").unwrap_or(0), created_at: r.try_get::("created_at").unwrap_or_default(), started_at: r.try_get::("started_at").ok(), } }) .collect(); Ok(Json(JobListResponse { jobs: job_infos, count: total_count, page, page_size, })) } async fn get_job( axum::extract::Path(uuid): axum::extract::Path, ) -> Result, StatusCode> { tracing::info!("[get_job] START - uuid: {}", uuid); let pg = PostgresDb::init().await.map_err(|e| { tracing::error!("[get_job] ERROR - Failed to init PostgresDb: {}", e); StatusCode::INTERNAL_SERVER_ERROR })?; tracing::info!("[get_job] PostgresDb initialized"); let job = pg .get_monitor_job_by_uuid(&uuid) .await .map_err(|e| { tracing::error!("[get_job] ERROR - Failed to get monitor job: {}", e); StatusCode::INTERNAL_SERVER_ERROR })? .ok_or_else(|| { tracing::warn!("[get_job] Job not found: {}", uuid); StatusCode::NOT_FOUND })?; tracing::info!("[get_job] Found job: id={}, uuid={}", job.id, job.uuid); let results = pg.get_processor_results_by_job(job.id).await.map_err(|e| { tracing::error!("[get_job] ERROR - Failed to get processor results: {}", e); StatusCode::INTERNAL_SERVER_ERROR })?; tracing::info!("[get_job] Got {} processor results", results.len()); let processors: Vec = results .into_iter() .map(|r| ProcessorInfoResponse { processor_type: r.processor_type.as_str().to_string(), status: r.status.as_str().to_string(), started_at: r.started_at, completed_at: r.completed_at, duration_secs: r.duration_secs, error_message: r.error_message, }) .collect(); tracing::info!("[get_job] Mapped {} processors", processors.len()); let response = JobDetailResponse { id: job.id, uuid: job.uuid, status: job.status.as_str().to_string(), current_processor: job.current_processor, progress_current: job.progress_current, progress_total: job.progress_total, processors, created_at: job.created_at.to_string(), started_at: job.started_at.map(|t| t.to_string()), updated_at: job.updated_at.map(|t| t.to_string()), }; tracing::info!("[get_job] SUCCESS - returning response"); Ok(Json(response)) } async fn cache_toggle( State(_state): State, Json(req): Json, ) -> Result, StatusCode> { tracing::info!("[cache_toggle] Setting cache enabled to: {}", req.enabled); crate::core::config::set_cache_enabled(req.enabled); let response = CacheToggleResponse { success: true, cache_enabled: req.enabled, message: if req.enabled { "Cache enabled".to_string() } else { "Cache disabled".to_string() }, }; tracing::info!("[cache_toggle] SUCCESS"); Ok(Json(response)) } #[derive(Debug, Serialize)] struct UnregisterResponse { success: bool, uuid: String, message: String, } #[derive(Debug, Deserialize)] struct UnregisterRequest { uuid: Option, file_path: Option, pattern: Option, } async fn unregister( State(state): State, Json(req): Json, ) -> Result, StatusCode> { let db = &state.db; // Pattern mode: unregister all matching files in a directory if let (Some(ref dir_path), Some(ref pat)) = (&req.file_path, &req.pattern) { let dir = std::path::Path::new(dir_path); if !dir.is_dir() { return Ok(Json(UnregisterResponse { success: false, uuid: String::new(), message: format!("Not a directory: {}", dir_path), })); } let re = regex::Regex::new(pat).map_err(|_| StatusCode::BAD_REQUEST)?; let mut count = 0u32; let table = crate::core::db::schema::table_name("videos"); if let Ok(entries) = std::fs::read_dir(dir) { for entry in entries.flatten() { let fname = entry.file_name().to_string_lossy().to_string(); if !re.is_match(&fname) { continue; } // Find file_uuid by file_name and delete let rows: Vec<(String,)> = sqlx::query_as(&format!( "SELECT file_uuid FROM {} WHERE file_name = $1", table )) .bind(&fname) .fetch_all(db.pool()) .await .unwrap_or_default(); for (uuid,) in rows { let _ = db.delete_video(&uuid).await; count += 1; } } } let _ = state.mongo_cache.invalidate_videos_list().await; return Ok(Json(UnregisterResponse { success: true, uuid: format!("batch_{}", count), message: format!("Unregistered {} files matching pattern", count), })); } // Single UUID mode let uuid = req.uuid.as_deref().unwrap_or(""); if uuid.is_empty() { return Ok(Json(UnregisterResponse { success: false, uuid: String::new(), message: "Either uuid or file_path+pattern is required".to_string(), })); } tracing::info!("[unregister] Unregistering file: {}", uuid); match db.delete_video(uuid).await { Ok(_) => { let _ = state.mongo_cache.invalidate_videos_list().await; Ok(Json(UnregisterResponse { success: true, uuid: uuid.to_string(), message: "File unregistered successfully".to_string(), })) } Err(e) => { tracing::error!("[unregister] Failed: {}", e); Err(StatusCode::INTERNAL_SERVER_ERROR) } } } pub async fn start_server(host: &str, port: u16) -> anyhow::Result<()> { let _ = SERVER_START.set(Instant::now()); let embedder = std::sync::Arc::new(Embedder::new("nomic-embed-text-v2-moe:latest".to_string())); let mongo_cache = MongoCache::init().await?; let redis_cache = RedisCache::new()?; let db = PostgresDb::init().await?; let db = std::sync::Arc::new(db); let api_state = super::middleware::ApiState { db: db.clone() }; let state = AppState { db, embedder, embedder_model: "nomic-embed-text-v2-moe:latest".to_string(), mongo_cache, redis_cache, api_state, }; let protected_routes = Router::new() .route("/api/v1/files/register", post(register_file)) .route("/api/v1/unregister", post(unregister)) .route("/api/v1/files/scan", get(scan_files)) .route("/api/v1/file/:file_uuid/probe", get(probe_by_uuid)) .route("/api/v1/file/:file_uuid/process", post(trigger_processing)) .route("/api/v1/file/:file_uuid/chunks", get(list_pre_chunks)) .route("/api/v1/progress/:uuid", get(get_progress)) .route("/api/v1/jobs", get(list_jobs)) .route("/api/v1/config/cache", post(cache_toggle)) // .merge(person_identity::person_identity_routes()) // V4.0: DISABLED (person_identities table removed) .merge(identity_binding::identity_binding_routes()) .merge(identities::identity_routes()) .layer(axum::middleware::from_fn_with_state( state.api_state.clone(), api_key_validation, )) .with_state(state.clone()); let cors = CorsLayer::new() .allow_origin(tower_http::cors::AllowOrigin::any()) .allow_methods(Any) .allow_headers(Any); let app = Router::new() .route("/health", get(health)) .route("/health/detailed", get(health_detailed)) .route("/api/v1/auth/login", post(login)) .route("/api/v1/auth/logout", post(logout)) .route("/api/v1/stats/ingest", get(get_ingest_stats)) .route("/api/v1/stats/sftpgo", get(get_sftpgo_status)) .route("/api/v1/stats/inference", get(get_inference_health)) .route("/api/v1/search/visual", post(search_visual_chunks)) .route( "/api/v1/search/visual/class", post(search_visual_chunks_by_class), ) .route( "/api/v1/search/visual/density", post(search_visual_chunks_by_density), ) .route("/api/v1/search/visual/stats", post(get_visual_chunk_stats)) .route( "/api/v1/search/visual/combination", post(search_visual_chunks_by_combination), ) .merge(identity_api::identity_routes()) // Phase 3 Routes .merge(agent_api::agent_routes()) // Phase 6 Routes .merge(super::identity_agent_api::identity_agent_routes()) // Phase 5 Routes .merge(five_w1h_agent_api::five_w1h_agent_routes()) // Phase 3 Routes (5W1H Agent) .merge(super::media_api::bbox_routes()) // Media: video/bbox/thumbnail .merge(super::trace_agent_api::trace_agent_routes()) // Trace listing .merge(search_routes()) // Smart search drill-down .merge(universal_search_routes()) // Universal / frames / persons search .merge(protected_routes) .layer(cors) .with_state(state); let addr: std::net::SocketAddr = format!("{}:{}", host, port).parse().unwrap(); tracing::info!("Starting API server at http://{}", addr); let listener = tokio::net::TcpListener::bind(addr).await?; axum::serve(listener, app).await?; Ok(()) } #[derive(Debug, Serialize)] struct IngestStatsResponse { total_videos: i64, total_chunks: i64, sentence_chunks: i64, cut_chunks: i64, time_chunks: i64, searchable_chunks: i64, chunks_with_visual: i64, chunks_with_summary: i64, pending_videos: i64, } async fn get_ingest_stats( State(state): State, ) -> Result, StatusCode> { let table_videos = schema::table_name("videos"); let table_chunks = schema::table_name("chunks"); let total_videos: (i64,) = sqlx::query_as(&format!("SELECT COUNT(*) FROM {}", table_videos)) .fetch_one(state.db.pool()) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let total_chunks: (i64,) = sqlx::query_as(&format!("SELECT COUNT(*) FROM {}", table_chunks)) .fetch_one(state.db.pool()) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let sentence_chunks: (i64,) = sqlx::query_as(&format!( "SELECT COUNT(*) FROM {} WHERE chunk_type = 'sentence'", table_chunks )) .fetch_one(state.db.pool()) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let cut_chunks: (i64,) = sqlx::query_as(&format!( "SELECT COUNT(*) FROM {} WHERE chunk_type = 'cut'", table_chunks )) .fetch_one(state.db.pool()) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let time_chunks: (i64,) = sqlx::query_as(&format!( "SELECT COUNT(*) FROM {} WHERE chunk_type = 'time'", table_chunks )) .fetch_one(state.db.pool()) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let searchable_chunks: (i64,) = sqlx::query_as(&format!( "SELECT COUNT(*) FROM {} WHERE vector_id IS NOT NULL", table_chunks )) .fetch_one(state.db.pool()) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let chunks_with_visual: (i64,) = sqlx::query_as(&format!( "SELECT COUNT(*) FROM {} WHERE visual_stats IS NOT NULL AND visual_stats != '{}'::jsonb", table_chunks, "{}" )) .fetch_one(state.db.pool()) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let chunks_with_summary: (i64,) = sqlx::query_as(&format!( "SELECT COUNT(*) FROM {} WHERE summary_text IS NOT NULL", table_chunks )) .fetch_one(state.db.pool()) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let pending_videos: (i64,) = sqlx::query_as(&format!( "SELECT COUNT(*) FROM {} WHERE status = 'pending'", table_videos )) .fetch_one(state.db.pool()) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; Ok(Json(IngestStatsResponse { total_videos: total_videos.0, total_chunks: total_chunks.0, sentence_chunks: sentence_chunks.0, cut_chunks: cut_chunks.0, time_chunks: time_chunks.0, searchable_chunks: searchable_chunks.0, chunks_with_visual: chunks_with_visual.0, chunks_with_summary: chunks_with_summary.0, pending_videos: pending_videos.0, })) } #[derive(Debug, Serialize)] struct SftpgoStatusResponse { username: String, home_dir: String, files_count: i64, registered_videos: Vec, last_login: Option, } #[derive(Debug, Serialize)] struct RegisteredVideo { uuid: String, file_name: String, status: String, } async fn get_sftpgo_status( State(state): State, ) -> Result, StatusCode> { let demo_dir = "/Users/accusys/momentry/var/sftpgo/data/demo"; let files_count: i64 = std::fs::read_dir(demo_dir) .map(|entries| entries.count() as i64) .unwrap_or(0); let table_videos = schema::table_name("videos"); let registered_videos: Vec<(String, String, String)> = sqlx::query_as(&format!( "SELECT file_uuid, file_name, status FROM {} WHERE file_path LIKE '%demo%' ORDER BY id", table_videos )) .fetch_all(state.db.pool()) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let registered_videos = registered_videos .into_iter() .map(|(uuid, file_name, status)| RegisteredVideo { uuid, file_name, status, }) .collect(); Ok(Json(SftpgoStatusResponse { username: "demo".to_string(), home_dir: demo_dir.to_string(), files_count, registered_videos, last_login: None, })) } #[derive(Debug, Serialize)] struct InferenceEngineStatus { engine: String, model: String, status: String, latency_ms: Option, error: Option, } #[derive(Debug, Serialize)] struct InferenceHealthResponse { ollama: InferenceEngineStatus, llama_server: InferenceEngineStatus, } async fn get_inference_health() -> Result, StatusCode> { let client = reqwest::Client::builder() .timeout(std::time::Duration::from_secs(5)) .build() .unwrap(); let ollama_start = std::time::Instant::now(); let ollama_status = match client.get("http://localhost:11434/api/tags").send().await { Ok(resp) if resp.status().is_success() => { let latency = ollama_start.elapsed().as_millis() as u64; InferenceEngineStatus { engine: "Ollama".to_string(), model: "nomic-embed-text".to_string(), status: "ok".to_string(), latency_ms: Some(latency), error: None, } } Ok(resp) => InferenceEngineStatus { engine: "Ollama".to_string(), model: "nomic-embed-text".to_string(), status: "error".to_string(), latency_ms: Some(ollama_start.elapsed().as_millis() as u64), error: Some(format!("HTTP {}", resp.status())), }, Err(e) => InferenceEngineStatus { engine: "Ollama".to_string(), model: "nomic-embed-text".to_string(), status: "error".to_string(), latency_ms: None, error: Some(e.to_string()), }, }; let llama_start = std::time::Instant::now(); let llama_status = match client.get("http://localhost:8081/v1/models").send().await { Ok(resp) if resp.status().is_success() => { let latency = llama_start.elapsed().as_millis() as u64; InferenceEngineStatus { engine: "llama-server".to_string(), model: "gemma4_e4b_q5".to_string(), status: "ok".to_string(), latency_ms: Some(latency), error: None, } } Ok(resp) => InferenceEngineStatus { engine: "llama-server".to_string(), model: "gemma4_e4b_q5".to_string(), status: "error".to_string(), latency_ms: Some(llama_start.elapsed().as_millis() as u64), error: Some(format!("HTTP {}", resp.status())), }, Err(e) => InferenceEngineStatus { engine: "llama-server".to_string(), model: "gemma4_e4b_q5".to_string(), status: "error".to_string(), latency_ms: None, error: Some(e.to_string()), }, }; Ok(Json(InferenceHealthResponse { ollama: ollama_status, llama_server: llama_status, })) } #[derive(Debug, Deserialize)] struct VideoDetailsQuery { chunk_id: Option, parent_id: Option, } #[derive(Debug, Serialize)] struct VideoDetailsResponse { uuid: String, #[serde(flatten)] details: VideoDetailsResult, } #[derive(Debug, Serialize)] #[serde(untagged)] enum VideoDetailsResult { Chunk(ChunkDetailResponse), Parent(ParentChunkResponse), } #[derive(Debug, Serialize)] struct FrameRange { start_frame: i64, end_frame: i64, duration_frames: i64, fps: f64, } #[derive(Debug, Serialize)] struct ReferenceTime { start: f64, end: f64, } #[derive(Debug, Serialize)] struct ChunkDetailResponse { chunk_id: String, chunk_type: String, frame_range: FrameRange, reference_time: ReferenceTime, text_content: Option, content: Option, parent_id: Option, summary_text: Option, metadata: Option, visual_stats: Option, speaker_ids: Option>, person_ids: Option>, } #[derive(Debug, Serialize)] struct ParentChunkResponse { parent_id: i32, metadata: Option, summary_text: Option, frame_range: FrameRange, reference_time: ReferenceTime, } /// Search visual chunks based on criteria #[derive(Debug, Deserialize)] struct VisualChunkSearchRequest { uuid: String, criteria: visual_chunk_search::VisualChunkSearchCriteria, } #[derive(Debug, Serialize)] struct VisualChunkSearchResponse { chunks: Vec, total: usize, } async fn search_visual_chunks( State(state): State, Json(req): Json, ) -> Result, StatusCode> { let criteria_hash = generate_visual_search_hash(&req.uuid, &req.criteria); let cache_key = keys::visual_search(&req.uuid, &criteria_hash); let ttl = state.mongo_cache.ttl_visual_search(); let chunks = state .mongo_cache .get_or_fetch(&cache_key, ttl, keys::CATEGORY_VISUAL_SEARCH, || async { let db = PostgresDb::init() .await .map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?; visual_chunk_search::search_visual_chunks(&db, &req.uuid, &req.criteria) .await .map_err(|e| anyhow::anyhow!("Visual search failed: {}", e)) }) .await .map_err(|e| { tracing::error!("Visual chunk search failed: {}", e); StatusCode::INTERNAL_SERVER_ERROR })?; Ok(Json(VisualChunkSearchResponse { total: chunks.len(), chunks, })) } /// Request for searching visual chunks by object class #[derive(Debug, Deserialize)] struct VisualChunkSearchByClassRequest { uuid: String, object_class: String, min_count: Option, max_count: Option, } /// Request for searching visual chunks by density #[derive(Debug, Deserialize)] struct VisualChunkSearchByDensityRequest { uuid: String, min_density: f32, max_density: Option, } /// Request for getting visual chunk statistics #[derive(Debug, Deserialize)] struct VisualChunkStatsRequest { uuid: String, } async fn search_visual_chunks_by_class( State(state): State, Json(req): Json, ) -> Result, StatusCode> { let db = PostgresDb::init() .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let chunks = visual_chunk_search::search_visual_chunks_by_class( &db, &req.uuid, &req.object_class, req.min_count, req.max_count, ) .await .map_err(|e| { tracing::error!("Visual chunk search by class failed: {}", e); StatusCode::INTERNAL_SERVER_ERROR })?; Ok(Json(VisualChunkSearchResponse { total: chunks.len(), chunks, })) } async fn search_visual_chunks_by_density( State(state): State, Json(req): Json, ) -> Result, StatusCode> { let db = PostgresDb::init() .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let chunks = visual_chunk_search::search_visual_chunks_by_density( &db, &req.uuid, req.min_density, req.max_density, ) .await .map_err(|e| { tracing::error!("Visual chunk search by density failed: {}", e); StatusCode::INTERNAL_SERVER_ERROR })?; Ok(Json(VisualChunkSearchResponse { total: chunks.len(), chunks, })) } #[derive(Debug, Serialize)] struct VisualChunkStatsResponse { uuid: String, stats: std::collections::HashMap, } async fn get_visual_chunk_stats( State(state): State, Json(req): Json, ) -> Result, StatusCode> { let db = PostgresDb::init() .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let stats = visual_chunk_search::get_visual_chunk_statistics(&db, &req.uuid) .await .map_err(|e| { tracing::error!("Get visual chunk stats failed: {}", e); StatusCode::INTERNAL_SERVER_ERROR })?; Ok(Json(VisualChunkStatsResponse { uuid: req.uuid, stats, })) } /// Request for searching visual chunks by object combination #[derive(Debug, Deserialize)] struct VisualChunkSearchByCombinationRequest { uuid: String, combination: Vec<(String, u32)>, // (object_class, min_count) } async fn search_visual_chunks_by_combination( State(state): State, Json(req): Json, ) -> Result, StatusCode> { let db = PostgresDb::init() .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let combination: Vec<(&str, u32)> = req .combination .iter() .map(|(c, n)| (c.as_str(), *n)) .collect(); let chunks = visual_chunk_search::search_visual_chunks_by_combination(&db, &req.uuid, &combination) .await .map_err(|e| { tracing::error!("Visual chunk search by combination failed: {}", e); StatusCode::INTERNAL_SERVER_ERROR })?; Ok(Json(VisualChunkSearchResponse { total: chunks.len(), chunks, })) } async fn video_details( Path(uuid): Path, Query(query): Query, State(state): State, ) -> Result, StatusCode> { let table = schema::table_name("chunks"); if let Some(chunk_id) = query.chunk_id { let row: Option<( i32, String, String, i32, String, f64, i64, i64, Option, serde_json::Value, Option, Option, i32, Option, Option, Option, )> = sqlx::query_as(&format!( "SELECT file_id, uuid, chunk_id, chunk_index, chunk_type::text, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, parent_chunk_id, visual_stats, summary_text FROM {} WHERE chunk_id = $1 AND uuid = $2", table )) .bind(&chunk_id) .bind(&uuid) .fetch_optional(state.db.pool()) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let speaker_row: Option<(String, String)> = sqlx::query_as(&format!( "SELECT COALESCE(speaker_ids, '{{}}'::text[]), COALESCE(face_ids, '{{}}'::integer[])::text[] FROM {} WHERE chunk_id = $1 AND uuid = $2", table )) .bind(&chunk_id) .bind(&uuid) .fetch_optional(state.db.pool()) .await .ok() .flatten(); let row = row.ok_or(StatusCode::NOT_FOUND)?; let fps = if row.5 > 0.0 { row.5 } else { 24.0 }; let start_frame = row.6; let end_frame = row.7; let duration_frames = end_frame - start_frame; let start_time = start_frame as f64 / fps; let end_time = end_frame as f64 / fps; let row_metadata = row.10.clone(); let mut summary_text = row.15.clone(); let mut metadata = None; if let Some(ref pid_str) = row.13 { if !pid_str.is_empty() { if let Ok(pid) = pid_str.parse::() { let parent_table = schema::table_name("parent_chunks"); let parent: Option<(Option, Option)> = sqlx::query_as(&format!( "SELECT summary_text, metadata FROM {} WHERE id = $1", parent_table )) .bind(pid) .fetch_optional(state.db.pool()) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; if let Some((s, m)) = parent { if summary_text.is_none() { summary_text = s; } let mut merged: serde_json::Value = serde_json::json!({}); if let Some(ref cm) = row_metadata { if let Some(obj) = cm.as_object() { for (k, v) in obj { merged[k] = v.clone(); } } } if let Some(pm) = &m { if let Some(obj) = pm.as_object() { for (k, v) in obj { merged[k] = v.clone(); } } } metadata = Some(merged); } } } } else if let Some(ref cm) = row_metadata { metadata = Some(cm.clone()); } let parse_pg_array = |s: &str| -> Vec { if s.is_empty() || s == "{}" { return vec![]; } s.trim_start_matches('{') .trim_end_matches('}') .split(',') .map(|s| s.trim_matches('"').to_string()) .collect() }; let (speaker_str, face_str) = speaker_row.unwrap_or(("{}".to_string(), "{}".to_string())); let speaker_vec: Vec = parse_pg_array(&speaker_str); let speaker_ids: Option> = if speaker_vec.is_empty() { None } else { Some(speaker_vec) }; let face_vec: Vec = parse_pg_array(&face_str); let person_ids: Option> = if face_vec.is_empty() { None } else { Some(face_vec.iter().map(|id| format!("face_{}", id)).collect()) }; return Ok(Json(VideoDetailsResponse { uuid: row.1.clone(), details: VideoDetailsResult::Chunk(ChunkDetailResponse { chunk_id: row.2.clone(), chunk_type: row.4.clone(), frame_range: FrameRange { start_frame, end_frame, duration_frames, fps, }, reference_time: ReferenceTime { start: start_time, end: end_time, }, text_content: row.8.clone(), content: Some(row.9.clone()), parent_id: row.13.clone(), summary_text, metadata, visual_stats: row.14.clone(), speaker_ids, person_ids, }), })); } Err(StatusCode::BAD_REQUEST) } #[derive(Debug, Deserialize)] struct PreChunksQuery { processor_type: Option, page: Option, page_size: Option, } #[derive(Debug, Serialize)] struct PreChunksResponse { pre_chunks: Vec, count: i64, page: usize, page_size: usize, } #[derive(Debug, Serialize)] struct PreChunkItem { id: i64, processor_type: String, coordinate_type: String, coordinate_index: i64, start_frame: Option, end_frame: Option, start_time: Option, end_time: Option, fps: Option, data: serde_json::Value, identity_id: Option, confidence: Option, created_at: String, } async fn list_pre_chunks( Path(uuid): Path, Query(query): Query, State(state): State, ) -> Result, StatusCode> { let table = schema::table_name("pre_chunks"); let page = query.page.unwrap_or(1); let page_size = query.page_size.unwrap_or(20); let offset = (page - 1) * page_size; let processor_filter = if let Some(pt) = &query.processor_type { format!("AND processor_type = '{}'", pt.to_lowercase()) } else { "".to_string() }; let count_query = format!( "SELECT COUNT(*) FROM {} WHERE file_uuid = $1 {}", table, processor_filter ); let count: i64 = sqlx::query(&count_query) .bind(&uuid) .fetch_one(state.db.pool()) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? .try_get(0) .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let data_query = format!( "SELECT id, processor_type, coordinate_type, coordinate_index, start_frame, end_frame, start_time, end_time, fps, data, created_at FROM {} WHERE file_uuid = $1 {} ORDER BY coordinate_index ASC LIMIT {} OFFSET {}", table, processor_filter, page_size, offset ); let rows: Vec<( i64, String, String, i64, Option, Option, Option, Option, Option, serde_json::Value, chrono::DateTime, )> = sqlx::query_as(&data_query) .bind(&uuid) .fetch_all(state.db.pool()) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let pre_chunks = rows .iter() .map(|row| PreChunkItem { id: row.0, processor_type: row.1.clone(), coordinate_type: row.2.clone(), coordinate_index: row.3, start_frame: row.4, end_frame: row.5, start_time: row.6, end_time: row.7, fps: row.8, data: row.9.clone(), identity_id: None, confidence: None, created_at: row.10.to_rfc3339(), }) .collect(); Ok(Json(PreChunksResponse { pre_chunks, count, page, page_size, })) } #[derive(Debug, Serialize)] struct DeleteVideoResponse { success: bool, message: String, deleted_face_detections: u64, deleted_processor_results: u64, } #[derive(Debug, Serialize)] struct UnregisterFileResponse { success: bool, message: String, file_uuid: String, deleted_face_detections: u64, deleted_processor_results: u64, deleted_chunks: u64, } /// Detects file type using a tiered strategy: /// 1. ffprobe (if available) /// 2. `file` command (MIME type) /// 3. File extension fn detect_file_type( path: &std::path::Path, probe_result: Option<&crate::core::probe::ffprobe::ProbeResult>, ) -> String { // 1. ffprobe if let Some(probe) = probe_result { let has_video = probe .streams .iter() .any(|s| s.codec_type.as_deref() == Some("video")); let has_audio = probe .streams .iter() .any(|s| s.codec_type.as_deref() == Some("audio")); if has_video { return "video".to_string(); } if has_audio { return "audio".to_string(); } if !probe.streams.is_empty() { return "image".to_string(); } } // 2. file command if let Ok(output) = std::process::Command::new("file") .args(["--mime-type", "-b"]) .arg(path) .output() { let mime = String::from_utf8_lossy(&output.stdout) .trim() .to_lowercase(); if mime.starts_with("video/") { return "video".to_string(); } if mime.starts_with("audio/") { return "audio".to_string(); } if mime.starts_with("image/") { return "image".to_string(); } } // 3. Extension match path .extension() .and_then(|e| e.to_str()) .map(|s| s.to_lowercase()) .as_deref() { Some("mp4" | "mov" | "mkv" | "avi" | "webm" | "flv" | "wmv") => "video".to_string(), Some("mp3" | "wav" | "flac" | "aac" | "ogg" | "m4a") => "audio".to_string(), Some("jpg" | "jpeg" | "png" | "gif" | "bmp" | "webp" | "tiff" | "svg") => { "image".to_string() } _ => "unknown".to_string(), } } async fn delete_video( Path(uuid): Path, State(state): State, ) -> Result, StatusCode> { tracing::info!("[UNREGISTER] Unregistering file: {}", uuid); let videos_table = schema::table_name("videos"); let face_table = schema::table_name("face_detections"); let processor_table = schema::table_name("processor_results"); let chunks_table = schema::table_name("chunks"); let parent_chunks_table = schema::table_name("parent_chunks"); // Check if video exists first let exists: Option = sqlx::query_scalar(&format!( "SELECT file_uuid FROM {} WHERE file_uuid = $1", videos_table )) .bind(&uuid) .fetch_optional(state.db.pool()) .await .map_err(|e| { tracing::error!("[UNREGISTER] DB check failed: {}", e); StatusCode::INTERNAL_SERVER_ERROR })?; // Delete associated data regardless of video existence (cleanup orphans) let deleted_faces: i64 = sqlx::query(&format!("DELETE FROM {} WHERE file_uuid = $1", face_table)) .bind(&uuid) .execute(state.db.pool()) .await .map(|r| r.rows_affected() as i64) .unwrap_or(0); tracing::info!("[UNREGISTER] Deleted {} face_detections", deleted_faces); let deleted_processors: i64 = sqlx::query(&format!( "DELETE FROM {} WHERE file_uuid = $1", processor_table )) .bind(&uuid) .execute(state.db.pool()) .await .map(|r| r.rows_affected() as i64) .unwrap_or(0); tracing::info!( "[UNREGISTER] Deleted {} processor_results", deleted_processors ); let deleted_parent_chunks: i64 = sqlx::query(&format!( "DELETE FROM {} WHERE uuid = $1", parent_chunks_table )) .bind(&uuid) .execute(state.db.pool()) .await .map(|r| r.rows_affected() as i64) .unwrap_or(0); let deleted_chunks: i64 = sqlx::query(&format!("DELETE FROM {} WHERE uuid = $1", chunks_table)) .bind(&uuid) .execute(state.db.pool()) .await .map(|r| r.rows_affected() as i64) .unwrap_or(0); tracing::info!("[UNREGISTER] Deleted {} chunks", deleted_chunks); if exists.is_none() { tracing::warn!( "[UNREGISTER] Video not found: {}. (Maybe already deleted or UUID mismatch)", uuid ); return Err(StatusCode::NOT_FOUND); } Ok(Json(UnregisterFileResponse { success: true, message: format!( "File {} unregistered successfully. Record deleted from database.", uuid ), file_uuid: uuid, deleted_face_detections: deleted_faces as u64, deleted_processor_results: deleted_processors as u64, deleted_chunks: (deleted_chunks + deleted_parent_chunks) as u64, })) }