Files
momentry_core/src/api/universal_search.rs
2026-05-08 01:14:28 +08:00

814 lines
25 KiB
Rust

//! Universal Search API
//! Unified search across chunks, frames, and persons.
use axum::{
extract::{Query, State},
http::StatusCode,
response::Json,
routing::{get, post},
Router,
};
use serde::{Deserialize, Serialize};
use crate::core::db::{Database, PostgresDb};
#[derive(Debug, Deserialize)]
pub struct UniversalSearchRequest {
pub query: String,
pub uuid: Option<String>,
#[serde(default)]
pub types: Vec<String>, // chunk, frame, person
pub time_range: Option<[f64; 2]>,
pub filters: Option<SearchFilters>,
pub limit: Option<usize>,
pub offset: Option<usize>,
}
#[derive(Debug, Deserialize, Clone)]
pub struct SearchFilters {
pub person_id: Option<String>,
pub object_class: Option<Vec<String>>,
pub ocr_text: Option<String>,
pub has_face: Option<bool>,
pub speaker_id: Option<String>,
// Visual chunk filters
pub min_confidence: Option<f32>,
pub min_unique_classes: Option<u32>,
pub min_spatial_density: Option<f32>,
pub max_spatial_density: Option<f32>,
pub required_object_classes: Option<Vec<String>>,
}
#[derive(Debug, Serialize)]
pub struct UniversalSearchResponse {
pub query: String,
pub results: Vec<SearchResult>,
pub total: usize,
pub took_ms: u64,
}
#[derive(Debug, Serialize, Clone)]
#[serde(tag = "type")]
pub enum SearchResult {
#[serde(rename = "chunk")]
Chunk {
chunk_id: String,
chunk_type: String,
// Primary: frame-accurate position
start_frame: i64,
end_frame: i64,
// Reference: time derived from frames (subject to FPS variation)
start_time: f64,
end_time: f64,
score: f64,
text: Option<String>,
speaker_id: Option<String>,
metadata: Option<serde_json::Value>,
},
#[serde(rename = "frame")]
Frame {
// Primary: exact frame number
frame_number: i64,
// Reference: time derived from frame (subject to FPS variation)
timestamp: f64,
score: f64,
objects: Option<Vec<serde_json::Value>>,
ocr_texts: Option<Vec<String>>,
faces: Option<Vec<serde_json::Value>>,
pose_persons: Option<Vec<serde_json::Value>>,
},
#[serde(rename = "person")]
Person {
person_id: String,
name: Option<String>,
speaker_id: Option<String>,
appearance_count: i32,
score: f64,
first_appearance_time: Option<f64>,
last_appearance_time: Option<f64>,
},
}
pub fn universal_search_routes() -> Router<crate::api::server::AppState> {
Router::new()
.route("/api/v1/search/universal", post(universal_search))
.route("/api/v1/search/frames", post(search_frames))
}
/// Unified search across all data types
pub async fn universal_search(
State(_state): State<crate::api::server::AppState>,
Json(req): Json<UniversalSearchRequest>,
) -> Result<Json<UniversalSearchResponse>, (StatusCode, Json<serde_json::Value>)> {
let start_time = std::time::Instant::now();
let db = PostgresDb::init().await.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": format!("DB error: {}", e) })),
)
})?;
let limit = req.limit.unwrap_or(20);
let offset = req.offset.unwrap_or(0);
let types = if req.types.is_empty() {
vec![
"chunk".to_string(),
"frame".to_string(),
"person".to_string(),
]
} else {
req.types.clone()
};
let mut results = Vec::new();
// Search chunks
if types.contains(&"chunk".to_string()) {
let chunk_results = search_chunks(&db, &req).await.map_err(|e| {
(
StatusCode::BAD_REQUEST,
Json(serde_json::json!({ "error": e.to_string() })),
)
})?;
results.extend(chunk_results);
}
// Search frames
if types.contains(&"frame".to_string()) {
let frame_results = search_frames_internal(&db, &req).await.unwrap_or_default();
results.extend(frame_results);
}
// Search persons
if types.contains(&"person".to_string()) {
let person_results = search_persons_internal(&db, &req).await.unwrap_or_default();
results.extend(person_results);
}
// Sort by score descending
results.sort_by(|a, b| {
let score_a = match a {
SearchResult::Chunk { score, .. } => *score,
SearchResult::Frame { score, .. } => *score,
SearchResult::Person { score, .. } => *score,
};
let score_b = match b {
SearchResult::Chunk { score, .. } => *score,
SearchResult::Frame { score, .. } => *score,
SearchResult::Person { score, .. } => *score,
};
score_b
.partial_cmp(&score_a)
.unwrap_or(std::cmp::Ordering::Equal)
});
let total = results.len();
let end = std::cmp::min(offset + limit, results.len());
let paginated = if offset < results.len() {
results[offset..end].to_vec()
} else {
vec![]
};
let took = start_time.elapsed().as_millis() as u64;
Ok(Json(UniversalSearchResponse {
query: req.query,
results: paginated,
total,
took_ms: took,
}))
}
/// Search frames by YOLO objects, OCR text, or face IDs
pub async fn search_frames(
State(_state): State<crate::api::server::AppState>,
Json(req): Json<FrameSearchRequest>,
) -> Result<Json<FrameSearchResponse>, (StatusCode, Json<serde_json::Value>)> {
let db = PostgresDb::init().await.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": format!("DB error: {}", e) })),
)
})?;
let frames = search_frames_internal_v2(&db, &req).await.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": format!("Search error: {}", e) })),
)
})?;
let frames_count = frames.len();
Ok(Json(FrameSearchResponse {
frames,
total: frames_count,
}))
}
/// Search persons by name or speaker_id
pub async fn search_persons(
State(_state): State<crate::api::server::AppState>,
Query(query): Query<PersonSearchQuery>,
) -> Result<Json<PersonSearchResponse>, (StatusCode, Json<serde_json::Value>)> {
let db = PostgresDb::init().await.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": format!("DB error: {}", e) })),
)
})?;
let limit = query.limit.unwrap_or(20);
let persons = search_persons_by_query(
&db,
&query.query,
query.min_appearances,
query.max_age,
limit,
)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": format!("Search error: {}", e) })),
)
})?;
let persons_count = persons.len();
Ok(Json(PersonSearchResponse {
persons,
total: persons_count,
}))
}
// --- Internal search functions ---
#[derive(Debug, Deserialize)]
pub struct FrameSearchRequest {
pub uuid: Option<String>,
pub object_class: Option<String>,
pub ocr_text: Option<String>,
pub face_id: Option<String>,
pub time_range: Option<[f64; 2]>,
pub limit: Option<usize>,
}
#[derive(Debug, Serialize)]
pub struct FrameSearchResponse {
pub frames: Vec<FrameResult>,
pub total: usize,
}
#[derive(Debug, Serialize)]
pub struct FrameResult {
pub frame_number: i64,
pub timestamp: f64,
pub uuid: String,
pub objects: Option<Vec<serde_json::Value>>,
pub ocr_texts: Option<Vec<String>>,
pub faces: Option<Vec<serde_json::Value>>,
pub pose_persons: Option<Vec<serde_json::Value>>,
}
#[derive(Debug, Deserialize)]
pub struct PersonSearchQuery {
pub file_uuid: String,
pub query: Option<String>,
pub min_appearances: Option<i32>,
pub max_age: Option<i32>, // New filter for "children"
pub limit: Option<usize>,
}
#[derive(Debug, Serialize)]
pub struct PersonSearchResponse {
pub persons: Vec<PersonResult>,
pub total: usize,
}
#[derive(Debug, Serialize)]
pub struct PersonResult {
pub person_id: String,
pub name: Option<String>,
pub character_name: Option<String>,
pub aliases: Option<Vec<String>>,
pub age: Option<i32>,
pub gender: Option<String>,
pub speaker_id: Option<String>,
pub appearance_count: i32,
pub first_appearance_time: Option<f64>,
pub last_appearance_time: Option<f64>,
}
async fn search_chunks(
db: &PostgresDb,
req: &UniversalSearchRequest,
) -> Result<Vec<SearchResult>, anyhow::Error> {
// uuid is required for chunk search - chunk_id is only unique within a video
let uuid = match &req.uuid {
Some(u) => u.replace('\'', "''"),
None => return Err(anyhow::anyhow!("uuid is required for chunk search")),
};
let mut sql = format!(
"SELECT chunk_id, chunk_type, start_time, end_time, start_frame, end_frame, text_content, content FROM chunks WHERE file_uuid = '{}'",
uuid
);
if let Some(tr) = &req.time_range {
sql.push_str(&format!(
" AND start_time >= {} AND end_time <= {}",
tr[0], tr[1]
));
}
if !req.query.is_empty() {
let q = req.query.replace('\'', "''");
sql.push_str(&format!(
" AND (text_content ILIKE '%{}%' OR content::text ILIKE '%{}%')",
q, q
));
}
if let Some(ref filters) = req.filters {
if let Some(ref speaker_id) = filters.speaker_id {
sql.push_str(&format!(
" AND content->>'speaker_id' = '{}'",
speaker_id.replace('\'', "''")
));
}
if let Some(ref person_id) = filters.person_id {
sql.push_str(&format!(
" AND content::text LIKE '%{}%'",
person_id.replace('\'', "''")
));
}
// Visual chunk filters
if let Some(min_confidence) = filters.min_confidence {
sql.push_str(&format!(
" AND (content->'metadata'->>'avg_confidence')::float >= {}",
min_confidence
));
}
if let Some(min_unique_classes) = filters.min_unique_classes {
sql.push_str(&format!(
" AND jsonb_array_length(content->'metadata'->'unique_classes') >= {}",
min_unique_classes
));
}
if let Some(min_density) = filters.min_spatial_density {
sql.push_str(&format!(
" AND (content->'metadata'->>'spatial_density')::float >= {}",
min_density
));
}
if let Some(max_density) = filters.max_spatial_density {
sql.push_str(&format!(
" AND (content->'metadata'->>'spatial_density')::float <= {}",
max_density
));
}
if let Some(ref required_classes) = filters.required_object_classes {
if !required_classes.is_empty() {
let class_conditions: Vec<String> = required_classes
.iter()
.map(|class| {
format!(
"content->'keyframe_objects' @> '[{{ \"class_name\": \"{}\"}}]'",
class.replace('\'', "''")
)
})
.collect();
sql.push_str(&format!(" AND ({})", class_conditions.join(" OR ")));
}
}
}
sql.push_str(" ORDER BY start_time ASC");
sql.push_str(&format!(" LIMIT {}", req.limit.unwrap_or(20)));
let rows: Vec<(
String,
String,
f64,
f64,
i64,
i64,
Option<String>,
Option<serde_json::Value>,
)> = sqlx::query_as(&sql).fetch_all(db.pool()).await?;
let results: Vec<SearchResult> = rows
.into_iter()
.map(
|(
chunk_id,
chunk_type,
start_time,
end_time,
start_frame,
end_frame,
text_content,
content,
)| {
let text = text_content.or_else(|| {
content
.as_ref()
.and_then(|c| c.get("text").and_then(|v| v.as_str()).map(String::from))
});
let speaker_id = content.as_ref().and_then(|c| {
c.get("speaker_id")
.and_then(|v| v.as_str())
.map(String::from)
});
// Simple scoring: if query matches, score 0.8
let score = if !req.query.is_empty()
&& text.as_ref().map_or(false, |t| {
t.to_lowercase().contains(&req.query.to_lowercase())
}) {
0.9
} else {
0.5
};
SearchResult::Chunk {
chunk_id,
chunk_type,
start_time,
end_time,
start_frame,
end_frame,
score,
text,
speaker_id,
metadata: content,
}
},
)
.collect();
Ok(results)
}
async fn search_frames_internal(
db: &PostgresDb,
req: &UniversalSearchRequest,
) -> Result<Vec<SearchResult>, anyhow::Error> {
let table = "frames";
let video_table = "videos";
let mut sql = format!(
"SELECT f.frame_number, f.timestamp, f.yolo_objects, f.ocr_results, f.face_results, f.pose_results, v.file_uuid
FROM {} f JOIN {} v ON f.file_id = v.id WHERE 1=1",
table, video_table
);
if let Some(uuid) = &req.uuid {
sql.push_str(&format!(" AND v.file_uuid = '{}'", uuid));
}
if let Some(tr) = &req.time_range {
sql.push_str(&format!(
" AND f.timestamp >= {} AND f.timestamp <= {}",
tr[0], tr[1]
));
}
if let Some(ref filters) = req.filters {
if let Some(ref classes) = filters.object_class {
for class in classes {
sql.push_str(&format!(" AND f.yolo_objects::text ILIKE '%{}%'", class));
}
}
if let Some(ref ocr) = filters.ocr_text {
sql.push_str(&format!(" AND f.ocr_results::text ILIKE '%{}%'", ocr));
}
if let Some(true) = filters.has_face {
sql.push_str(
" AND f.face_results IS NOT NULL AND jsonb_array_length(f.face_results) > 0",
);
}
if let Some(ref person_id) = filters.person_id {
sql.push_str(&format!(" AND f.face_results::text LIKE '%{}%'", person_id));
}
}
if !req.query.is_empty() {
// Search across all frame data
sql.push_str(&format!(
" AND (f.yolo_objects::text ILIKE '%{}%' OR f.ocr_results::text ILIKE '%{}%' OR f.face_results::text ILIKE '%{}%')",
req.query, req.query, req.query
));
}
sql.push_str(" ORDER BY f.timestamp ASC");
sql.push_str(&format!(" LIMIT {}", req.limit.unwrap_or(20)));
let rows: Vec<(
i64,
f64,
Option<serde_json::Value>,
Option<serde_json::Value>,
Option<serde_json::Value>,
Option<serde_json::Value>,
String,
)> = sqlx::query_as(&sql).fetch_all(db.pool()).await?;
let results: Vec<SearchResult> = rows
.into_iter()
.map(|(frame_number, timestamp, yolo, ocr, face, pose, _uuid)| {
let objects = yolo.as_ref().and_then(|v| {
v.get("objects")
.map(|o| o.as_array().cloned().unwrap_or_default())
});
let ocr_texts = ocr.as_ref().and_then(|v| {
v.get("texts").and_then(|t| {
t.as_array().map(|arr| {
arr.iter()
.filter_map(|item| {
item.get("text").and_then(|x| x.as_str()).map(String::from)
})
.collect()
})
})
});
let faces = face.as_ref().and_then(|v| {
v.get("faces")
.map(|f| f.as_array().cloned().unwrap_or_default())
});
let pose_persons = pose.as_ref().and_then(|v| {
v.get("persons")
.map(|p| p.as_array().cloned().unwrap_or_default())
});
SearchResult::Frame {
frame_number,
timestamp,
score: 0.7,
objects: objects.map(|arr| arr.iter().map(|v| v.clone()).collect()),
ocr_texts,
faces,
pose_persons,
}
})
.collect();
Ok(results)
}
async fn search_persons_internal(
db: &PostgresDb,
req: &UniversalSearchRequest,
) -> Result<Vec<SearchResult>, anyhow::Error> {
let table = "person_identities";
let mut sql = format!(
"SELECT person_id, name, speaker_id, appearance_count, first_appearance_time, last_appearance_time FROM {} WHERE 1=1",
table
);
if !req.query.is_empty() {
sql.push_str(&format!(
" AND (name ILIKE '%{}%' OR person_id ILIKE '%{}%' OR speaker_id ILIKE '%{}%')",
req.query, req.query, req.query
));
}
if let Some(ref filters) = req.filters {
if let Some(ref speaker_id) = filters.speaker_id {
sql.push_str(&format!(" AND speaker_id = '{}'", speaker_id));
}
if let Some(ref person_id) = filters.person_id {
sql.push_str(&format!(" AND person_id = '{}'", person_id));
}
}
sql.push_str(" ORDER BY appearance_count DESC");
sql.push_str(&format!(" LIMIT {}", req.limit.unwrap_or(20)));
let rows: Vec<(
String,
Option<String>,
Option<String>,
i32,
Option<f64>,
Option<f64>,
)> = sqlx::query_as(&sql).fetch_all(db.pool()).await?;
let results: Vec<SearchResult> = rows
.into_iter()
.map(
|(person_id, name, speaker_id, appearance_count, first_time, last_time)| {
let score = if !req.query.is_empty()
&& name.as_ref().map_or(false, |n| {
n.to_lowercase().contains(&req.query.to_lowercase())
}) {
0.95
} else {
0.5
};
SearchResult::Person {
person_id,
name,
speaker_id,
appearance_count,
score,
first_appearance_time: first_time,
last_appearance_time: last_time,
}
},
)
.collect();
Ok(results)
}
async fn search_frames_internal_v2(
db: &PostgresDb,
req: &FrameSearchRequest,
) -> Result<Vec<FrameResult>, anyhow::Error> {
let table = "frames";
let video_table = "videos";
let mut sql = format!(
"SELECT f.frame_number, f.timestamp, f.yolo_objects, f.ocr_results, f.face_results, f.pose_results, v.file_uuid
FROM {} f JOIN {} v ON f.file_id = v.id WHERE 1=1",
table, video_table
);
if let Some(uuid) = &req.uuid {
sql.push_str(&format!(" AND v.file_uuid = '{}'", uuid));
}
if let Some(tr) = &req.time_range {
sql.push_str(&format!(
" AND f.timestamp >= {} AND f.timestamp <= {}",
tr[0], tr[1]
));
}
if let Some(ref class) = req.object_class {
sql.push_str(&format!(" AND f.yolo_objects::text ILIKE '%{}%'", class));
}
if let Some(ref ocr) = req.ocr_text {
sql.push_str(&format!(" AND f.ocr_results::text ILIKE '%{}%'", ocr));
}
if let Some(ref face_id) = req.face_id {
sql.push_str(&format!(" AND f.face_results::text LIKE '%{}%'", face_id));
}
sql.push_str(" ORDER BY f.timestamp ASC");
sql.push_str(&format!(" LIMIT {}", req.limit.unwrap_or(50)));
let rows: Vec<(
i64,
f64,
Option<serde_json::Value>,
Option<serde_json::Value>,
Option<serde_json::Value>,
Option<serde_json::Value>,
String,
)> = sqlx::query_as(&sql).fetch_all(db.pool()).await?;
let results: Vec<FrameResult> = rows
.into_iter()
.map(|(frame_number, timestamp, yolo, ocr, face, pose, uuid)| {
let objects = yolo.as_ref().and_then(|v| {
v.get("objects")
.map(|o| o.as_array().cloned().unwrap_or_default())
});
let ocr_texts = ocr.as_ref().and_then(|v| {
v.get("texts").and_then(|t| {
t.as_array().map(|arr| {
arr.iter()
.filter_map(|item| {
item.get("text").and_then(|x| x.as_str()).map(String::from)
})
.collect()
})
})
});
let faces = face.as_ref().and_then(|v| {
v.get("faces")
.map(|f| f.as_array().cloned().unwrap_or_default())
});
let pose_persons = pose.as_ref().and_then(|v| {
v.get("persons")
.map(|p| p.as_array().cloned().unwrap_or_default())
});
FrameResult {
frame_number,
timestamp,
uuid,
objects: objects.map(|arr| arr.iter().map(|v| v.clone()).collect()),
ocr_texts,
faces,
pose_persons,
}
})
.collect();
Ok(results)
}
async fn search_persons_by_query(
db: &PostgresDb,
query: &Option<String>,
min_appearances: Option<i32>,
max_age: Option<i32>,
limit: usize,
) -> Result<Vec<PersonResult>, anyhow::Error> {
let table = "person_identities";
let mut sql = format!(
"SELECT person_id, name, character_name, aliases, age, gender, speaker_id, appearance_count, first_appearance_time, last_appearance_time FROM {} WHERE 1=1",
table
);
if let Some(ref q) = query {
// Search name, character_name, aliases (cast to text), person_id, speaker_id
sql.push_str(&format!(
" AND (name ILIKE '%{}%' OR character_name ILIKE '%{}%' OR aliases::text ILIKE '%{}%' OR person_id ILIKE '%{}%' OR speaker_id ILIKE '%{}%')",
q, q, q, q, q
));
}
if let Some(min) = min_appearances {
sql.push_str(&format!(" AND appearance_count >= {}", min));
}
if let Some(max_a) = max_age {
// Strictly filter for age <= max_age.
// Note: This excludes entries with NULL age.
sql.push_str(&format!(" AND age <= {}", max_a));
}
sql.push_str(" ORDER BY appearance_count DESC");
sql.push_str(&format!(" LIMIT {}", limit));
let rows: Vec<(
String,
Option<String>,
Option<String>,
Option<serde_json::Value>,
Option<i32>,
Option<String>,
Option<String>,
i32,
Option<f64>,
Option<f64>,
)> = sqlx::query_as(&sql).fetch_all(db.pool()).await?;
let results: Vec<PersonResult> = rows
.into_iter()
.map(
|(
person_id,
name,
character_name,
aliases_json,
age,
gender,
speaker_id,
appearance_count,
first_time,
last_time,
)| {
let aliases = aliases_json.and_then(|v| {
v.as_array().map(|arr| {
arr.iter()
.filter_map(|val| val.as_str().map(String::from))
.collect()
})
});
PersonResult {
person_id,
name,
character_name,
aliases,
age,
gender,
speaker_id,
appearance_count,
first_appearance_time: first_time,
last_appearance_time: last_time,
}
},
)
.collect();
Ok(results)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_search_filters_with_visual() {
let filters = SearchFilters {
person_id: None,
object_class: None,
ocr_text: None,
has_face: None,
speaker_id: None,
min_confidence: Some(0.8),
min_unique_classes: Some(3),
min_spatial_density: Some(0.5),
max_spatial_density: Some(0.9),
required_object_classes: Some(vec!["person".to_string()]),
};
assert_eq!(filters.min_confidence, Some(0.8));
}
}