feat: Phase 2-3 TKG-only architecture

Phase 2.1: build_face_trace_nodes_from_qdrant()
- Read trace_id, frame, bbox directly from Qdrant payload
- No dependency on face_detections table

Phase 2.3: Rule2 queries TKG nodes
- identity resolution from tkg_nodes.properties.identity_id
- TKG-only architecture (Phase 2.3)

Phase 3: Identity Agent updates TKG nodes
- match_faces_iterative() updates tkg_nodes.properties
- bind_identity_trace() syncs identity_id to TKG
- unbind_identity() removes identity_id from TKG

Test results:
- 23 face_trace nodes from Qdrant (Phase 2.1)
- 75 relationship chunks (Rule2)
- TKG rebuild: Phase0 → Phase1 → Phase2
This commit is contained in:
Accusys
2026-06-21 01:30:04 +08:00
parent 2f2ccc94f7
commit 23c440104b
5 changed files with 305 additions and 23 deletions

View File

@@ -0,0 +1,69 @@
---
title: TKG Phase 2-3 Progress Report
version: 1.0
date: 2026-06-21
author: OpenCode
status: In Progress
---
## Goal
- Complete TKG-only architecture migration: Phase 0-4 for Rule 2 relationship chunks and Identity Agent Qdrant integration
## Constraints & Preferences
- Rule 2 chunk_type: `"relationship"` (not `"visual"`)
- Rule 2 edge types match TKG storage: `SPEAKS_AS`, `MUTUAL_GAZE`, `CO_OCCURS_WITH`, `HAS_APPEARANCE`, `WEARS`
- Rule 2 each edge = one chunk (not aggregated)
- Qdrant face embeddings: dim=512, Cosine distance, collection `{schema}_face_embeddings`
- Phase approach: Phase 0 (populate), Phase 1 (Qdrant), Phase 2 (TKG-only), Phase 3 (Identity), Phase 4 (deprecate)
## Progress
### Done
- **Phase 0**: TKG builder populate face_detections from face.json via `store_traced_faces.py`
- **Phase 1.1**: Create `dev_face_embeddings` Qdrant collection (dim=512)
- **Phase 1.2**: `FaceEmbeddingDb` module with `init_collection`, `batch_upsert`, `search_similar`, `get_all_embeddings_for_file`
- **Phase 1.3**: TKG builder stores 1122 embeddings to Qdrant with pose metadata
- **Phase 1.4**: Identity Agent `match_faces_iterative()` queries Qdrant (fallback to PG)
- **Phase 2.1**: `build_face_trace_nodes_from_qdrant()` reads Qdrant payload (no face_detections dependency)
- **Phase 2.3**: Rule2 queries `tkg_nodes.properties.identity_id` (TKG-only)
- **Phase 3**: Identity Agent (Qdrant + PG) updates `tkg_nodes.properties` when binding
- **Rule 2**: 75 relationship chunks created + vectorized (tested)
- **Rule 2 API**: `POST /api/v1/file/:file_uuid/rule2` with auto-vectorize, triggers on TKG rebuild
- **Identity binding**: `bind_identity_trace()` and `unbind_identity()` update TKG nodes (Phase 2.3)
- **TKG builder**: `populate_face_detections_from_face_json()` and `populate_face_embeddings_to_qdrant()`
### Pending
- **Phase 4**: Deprecate face_detections table (await all Phase 2-3 verified in production)
## Key Decisions
- TKG builder fixed for `pose_angle` format (was expecting `pose` with `bbox` sub-object)
- Edge types corrected: TKG stores `CO_OCCURS_WITH` (not `co_occurs`), `SPEAKS_AS` (not `speaker_face`)
- Qdrant point IDs must be numeric or UUID (not string like `"file_uuid-frame"`)
- Identity Agent dual-source: Qdrant first, PostgreSQL fallback
- Phase 0 checks `trace_id IS NOT NULL` before calling `store_traced_faces.py`
- Phase 2.1: Qdrant payload contains `trace_id`, `frame`, `bbox_x/y/w/h`, `pose`, no PG query needed
- Phase 2.3: TKG nodes store `identity_id` and `identity_name` in properties JSON
- Phase 3: Identity Agent updates both `face_detections.identity_id` AND `tkg_nodes.properties`
## Test Results
- 1122 face embeddings in Qdrant (`dev_face_embeddings` collection)
- 75 relationship chunks from Rule2
- 23 face_trace_nodes built from Qdrant (Phase 2.1)
- Rule2 still works after TKG-only migration (Phase 2.3)
## Next Steps
- Phase 4: Verify all systems work without face_detections dependency
- Phase 4: Document face_detections deprecation plan
## Commits
- `2f2ccc94` (Phase 1.4)
- `3ad6f874` (Rule2 + Phase 0-1)
- (pending) Phase 2-3 changes
## Relevant Files
- `src/core/db/face_embedding_db.rs`: **New** — FaceEmbeddingDb, FaceEmbeddingPayload, FaceEmbeddingPoint
- `src/core/db/mod.rs`: updated — `pub mod face_embedding_db; pub use FaceEmbeddingDb;`
- `src/core/processor/tkg.rs`: updated — Phase 2.1 `build_face_trace_nodes_from_qdrant()`
- `src/core/chunk/rule2_ingest.rs`: updated — Phase 2.3 TKG-only identity query
- `src/api/identity_binding.rs`: updated — Phase 2.3 TKG node update on bind/unbind
- `src/api/identity_agent_api.rs`: updated — Phase 3 TKG node update on match
- `docs_v1.0/DESIGN/RULE2_TKG_RELATIONSHIP_V1.0.md`: Rule 2 design spec

