feat: trace chunks with co-appearance relationships

- New trace_ingest module: creates chunks for each face trace (time + bbox + ASR text)
- Computes pairwise time overlaps between traces -> co_appearances in metadata
- Worker auto-triggers after face trace store + Qdrant sync
- SearchFilters: chunk_type filter (sentence/cut/trace/visual)
- SearchFilters: co_appears_with_trace_id filter
This commit is contained in:
Accusys
2026-05-09 06:18:32 +08:00
parent 9f5afd1b86
commit b902763d45
5 changed files with 373 additions and 6 deletions

View File

@@ -0,0 +1,101 @@
# Trace Search API 設計
## 概念
trace 是一種 chunk。
現有的 chunk_type: `cut`, `sentence`, `visual`, `story`
新增 chunk_type: `trace`
每個 trace人物跨 frame 追蹤軌跡)就是一個時間區間 + 區間內的 ASR text。
跟其他 chunk 完全一樣,只是切分維度不同:
- cut chunk = 鏡頭切換
- sentence chunk = 語句邊界
- visual chunk = 畫面物體組合
- **trace chunk = 人物出現區間 + 當下 spoken text**
這樣 trace 可以直接放進現有的 `chunks` 表,共用 embedding、搜尋、Qdrant sync 整套機制,不需要任何新 table。
## chunks 表現有結構
```sql
chunks (
id, file_uuid, chunk_type, -- 'trace' 新增
start_frame, end_frame, start_time, end_time,
text_content, -- trace 區間的 ASR text
embedding, -- text_content 的 pgvector
metadata JSONB, -- { trace_id, face_count, identity_id, identity_name }
...
)
```
## 資料產生流程worker 擴充)
在 face processing + `store_traced_faces.py` 完成後:
1. 查詢 `face_detections` 聚合每個 trace 的 `MIN(frame)`, `MAX(frame)`, `COUNT(*)`
2. 對每個 trace查詢 `pre_chunks WHERE processor_type='asr'` 中與 trace time range 重疊的 text
3. 彙整 text → EmbeddingGemma 產生 `embedding`
4. 寫入 `chunks``chunk_type='trace'`metadata 含 `trace_id`, `face_count`, `identity_id`
5. embedding 自動進 Qdrant與既有 chunk 同一 collection
## Search API 擴充
Universal Search 的 `types` 原本就支援 `"chunk"`
在 chunk 搜尋中過濾 `chunk_type = 'trace'` 即可。
**Request**
```json
{
"query": "open the door",
"types": ["chunk"],
"filters": { "chunk_type": "trace" },
"uuid": "aeed71342a899fe4b4c57b7d41bcb692",
"page": 1,
"page_size": 20
}
```
**Response**(與既有 Chunk result 相同):
```json
{
"type": "chunk",
"chunk_id": "chunk_42",
"chunk_type": "trace",
"start_frame": 45200, "end_frame": 45900,
"start_time": 1808.0, "end_time": 1836.0,
"score": 0.87,
"text": "Open the door. Come on, hurry up.",
"metadata": {
"trace_id": 5,
"face_count": 42,
"identity_name": "Audrey Hepburn"
}
}
```
完全沿用既有的 `SearchResult::Chunk` variant不用新增 enum variant。
### 搜尋語法
```sql
SELECT c.*
FROM dev.chunks c
WHERE c.file_uuid = $1
AND c.chunk_type = 'trace'
AND c.embedding IS NOT NULL
ORDER BY c.embedding <=> $2
LIMIT $3;
```
## 總結
| 項目 | 作法 |
|------|------|
| 新 table | ❌ 不需要 |
| 新 enum variant | ❌ 不需要 |
| SearchResult 改動 | ❌ 不需要 |
| chunk_type 新增 | ✅ `'trace'` |
| worker 擴充 | ✅ 產生 trace chunk (face done 後) |
| SearchFilters 擴充 | ✅ 加 `chunk_type` filter |
| Qdrant | ✅ 自動(既有 chunk collection |

View File

@@ -20,6 +20,8 @@ pub struct UniversalSearchRequest {
pub types: Vec<String>, // chunk, frame, person
pub time_range: Option<[f64; 2]>,
pub filters: Option<SearchFilters>,
pub page: Option<usize>,
pub page_size: Option<usize>,
pub limit: Option<usize>,
pub offset: Option<usize>,
}
@@ -31,6 +33,10 @@ pub struct SearchFilters {
pub ocr_text: Option<String>,
pub has_face: Option<bool>,
pub speaker_id: Option<String>,
/// 指定 chunk_type如 "sentence", "cut", "trace", "visual"
pub chunk_type: Option<String>,
/// 搜尋與指定 trace_id 有時間重疊的 trace chunk
pub co_appears_with_trace_id: Option<i32>,
// Visual chunk filters
pub min_confidence: Option<f32>,
pub min_unique_classes: Option<u32>,
@@ -44,6 +50,8 @@ pub struct UniversalSearchResponse {
pub query: String,
pub results: Vec<SearchResult>,
pub total: usize,
pub page: usize,
pub page_size: usize,
pub took_ms: u64,
}
@@ -108,8 +116,14 @@ pub async fn universal_search(
)
})?;
let limit = req.limit.unwrap_or(20);
let offset = req.offset.unwrap_or(0);
let page = req.page.unwrap_or(1).max(1);
let page_size = req.page_size.unwrap_or(20).max(1).min(200);
// Backward compat: if old `offset` is used without `page`, derive from offset
let offset = if req.page.is_none() && req.offset.is_some() {
req.offset.unwrap()
} else {
(page - 1) * page_size
};
let types = if req.types.is_empty() {
vec![
"chunk".to_string(),
@@ -163,7 +177,8 @@ pub async fn universal_search(
});
let total = results.len();
let end = std::cmp::min(offset + limit, results.len());
let effective_limit = req.limit.unwrap_or(usize::MAX);
let end = std::cmp::min(offset + page_size, results.len()).min(effective_limit);
let paginated = if offset < results.len() {
results[offset..end].to_vec()
} else {
@@ -176,6 +191,8 @@ pub async fn universal_search(
query: req.query,
results: paginated,
total,
page,
page_size,
took_ms: took,
}))
}
@@ -378,10 +395,22 @@ async fn search_chunks(
sql.push_str(&format!(" AND ({})", class_conditions.join(" OR ")));
}
}
if let Some(ref chunk_type) = filters.chunk_type {
sql.push_str(&format!(
" AND chunk_type = '{}'",
chunk_type.replace('\'', "''")
));
}
if let Some(trace_id) = filters.co_appears_with_trace_id {
sql.push_str(&format!(
" AND metadata->'co_appearances' @> '[{{ \"trace_id\": {} }}]'",
trace_id
));
}
}
sql.push_str(" ORDER BY start_time ASC");
sql.push_str(&format!(" LIMIT {}", req.limit.unwrap_or(20)));
sql.push_str(&format!(" LIMIT {}", req.page_size.unwrap_or(20)));
let rows: Vec<(
String,
@@ -495,7 +524,7 @@ async fn search_frames_internal(
}
sql.push_str(" ORDER BY f.timestamp ASC");
sql.push_str(&format!(" LIMIT {}", req.limit.unwrap_or(20)));
sql.push_str(&format!(" LIMIT {}", req.page_size.unwrap_or(20)));
let rows: Vec<(
i64,
@@ -575,7 +604,7 @@ async fn search_persons_internal(
}
sql.push_str(" ORDER BY appearance_count DESC");
sql.push_str(&format!(" LIMIT {}", req.limit.unwrap_or(20)));
sql.push_str(&format!(" LIMIT {}", req.page_size.unwrap_or(20)));
let rows: Vec<(
String,

View File

@@ -1,9 +1,11 @@
pub mod rule1_ingest;
pub mod rule3_ingest;
pub mod splitter;
pub mod trace_ingest;
pub mod types;
pub use rule1_ingest::execute_rule1;
pub use rule3_ingest::ingest_rule3;
pub use trace_ingest::ingest_traces;
pub use splitter::{AsrSegment, ChunkSplitter};
pub use types::{Chunk, ChunkType};

View File

@@ -0,0 +1,222 @@
use crate::core::chunk::types::{Chunk, ChunkRule, ChunkType};
use crate::core::db::schema;
use crate::core::db::PostgresDb;
use anyhow::{Context, Result};
use sqlx::Row;
use tracing::{error, info};
pub async fn ingest_traces(db: &PostgresDb, file_uuid: &str) -> Result<usize> {
let pool = db.pool();
let face_table = schema::table_name("face_detections");
let pre_table = schema::table_name("pre_chunks");
let video = db
.get_video_by_uuid(file_uuid)
.await?
.context("Video not found")?;
let file_id = video.id as i32;
let fps = video.fps;
let traces = sqlx::query_as::<_, TraceAgg>(&format!(
r#"
SELECT trace_id,
MIN(frame_number) AS first_frame,
MAX(frame_number) AS last_frame,
MIN(timestamp_secs) AS first_time,
MAX(timestamp_secs) AS last_time,
COUNT(*) AS face_count,
AVG(x)::float8 AS avg_x,
AVG(y)::float8 AS avg_y,
AVG(width)::float8 AS avg_w,
AVG(height)::float8 AS avg_h
FROM {}
WHERE file_uuid = $1 AND trace_id IS NOT NULL
GROUP BY trace_id
ORDER BY trace_id
"#,
face_table
))
.bind(file_uuid)
.fetch_all(pool)
.await?;
if traces.is_empty() {
info!("No traces found for {}", file_uuid);
return Ok(0);
}
let asr_segments = sqlx::query_as::<_, AsrSegment>(&format!(
r#"
SELECT start_frame, end_frame, start_time, end_time, data
FROM {}
WHERE file_uuid = $1 AND processor_type = 'asr'
ORDER BY start_frame
"#,
pre_table
))
.bind(file_uuid)
.fetch_all(pool)
.await?;
// 計算 pairwise trace 重疊關係
let overlaps = compute_overlaps(&traces);
let mut count = 0;
for trace in &traces {
let text = collect_overlapping_text(&asr_segments, trace.first_time, trace.last_time);
let bbox = serde_json::json!({
"x": trace.avg_x,
"y": trace.avg_y,
"width": trace.avg_w,
"height": trace.avg_h,
});
// 與此 trace 同框的其他 trace
let co_appearances: Vec<serde_json::Value> = overlaps
.iter()
.filter(|o| o.trace_id == trace.trace_id)
.map(|o| {
serde_json::json!({
"trace_id": o.other_trace_id,
"overlap_frames": o.overlap_frames,
"overlap_secs": (o.overlap_frames as f64 / fps * 100.0).round() / 100.0,
})
})
.collect();
let metadata = serde_json::json!({
"trace_id": trace.trace_id,
"face_count": trace.face_count,
"bbox": bbox,
"co_appearances": co_appearances,
});
let chunk = Chunk::new(
file_id,
file_uuid.to_string(),
(count + 1) as u32,
ChunkType::Trace,
ChunkRule::Rule1,
trace.first_frame as i64,
trace.last_frame as i64,
fps,
metadata.clone(),
)
.with_text_content(text)
.with_metadata(metadata)
.with_frame_count(trace.face_count as i32);
if let Err(e) = db.store_chunk(&chunk).await {
error!("Failed to store trace chunk {}: {}", trace.trace_id, e);
} else {
let preview = chunk.text_content.as_deref().unwrap_or("").chars().take(60).collect::<String>();
let co = chunk.metadata.as_ref()
.and_then(|m| m.get("co_appearances"))
.and_then(|c| c.as_array())
.map(|a| a.len())
.unwrap_or(0);
info!(
"Trace chunk {}: trace_id={} frames={}-{} faces={} co_appear={} text={}",
chunk.chunk_id, trace.trace_id,
trace.first_frame, trace.last_frame,
trace.face_count, co, preview,
);
count += 1;
}
}
info!("Ingested {} trace chunks for {}", count, file_uuid);
Ok(count)
}
/// 計算所有 trace pair 之間在時間上的重疊 frame 數
struct TraceOverlap {
trace_id: i32,
other_trace_id: i32,
overlap_frames: i64,
}
fn compute_overlaps(traces: &[TraceAgg]) -> Vec<TraceOverlap> {
let mut result = Vec::new();
for (i, a) in traces.iter().enumerate() {
for b in traces.iter().skip(i + 1) {
let overlap_start = a.first_frame.max(b.first_frame);
let overlap_end = a.last_frame.min(b.last_frame);
let frames = overlap_end - overlap_start;
if frames > 0 {
result.push(TraceOverlap {
trace_id: a.trace_id,
other_trace_id: b.trace_id,
overlap_frames: frames,
});
result.push(TraceOverlap {
trace_id: b.trace_id,
other_trace_id: a.trace_id,
overlap_frames: frames,
});
}
}
}
result
}
fn collect_overlapping_text(segments: &[AsrSegment], start_time: f64, end_time: f64) -> String {
let mut texts: Vec<&str> = Vec::new();
for seg in segments {
if seg.end_time >= start_time && seg.start_time <= end_time {
if let Some(t) = seg.text() {
texts.push(t);
}
}
}
texts.join(" ")
}
#[derive(Debug, sqlx::FromRow)]
struct TraceAgg {
trace_id: i32,
first_frame: i64,
last_frame: i64,
first_time: f64,
last_time: f64,
face_count: i64,
avg_x: f64,
avg_y: f64,
avg_w: f64,
avg_h: f64,
}
struct AsrSegment {
start_frame: i64,
end_frame: i64,
start_time: f64,
end_time: f64,
data: serde_json::Value,
}
impl<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> for AsrSegment {
fn from_row(row: &'r sqlx::postgres::PgRow) -> Result<Self, sqlx::Error> {
Ok(Self {
start_frame: row.try_get("start_frame")?,
end_frame: row.try_get("end_frame")?,
start_time: row.try_get("start_time")?,
end_time: row.try_get("end_time")?,
data: row.try_get("data")?,
})
}
}
impl AsrSegment {
fn text(&self) -> Option<&str> {
self.data
.get("text")
.and_then(|v| v.as_str())
.or_else(|| {
self.data
.get("data")
.and_then(|d| d.get("text"))
.and_then(|v| v.as_str())
})
}
}

View File

@@ -713,6 +713,7 @@ impl JobWorker {
// Runs face_tracker.py (IoU+embedding tracking), stores trace_id + position in DB
if has_face {
info!("📝 Face completed, triggering face trace + DB store...");
let db_clone = self.db.clone();
let uuid_clone = uuid.to_string();
tokio::spawn(async move {
let executor = match crate::core::processor::PythonExecutor::new() {
@@ -744,6 +745,18 @@ impl JobWorker {
} else {
info!("✅ Qdrant face sync completed for {}", uuid_clone);
}
// Generate trace chunks from face_detections + ASR text
info!("📝 Generating trace chunks...");
match crate::core::chunk::trace_ingest::ingest_traces(
&db_clone,
&uuid_clone,
)
.await
{
Ok(n) => info!("✅ {} trace chunks created for {}", n, uuid_clone),
Err(e) => error!("❌ Trace chunk ingestion failed: {}", e),
}
}
Err(e) => {
error!("❌ Face trace + DB store failed for {}: {}", uuid_clone, e)