//! Universal Search API //! Unified search across chunks, frames, and persons. use axum::{ extract::{Query, State}, http::StatusCode, response::Json, routing::{get, post}, Router, }; use serde::{Deserialize, Serialize}; use crate::core::db::{Database, PostgresDb}; #[derive(Debug, Deserialize)] pub struct UniversalSearchRequest { pub query: String, pub uuid: Option, #[serde(default)] pub types: Vec, // chunk, frame, person pub time_range: Option<[f64; 2]>, pub filters: Option, pub page: Option, pub page_size: Option, pub limit: Option, pub offset: Option, } #[derive(Debug, Deserialize, Clone)] pub struct SearchFilters { pub person_id: Option, pub object_class: Option>, pub ocr_text: Option, pub has_face: Option, pub speaker_id: Option, /// 指定 chunk_type:如 "sentence", "cut", "trace", "visual" pub chunk_type: Option, /// 搜尋與指定 trace_id 有時間重疊的 trace chunk pub co_appears_with_trace_id: Option, // Visual chunk filters pub min_confidence: Option, pub min_unique_classes: Option, pub min_spatial_density: Option, pub max_spatial_density: Option, pub required_object_classes: Option>, } #[derive(Debug, Serialize)] pub struct UniversalSearchResponse { pub query: String, pub results: Vec, pub total: usize, pub page: usize, pub page_size: usize, pub took_ms: u64, } #[derive(Debug, Serialize, Clone)] #[serde(tag = "type")] pub enum SearchResult { #[serde(rename = "chunk")] Chunk { chunk_id: String, chunk_type: String, // Primary: frame-accurate position start_frame: i64, end_frame: i64, // Reference: time derived from frames (subject to FPS variation) start_time: f64, end_time: f64, score: f64, text: Option, speaker_id: Option, metadata: Option, }, #[serde(rename = "frame")] Frame { // Primary: exact frame number frame_number: i64, // Reference: time derived from frame (subject to FPS variation) timestamp: f64, score: f64, objects: Option>, ocr_texts: Option>, faces: Option>, pose_persons: Option>, }, #[serde(rename = "person")] Person { person_id: String, name: Option, speaker_id: Option, appearance_count: i32, score: f64, first_appearance_time: Option, last_appearance_time: Option, }, } pub fn universal_search_routes() -> Router { Router::new() .route("/api/v1/search/universal", post(universal_search)) .route("/api/v1/search/frames", post(search_frames)) } /// Unified search across all data types pub async fn universal_search( State(_state): State, Json(req): Json, ) -> Result, (StatusCode, Json)> { let start_time = std::time::Instant::now(); let db = PostgresDb::init().await.map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": format!("DB error: {}", e) })), ) })?; let page = req.page.unwrap_or(1).max(1); let page_size = req.page_size.unwrap_or(20).max(1).min(200); // Backward compat: if old `offset` is used without `page`, derive from offset let offset = if req.page.is_none() && req.offset.is_some() { req.offset.unwrap() } else { (page - 1) * page_size }; let types = if req.types.is_empty() { vec![ "chunk".to_string(), "frame".to_string(), "person".to_string(), ] } else { req.types.clone() }; let mut results = Vec::new(); // Search chunks if types.contains(&"chunk".to_string()) { let chunk_results = search_chunks(&db, &req).await.map_err(|e| { ( StatusCode::BAD_REQUEST, Json(serde_json::json!({ "error": e.to_string() })), ) })?; results.extend(chunk_results); } // Search frames if types.contains(&"frame".to_string()) { let frame_results = search_frames_internal(&db, &req).await.unwrap_or_default(); results.extend(frame_results); } // Search persons if types.contains(&"person".to_string()) { let person_results = search_persons_internal(&db, &req).await.unwrap_or_default(); results.extend(person_results); } // Sort by score descending results.sort_by(|a, b| { let score_a = match a { SearchResult::Chunk { score, .. } => *score, SearchResult::Frame { score, .. } => *score, SearchResult::Person { score, .. } => *score, }; let score_b = match b { SearchResult::Chunk { score, .. } => *score, SearchResult::Frame { score, .. } => *score, SearchResult::Person { score, .. } => *score, }; score_b .partial_cmp(&score_a) .unwrap_or(std::cmp::Ordering::Equal) }); let total = results.len(); let effective_limit = req.limit.unwrap_or(usize::MAX); let end = std::cmp::min(offset + page_size, results.len()).min(effective_limit); let paginated = if offset < results.len() { results[offset..end].to_vec() } else { vec![] }; let took = start_time.elapsed().as_millis() as u64; Ok(Json(UniversalSearchResponse { query: req.query, results: paginated, total, page, page_size, took_ms: took, })) } /// Search frames by YOLO objects, OCR text, or face IDs pub async fn search_frames( State(_state): State, Json(req): Json, ) -> Result, (StatusCode, Json)> { let db = PostgresDb::init().await.map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": format!("DB error: {}", e) })), ) })?; let frames = search_frames_internal_v2(&db, &req).await.map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": format!("Search error: {}", e) })), ) })?; let frames_count = frames.len(); Ok(Json(FrameSearchResponse { frames, total: frames_count, })) } /// Search persons by name or speaker_id pub async fn search_persons( State(_state): State, Query(query): Query, ) -> Result, (StatusCode, Json)> { let db = PostgresDb::init().await.map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": format!("DB error: {}", e) })), ) })?; let limit = query.limit.unwrap_or(20); let persons = search_persons_by_query( &db, &query.query, query.min_appearances, query.max_age, limit, ) .await .map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": format!("Search error: {}", e) })), ) })?; let persons_count = persons.len(); Ok(Json(PersonSearchResponse { persons, total: persons_count, })) } // --- Internal search functions --- #[derive(Debug, Deserialize)] pub struct FrameSearchRequest { pub uuid: Option, pub object_class: Option, pub ocr_text: Option, pub face_id: Option, pub time_range: Option<[f64; 2]>, pub limit: Option, } #[derive(Debug, Serialize)] pub struct FrameSearchResponse { pub frames: Vec, pub total: usize, } #[derive(Debug, Serialize)] pub struct FrameResult { pub frame_number: i64, pub timestamp: f64, pub uuid: String, pub objects: Option>, pub ocr_texts: Option>, pub faces: Option>, pub pose_persons: Option>, } #[derive(Debug, Deserialize)] pub struct PersonSearchQuery { pub file_uuid: String, pub query: Option, pub min_appearances: Option, pub max_age: Option, // New filter for "children" pub limit: Option, } #[derive(Debug, Serialize)] pub struct PersonSearchResponse { pub persons: Vec, pub total: usize, } #[derive(Debug, Serialize)] pub struct PersonResult { pub person_id: String, pub name: Option, pub character_name: Option, pub aliases: Option>, pub age: Option, pub gender: Option, pub speaker_id: Option, pub appearance_count: i32, pub first_appearance_time: Option, pub last_appearance_time: Option, } async fn search_chunks( db: &PostgresDb, req: &UniversalSearchRequest, ) -> Result, anyhow::Error> { // uuid is required for chunk search - chunk_id is only unique within a video let uuid = match &req.uuid { Some(u) => u.replace('\'', "''"), None => return Err(anyhow::anyhow!("uuid is required for chunk search")), }; let mut sql = format!( "SELECT chunk_id, chunk_type, start_time, end_time, start_frame, end_frame, text_content, content FROM dev.chunk WHERE file_uuid = '{}'", uuid ); if let Some(tr) = &req.time_range { sql.push_str(&format!( " AND start_time >= {} AND end_time <= {}", tr[0], tr[1] )); } if !req.query.is_empty() { let q = req.query.replace('\'', "''"); sql.push_str(&format!( " AND (text_content ILIKE '%{}%' OR content::text ILIKE '%{}%')", q, q )); } if let Some(ref filters) = req.filters { if let Some(ref speaker_id) = filters.speaker_id { sql.push_str(&format!( " AND content->>'speaker_id' = '{}'", speaker_id.replace('\'', "''") )); } if let Some(ref person_id) = filters.person_id { sql.push_str(&format!( " AND content::text LIKE '%{}%'", person_id.replace('\'', "''") )); } // Visual chunk filters if let Some(min_confidence) = filters.min_confidence { sql.push_str(&format!( " AND (content->'metadata'->>'avg_confidence')::float >= {}", min_confidence )); } if let Some(min_unique_classes) = filters.min_unique_classes { sql.push_str(&format!( " AND jsonb_array_length(content->'metadata'->'unique_classes') >= {}", min_unique_classes )); } if let Some(min_density) = filters.min_spatial_density { sql.push_str(&format!( " AND (content->'metadata'->>'spatial_density')::float >= {}", min_density )); } if let Some(max_density) = filters.max_spatial_density { sql.push_str(&format!( " AND (content->'metadata'->>'spatial_density')::float <= {}", max_density )); } if let Some(ref required_classes) = filters.required_object_classes { if !required_classes.is_empty() { let class_conditions: Vec = required_classes .iter() .map(|class| { format!( "content->'keyframe_objects' @> '[{{ \"class_name\": \"{}\"}}]'", class.replace('\'', "''") ) }) .collect(); sql.push_str(&format!(" AND ({})", class_conditions.join(" OR "))); } } if let Some(ref chunk_type) = filters.chunk_type { sql.push_str(&format!( " AND chunk_type = '{}'", chunk_type.replace('\'', "''") )); } if let Some(trace_id) = filters.co_appears_with_trace_id { sql.push_str(&format!( " AND metadata->'co_appearances' @> '[{{ \"trace_id\": {} }}]'", trace_id )); } } sql.push_str(" ORDER BY start_time ASC"); sql.push_str(&format!(" LIMIT {}", req.page_size.unwrap_or(20))); let rows: Vec<( String, String, f64, f64, i64, i64, Option, Option, )> = sqlx::query_as(&sql).fetch_all(db.pool()).await?; let results: Vec = rows .into_iter() .map( |( chunk_id, chunk_type, start_time, end_time, start_frame, end_frame, text_content, content, )| { let text = text_content.or_else(|| { content .as_ref() .and_then(|c| c.get("text").and_then(|v| v.as_str()).map(String::from)) }); let speaker_id = content.as_ref().and_then(|c| { c.get("speaker_id") .and_then(|v| v.as_str()) .map(String::from) }); // Simple scoring: if query matches, score 0.8 let score = if !req.query.is_empty() && text.as_ref().map_or(false, |t| { t.to_lowercase().contains(&req.query.to_lowercase()) }) { 0.9 } else { 0.5 }; SearchResult::Chunk { chunk_id, chunk_type, start_time, end_time, start_frame, end_frame, score, text, speaker_id, metadata: content, } }, ) .collect(); Ok(results) } async fn search_frames_internal( db: &PostgresDb, req: &UniversalSearchRequest, ) -> Result, anyhow::Error> { let table = "frames"; let video_table = "videos"; let mut sql = format!( "SELECT f.frame_number, f.timestamp, f.yolo_objects, f.ocr_results, f.face_results, v.file_uuid FROM {} f JOIN {} v ON f.file_id = v.id WHERE 1=1", table, video_table ); if let Some(uuid) = &req.uuid { sql.push_str(&format!(" AND v.file_uuid = '{}'", uuid)); } if let Some(tr) = &req.time_range { sql.push_str(&format!( " AND f.timestamp >= {} AND f.timestamp <= {}", tr[0], tr[1] )); } if let Some(ref filters) = req.filters { if let Some(ref classes) = filters.object_class { for class in classes { sql.push_str(&format!(" AND f.yolo_objects::text ILIKE '%{}%'", class)); } } if let Some(ref ocr) = filters.ocr_text { sql.push_str(&format!(" AND f.ocr_results::text ILIKE '%{}%'", ocr)); } if let Some(true) = filters.has_face { sql.push_str( " AND f.face_results IS NOT NULL AND jsonb_array_length(f.face_results) > 0", ); } if let Some(ref person_id) = filters.person_id { sql.push_str(&format!(" AND f.face_results::text LIKE '%{}%'", person_id)); } } if !req.query.is_empty() { // Search across all frame data sql.push_str(&format!( " AND (f.yolo_objects::text ILIKE '%{}%' OR f.ocr_results::text ILIKE '%{}%' OR f.face_results::text ILIKE '%{}%')", req.query, req.query, req.query )); } sql.push_str(" ORDER BY f.timestamp ASC"); sql.push_str(&format!(" LIMIT {}", req.page_size.unwrap_or(20))); let rows: Vec<( i64, f64, Option, Option, Option, String, )> = sqlx::query_as(&sql).fetch_all(db.pool()).await?; let results: Vec = rows .into_iter() .map(|(frame_number, timestamp, yolo, ocr, face, _uuid)| { let objects = yolo.as_ref().and_then(|v| { v.get("objects") .map(|o| o.as_array().cloned().unwrap_or_default()) }); let ocr_texts = ocr.as_ref().and_then(|v| { v.get("texts").and_then(|t| { t.as_array().map(|arr| { arr.iter() .filter_map(|item| { item.get("text").and_then(|x| x.as_str()).map(String::from) }) .collect() }) }) }); let faces = face.as_ref().and_then(|v| { v.get("faces") .map(|f| f.as_array().cloned().unwrap_or_default()) }); SearchResult::Frame { frame_number, timestamp, score: 0.7, objects: objects.map(|arr| arr.iter().map(|v| v.clone()).collect()), ocr_texts, faces, pose_persons: None, } }) .collect(); Ok(results) } async fn search_persons_internal( db: &PostgresDb, req: &UniversalSearchRequest, ) -> Result, anyhow::Error> { let table = "person_identities"; let mut sql = format!( "SELECT person_id, name, speaker_id, appearance_count, first_appearance_time, last_appearance_time FROM {} WHERE 1=1", table ); if !req.query.is_empty() { sql.push_str(&format!( " AND (name ILIKE '%{}%' OR person_id ILIKE '%{}%' OR speaker_id ILIKE '%{}%')", req.query, req.query, req.query )); } if let Some(ref filters) = req.filters { if let Some(ref speaker_id) = filters.speaker_id { sql.push_str(&format!(" AND speaker_id = '{}'", speaker_id)); } if let Some(ref person_id) = filters.person_id { sql.push_str(&format!(" AND person_id = '{}'", person_id)); } } sql.push_str(" ORDER BY appearance_count DESC"); sql.push_str(&format!(" LIMIT {}", req.page_size.unwrap_or(20))); let rows: Vec<( String, Option, Option, i32, Option, Option, )> = sqlx::query_as(&sql).fetch_all(db.pool()).await?; let results: Vec = rows .into_iter() .map( |(person_id, name, speaker_id, appearance_count, first_time, last_time)| { let score = if !req.query.is_empty() && name.as_ref().map_or(false, |n| { n.to_lowercase().contains(&req.query.to_lowercase()) }) { 0.95 } else { 0.5 }; SearchResult::Person { person_id, name, speaker_id, appearance_count, score, first_appearance_time: first_time, last_appearance_time: last_time, } }, ) .collect(); Ok(results) } async fn search_frames_internal_v2( db: &PostgresDb, req: &FrameSearchRequest, ) -> Result, anyhow::Error> { let table = "frames"; let video_table = "videos"; let mut sql = format!( "SELECT f.frame_number, f.timestamp, f.yolo_objects, f.ocr_results, f.face_results, v.file_uuid FROM {} f JOIN {} v ON f.file_id = v.id WHERE 1=1", table, video_table ); if let Some(uuid) = &req.uuid { sql.push_str(&format!(" AND v.file_uuid = '{}'", uuid)); } if let Some(tr) = &req.time_range { sql.push_str(&format!( " AND f.timestamp >= {} AND f.timestamp <= {}", tr[0], tr[1] )); } if let Some(ref class) = req.object_class { sql.push_str(&format!(" AND f.yolo_objects::text ILIKE '%{}%'", class)); } if let Some(ref ocr) = req.ocr_text { sql.push_str(&format!(" AND f.ocr_results::text ILIKE '%{}%'", ocr)); } if let Some(ref face_id) = req.face_id { sql.push_str(&format!(" AND f.face_results::text LIKE '%{}%'", face_id)); } sql.push_str(" ORDER BY f.timestamp ASC"); sql.push_str(&format!(" LIMIT {}", req.limit.unwrap_or(50))); let rows: Vec<( i64, f64, Option, Option, Option, String, )> = sqlx::query_as(&sql).fetch_all(db.pool()).await?; let results: Vec = rows .into_iter() .map(|(frame_number, timestamp, yolo, ocr, face, uuid)| { let objects = yolo.as_ref().and_then(|v| { v.get("objects") .map(|o| o.as_array().cloned().unwrap_or_default()) }); let ocr_texts = ocr.as_ref().and_then(|v| { v.get("texts").and_then(|t| { t.as_array().map(|arr| { arr.iter() .filter_map(|item| { item.get("text").and_then(|x| x.as_str()).map(String::from) }) .collect() }) }) }); let faces = face.as_ref().and_then(|v| { v.get("faces") .map(|f| f.as_array().cloned().unwrap_or_default()) }); FrameResult { frame_number, timestamp, uuid, objects: objects.map(|arr| arr.iter().map(|v| v.clone()).collect()), ocr_texts, faces, pose_persons: None, } }) .collect(); Ok(results) } async fn search_persons_by_query( db: &PostgresDb, query: &Option, min_appearances: Option, max_age: Option, limit: usize, ) -> Result, anyhow::Error> { let table = "person_identities"; let mut sql = format!( "SELECT person_id, name, character_name, aliases, age, gender, speaker_id, appearance_count, first_appearance_time, last_appearance_time FROM {} WHERE 1=1", table ); if let Some(ref q) = query { // Search name, character_name, aliases (cast to text), person_id, speaker_id sql.push_str(&format!( " AND (name ILIKE '%{}%' OR character_name ILIKE '%{}%' OR aliases::text ILIKE '%{}%' OR person_id ILIKE '%{}%' OR speaker_id ILIKE '%{}%')", q, q, q, q, q )); } if let Some(min) = min_appearances { sql.push_str(&format!(" AND appearance_count >= {}", min)); } if let Some(max_a) = max_age { // Strictly filter for age <= max_age. // Note: This excludes entries with NULL age. sql.push_str(&format!(" AND age <= {}", max_a)); } sql.push_str(" ORDER BY appearance_count DESC"); sql.push_str(&format!(" LIMIT {}", limit)); let rows: Vec<( String, Option, Option, Option, Option, Option, Option, i32, Option, Option, )> = sqlx::query_as(&sql).fetch_all(db.pool()).await?; let results: Vec = rows .into_iter() .map( |( person_id, name, character_name, aliases_json, age, gender, speaker_id, appearance_count, first_time, last_time, )| { let aliases = aliases_json.and_then(|v| { v.as_array().map(|arr| { arr.iter() .filter_map(|val| val.as_str().map(String::from)) .collect() }) }); PersonResult { person_id, name, character_name, aliases, age, gender, speaker_id, appearance_count, first_appearance_time: first_time, last_appearance_time: last_time, } }, ) .collect(); Ok(results) } #[cfg(test)] mod tests { use super::*; #[test] fn test_search_filters_with_visual() { let filters = SearchFilters { person_id: None, object_class: None, ocr_text: None, has_face: None, speaker_id: None, min_confidence: Some(0.8), min_unique_classes: Some(3), min_spatial_density: Some(0.5), max_spatial_density: Some(0.9), required_object_classes: Some(vec!["person".to_string()]), }; assert_eq!(filters.min_confidence, Some(0.8)); } }