feat: Rule2 TKG relationship chunks + Phase0-1 Qdrant integration

Phase 0: TKG builder populate face_detections from face.json
- Fix face.json parser for pose_angle format
- Call store_traced_faces.py to set trace_id
- Skip if trace_id already populated

Phase 1: Qdrant face embeddings integration
- Add FaceEmbeddingDb module (src/core/db/face_embedding_db.rs)
- Create dev_face_embeddings collection (dim=512)
- Store 1122 face embeddings with pose metadata
- API: init_collection, batch_upsert, search_similar

Rule2: TKG edges → relationship chunks
- Design: RULE2_TKG_RELATIONSHIP_V1.0.md
- Implementation: rule2_ingest.rs
- ChunkType::Relationship added
- Edge types: SPEAKS_AS, MUTUAL_GAZE, CO_OCCURS_WITH, HAS_APPEARANCE, WEARS
- Auto-trigger on TKG rebuild

API:
- POST /api/v1/file/:file_uuid/rule2 (vectorization)
- POST /api/v1/file/:file_uuid/tkg/rebuild (auto Rule2)

Test: 75 relationship chunks created + vectorized
This commit is contained in:
Accusys
2026-06-21 00:22:41 +08:00
parent 17e4e15860
commit 3ad6f8740a
10 changed files with 3811 additions and 30 deletions

View File