View File

@@ -751,13 +751,26 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::
round += 1; round += 1;
} }
// Update face_detections.identity_id // Update face_detections.identity_id AND tkg_nodes.properties (Phase 3)
let fd_table = schema::table_name("face_detections"); let fd_table = schema::table_name("face_detections");
let nodes_table = schema::table_name("tkg_nodes");
let id_table = schema::table_name("identities");
let identities_map: HashMap<String, i32> = tmdb_seeds let identities_map: HashMap<String, i32> = tmdb_seeds
.iter() .iter()
.map(|(id, name, _)| (name.clone(), *id)) .map(|(id, name, _)| (name.clone(), *id))
.collect(); .collect();
// Batch query identity names
let identity_names: HashMap<i32, String> = sqlx::query_as::<_, (i32, String)>(&format!(
"SELECT id, name FROM {} WHERE id = ANY($1)",
id_table
))
.bind(identities_map.values().collect::<Vec<_>>())
.fetch_all(pool)
.await?
.into_iter()
.collect();
let mut updated = 0usize; let mut updated = 0usize;
for (tid, name) in &matched { for (tid, name) in &matched {
let identity_id = identities_map.get(name); let identity_id = identities_map.get(name);
@@ -773,6 +786,23 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::
.await? .await?
.rows_affected(); .rows_affected();
updated += rows as usize; updated += rows as usize;
// Phase 3: Also update TKG node
let external_id = format!("trace_{}", tid);
let identity_name = identity_names.get(id);
let _ = sqlx::query(&format!(
"UPDATE {} SET properties = jsonb_set(\
jsonb_set(properties, '{{identity_id}}', $1::jsonb, false),\
'{{identity_name}}', $2::jsonb, false)\
WHERE file_uuid = $3 AND node_type = 'face_trace' AND external_id = $4",
nodes_table
))
.bind(*id)
.bind(identity_name.as_deref())
.bind(file_uuid)
.bind(&external_id)
.execute(pool)
.await;
} }
} }
@@ -878,10 +908,11 @@ async fn match_faces_iterative_pg(pool: &sqlx::PgPool, file_uuid: &str) -> anyho
matched.len() * 100 / total_traces matched.len() * 100 / total_traces
); );
// Step 5: 寫入 DB — Round 1 結果先存 // Step 5: 寫入 DB — Round 1 結果先存 (Phase 3: update both face_detections AND tkg_nodes)
let identities_table = schema::table_name("identities"); let identities_table = schema::table_name("identities");
let strangers_table = schema::table_name("strangers"); let strangers_table = schema::table_name("strangers");
let fd_table = schema::table_name("face_detections"); let fd_table = schema::table_name("face_detections");
let nodes_table = schema::table_name("tkg_nodes");
let mut updated = 0usize; let mut updated = 0usize;
for (tid, name) in &matched { for (tid, name) in &matched {
let id_opt = sqlx::query_scalar::<_, Option<i32>>(&format!( let id_opt = sqlx::query_scalar::<_, Option<i32>>(&format!(
@@ -901,6 +932,23 @@ async fn match_faces_iterative_pg(pool: &sqlx::PgPool, file_uuid: &str) -> anyho
.bind(tid) .bind(tid)
.execute(pool) .execute(pool)
.await; .await;
// Phase 3: Also update TKG node
let external_id = format!("trace_{}", tid);
let _ = sqlx::query(&format!(
"UPDATE {} SET properties = jsonb_set(\
jsonb_set(properties, '{{identity_id}}', $1::jsonb, false),\
'{{identity_name}}', $2::jsonb, false)\
WHERE file_uuid = $3 AND node_type = 'face_trace' AND external_id = $4",
nodes_table
))
.bind(identity_id)
.bind(name.as_str())
.bind(file_uuid)
.bind(&external_id)
.execute(pool)
.await;
updated += 1; updated += 1;
} }
} }

View File

@@ -225,6 +225,32 @@ pub async fn unbind_identity(
) )
})?; })?;
// Phase 2.3: Also update TKG node (find trace_id first)
let trace_id_opt: Option<i32> = sqlx::query_scalar(&format!(
"SELECT trace_id FROM {} WHERE file_uuid = $1 AND face_id = $2",
table
))
.bind(&req.file_uuid)
.bind(&req.face_id)
.fetch_optional(state.db.pool())
.await
.ok()
.flatten();
if let Some(trace_id) = trace_id_opt {
let nodes_table = crate::core::db::schema::table_name("tkg_nodes");
let external_id = format!("trace_{}", trace_id);
let _ = sqlx::query(&format!(
"UPDATE {} SET properties = properties - 'identity_id' - 'identity_name' \
WHERE file_uuid = $1 AND node_type = 'face_trace' AND external_id = $2",
nodes_table
))
.bind(&req.file_uuid)
.bind(&external_id)
.execute(state.db.pool())
.await;
}
// Record history if there was a binding // Record history if there was a binding
if let Some(identity_id) = old_identity_id { if let Some(identity_id) = old_identity_id {
// Clear bind redo stack // Clear bind redo stack
@@ -794,6 +820,33 @@ pub async fn bind_identity_trace(
) )
})?; })?;
// Phase 2.3: Also update TKG node properties
let nodes_table = crate::core::db::schema::table_name("tkg_nodes");
let external_id = format!("trace_{}", req.trace_id);
let identity_name: Option<String> = sqlx::query_scalar(&format!(
"SELECT name FROM {} WHERE id = $1",
id_table
))
.bind(identity_id)
.fetch_optional(state.db.pool())
.await
.ok()
.flatten();
let _ = sqlx::query(&format!(
"UPDATE {} SET properties = jsonb_set(\
jsonb_set(properties, '{{identity_id}}', $1::jsonb, false),\
'{{identity_name}}', $2::jsonb, false)\
WHERE file_uuid = $3 AND node_type = 'face_trace' AND external_id = $4",
nodes_table
))
.bind(identity_id)
.bind(identity_name.as_deref())
.bind(&req.file_uuid)
.bind(&external_id)
.execute(state.db.pool())
.await;
// Clear bind redo stack // Clear bind redo stack
let _ = sqlx::query(&format!( let _ = sqlx::query(&format!(
"DELETE FROM {} WHERE identity_id = $1 AND is_undone = true AND operation IN ('bind','unbind','bind_trace')", "DELETE FROM {} WHERE identity_id = $1 AND is_undone = true AND operation IN ('bind','unbind','bind_trace')",

View File

@@ -99,21 +99,16 @@ pub async fn ingest_rule2(pool: &PgPool, file_uuid: &str) -> Result<usize> {
let (src_type, src_ext_id, src_label, _src_props) = source_node.unwrap(); let (src_type, src_ext_id, src_label, _src_props) = source_node.unwrap();
let (tgt_type, tgt_ext_id, tgt_label, tgt_props) = target_node.unwrap(); let (tgt_type, tgt_ext_id, tgt_label, tgt_props) = target_node.unwrap();
// Resolve identity names for face_trace nodes (inline) // Resolve identity names for face_trace nodes (Phase 2.3: TKG-only)
let src_identity: Option<String> = if src_type == "face_trace" { let src_identity: Option<String> = if src_type == "face_trace" {
let trace_id: i32 = src_ext_id
.replace("trace_", "")
.parse()
.context("Invalid trace_id")?;
sqlx::query_scalar(&format!( sqlx::query_scalar(&format!(
"SELECT i.name FROM {} fd \ "SELECT i.name FROM {} n \
JOIN {} i ON i.id = fd.identity_id \ JOIN {} i ON i.id = (n.properties->>'identity_id')::bigint \
WHERE fd.file_uuid = $1 AND fd.trace_id = $2 AND fd.identity_id IS NOT NULL \ WHERE n.file_uuid = $1 AND n.node_type = 'face_trace' AND n.external_id = $2 AND n.properties->>'identity_id' IS NOT NULL",
LIMIT 1", nodes_table, id_table
fd_table, id_table
)) ))
.bind(file_uuid) .bind(file_uuid)
.bind(trace_id) .bind(&src_ext_id)
.fetch_optional(&mut *tx) .fetch_optional(&mut *tx)
.await? .await?
} else { } else {
@@ -121,19 +116,14 @@ pub async fn ingest_rule2(pool: &PgPool, file_uuid: &str) -> Result<usize> {
}; };
let tgt_identity: Option<String> = if tgt_type == "face_trace" { let tgt_identity: Option<String> = if tgt_type == "face_trace" {
let trace_id: i32 = tgt_ext_id
.replace("trace_", "")
.parse()
.context("Invalid trace_id")?;
sqlx::query_scalar(&format!( sqlx::query_scalar(&format!(
"SELECT i.name FROM {} fd \ "SELECT i.name FROM {} n \
JOIN {} i ON i.id = fd.identity_id \ JOIN {} i ON i.id = (n.properties->>'identity_id')::bigint \
WHERE fd.file_uuid = $1 AND fd.trace_id = $2 AND fd.identity_id IS NOT NULL \ WHERE n.file_uuid = $1 AND n.node_type = 'face_trace' AND n.external_id = $2 AND n.properties->>'identity_id' IS NOT NULL",
LIMIT 1", nodes_table, id_table
fd_table, id_table
)) ))
.bind(file_uuid) .bind(file_uuid)
.bind(trace_id) .bind(&tgt_ext_id)
.fetch_optional(&mut *tx) .fetch_optional(&mut *tx)
.await? .await?
} else { } else {

View File

@@ -549,6 +549,128 @@ async fn build_face_trace_nodes(
pool: &PgPool, pool: &PgPool,
file_uuid: &str, file_uuid: &str,
pose_data: &[FacePose], pose_data: &[FacePose],
) -> Result<usize> {
use crate::core::db::face_embedding_db::FaceEmbeddingDb;
// Try Qdrant first (Phase 2: TKG-only)
let face_db = FaceEmbeddingDb::new();
let qdrant_embeddings = face_db.get_all_embeddings_for_file(file_uuid).await?;
if !qdrant_embeddings.is_empty() {
tracing::info!("[TKG-Phase2] Building face_trace nodes from Qdrant ({} embeddings)", qdrant_embeddings.len());
return build_face_trace_nodes_from_qdrant(pool, file_uuid, pose_data, qdrant_embeddings).await;
}
// Fallback to PostgreSQL
tracing::info!("[TKG-Phase2] No Qdrant embeddings, falling back to PostgreSQL");
build_face_trace_nodes_from_pg(pool, file_uuid, pose_data).await
}
async fn build_face_trace_nodes_from_qdrant(
pool: &PgPool,
file_uuid: &str,
pose_data: &[FacePose],
qdrant_embeddings: Vec<(String, Vec<f32>, crate::core::db::face_embedding_db::FaceEmbeddingPayload)>,
) -> Result<usize> {
use crate::core::db::face_embedding_db::FaceEmbeddingPayload;
let nodes_table = t("tkg_nodes");
// Group by trace_id
let mut trace_frames: HashMap<i64, Vec<(i64, f64, f64, f64, f64)>> = HashMap::new();
for (_, _, payload) in &qdrant_embeddings {
trace_frames
.entry(payload.trace_id as i64)
.or_default()
.push((
payload.frame,
payload.bbox_x,
payload.bbox_y,
payload.bbox_w,
payload.bbox_h,
));
}
if trace_frames.is_empty() {
tracing::warn!("[TKG-Phase2] No trace data in Qdrant");
return Ok(0);
}
// Build aggregates
let mut count = 0;
for (tid, frames) in &trace_frames {
let external_id = format!("trace_{}", tid);
let label = format!("Face Trace {}", tid);
let frame_count = frames.len() as i64;
let start_f = frames.iter().map(|(f, _, _, _, _)| *f).min().unwrap_or(0);
let end_f = frames.iter().map(|(f, _, _, _, _)| *f).max().unwrap_or(0);
let avg_x = frames.iter().map(|(_, x, _, _, _)| *x).sum::<f64>() / frame_count as f64;
let avg_y = frames.iter().map(|(_, _, y, _, _)| *y).sum::<f64>() / frame_count as f64;
let avg_w = frames.iter().map(|(_, _, _, w, _)| *w).sum::<f64>() / frame_count as f64;
let avg_h = frames.iter().map(|(_, _, _, _, h)| *h).sum::<f64>() / frame_count as f64;
// Compute average pose
let mut yaw_sum = 0.0f64;
let mut pitch_sum = 0.0f64;
let mut roll_sum = 0.0f64;
let mut pose_count = 0i64;
for (frame, x, y, w, h) in frames {
if let Some((yaw, pitch, roll)) = get_pose_for_face(*frame, *x, *y, *w, *h, pose_data) {
yaw_sum += yaw;
pitch_sum += pitch;
roll_sum += roll;
pose_count += 1;
}
}
let (avg_yaw, avg_pitch, avg_roll) = if pose_count > 0 {
(yaw_sum / pose_count as f64, pitch_sum / pose_count as f64, roll_sum / pose_count as f64)
} else {
(0.0, 0.0, 0.0)
};
let props = serde_json::json!({
"trace_id": tid,
"frame_count": frame_count,
"first_frame": start_f,
"last_frame": end_f,
"avg_x": avg_x,
"avg_y": avg_y,
"avg_width": avg_w,
"avg_height": avg_h,
"pose_count": pose_count,
"avg_yaw": avg_yaw,
"avg_pitch": avg_pitch,
"avg_roll": avg_roll,
});
sqlx::query(&format!(
"INSERT INTO {} (file_uuid, node_type, external_id, label, properties) \
VALUES ($1, $2, $3, $4, $5::jsonb) \
ON CONFLICT (file_uuid, node_type, external_id) \
DO UPDATE SET properties = COALESCE(EXCLUDED.properties, tkg_nodes.properties)",
nodes_table
))
.bind(file_uuid)
.bind("face_trace")
.bind(&external_id)
.bind(&label)
.bind(serde_json::to_string(&props)?)
.execute(pool)
.await?;
count += 1;
}
tracing::info!("[TKG-Phase2] Built {} face_trace nodes from Qdrant", count);
Ok(count)
}
async fn build_face_trace_nodes_from_pg(
pool: &PgPool,
file_uuid: &str,
pose_data: &[FacePose],
) -> Result<usize> { ) -> Result<usize> {
let face_table = t("face_detections"); let face_table = t("face_detections");
let nodes_table = t("tkg_nodes"); let nodes_table = t("tkg_nodes");