From 23c440104bdc1fad83b86cdf9dbd3d6334218498 Mon Sep 17 00:00:00 2001 From: Accusys Date: Sun, 21 Jun 2026 01:30:04 +0800 Subject: [PATCH] feat: Phase 2-3 TKG-only architecture MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 2.1: build_face_trace_nodes_from_qdrant() - Read trace_id, frame, bbox directly from Qdrant payload - No dependency on face_detections table Phase 2.3: Rule2 queries TKG nodes - identity resolution from tkg_nodes.properties.identity_id - TKG-only architecture (Phase 2.3) Phase 3: Identity Agent updates TKG nodes - match_faces_iterative() updates tkg_nodes.properties - bind_identity_trace() syncs identity_id to TKG - unbind_identity() removes identity_id from TKG Test results: - 23 face_trace nodes from Qdrant (Phase 2.1) - 75 relationship chunks (Rule2) - TKG rebuild: Phase0 → Phase1 → Phase2 --- .../2026-06-21_tkg_phase2_progress.md | 69 ++++++++++ src/api/identity_agent_api.rs | 52 +++++++- src/api/identity_binding.rs | 53 ++++++++ src/core/chunk/rule2_ingest.rs | 32 ++--- src/core/processor/tkg.rs | 122 ++++++++++++++++++ 5 files changed, 305 insertions(+), 23 deletions(-) create mode 100644 docs_v1.0/M4_workspace/2026-06-21_tkg_phase2_progress.md diff --git a/docs_v1.0/M4_workspace/2026-06-21_tkg_phase2_progress.md b/docs_v1.0/M4_workspace/2026-06-21_tkg_phase2_progress.md new file mode 100644 index 0000000..074cc81 --- /dev/null +++ b/docs_v1.0/M4_workspace/2026-06-21_tkg_phase2_progress.md @@ -0,0 +1,69 @@ +--- +title: TKG Phase 2-3 Progress Report +version: 1.0 +date: 2026-06-21 +author: OpenCode +status: In Progress +--- + +## Goal +- Complete TKG-only architecture migration: Phase 0-4 for Rule 2 relationship chunks and Identity Agent Qdrant integration + +## Constraints & Preferences +- Rule 2 chunk_type: `"relationship"` (not `"visual"`) +- Rule 2 edge types match TKG storage: `SPEAKS_AS`, `MUTUAL_GAZE`, `CO_OCCURS_WITH`, `HAS_APPEARANCE`, `WEARS` +- Rule 2 each edge = one chunk (not aggregated) +- Qdrant face embeddings: dim=512, Cosine distance, collection `{schema}_face_embeddings` +- Phase approach: Phase 0 (populate), Phase 1 (Qdrant), Phase 2 (TKG-only), Phase 3 (Identity), Phase 4 (deprecate) + +## Progress +### Done +- **Phase 0**: TKG builder populate face_detections from face.json via `store_traced_faces.py` +- **Phase 1.1**: Create `dev_face_embeddings` Qdrant collection (dim=512) +- **Phase 1.2**: `FaceEmbeddingDb` module with `init_collection`, `batch_upsert`, `search_similar`, `get_all_embeddings_for_file` +- **Phase 1.3**: TKG builder stores 1122 embeddings to Qdrant with pose metadata +- **Phase 1.4**: Identity Agent `match_faces_iterative()` queries Qdrant (fallback to PG) +- **Phase 2.1**: `build_face_trace_nodes_from_qdrant()` reads Qdrant payload (no face_detections dependency) +- **Phase 2.3**: Rule2 queries `tkg_nodes.properties.identity_id` (TKG-only) +- **Phase 3**: Identity Agent (Qdrant + PG) updates `tkg_nodes.properties` when binding +- **Rule 2**: 75 relationship chunks created + vectorized (tested) +- **Rule 2 API**: `POST /api/v1/file/:file_uuid/rule2` with auto-vectorize, triggers on TKG rebuild +- **Identity binding**: `bind_identity_trace()` and `unbind_identity()` update TKG nodes (Phase 2.3) +- **TKG builder**: `populate_face_detections_from_face_json()` and `populate_face_embeddings_to_qdrant()` + +### Pending +- **Phase 4**: Deprecate face_detections table (await all Phase 2-3 verified in production) + +## Key Decisions +- TKG builder fixed for `pose_angle` format (was expecting `pose` with `bbox` sub-object) +- Edge types corrected: TKG stores `CO_OCCURS_WITH` (not `co_occurs`), `SPEAKS_AS` (not `speaker_face`) +- Qdrant point IDs must be numeric or UUID (not string like `"file_uuid-frame"`) +- Identity Agent dual-source: Qdrant first, PostgreSQL fallback +- Phase 0 checks `trace_id IS NOT NULL` before calling `store_traced_faces.py` +- Phase 2.1: Qdrant payload contains `trace_id`, `frame`, `bbox_x/y/w/h`, `pose`, no PG query needed +- Phase 2.3: TKG nodes store `identity_id` and `identity_name` in properties JSON +- Phase 3: Identity Agent updates both `face_detections.identity_id` AND `tkg_nodes.properties` + +## Test Results +- 1122 face embeddings in Qdrant (`dev_face_embeddings` collection) +- 75 relationship chunks from Rule2 +- 23 face_trace_nodes built from Qdrant (Phase 2.1) +- Rule2 still works after TKG-only migration (Phase 2.3) + +## Next Steps +- Phase 4: Verify all systems work without face_detections dependency +- Phase 4: Document face_detections deprecation plan + +## Commits +- `2f2ccc94` (Phase 1.4) +- `3ad6f874` (Rule2 + Phase 0-1) +- (pending) Phase 2-3 changes + +## Relevant Files +- `src/core/db/face_embedding_db.rs`: **New** — FaceEmbeddingDb, FaceEmbeddingPayload, FaceEmbeddingPoint +- `src/core/db/mod.rs`: updated — `pub mod face_embedding_db; pub use FaceEmbeddingDb;` +- `src/core/processor/tkg.rs`: updated — Phase 2.1 `build_face_trace_nodes_from_qdrant()` +- `src/core/chunk/rule2_ingest.rs`: updated — Phase 2.3 TKG-only identity query +- `src/api/identity_binding.rs`: updated — Phase 2.3 TKG node update on bind/unbind +- `src/api/identity_agent_api.rs`: updated — Phase 3 TKG node update on match +- `docs_v1.0/DESIGN/RULE2_TKG_RELATIONSHIP_V1.0.md`: Rule 2 design spec \ No newline at end of file diff --git a/src/api/identity_agent_api.rs b/src/api/identity_agent_api.rs index 7236d60..4854ce7 100644 --- a/src/api/identity_agent_api.rs +++ b/src/api/identity_agent_api.rs @@ -751,13 +751,26 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow:: round += 1; } - // Update face_detections.identity_id + // Update face_detections.identity_id AND tkg_nodes.properties (Phase 3) let fd_table = schema::table_name("face_detections"); + let nodes_table = schema::table_name("tkg_nodes"); + let id_table = schema::table_name("identities"); let identities_map: HashMap = tmdb_seeds .iter() .map(|(id, name, _)| (name.clone(), *id)) .collect(); + // Batch query identity names + let identity_names: HashMap = sqlx::query_as::<_, (i32, String)>(&format!( + "SELECT id, name FROM {} WHERE id = ANY($1)", + id_table + )) + .bind(identities_map.values().collect::>()) + .fetch_all(pool) + .await? + .into_iter() + .collect(); + let mut updated = 0usize; for (tid, name) in &matched { let identity_id = identities_map.get(name); @@ -773,6 +786,23 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow:: .await? .rows_affected(); updated += rows as usize; + + // Phase 3: Also update TKG node + let external_id = format!("trace_{}", tid); + let identity_name = identity_names.get(id); + let _ = sqlx::query(&format!( + "UPDATE {} SET properties = jsonb_set(\ + jsonb_set(properties, '{{identity_id}}', $1::jsonb, false),\ + '{{identity_name}}', $2::jsonb, false)\ + WHERE file_uuid = $3 AND node_type = 'face_trace' AND external_id = $4", + nodes_table + )) + .bind(*id) + .bind(identity_name.as_deref()) + .bind(file_uuid) + .bind(&external_id) + .execute(pool) + .await; } } @@ -878,10 +908,11 @@ async fn match_faces_iterative_pg(pool: &sqlx::PgPool, file_uuid: &str) -> anyho matched.len() * 100 / total_traces ); - // Step 5: 寫入 DB — Round 1 結果先存 + // Step 5: 寫入 DB — Round 1 結果先存 (Phase 3: update both face_detections AND tkg_nodes) let identities_table = schema::table_name("identities"); let strangers_table = schema::table_name("strangers"); let fd_table = schema::table_name("face_detections"); + let nodes_table = schema::table_name("tkg_nodes"); let mut updated = 0usize; for (tid, name) in &matched { let id_opt = sqlx::query_scalar::<_, Option>(&format!( @@ -901,6 +932,23 @@ async fn match_faces_iterative_pg(pool: &sqlx::PgPool, file_uuid: &str) -> anyho .bind(tid) .execute(pool) .await; + + // Phase 3: Also update TKG node + let external_id = format!("trace_{}", tid); + let _ = sqlx::query(&format!( + "UPDATE {} SET properties = jsonb_set(\ + jsonb_set(properties, '{{identity_id}}', $1::jsonb, false),\ + '{{identity_name}}', $2::jsonb, false)\ + WHERE file_uuid = $3 AND node_type = 'face_trace' AND external_id = $4", + nodes_table + )) + .bind(identity_id) + .bind(name.as_str()) + .bind(file_uuid) + .bind(&external_id) + .execute(pool) + .await; + updated += 1; } } diff --git a/src/api/identity_binding.rs b/src/api/identity_binding.rs index ed64be9..fd6bc11 100644 --- a/src/api/identity_binding.rs +++ b/src/api/identity_binding.rs @@ -225,6 +225,32 @@ pub async fn unbind_identity( ) })?; + // Phase 2.3: Also update TKG node (find trace_id first) + let trace_id_opt: Option = sqlx::query_scalar(&format!( + "SELECT trace_id FROM {} WHERE file_uuid = $1 AND face_id = $2", + table + )) + .bind(&req.file_uuid) + .bind(&req.face_id) + .fetch_optional(state.db.pool()) + .await + .ok() + .flatten(); + + if let Some(trace_id) = trace_id_opt { + let nodes_table = crate::core::db::schema::table_name("tkg_nodes"); + let external_id = format!("trace_{}", trace_id); + let _ = sqlx::query(&format!( + "UPDATE {} SET properties = properties - 'identity_id' - 'identity_name' \ + WHERE file_uuid = $1 AND node_type = 'face_trace' AND external_id = $2", + nodes_table + )) + .bind(&req.file_uuid) + .bind(&external_id) + .execute(state.db.pool()) + .await; + } + // Record history if there was a binding if let Some(identity_id) = old_identity_id { // Clear bind redo stack @@ -794,6 +820,33 @@ pub async fn bind_identity_trace( ) })?; +// Phase 2.3: Also update TKG node properties + let nodes_table = crate::core::db::schema::table_name("tkg_nodes"); + let external_id = format!("trace_{}", req.trace_id); + let identity_name: Option = sqlx::query_scalar(&format!( + "SELECT name FROM {} WHERE id = $1", + id_table + )) + .bind(identity_id) + .fetch_optional(state.db.pool()) + .await + .ok() + .flatten(); + + let _ = sqlx::query(&format!( + "UPDATE {} SET properties = jsonb_set(\ + jsonb_set(properties, '{{identity_id}}', $1::jsonb, false),\ + '{{identity_name}}', $2::jsonb, false)\ + WHERE file_uuid = $3 AND node_type = 'face_trace' AND external_id = $4", + nodes_table + )) + .bind(identity_id) + .bind(identity_name.as_deref()) + .bind(&req.file_uuid) + .bind(&external_id) + .execute(state.db.pool()) + .await; + // Clear bind redo stack let _ = sqlx::query(&format!( "DELETE FROM {} WHERE identity_id = $1 AND is_undone = true AND operation IN ('bind','unbind','bind_trace')", diff --git a/src/core/chunk/rule2_ingest.rs b/src/core/chunk/rule2_ingest.rs index ded93fc..69bbb5b 100644 --- a/src/core/chunk/rule2_ingest.rs +++ b/src/core/chunk/rule2_ingest.rs @@ -99,21 +99,16 @@ pub async fn ingest_rule2(pool: &PgPool, file_uuid: &str) -> Result { let (src_type, src_ext_id, src_label, _src_props) = source_node.unwrap(); let (tgt_type, tgt_ext_id, tgt_label, tgt_props) = target_node.unwrap(); - // Resolve identity names for face_trace nodes (inline) + // Resolve identity names for face_trace nodes (Phase 2.3: TKG-only) let src_identity: Option = if src_type == "face_trace" { - let trace_id: i32 = src_ext_id - .replace("trace_", "") - .parse() - .context("Invalid trace_id")?; sqlx::query_scalar(&format!( - "SELECT i.name FROM {} fd \ - JOIN {} i ON i.id = fd.identity_id \ - WHERE fd.file_uuid = $1 AND fd.trace_id = $2 AND fd.identity_id IS NOT NULL \ - LIMIT 1", - fd_table, id_table + "SELECT i.name FROM {} n \ + JOIN {} i ON i.id = (n.properties->>'identity_id')::bigint \ + WHERE n.file_uuid = $1 AND n.node_type = 'face_trace' AND n.external_id = $2 AND n.properties->>'identity_id' IS NOT NULL", + nodes_table, id_table )) .bind(file_uuid) - .bind(trace_id) + .bind(&src_ext_id) .fetch_optional(&mut *tx) .await? } else { @@ -121,19 +116,14 @@ pub async fn ingest_rule2(pool: &PgPool, file_uuid: &str) -> Result { }; let tgt_identity: Option = if tgt_type == "face_trace" { - let trace_id: i32 = tgt_ext_id - .replace("trace_", "") - .parse() - .context("Invalid trace_id")?; sqlx::query_scalar(&format!( - "SELECT i.name FROM {} fd \ - JOIN {} i ON i.id = fd.identity_id \ - WHERE fd.file_uuid = $1 AND fd.trace_id = $2 AND fd.identity_id IS NOT NULL \ - LIMIT 1", - fd_table, id_table + "SELECT i.name FROM {} n \ + JOIN {} i ON i.id = (n.properties->>'identity_id')::bigint \ + WHERE n.file_uuid = $1 AND n.node_type = 'face_trace' AND n.external_id = $2 AND n.properties->>'identity_id' IS NOT NULL", + nodes_table, id_table )) .bind(file_uuid) - .bind(trace_id) + .bind(&tgt_ext_id) .fetch_optional(&mut *tx) .await? } else { diff --git a/src/core/processor/tkg.rs b/src/core/processor/tkg.rs index cafdf03..b672cf5 100644 --- a/src/core/processor/tkg.rs +++ b/src/core/processor/tkg.rs @@ -549,6 +549,128 @@ async fn build_face_trace_nodes( pool: &PgPool, file_uuid: &str, pose_data: &[FacePose], +) -> Result { + use crate::core::db::face_embedding_db::FaceEmbeddingDb; + + // Try Qdrant first (Phase 2: TKG-only) + let face_db = FaceEmbeddingDb::new(); + let qdrant_embeddings = face_db.get_all_embeddings_for_file(file_uuid).await?; + + if !qdrant_embeddings.is_empty() { + tracing::info!("[TKG-Phase2] Building face_trace nodes from Qdrant ({} embeddings)", qdrant_embeddings.len()); + return build_face_trace_nodes_from_qdrant(pool, file_uuid, pose_data, qdrant_embeddings).await; + } + + // Fallback to PostgreSQL + tracing::info!("[TKG-Phase2] No Qdrant embeddings, falling back to PostgreSQL"); + build_face_trace_nodes_from_pg(pool, file_uuid, pose_data).await +} + +async fn build_face_trace_nodes_from_qdrant( + pool: &PgPool, + file_uuid: &str, + pose_data: &[FacePose], + qdrant_embeddings: Vec<(String, Vec, crate::core::db::face_embedding_db::FaceEmbeddingPayload)>, +) -> Result { + use crate::core::db::face_embedding_db::FaceEmbeddingPayload; + let nodes_table = t("tkg_nodes"); + + // Group by trace_id + let mut trace_frames: HashMap> = HashMap::new(); + for (_, _, payload) in &qdrant_embeddings { + trace_frames + .entry(payload.trace_id as i64) + .or_default() + .push(( + payload.frame, + payload.bbox_x, + payload.bbox_y, + payload.bbox_w, + payload.bbox_h, + )); + } + + if trace_frames.is_empty() { + tracing::warn!("[TKG-Phase2] No trace data in Qdrant"); + return Ok(0); + } + + // Build aggregates + let mut count = 0; + for (tid, frames) in &trace_frames { + let external_id = format!("trace_{}", tid); + let label = format!("Face Trace {}", tid); + + let frame_count = frames.len() as i64; + let start_f = frames.iter().map(|(f, _, _, _, _)| *f).min().unwrap_or(0); + let end_f = frames.iter().map(|(f, _, _, _, _)| *f).max().unwrap_or(0); + let avg_x = frames.iter().map(|(_, x, _, _, _)| *x).sum::() / frame_count as f64; + let avg_y = frames.iter().map(|(_, _, y, _, _)| *y).sum::() / frame_count as f64; + let avg_w = frames.iter().map(|(_, _, _, w, _)| *w).sum::() / frame_count as f64; + let avg_h = frames.iter().map(|(_, _, _, _, h)| *h).sum::() / frame_count as f64; + + // Compute average pose + let mut yaw_sum = 0.0f64; + let mut pitch_sum = 0.0f64; + let mut roll_sum = 0.0f64; + let mut pose_count = 0i64; + + for (frame, x, y, w, h) in frames { + if let Some((yaw, pitch, roll)) = get_pose_for_face(*frame, *x, *y, *w, *h, pose_data) { + yaw_sum += yaw; + pitch_sum += pitch; + roll_sum += roll; + pose_count += 1; + } + } + + let (avg_yaw, avg_pitch, avg_roll) = if pose_count > 0 { + (yaw_sum / pose_count as f64, pitch_sum / pose_count as f64, roll_sum / pose_count as f64) + } else { + (0.0, 0.0, 0.0) + }; + + let props = serde_json::json!({ + "trace_id": tid, + "frame_count": frame_count, + "first_frame": start_f, + "last_frame": end_f, + "avg_x": avg_x, + "avg_y": avg_y, + "avg_width": avg_w, + "avg_height": avg_h, + "pose_count": pose_count, + "avg_yaw": avg_yaw, + "avg_pitch": avg_pitch, + "avg_roll": avg_roll, + }); + + sqlx::query(&format!( + "INSERT INTO {} (file_uuid, node_type, external_id, label, properties) \ + VALUES ($1, $2, $3, $4, $5::jsonb) \ + ON CONFLICT (file_uuid, node_type, external_id) \ + DO UPDATE SET properties = COALESCE(EXCLUDED.properties, tkg_nodes.properties)", + nodes_table + )) + .bind(file_uuid) + .bind("face_trace") + .bind(&external_id) + .bind(&label) + .bind(serde_json::to_string(&props)?) + .execute(pool) + .await?; + + count += 1; + } + + tracing::info!("[TKG-Phase2] Built {} face_trace nodes from Qdrant", count); + Ok(count) +} + +async fn build_face_trace_nodes_from_pg( + pool: &PgPool, + file_uuid: &str, + pose_data: &[FacePose], ) -> Result { let face_table = t("face_detections"); let nodes_table = t("tkg_nodes");