@@ -38,10 +38,17 @@ pub fn trace_agent_routes() -> Router<crate::api::types::AppState> {
get(get_cooccurrence),
)
.route("/api/v1/file/:file_uuid/tkg/rebuild", post(rebuild_tkg))
.route("/api/v1/file/:file_uuid/rule2", post(ingest_rule2))
.route(
"/api/v1/file/:file_uuid/representative-frame",
get(get_representative_frame),
)
.route("/api/v1/file/:file_uuid/tkg/nodes", post(query_tkg_nodes))
.route("/api/v1/file/:file_uuid/tkg/edges", post(query_tkg_edges))
.route(
"/api/v1/file/:file_uuid/tkg/node/:node_id",
get(get_tkg_node_detail),
)
}
#[derive(Debug, Deserialize)]
@@ -961,22 +968,52 @@ async fn rebuild_tkg(
State(state): State<crate::api::types::AppState>,
Path(file_uuid): Path<String>,
) -> Json<TkgRebuildResponse> {
use crate::core::chunk::rule2_ingest::ingest_rule2;
use tracing::info;
let result = crate::core::processor::tkg::build_tkg(&state.db, &file_uuid, &OUTPUT_DIR).await;
match result {
Ok(r) => Json(TkgRebuildResponse {
success: true,
file_uuid,
result: Some(serde_json::json!({
"face_trace_nodes": r.face_trace_nodes,
"object_nodes": r.object_nodes,
"speaker_nodes": r.speaker_nodes,
"co_occurrence_edges": r.co_occurrence_edges,
"speaker_face_edges": r.speaker_face_edges,
"face_face_edges": r.face_face_edges,
})),
error: None,
}),
Ok(r) => {
let total_edges = r.speaker_face_edges
+ r.mutual_gaze_edges
+ r.face_face_edges
+ r.co_occurrence_edges
+ r.has_appearance_edges
+ r.wears_edges;
if total_edges > 0 {
info!("[TKG] {} relationship edges found, triggering Rule 2 ingestion...", total_edges);
match ingest_rule2(state.db.pool(), &file_uuid).await {
Ok(count) => info!("[TKG] Rule 2 created {} relationship chunks", count),
Err(e) => info!("[TKG] Rule 2 ingestion failed: {}", e),
}
}
Json(TkgRebuildResponse {
success: true,
file_uuid,
result: Some(serde_json::json!({
"face_trace_nodes": r.face_trace_nodes,
"gaze_trace_nodes": r.gaze_trace_nodes,
"lip_trace_nodes": r.lip_trace_nodes,
"text_trace_nodes": r.text_trace_nodes,
"appearance_trace_nodes": r.appearance_trace_nodes,
"skin_tone_trace_nodes": r.skin_tone_trace_nodes,
"accessory_nodes": r.accessory_nodes,
"object_nodes": r.object_nodes,
"speaker_nodes": r.speaker_nodes,
"co_occurrence_edges": r.co_occurrence_edges,
"speaker_face_edges": r.speaker_face_edges,
"face_face_edges": r.face_face_edges,
"mutual_gaze_edges": r.mutual_gaze_edges,
"lip_sync_edges": r.lip_sync_edges,
"has_appearance_edges": r.has_appearance_edges,
"wears_edges": r.wears_edges,
})),
error: None,
})
}
Err(e) => Json(TkgRebuildResponse {
success: false,
file_uuid,
@@ -1097,3 +1134,463 @@ async fn get_stranger_thumbnail(
get_trace_thumbnail_inner(&state, &file_uuid, trace_id).await
}
// ── TKG Node/Edge Query APIs ─────────────────────────────────────
fn t(name: &str) -> String {
let schema = std::env::var("DATABASE_SCHEMA").unwrap_or_else(|_| "dev".to_string());
if schema == "public" {
name.to_string()
} else {
format!("{}.{}", schema, name)
}
}
#[derive(Debug, Deserialize)]
struct TkgNodesRequest {
node_type: Option<String>,
page: Option<i64>,
page_size: Option<i64>,
}
#[derive(Debug, Serialize)]
struct TkgNodeInfo {
id: i64,
node_type: String,
external_id: String,
label: String,
properties: serde_json::Value,
}
#[derive(Debug, Serialize)]
struct TkgNodesResponse {
success: bool,
file_uuid: String,
total: i64,
page: i64,
page_size: i64,
nodes: Vec<TkgNodeInfo>,
}
async fn query_tkg_nodes(
State(state): State<crate::api::types::AppState>,
Path(file_uuid): Path<String>,
Json(req): Json<TkgNodesRequest>,
) -> Result<Json<TkgNodesResponse>, (StatusCode, Json<serde_json::Value>)> {
let nodes_table = t("tkg_nodes");
let page = req.page.unwrap_or(1).max(1);
let page_size = req.page_size.unwrap_or(100).max(1).min(500);
let offset = (page - 1) * page_size;
let (where_clause, count_args, query_args) = if let Some(ref node_type) = req.node_type {
(
"WHERE file_uuid = $1 AND node_type = $2".to_string(),
vec![serde_json::json!([&file_uuid, node_type])],
vec![serde_json::json!([
&file_uuid, node_type, page_size, offset
])],
)
} else {
(
"WHERE file_uuid = $1".to_string(),
vec![serde_json::json!([&file_uuid])],
vec![serde_json::json!([&file_uuid, page_size, offset])],
)
};
let total: i64 = if let Some(ref node_type) = req.node_type {
sqlx::query_scalar(&format!(
"SELECT COUNT(*) FROM {} {}",
nodes_table, where_clause
))
.bind(&file_uuid)
.bind(node_type)
.fetch_one(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?
} else {
sqlx::query_scalar(&format!(
"SELECT COUNT(*) FROM {} {}",
nodes_table, where_clause
))
.bind(&file_uuid)
.fetch_one(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?
};
let query = format!(
"SELECT id, node_type, external_id, label, properties FROM {} {} ORDER BY id LIMIT ${} OFFSET ${}",
nodes_table, where_clause,
if req.node_type.is_some() { 3 } else { 2 },
if req.node_type.is_some() { 4 } else { 3 }
);
let rows: Vec<(i64, String, String, String, serde_json::Value)> =
if let Some(ref node_type) = req.node_type {
sqlx::query_as(&query)
.bind(&file_uuid)
.bind(node_type)
.bind(page_size)
.bind(offset)
.fetch_all(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?
} else {
sqlx::query_as(&query)
.bind(&file_uuid)
.bind(page_size)
.bind(offset)
.fetch_all(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?
};
let nodes = rows
.into_iter()
.map(
|(id, node_type, external_id, label, properties)| TkgNodeInfo {
id,
node_type,
external_id,
label,
properties,
},
)
.collect();
Ok(Json(TkgNodesResponse {
success: true,
file_uuid,
total,
page,
page_size,
nodes,
}))
}
#[derive(Debug, Deserialize)]
struct TkgEdgesRequest {
edge_type: Option<String>,
source_type: Option<String>,
target_type: Option<String>,
page: Option<i64>,
page_size: Option<i64>,
}
#[derive(Debug, Serialize)]
struct TkgEdgeInfo {
id: i64,
edge_type: String,
source_node_id: i64,
target_node_id: i64,
properties: serde_json::Value,
}
#[derive(Debug, Serialize)]
struct TkgEdgesResponse {
success: bool,
file_uuid: String,
total: i64,
page: i64,
page_size: i64,
edges: Vec<TkgEdgeInfo>,
}
async fn query_tkg_edges(
State(state): State<crate::api::types::AppState>,
Path(file_uuid): Path<String>,
Json(req): Json<TkgEdgesRequest>,
) -> Result<Json<TkgEdgesResponse>, (StatusCode, Json<serde_json::Value>)> {
let edges_table = t("tkg_edges");
let nodes_table = t("tkg_nodes");
let page = req.page.unwrap_or(1).max(1);
let page_size = req.page_size.unwrap_or(100).max(1).min(500);
let offset = (page - 1) * page_size;
let mut conditions = vec!["e.file_uuid = $1".to_string()];
let mut param_idx = 2i32;
let mut joins = String::new();
if let Some(ref edge_type) = req.edge_type {
conditions.push(format!("e.edge_type = ${}", param_idx));
param_idx += 1;
}
if req.source_type.is_some() || req.target_type.is_some() {
joins = format!(
" JOIN {} sn ON e.source_node_id = sn.id JOIN {} tn ON e.target_node_id = tn.id",
nodes_table, nodes_table
);
}
if let Some(ref source_type) = req.source_type {
conditions.push(format!("sn.node_type = ${}", param_idx));
param_idx += 1;
}
if let Some(ref target_type) = req.target_type {
conditions.push(format!("tn.node_type = ${}", param_idx));
param_idx += 1;
}
let where_clause = conditions.join(" AND ");
let count_query = format!(
"SELECT COUNT(*) FROM {} e {} WHERE {}",
edges_table, joins, where_clause
);
let total: i64 = {
let mut q = sqlx::query_scalar::<_, i64>(&count_query).bind(&file_uuid);
if let Some(ref edge_type) = req.edge_type {
q = q.bind(edge_type);
}
if let Some(ref source_type) = req.source_type {
q = q.bind(source_type);
}
if let Some(ref target_type) = req.target_type {
q = q.bind(target_type);
}
q.fetch_one(state.db.pool()).await.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?
};
let query = format!(
"SELECT e.id, e.edge_type, e.source_node_id, e.target_node_id, e.properties FROM {} e {} WHERE {} ORDER BY e.id LIMIT ${} OFFSET ${}",
edges_table, joins, where_clause, param_idx, param_idx + 1
);
let rows: Vec<(i64, String, i64, i64, serde_json::Value)> = {
let mut q = sqlx::query_as(&query).bind(&file_uuid);
if let Some(ref edge_type) = req.edge_type {
q = q.bind(edge_type);
}
if let Some(ref source_type) = req.source_type {
q = q.bind(source_type);
}
if let Some(ref target_type) = req.target_type {
q = q.bind(target_type);
}
q.bind(page_size)
.bind(offset)
.fetch_all(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?
};
let edges = rows
.into_iter()
.map(
|(id, edge_type, source_node_id, target_node_id, properties)| TkgEdgeInfo {
id,
edge_type,
source_node_id,
target_node_id,
properties,
},
)
.collect();
Ok(Json(TkgEdgesResponse {
success: true,
file_uuid,
total,
page,
page_size,
edges,
}))
}
#[derive(Debug, Serialize)]
struct TkgNodeWithEdges {
node: TkgNodeInfo,
incoming_edges: Vec<TkgEdgeInfo>,
outgoing_edges: Vec<TkgEdgeInfo>,
}
#[derive(Debug, Serialize)]
struct TkgNodeDetailResponse {
success: bool,
file_uuid: String,
node: Option<TkgNodeWithEdges>,
error: Option<String>,
}
async fn get_tkg_node_detail(
State(state): State<crate::api::types::AppState>,
Path((file_uuid, node_id)): Path<(String, i64)>,
) -> Json<TkgNodeDetailResponse> {
let nodes_table = t("tkg_nodes");
let edges_table = t("tkg_edges");
let node: Option<(i64, String, String, String, serde_json::Value)> = sqlx::query_as(
&format!("SELECT id, node_type, external_id, label, properties FROM {} WHERE file_uuid = $1 AND id = $2", nodes_table)
)
.bind(&file_uuid).bind(node_id)
.fetch_optional(state.db.pool()).await.ok().flatten();
match node {
Some((id, node_type, external_id, label, properties)) => {
let incoming: Vec<TkgEdgeInfo> = sqlx::query_as(
&format!("SELECT id, edge_type, source_node_id, target_node_id, properties FROM {} WHERE file_uuid = $1 AND target_node_id = $2", edges_table)
)
.bind(&file_uuid).bind(node_id)
.fetch_all(state.db.pool()).await.unwrap_or_default()
.into_iter().map(|(id, edge_type, source_node_id, target_node_id, properties)| {
TkgEdgeInfo { id, edge_type, source_node_id, target_node_id, properties }
}).collect();
let outgoing: Vec<TkgEdgeInfo> = sqlx::query_as(
&format!("SELECT id, edge_type, source_node_id, target_node_id, properties FROM {} WHERE file_uuid = $1 AND source_node_id = $2", edges_table)
)
.bind(&file_uuid).bind(node_id)
.fetch_all(state.db.pool()).await.unwrap_or_default()
.into_iter().map(|(id, edge_type, source_node_id, target_node_id, properties)| {
TkgEdgeInfo { id, edge_type, source_node_id, target_node_id, properties }
}).collect();
Json(TkgNodeDetailResponse {
success: true,
file_uuid,
node: Some(TkgNodeWithEdges {
node: TkgNodeInfo {
id,
node_type,
external_id,
label,
properties,
},
incoming_edges: incoming,
outgoing_edges: outgoing,
}),
error: None,
})
}
None => Json(TkgNodeDetailResponse {
success: false,
file_uuid,
node: None,
error: Some("Node not found".to_string()),
}),
}
}
// ── Rule 2 Ingest ───────────────────────────────────────────────────
#[derive(Serialize)]
struct IngestRule2Response {
success: bool,
file_uuid: String,
rule2_chunks: i64,
vectorized_chunks: Option<i64>,
error: Option<String>,
}
async fn ingest_rule2(
State(state): State<crate::api::types::AppState>,
Path(file_uuid): Path<String>,
) -> Result<Json<IngestRule2Response>, (StatusCode, Json<serde_json::Value>)> {
use crate::core::chunk::rule2_ingest::ingest_rule2;
use crate::core::embedding::Embedder;
use crate::core::db::schema;
use crate::core::db::qdrant_db::{QdrantDb, VectorPayload};
use tracing::info;
let result = ingest_rule2(state.db.pool(), &file_uuid).await;
match result {
Ok(rule2_chunks) => {
info!(
"[Rule2API] {} relationship chunks created for {}",
rule2_chunks, file_uuid
);
// Auto-vectorize relationship chunks
let embedder = Embedder::new("embeddinggemma-300m".to_string());
let qdrant = QdrantDb::new();
let pool = state.db.pool();
let chunk_table = schema::table_name("chunk");
let rows: Vec<(String, String, i64, i64, f64, f64)> = sqlx::query_as(&format!(
"SELECT chunk_id, text_content, start_frame, end_frame, start_time, end_time \
FROM {} WHERE file_uuid = $1 AND chunk_type = 'relationship' \
AND embedding IS NULL AND (text_content IS NOT NULL AND text_content != '')",
chunk_table
))
.bind(&file_uuid)
.fetch_all(pool)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?;
let mut vectorized = 0i64;
for (chunk_id, text, start_frame, end_frame, start_time, end_time) in &rows {
if text.is_empty() {
continue;
}
if let Ok(vector) = embedder.embed_document(&text).await {
if state.db.store_vector(&chunk_id, &vector, &file_uuid).await.is_ok() {
let payload = VectorPayload {
file_uuid: file_uuid.clone(),
chunk_id: chunk_id.clone(),
chunk_type: "relationship".to_string(),
start_frame: *start_frame,
end_frame: *end_frame,
start_time: *start_time,
end_time: *end_time,
text: Some(text.clone()),
};
if qdrant.upsert_vector(&chunk_id, &vector, payload).await.is_ok() {
vectorized += 1;
}
}
}
}
Ok(Json(IngestRule2Response {
success: true,
file_uuid,
rule2_chunks: rule2_chunks as i64,
vectorized_chunks: Some(vectorized),
error: None,
}))
}
Err(e) => Ok(Json(IngestRule2Response {
success: false,
file_uuid,
rule2_chunks: 0,
vectorized_chunks: None,
error: Some(e.to_string()),
})),
}
}

View File

@@ -1,10 +1,12 @@
pub mod rule1_ingest;
pub mod rule2_ingest;
pub mod rule3_ingest;
pub mod splitter;
pub mod trace_ingest;
pub mod types;
pub use rule1_ingest::execute_rule1;
pub use rule2_ingest::ingest_rule2;
pub use rule3_ingest::ingest_rule3;
pub use splitter::{AsrSegment, ChunkSplitter};
pub use trace_ingest::ingest_traces;

View File

@@ -0,0 +1,335 @@
use crate::core::db::schema;
use anyhow::{Context, Result};
use serde_json::Value;
use sqlx::PgPool;
use tracing::{info, warn};
fn t(name: &str) -> String {
let schema = std::env::var("DATABASE_SCHEMA").unwrap_or_else(|_| "dev".to_string());
if schema == "public" {
name.to_string()
} else {
format!("{}.{}", schema, name)
}
}
/// Executes Rule 2 Ingestion: TKG edges → relationship chunks.
///
/// 1. Query tkg_edges by priority order.
/// 2. Resolve source/target nodes and identities.
/// 3. Generate natural language description (template-based).
/// 4. Insert chunks with chunk_type='relationship'.
pub async fn ingest_rule2(pool: &PgPool, file_uuid: &str) -> Result<usize> {
let edges_table = t("tkg_edges");
let nodes_table = t("tkg_nodes");
let chunk_table = t("chunk");
let fd_table = t("face_detections");
let id_table = t("identities");
let videos_table = t("videos");
// Get video fps
let fps: f64 = sqlx::query_scalar(&format!(
"SELECT COALESCE(fps, 25.0) FROM {} WHERE file_uuid = $1",
videos_table
))
.bind(file_uuid)
.fetch_optional(pool)
.await?
.unwrap_or(25.0);
// Priority order for edge types (matching TKG edge_type values)
let edge_types = vec![
"SPEAKS_AS",
"MUTUAL_GAZE",
"CO_OCCURS_WITH",
"HAS_APPEARANCE",
"WEARS",
];
let mut count = 0;
let mut tx = pool.begin().await?;
for edge_type in &edge_types {
// Query edges of this type
let edges: Vec<(i64, String, String, Value)> = sqlx::query_as(&format!(
"SELECT id, source_node_id::text, target_node_id::text, properties \
FROM {} WHERE file_uuid = $1 AND edge_type = $2",
edges_table
))
.bind(file_uuid)
.bind(edge_type)
.fetch_all(&mut *tx)
.await?;
info!(
"Rule2: {} {} edges for file_uuid={}",
edges.len(),
edge_type,
file_uuid
);
for (edge_id, source_id_str, target_id_str, properties) in &edges {
// Parse source/target node IDs
let source_id: i64 = source_id_str.parse().context("Invalid source_node_id")?;
let target_id: i64 = target_id_str.parse().context("Invalid target_node_id")?;
// Query source node
let source_node: Option<(String, String, String, Value)> = sqlx::query_as(&format!(
"SELECT node_type, external_id, label, properties FROM {} WHERE id = $1",
nodes_table
))
.bind(source_id)
.fetch_optional(&mut *tx)
.await?;
// Query target node
let target_node: Option<(String, String, String, Value)> = sqlx::query_as(&format!(
"SELECT node_type, external_id, label, properties FROM {} WHERE id = $1",
nodes_table
))
.bind(target_id)
.fetch_optional(&mut *tx)
.await?;
if source_node.is_none() || target_node.is_none() {
warn!("Rule2: Missing node for edge {}", edge_id);
continue;
}
let (src_type, src_ext_id, src_label, _src_props) = source_node.unwrap();
let (tgt_type, tgt_ext_id, tgt_label, tgt_props) = target_node.unwrap();
// Resolve identity names for face_trace nodes (inline)
let src_identity: Option<String> = if src_type == "face_trace" {
let trace_id: i32 = src_ext_id
.replace("trace_", "")
.parse()
.context("Invalid trace_id")?;
sqlx::query_scalar(&format!(
"SELECT i.name FROM {} fd \
JOIN {} i ON i.id = fd.identity_id \
WHERE fd.file_uuid = $1 AND fd.trace_id = $2 AND fd.identity_id IS NOT NULL \
LIMIT 1",
fd_table, id_table
))
.bind(file_uuid)
.bind(trace_id)
.fetch_optional(&mut *tx)
.await?
} else {
None
};
let tgt_identity: Option<String> = if tgt_type == "face_trace" {
let trace_id: i32 = tgt_ext_id
.replace("trace_", "")
.parse()
.context("Invalid trace_id")?;
sqlx::query_scalar(&format!(
"SELECT i.name FROM {} fd \
JOIN {} i ON i.id = fd.identity_id \
WHERE fd.file_uuid = $1 AND fd.trace_id = $2 AND fd.identity_id IS NOT NULL \
LIMIT 1",
fd_table, id_table
))
.bind(file_uuid)
.bind(trace_id)
.fetch_optional(&mut *tx)
.await?
} else {
None
};
// Extract time range from properties
let first_frame = properties
.get("first_frame")
.and_then(|v| v.as_i64())
.unwrap_or(0);
let last_frame = properties
.get("last_frame")
.or_else(|| properties.get("end_frame"))
.and_then(|v| v.as_i64())
.unwrap_or(first_frame);
let frame_count = properties
.get("frame_count")
.or_else(|| properties.get("gaze_frame_count"))
.and_then(|v| v.as_i64())
.unwrap_or(last_frame - first_frame);
// Build context for description
let context = serde_json::json!({
"edge_type": edge_type,
"source_node": {
"node_type": src_type,
"external_id": src_ext_id,
"label": src_label,
"identity_name": src_identity,
},
"target_node": {
"node_type": tgt_type,
"external_id": tgt_ext_id,
"label": tgt_label,
"identity_name": tgt_identity,
"properties": tgt_props,
},
"properties": {
"first_frame": first_frame,
"last_frame": last_frame,
"frame_count": frame_count,
"confidence": properties.get("confidence").and_then(|v| v.as_f64()),
}
});
// Generate text_content (template-based)
let text_content = generate_description(&context);
// Build chunk_id
let chunk_id = format!("rel_{}", edge_id);
// Build content JSON
let content = serde_json::json!({
"edge_type": edge_type,
"edge_id": edge_id,
"source_node": {
"id": source_id,
"node_type": src_type,
"external_id": src_ext_id,
"label": src_label,
"identity_name": src_identity,
},
"target_node": {
"id": target_id,
"node_type": tgt_type,
"external_id": tgt_ext_id,
"label": tgt_label,
"identity_name": tgt_identity,
},
"properties": properties,
});
// Build metadata
let metadata = serde_json::json!({
"source_type": src_type,
"target_type": tgt_type,
"has_identity": src_identity.is_some() || tgt_identity.is_some(),
});
// Insert chunk
let start_time = first_frame as f64 / fps;
let end_time = last_frame as f64 / fps;
sqlx::query(&format!(
"INSERT INTO {} (file_uuid, chunk_id, chunk_type, \
start_frame, end_frame, fps, start_time, end_time, \
text_content, content, metadata) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) \
ON CONFLICT (file_uuid, chunk_id) DO NOTHING",
chunk_table
))
.bind(file_uuid)
.bind(&chunk_id)
.bind("relationship")
.bind(first_frame)
.bind(last_frame)
.bind(fps)
.bind(start_time)
.bind(end_time)
.bind(&text_content)
.bind(&content)
.bind(&metadata)
.execute(&mut *tx)
.await?;
count += 1;
}
}
tx.commit().await?;
info!(
"Rule2: {} relationship chunks created for file_uuid={}",
count, file_uuid
);
Ok(count)
}
/// Generate natural language description for a relationship (template-based).
fn generate_description(context: &Value) -> String {
let edge_type = context.get("edge_type").and_then(|v| v.as_str()).unwrap_or("");
let src = context.get("source_node").unwrap();
let tgt = context.get("target_node").unwrap();
let props = context.get("properties").unwrap();
let src_identity = src.get("identity_name").and_then(|v| v.as_str());
let tgt_identity = tgt.get("identity_name").and_then(|v| v.as_str());
let src_ext_id = src.get("external_id").and_then(|v| v.as_str()).unwrap_or("");
let tgt_ext_id = tgt.get("external_id").and_then(|v| v.as_str()).unwrap_or("");
let first_frame = props.get("first_frame").and_then(|v| v.as_i64()).unwrap_or(0);
let last_frame = props.get("last_frame").and_then(|v| v.as_i64()).unwrap_or(first_frame);
let frame_count = props.get("frame_count").and_then(|v| v.as_i64()).unwrap_or(0);
let src_display = src_identity.unwrap_or(src_ext_id);
let tgt_display = tgt_identity.unwrap_or(tgt_ext_id);
match edge_type {
"SPEAKS_AS" => {
format!(
"SPEAKER {}{} 的身份說話,從 frame {} 到 frame {}",
src_ext_id, tgt_display, first_frame, last_frame
)
}
"MUTUAL_GAZE" => {
format!(
"{}{} 互相看對方 {} 幀,起始於 frame {}",
src_display, tgt_display, frame_count, first_frame
)
}
"CO_OCCURS_WITH" => {
// Check if both nodes are face_trace (face-face co-occurrence)
let src_type = src.get("node_type").and_then(|v| v.as_str()).unwrap_or("");
let tgt_type = tgt.get("node_type").and_then(|v| v.as_str()).unwrap_or("");
if src_type == "face_trace" && tgt_type == "face_trace" {
format!(
"{}{} 同框 {} 幀,從 frame {} 到 frame {}",
src_display, tgt_display, frame_count, first_frame, last_frame
)
} else {
format!(
"{}{} 在同一畫面出現",
src_display, tgt_display
)
}
}
"HAS_APPEARANCE" => {
let tgt_props = tgt.get("properties").unwrap();
let upper_color = tgt_props
.get("color_features")
.and_then(|c| c.get("dominant_colors"))
.and_then(|d| d.as_array())
.and_then(|arr| arr.first())
.and_then(|c| c.as_str());
format!(
"{} 穿著 {} 上衣",
src_display,
upper_color.unwrap_or("未知顏色")
)
}
"WEARS" => {
let tgt_props = tgt.get("properties").unwrap();
let accessory_type = tgt_props.get("type").and_then(|t| t.as_str());
let confidence = tgt_props.get("confidence").and_then(|c| c.as_f64());
format!(
"{}{},信心值 {:.2}",
src_display,
accessory_type.unwrap_or("配件"),
confidence.unwrap_or(0.0)
)
}
_ => {
format!(
"{}{}{} 關係frame {}-{}",
src_display, tgt_display, edge_type, first_frame, last_frame
)
}
}
}

View File

@@ -9,6 +9,7 @@ pub enum ChunkType {
Cut,
Trace,
Story,
Relationship,
}
impl ChunkType {
@@ -19,6 +20,7 @@ impl ChunkType {
ChunkType::Cut => "cut",
ChunkType::Trace => "trace",
ChunkType::Story => "story",
ChunkType::Relationship => "relationship",
}
}
}

View File

@@ -0,0 +1,488 @@
use anyhow::{Context, Result};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
pub struct FaceEmbeddingDb {
client: Client,
base_url: String,
api_key: String,
collection_name: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FaceEmbeddingPayload {
pub file_uuid: String,
pub trace_id: i32,
pub frame: i64,
pub bbox_x: f64,
pub bbox_y: f64,
pub bbox_w: f64,
pub bbox_h: f64,
pub confidence: f64,
pub yaw: f64,
pub pitch: f64,
pub roll: f64,
}
#[derive(Debug, Clone, Deserialize)]
pub struct FaceEmbeddingPoint {
pub id: String,
pub vector: Vec<f32>,
pub payload: FaceEmbeddingPayload,
pub score: f64,
}
impl FaceEmbeddingDb {
pub fn new() -> Self {
let schema = std::env::var("DATABASE_SCHEMA").unwrap_or_else(|_| "dev".to_string());
let collection_name = format!("{}_face_embeddings", schema);
let base_url =
std::env::var("QDRANT_URL").unwrap_or_else(|_| "http://localhost:6333".to_string());
let api_key = std::env::var("QDRANT_API_KEY")
.unwrap_or_else(|_| "Test3200Test3200Test3200".to_string());
Self {
client: Client::new(),
base_url,
api_key,
collection_name,
}
}
pub async fn init_collection(&self) -> Result<()> {
let url = format!("{}/collections/{}", self.base_url, self.collection_name);
let response = self
.client
.get(&url)
.header("api-key", &self.api_key)
.send()
.await?;
if response.status().is_success() {
tracing::info!("[FaceEmbedding] Collection {} already exists", self.collection_name);
return Ok(());
}
let create_url = format!("{}/collections/{}", self.base_url, self.collection_name);
let body = serde_json::json!({
"vectors": {
"size": 512,
"distance": "Cosine"
}
});
self.client
.put(&create_url)
.header("api-key", &self.api_key)
.header("Content-Type", "application/json")
.json(&body)
.send()
.await
.context("Failed to create face embeddings collection")?;
tracing::info!("[FaceEmbedding] Created collection {} (dim=512)", self.collection_name);
Ok(())
}
pub async fn upsert_embedding(
&self,
point_id: &str,
embedding: &[f32],
payload: &FaceEmbeddingPayload,
) -> Result<()> {
let url = format!(
"{}/collections/{}/points?wait=true",
self.base_url, self.collection_name
);
let body = serde_json::json!({
"points": [{
"id": point_id,
"vector": embedding,
"payload": payload
}]
});
let response = self
.client
.put(&url)
.header("api-key", &self.api_key)
.header("Content-Type", "application/json")
.json(&body)
.send()
.await
.context("Failed to upsert face embedding")?;
if !response.status().is_success() {
let text = response.text().await.unwrap_or_default();
anyhow::bail!("Qdrant upsert failed: {}", text);
}
Ok(())
}
pub async fn batch_upsert(
&self,
points: Vec<(String, Vec<f32>, FaceEmbeddingPayload)>,
) -> Result<usize> {
if points.is_empty() {
return Ok(0);
}
let url = format!(
"{}/collections/{}/points?wait=true",
self.base_url, self.collection_name
);
let body = serde_json::json!({
"points": points.iter().map(|(id, vec, payload)| {
// Parse id as u64 for Qdrant (requires integer or UUID)
let id_num: u64 = id.parse().unwrap_or(0);
serde_json::json!({
"id": id_num,
"vector": vec,
"payload": payload
})
}).collect::<Vec<_>>()
});
let response = self
.client
.put(&url)
.header("api-key", &self.api_key)
.header("Content-Type", "application/json")
.json(&body)
.send()
.await
.context("Failed to batch upsert face embeddings")?;
if !response.status().is_success() {
let text = response.text().await.unwrap_or_default();
anyhow::bail!("Qdrant batch upsert failed: {}", text);
}
Ok(points.len())
}
pub async fn search_similar(
&self,
query_embedding: &[f32],
file_uuid: Option<&str>,
limit: usize,
threshold: f64,
) -> Result<Vec<FaceEmbeddingPoint>> {
let url = format!(
"{}/collections/{}/points/search",
self.base_url, self.collection_name
);
let mut filter = serde_json::json!({});
if let Some(fu) = file_uuid {
filter = serde_json::json!({
"must": [{
"key": "file_uuid",
"match": { "value": fu }
}]
});
}
let body = serde_json::json!({
"vector": query_embedding,
"limit": limit,
"with_payload": true,
"with_vector": false,
"filter": filter
});
let response = self
.client
.post(&url)
.header("api-key", &self.api_key)
.header("Content-Type", "application/json")
.json(&body)
.send()
.await
.context("Failed to search face embeddings")?;
let status = response.status();
let text = response.text().await.unwrap_or_default();
if !status.is_success() {
anyhow::bail!("Qdrant search failed: {} - {}", status, text);
}
#[derive(Deserialize)]
struct SearchResult {
result: Vec<PointResult>,
}
#[derive(Deserialize)]
struct PointResult {
id: serde_json::Value,
score: f64,
payload: HashMap<String, serde_json::Value>,
}
let parsed: SearchResult = serde_json::from_str(&text)
.context("Failed to parse Qdrant search response")?;
let results: Vec<FaceEmbeddingPoint> = parsed
.result
.into_iter()
.filter(|r| r.score >= threshold)
.map(|r| {
let id = match r.id {
serde_json::Value::String(s) => s,
serde_json::Value::Number(n) => n.to_string(),
_ => "unknown".to_string(),
};
let payload = FaceEmbeddingPayload {
file_uuid: r.payload.get("file_uuid")
.and_then(|v| v.as_str()).unwrap_or("").to_string(),
trace_id: r.payload.get("trace_id")
.and_then(|v| v.as_i64()).unwrap_or(0) as i32,
frame: r.payload.get("frame")
.and_then(|v| v.as_i64()).unwrap_or(0),
bbox_x: r.payload.get("bbox_x")
.and_then(|v| v.as_f64()).unwrap_or(0.0),
bbox_y: r.payload.get("bbox_y")
.and_then(|v| v.as_f64()).unwrap_or(0.0),
bbox_w: r.payload.get("bbox_w")
.and_then(|v| v.as_f64()).unwrap_or(0.0),
bbox_h: r.payload.get("bbox_h")
.and_then(|v| v.as_f64()).unwrap_or(0.0),
confidence: r.payload.get("confidence")
.and_then(|v| v.as_f64()).unwrap_or(0.0),
yaw: r.payload.get("yaw")
.and_then(|v| v.as_f64()).unwrap_or(0.0),
pitch: r.payload.get("pitch")
.and_then(|v| v.as_f64()).unwrap_or(0.0),
roll: r.payload.get("roll")
.and_then(|v| v.as_f64()).unwrap_or(0.0),
};
FaceEmbeddingPoint {
id,
vector: vec![], // Not returned with_vector=false
payload,
score: r.score,
}
})
.collect();
Ok(results)
}
pub async fn get_embeddings_by_trace(
&self,
file_uuid: &str,
trace_id: i32,
) -> Result<Vec<(String, Vec<f32>)>> {
let url = format!(
"{}/collections/{}/points/scroll",
self.base_url, self.collection_name
);
let body = serde_json::json!({
"limit": 1000,
"with_payload": true,
"with_vector": true,
"filter": {
"must": [
{"key": "file_uuid", "match": { "value": file_uuid }},
{"key": "trace_id", "match": { "value": trace_id }}
]
}
});
let response = self
.client
.post(&url)
.header("api-key", &self.api_key)
.header("Content-Type", "application/json")
.json(&body)
.send()
.await
.context("Failed to scroll face embeddings")?;
let status = response.status();
let text = response.text().await.unwrap_or_default();
if !status.is_success() {
anyhow::bail!("Qdrant scroll failed: {} - {}", status, text);
}
#[derive(Deserialize)]
struct ScrollResult {
result: ScrollPoints,
}
#[derive(Deserialize)]
struct ScrollPoints {
points: Vec<PointResult>,
}
#[derive(Deserialize)]
struct PointResult {
id: serde_json::Value,
vector: Vec<f32>,
}
let parsed: ScrollResult = serde_json::from_str(&text)
.context("Failed to parse Qdrant scroll response")?;
let results: Vec<(String, Vec<f32>)> = parsed
.result
.points
.into_iter()
.map(|r| {
let id = match r.id {
serde_json::Value::String(s) => s,
serde_json::Value::Number(n) => n.to_string(),
_ => "unknown".to_string(),
};
(id, r.vector)
})
.collect();
Ok(results)
}
pub async fn get_all_embeddings_for_file(
&self,
file_uuid: &str,
) -> Result<Vec<(String, Vec<f32>, FaceEmbeddingPayload)>> {
let url = format!(
"{}/collections/{}/points/scroll",
self.base_url, self.collection_name
);
let body = serde_json::json!({
"limit": 10000,
"with_payload": true,
"with_vector": true,
"filter": {
"must": [
{"key": "file_uuid", "match": { "value": file_uuid }}
]
}
});
let response = self
.client
.post(&url)
.header("api-key", &self.api_key)
.header("Content-Type", "application/json")
.json(&body)
.send()
.await
.context("Failed to scroll face embeddings")?;
let status = response.status();
let text = response.text().await.unwrap_or_default();
if !status.is_success() {
anyhow::bail!("Qdrant scroll failed: {} - {}", status, text);
}
#[derive(Deserialize)]
struct ScrollResult {
result: ScrollPoints,
}
#[derive(Deserialize)]
struct ScrollPoints {
points: Vec<PointResult>,
}
#[derive(Deserialize)]
struct PointResult {
id: serde_json::Value,
vector: Vec<f32>,
payload: HashMap<String, serde_json::Value>,
}
let parsed: ScrollResult = serde_json::from_str(&text)
.context("Failed to parse Qdrant scroll response")?;
let results: Vec<(String, Vec<f32>, FaceEmbeddingPayload)> = parsed
.result
.points
.into_iter()
.map(|r| {
let id = match r.id {
serde_json::Value::String(s) => s,
serde_json::Value::Number(n) => n.to_string(),
_ => "unknown".to_string(),
};
let payload = FaceEmbeddingPayload {
file_uuid: r.payload.get("file_uuid")
.and_then(|v| v.as_str()).unwrap_or("").to_string(),
trace_id: r.payload.get("trace_id")
.and_then(|v| v.as_i64()).unwrap_or(0) as i32,
frame: r.payload.get("frame")
.and_then(|v| v.as_i64()).unwrap_or(0),
bbox_x: r.payload.get("bbox_x")
.and_then(|v| v.as_f64()).unwrap_or(0.0),
bbox_y: r.payload.get("bbox_y")
.and_then(|v| v.as_f64()).unwrap_or(0.0),
bbox_w: r.payload.get("bbox_w")
.and_then(|v| v.as_f64()).unwrap_or(0.0),
bbox_h: r.payload.get("bbox_h")
.and_then(|v| v.as_f64()).unwrap_or(0.0),
confidence: r.payload.get("confidence")
.and_then(|v| v.as_f64()).unwrap_or(0.0),
yaw: r.payload.get("yaw")
.and_then(|v| v.as_f64()).unwrap_or(0.0),
pitch: r.payload.get("pitch")
.and_then(|v| v.as_f64()).unwrap_or(0.0),
roll: r.payload.get("roll")
.and_then(|v| v.as_f64()).unwrap_or(0.0),
};
(id, r.vector, payload)
})
.collect();
Ok(results)
}
pub async fn delete_file_embeddings(&self, file_uuid: &str) -> Result<usize> {
let url = format!(
"{}/collections/{}/points/delete?wait=true",
self.base_url, self.collection_name
);
let body = serde_json::json!({
"filter": {
"must": [
{"key": "file_uuid", "match": { "value": file_uuid }}
]
}
});
let response = self
.client
.post(&url)
.header("api-key", &self.api_key)
.header("Content-Type", "application/json")
.json(&body)
.send()
.await
.context("Failed to delete face embeddings")?;
if !response.status().is_success() {
let text = response.text().await.unwrap_or_default();
anyhow::bail!("Qdrant delete failed: {}", text);
}
Ok(0)
}
}
impl Default for FaceEmbeddingDb {
fn default() -> Self {
Self::new()
}
}

View File

@@ -32,12 +32,14 @@ pub trait VectorStore: Send + Sync {
async fn search(&self, query_vector: &[f32], limit: usize) -> Result<Vec<SearchResult>>;
}
pub mod face_embedding_db;
pub mod identity_merge_history;
pub mod mongodb_db;
pub mod postgres_db;
pub mod qdrant_db;
pub mod redis_client;
pub mod redis_db;
pub use face_embedding_db::{FaceEmbeddingDb, FaceEmbeddingPayload, FaceEmbeddingPoint};
pub use identity_merge_history::{
AliasEntry, FacesTransferred, IdentityMergeHistory, IdentityMergeHistoryStore,
IdentitySnapshot, MergeHistoryEntry, MergeHistoryQuery, MergeParams, TargetIdentitySnapshot,
@@ -56,3 +58,10 @@ pub use redis_client::{
ProgressMessage, RedisClient,
};
pub use redis_db::RedisDb;
pub mod qdrant_workspace;
pub mod workspace_sqlite;
pub use qdrant_workspace::{QdrantWorkspace, ScrolledPoint, WorkspaceScrollResult};
pub use workspace_sqlite::{
workspace_path, ChunkRow, FaceDetectionBatchItem, FaceDetectionRow, PreChunkRow,
ProcessorResultRow, SpeakerDetectionBatchItem, SpeakerDetectionRow, WorkspaceDb,
};

File diff suppressed because it is too large Load Diff