From b902763d45ba62df713b1d1e958710b6abd32d05 Mon Sep 17 00:00:00 2001 From: Accusys Date: Sat, 9 May 2026 06:18:32 +0800 Subject: [PATCH] feat: trace chunks with co-appearance relationships - New trace_ingest module: creates chunks for each face trace (time + bbox + ASR text) - Computes pairwise time overlaps between traces -> co_appearances in metadata - Worker auto-triggers after face trace store + Qdrant sync - SearchFilters: chunk_type filter (sentence/cut/trace/visual) - SearchFilters: co_appears_with_trace_id filter --- docs/TRACE_SEARCH_API_DESIGN.md | 101 +++++++++++++++ src/api/universal_search.rs | 41 +++++- src/core/chunk/mod.rs | 2 + src/core/chunk/trace_ingest.rs | 222 ++++++++++++++++++++++++++++++++ src/worker/job_worker.rs | 13 ++ 5 files changed, 373 insertions(+), 6 deletions(-) create mode 100644 docs/TRACE_SEARCH_API_DESIGN.md create mode 100644 src/core/chunk/trace_ingest.rs diff --git a/docs/TRACE_SEARCH_API_DESIGN.md b/docs/TRACE_SEARCH_API_DESIGN.md new file mode 100644 index 0000000..6d1db97 --- /dev/null +++ b/docs/TRACE_SEARCH_API_DESIGN.md @@ -0,0 +1,101 @@ +# Trace Search API 設計 + +## 概念 + +trace 是一種 chunk。 + +現有的 chunk_type: `cut`, `sentence`, `visual`, `story` +新增 chunk_type: `trace` + +每個 trace(人物跨 frame 追蹤軌跡)就是一個時間區間 + 區間內的 ASR text。 +跟其他 chunk 完全一樣,只是切分維度不同: +- cut chunk = 鏡頭切換 +- sentence chunk = 語句邊界 +- visual chunk = 畫面物體組合 +- **trace chunk = 人物出現區間 + 當下 spoken text** + +這樣 trace 可以直接放進現有的 `chunks` 表,共用 embedding、搜尋、Qdrant sync 整套機制,不需要任何新 table。 + +## chunks 表現有結構 + +```sql +chunks ( + id, file_uuid, chunk_type, -- 'trace' 新增 + start_frame, end_frame, start_time, end_time, + text_content, -- trace 區間的 ASR text + embedding, -- text_content 的 pgvector + metadata JSONB, -- { trace_id, face_count, identity_id, identity_name } + ... +) +``` + +## 資料產生流程(worker 擴充) + +在 face processing + `store_traced_faces.py` 完成後: + +1. 查詢 `face_detections` 聚合每個 trace 的 `MIN(frame)`, `MAX(frame)`, `COUNT(*)` +2. 對每個 trace,查詢 `pre_chunks WHERE processor_type='asr'` 中與 trace time range 重疊的 text +3. 彙整 text → EmbeddingGemma 產生 `embedding` +4. 寫入 `chunks`(`chunk_type='trace'`),metadata 含 `trace_id`, `face_count`, `identity_id` +5. embedding 自動進 Qdrant(與既有 chunk 同一 collection) + +## Search API 擴充 + +Universal Search 的 `types` 原本就支援 `"chunk"`。 +在 chunk 搜尋中過濾 `chunk_type = 'trace'` 即可。 + +**Request**: +```json +{ + "query": "open the door", + "types": ["chunk"], + "filters": { "chunk_type": "trace" }, + "uuid": "aeed71342a899fe4b4c57b7d41bcb692", + "page": 1, + "page_size": 20 +} +``` + +**Response**(與既有 Chunk result 相同): +```json +{ + "type": "chunk", + "chunk_id": "chunk_42", + "chunk_type": "trace", + "start_frame": 45200, "end_frame": 45900, + "start_time": 1808.0, "end_time": 1836.0, + "score": 0.87, + "text": "Open the door. Come on, hurry up.", + "metadata": { + "trace_id": 5, + "face_count": 42, + "identity_name": "Audrey Hepburn" + } +} +``` + +完全沿用既有的 `SearchResult::Chunk` variant,不用新增 enum variant。 + +### 搜尋語法 + +```sql +SELECT c.* +FROM dev.chunks c +WHERE c.file_uuid = $1 + AND c.chunk_type = 'trace' + AND c.embedding IS NOT NULL +ORDER BY c.embedding <=> $2 +LIMIT $3; +``` + +## 總結 + +| 項目 | 作法 | +|------|------| +| 新 table | ❌ 不需要 | +| 新 enum variant | ❌ 不需要 | +| SearchResult 改動 | ❌ 不需要 | +| chunk_type 新增 | ✅ `'trace'` | +| worker 擴充 | ✅ 產生 trace chunk (face done 後) | +| SearchFilters 擴充 | ✅ 加 `chunk_type` filter | +| Qdrant | ✅ 自動(既有 chunk collection) | diff --git a/src/api/universal_search.rs b/src/api/universal_search.rs index 054a1f4..2fc9520 100644 --- a/src/api/universal_search.rs +++ b/src/api/universal_search.rs @@ -20,6 +20,8 @@ pub struct UniversalSearchRequest { pub types: Vec, // chunk, frame, person pub time_range: Option<[f64; 2]>, pub filters: Option, + pub page: Option, + pub page_size: Option, pub limit: Option, pub offset: Option, } @@ -31,6 +33,10 @@ pub struct SearchFilters { pub ocr_text: Option, pub has_face: Option, pub speaker_id: Option, + /// 指定 chunk_type:如 "sentence", "cut", "trace", "visual" + pub chunk_type: Option, + /// 搜尋與指定 trace_id 有時間重疊的 trace chunk + pub co_appears_with_trace_id: Option, // Visual chunk filters pub min_confidence: Option, pub min_unique_classes: Option, @@ -44,6 +50,8 @@ pub struct UniversalSearchResponse { pub query: String, pub results: Vec, pub total: usize, + pub page: usize, + pub page_size: usize, pub took_ms: u64, } @@ -108,8 +116,14 @@ pub async fn universal_search( ) })?; - let limit = req.limit.unwrap_or(20); - let offset = req.offset.unwrap_or(0); + let page = req.page.unwrap_or(1).max(1); + let page_size = req.page_size.unwrap_or(20).max(1).min(200); + // Backward compat: if old `offset` is used without `page`, derive from offset + let offset = if req.page.is_none() && req.offset.is_some() { + req.offset.unwrap() + } else { + (page - 1) * page_size + }; let types = if req.types.is_empty() { vec![ "chunk".to_string(), @@ -163,7 +177,8 @@ pub async fn universal_search( }); let total = results.len(); - let end = std::cmp::min(offset + limit, results.len()); + let effective_limit = req.limit.unwrap_or(usize::MAX); + let end = std::cmp::min(offset + page_size, results.len()).min(effective_limit); let paginated = if offset < results.len() { results[offset..end].to_vec() } else { @@ -176,6 +191,8 @@ pub async fn universal_search( query: req.query, results: paginated, total, + page, + page_size, took_ms: took, })) } @@ -378,10 +395,22 @@ async fn search_chunks( sql.push_str(&format!(" AND ({})", class_conditions.join(" OR "))); } } + if let Some(ref chunk_type) = filters.chunk_type { + sql.push_str(&format!( + " AND chunk_type = '{}'", + chunk_type.replace('\'', "''") + )); + } + if let Some(trace_id) = filters.co_appears_with_trace_id { + sql.push_str(&format!( + " AND metadata->'co_appearances' @> '[{{ \"trace_id\": {} }}]'", + trace_id + )); + } } sql.push_str(" ORDER BY start_time ASC"); - sql.push_str(&format!(" LIMIT {}", req.limit.unwrap_or(20))); + sql.push_str(&format!(" LIMIT {}", req.page_size.unwrap_or(20))); let rows: Vec<( String, @@ -495,7 +524,7 @@ async fn search_frames_internal( } sql.push_str(" ORDER BY f.timestamp ASC"); - sql.push_str(&format!(" LIMIT {}", req.limit.unwrap_or(20))); + sql.push_str(&format!(" LIMIT {}", req.page_size.unwrap_or(20))); let rows: Vec<( i64, @@ -575,7 +604,7 @@ async fn search_persons_internal( } sql.push_str(" ORDER BY appearance_count DESC"); - sql.push_str(&format!(" LIMIT {}", req.limit.unwrap_or(20))); + sql.push_str(&format!(" LIMIT {}", req.page_size.unwrap_or(20))); let rows: Vec<( String, diff --git a/src/core/chunk/mod.rs b/src/core/chunk/mod.rs index 14226fd..75e4d80 100644 --- a/src/core/chunk/mod.rs +++ b/src/core/chunk/mod.rs @@ -1,9 +1,11 @@ pub mod rule1_ingest; pub mod rule3_ingest; pub mod splitter; +pub mod trace_ingest; pub mod types; pub use rule1_ingest::execute_rule1; pub use rule3_ingest::ingest_rule3; +pub use trace_ingest::ingest_traces; pub use splitter::{AsrSegment, ChunkSplitter}; pub use types::{Chunk, ChunkType}; diff --git a/src/core/chunk/trace_ingest.rs b/src/core/chunk/trace_ingest.rs new file mode 100644 index 0000000..3821cc7 --- /dev/null +++ b/src/core/chunk/trace_ingest.rs @@ -0,0 +1,222 @@ +use crate::core::chunk::types::{Chunk, ChunkRule, ChunkType}; +use crate::core::db::schema; +use crate::core::db::PostgresDb; +use anyhow::{Context, Result}; +use sqlx::Row; +use tracing::{error, info}; + +pub async fn ingest_traces(db: &PostgresDb, file_uuid: &str) -> Result { + let pool = db.pool(); + let face_table = schema::table_name("face_detections"); + let pre_table = schema::table_name("pre_chunks"); + + let video = db + .get_video_by_uuid(file_uuid) + .await? + .context("Video not found")?; + let file_id = video.id as i32; + let fps = video.fps; + + let traces = sqlx::query_as::<_, TraceAgg>(&format!( + r#" + SELECT trace_id, + MIN(frame_number) AS first_frame, + MAX(frame_number) AS last_frame, + MIN(timestamp_secs) AS first_time, + MAX(timestamp_secs) AS last_time, + COUNT(*) AS face_count, + AVG(x)::float8 AS avg_x, + AVG(y)::float8 AS avg_y, + AVG(width)::float8 AS avg_w, + AVG(height)::float8 AS avg_h + FROM {} + WHERE file_uuid = $1 AND trace_id IS NOT NULL + GROUP BY trace_id + ORDER BY trace_id + "#, + face_table + )) + .bind(file_uuid) + .fetch_all(pool) + .await?; + + if traces.is_empty() { + info!("No traces found for {}", file_uuid); + return Ok(0); + } + + let asr_segments = sqlx::query_as::<_, AsrSegment>(&format!( + r#" + SELECT start_frame, end_frame, start_time, end_time, data + FROM {} + WHERE file_uuid = $1 AND processor_type = 'asr' + ORDER BY start_frame + "#, + pre_table + )) + .bind(file_uuid) + .fetch_all(pool) + .await?; + + // 計算 pairwise trace 重疊關係 + let overlaps = compute_overlaps(&traces); + + let mut count = 0; + for trace in &traces { + let text = collect_overlapping_text(&asr_segments, trace.first_time, trace.last_time); + + let bbox = serde_json::json!({ + "x": trace.avg_x, + "y": trace.avg_y, + "width": trace.avg_w, + "height": trace.avg_h, + }); + + // 與此 trace 同框的其他 trace + let co_appearances: Vec = overlaps + .iter() + .filter(|o| o.trace_id == trace.trace_id) + .map(|o| { + serde_json::json!({ + "trace_id": o.other_trace_id, + "overlap_frames": o.overlap_frames, + "overlap_secs": (o.overlap_frames as f64 / fps * 100.0).round() / 100.0, + }) + }) + .collect(); + + let metadata = serde_json::json!({ + "trace_id": trace.trace_id, + "face_count": trace.face_count, + "bbox": bbox, + "co_appearances": co_appearances, + }); + + let chunk = Chunk::new( + file_id, + file_uuid.to_string(), + (count + 1) as u32, + ChunkType::Trace, + ChunkRule::Rule1, + trace.first_frame as i64, + trace.last_frame as i64, + fps, + metadata.clone(), + ) + .with_text_content(text) + .with_metadata(metadata) + .with_frame_count(trace.face_count as i32); + + if let Err(e) = db.store_chunk(&chunk).await { + error!("Failed to store trace chunk {}: {}", trace.trace_id, e); + } else { + let preview = chunk.text_content.as_deref().unwrap_or("").chars().take(60).collect::(); + let co = chunk.metadata.as_ref() + .and_then(|m| m.get("co_appearances")) + .and_then(|c| c.as_array()) + .map(|a| a.len()) + .unwrap_or(0); + info!( + "Trace chunk {}: trace_id={} frames={}-{} faces={} co_appear={} text={}", + chunk.chunk_id, trace.trace_id, + trace.first_frame, trace.last_frame, + trace.face_count, co, preview, + ); + count += 1; + } + } + + info!("Ingested {} trace chunks for {}", count, file_uuid); + Ok(count) +} + +/// 計算所有 trace pair 之間在時間上的重疊 frame 數 +struct TraceOverlap { + trace_id: i32, + other_trace_id: i32, + overlap_frames: i64, +} + +fn compute_overlaps(traces: &[TraceAgg]) -> Vec { + let mut result = Vec::new(); + for (i, a) in traces.iter().enumerate() { + for b in traces.iter().skip(i + 1) { + let overlap_start = a.first_frame.max(b.first_frame); + let overlap_end = a.last_frame.min(b.last_frame); + let frames = overlap_end - overlap_start; + if frames > 0 { + result.push(TraceOverlap { + trace_id: a.trace_id, + other_trace_id: b.trace_id, + overlap_frames: frames, + }); + result.push(TraceOverlap { + trace_id: b.trace_id, + other_trace_id: a.trace_id, + overlap_frames: frames, + }); + } + } + } + result +} + +fn collect_overlapping_text(segments: &[AsrSegment], start_time: f64, end_time: f64) -> String { + let mut texts: Vec<&str> = Vec::new(); + for seg in segments { + if seg.end_time >= start_time && seg.start_time <= end_time { + if let Some(t) = seg.text() { + texts.push(t); + } + } + } + texts.join(" ") +} + +#[derive(Debug, sqlx::FromRow)] +struct TraceAgg { + trace_id: i32, + first_frame: i64, + last_frame: i64, + first_time: f64, + last_time: f64, + face_count: i64, + avg_x: f64, + avg_y: f64, + avg_w: f64, + avg_h: f64, +} + +struct AsrSegment { + start_frame: i64, + end_frame: i64, + start_time: f64, + end_time: f64, + data: serde_json::Value, +} + +impl<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> for AsrSegment { + fn from_row(row: &'r sqlx::postgres::PgRow) -> Result { + Ok(Self { + start_frame: row.try_get("start_frame")?, + end_frame: row.try_get("end_frame")?, + start_time: row.try_get("start_time")?, + end_time: row.try_get("end_time")?, + data: row.try_get("data")?, + }) + } +} + +impl AsrSegment { + fn text(&self) -> Option<&str> { + self.data + .get("text") + .and_then(|v| v.as_str()) + .or_else(|| { + self.data + .get("data") + .and_then(|d| d.get("text")) + .and_then(|v| v.as_str()) + }) + } +} diff --git a/src/worker/job_worker.rs b/src/worker/job_worker.rs index 0f0ea1e..5509115 100644 --- a/src/worker/job_worker.rs +++ b/src/worker/job_worker.rs @@ -713,6 +713,7 @@ impl JobWorker { // Runs face_tracker.py (IoU+embedding tracking), stores trace_id + position in DB if has_face { info!("📝 Face completed, triggering face trace + DB store..."); + let db_clone = self.db.clone(); let uuid_clone = uuid.to_string(); tokio::spawn(async move { let executor = match crate::core::processor::PythonExecutor::new() { @@ -744,6 +745,18 @@ impl JobWorker { } else { info!("✅ Qdrant face sync completed for {}", uuid_clone); } + + // Generate trace chunks from face_detections + ASR text + info!("📝 Generating trace chunks..."); + match crate::core::chunk::trace_ingest::ingest_traces( + &db_clone, + &uuid_clone, + ) + .await + { + Ok(n) => info!("✅ {} trace chunks created for {}", n, uuid_clone), + Err(e) => error!("❌ Trace chunk ingestion failed: {}", e), + } } Err(e) => { error!("❌ Face trace + DB store failed for {}: {}", uuid_clone, e)