From 3ad6f8740a63e2001dc3cf5d798afcb8bd876b9f Mon Sep 17 00:00:00 2001 From: Accusys Date: Sun, 21 Jun 2026 00:22:41 +0800 Subject: [PATCH] feat: Rule2 TKG relationship chunks + Phase0-1 Qdrant integration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 0: TKG builder populate face_detections from face.json - Fix face.json parser for pose_angle format - Call store_traced_faces.py to set trace_id - Skip if trace_id already populated Phase 1: Qdrant face embeddings integration - Add FaceEmbeddingDb module (src/core/db/face_embedding_db.rs) - Create dev_face_embeddings collection (dim=512) - Store 1122 face embeddings with pose metadata - API: init_collection, batch_upsert, search_similar Rule2: TKG edges → relationship chunks - Design: RULE2_TKG_RELATIONSHIP_V1.0.md - Implementation: rule2_ingest.rs - ChunkType::Relationship added - Edge types: SPEAKS_AS, MUTUAL_GAZE, CO_OCCURS_WITH, HAS_APPEARANCE, WEARS - Auto-trigger on TKG rebuild API: - POST /api/v1/file/:file_uuid/rule2 (vectorization) - POST /api/v1/file/:file_uuid/tkg/rebuild (auto Rule2) Test: 75 relationship chunks created + vectorized --- docs_v1.0/API_WORKSPACE/modules/15_tkg.md | 378 +++++ .../DESIGN/RULE2_TKG_RELATIONSHIP_V1.0.md | 235 +++ docs_v1.0/doc_wasm/modules/15_tkg.md | 378 +++++ src/api/trace_agent_api.rs | 523 +++++- src/core/chunk/mod.rs | 2 + src/core/chunk/rule2_ingest.rs | 335 ++++ src/core/chunk/types.rs | 2 + src/core/db/face_embedding_db.rs | 488 ++++++ src/core/db/mod.rs | 9 + src/core/processor/tkg.rs | 1491 ++++++++++++++++- 10 files changed, 3811 insertions(+), 30 deletions(-) create mode 100644 docs_v1.0/API_WORKSPACE/modules/15_tkg.md create mode 100644 docs_v1.0/DESIGN/RULE2_TKG_RELATIONSHIP_V1.0.md create mode 100644 docs_v1.0/doc_wasm/modules/15_tkg.md create mode 100644 src/core/chunk/rule2_ingest.rs create mode 100644 src/core/db/face_embedding_db.rs diff --git a/docs_v1.0/API_WORKSPACE/modules/15_tkg.md b/docs_v1.0/API_WORKSPACE/modules/15_tkg.md new file mode 100644 index 0000000..bdbca67 --- /dev/null +++ b/docs_v1.0/API_WORKSPACE/modules/15_tkg.md @@ -0,0 +1,378 @@ + + + + +## Temporal Knowledge Graph (TKG) + +TKG is a time-aligned knowledge graph built from multi-processor outputs (face, yolo, ocr, pose, asrx, gaze, lip, appearance). It produces 9 node types and 14 edge types stored in `dev.tkg_nodes` and `dev.tkg_edges`. + +### Node Types + +| Node Type | Description | Key Properties | +|-----------|-------------|----------------| +| `face_trace` | A tracked face identity over time | `trace_id`, `face_count`, `avg_confidence` | +| `gaze_trace` | Gaze direction over time | `direction` (frontal/left/right/up/down + diagonals) | +| `lip_trace` | Lip movement synced with speech | `speaker_id`, `lip_area_range` | +| `text_trace` | Spoken text aligned to time | `speaker_id`, `text`, `start_time`, `end_time` | +| `appearance_trace` | Human appearance (clothing) over time | `clothing_color`, `upper_cloth`, `lower_cloth` | +| `skin_tone_trace` | Fitzpatrick skin tone classification | `fitzpatrick_type` (I–VI) | +| `accessory` | Detected accessories | `type` (glasses/hat/etc.), `confidence` | +| `object` | YOLO-detected object | `class`, `confidence`, `frame_count` | +| `speaker` | ASRX speaker segment | `speaker_id`, `segment_count`, `total_duration` | + +### Edge Types + +| Edge Type | Source → Target | Description | +|-----------|-----------------|-------------| +| `co_occurs` | object ↔ object | Two objects appear together in same frame | +| `speaker_face` | speaker ↔ face_trace | Speaker matched to face trace via lip sync | +| `face_face` | face_trace ↔ face_trace | Two face traces interact (mutual gaze) | +| `mutual_gaze` | gaze_trace ↔ gaze_trace | Two people looking at each other | +| `lip_sync` | lip_trace ↔ text_trace | Lip movement aligned with spoken text | +| `has_appearance` | face_trace ↔ appearance_trace | Face has specific appearance | +| `wears` | face_trace ↔ accessory | Face wears an accessory | + +--- + +### `POST /api/v1/file/:file_uuid/tkg/rebuild` + +**Auth**: Required +**Scope**: file-level + +Rebuild the Temporal Knowledge Graph for a file. Reads processor JSON outputs (face, yolo, ocr, pose, asrx, gaze, lip, appearance) and generates TKG nodes and edges. Clears existing nodes/edges for the file first, then rebuilds from scratch. + +#### Example + +```bash +curl -s -X POST "$API/api/v1/file/$FILE_UUID/tkg/rebuild" \ + -H "X-API-Key: $KEY" +``` + +#### Response (200) + +```json +{ + "success": true, + "file_uuid": "d3f9ae8e471a1fc4d47022c66091b920", + "result": { + "face_trace_nodes": 16, + "gaze_trace_nodes": 16, + "lip_trace_nodes": 12, + "text_trace_nodes": 24, + "appearance_trace_nodes": 8, + "skin_tone_trace_nodes": 5, + "accessory_nodes": 3, + "object_nodes": 26, + "speaker_nodes": 4, + "co_occurrence_edges": 94, + "speaker_face_edges": 12, + "face_face_edges": 8, + "mutual_gaze_edges": 2, + "lip_sync_edges": 10, + "has_appearance_edges": 16, + "wears_edges": 3 + }, + "error": null +} +``` + +| Field | Type | Description | +|-------|------|-------------| +| `success` | boolean | True if rebuild completed | +| `file_uuid` | string | 32-char hex UUID | +| `result` | object | Node and edge counts by type | +| `error` | string/null | Error message if failed | + +--- + +### `POST /api/v1/file/:file_uuid/tkg/nodes` + +**Auth**: Required +**Scope**: file-level + +Query TKG nodes with pagination and optional type filter. + +#### Request Parameters + +| Field | Type | Required | Default | Description | +|-------|------|----------|---------|-------------| +| `node_type` | string | No | all | Filter by node type: `face_trace`, `gaze_trace`, `lip_trace`, `text_trace`, `appearance_trace`, `skin_tone_trace`, `accessory`, `object`, `speaker` | +| `page` | integer | No | 1 | Page number | +| `page_size` | integer | No | 100 | Items per page (max 500) | + +#### Example + +```bash +# Get all face_trace nodes +curl -s -X POST "$API/api/v1/file/$FILE_UUID/tkg/nodes" \ + -H "X-API-Key: $KEY" \ + -H "Content-Type: application/json" \ + -d '{"node_type": "face_trace", "page": 1, "page_size": 50}' + +# Get all nodes +curl -s -X POST "$API/api/v1/file/$FILE_UUID/tkg/nodes" \ + -H "X-API-Key: $KEY" \ + -H "Content-Type: application/json" \ + -d '{}' +``` + +#### Response (200) + +```json +{ + "success": true, + "file_uuid": "d3f9ae8e471a1fc4d47022c66091b920", + "total": 16, + "page": 1, + "page_size": 50, + "nodes": [ + { + "id": 1, + "node_type": "face_trace", + "external_id": "trace_0", + "label": "Face Trace 0", + "properties": { + "trace_id": 0, + "face_count": 142, + "avg_confidence": 0.87 + } + } + ] +} +``` + +| Field | Type | Description | +|-------|------|-------------| +| `success` | boolean | Always true on 200 | +| `file_uuid` | string | 32-char hex UUID | +| `total` | integer | Total matching node count | +| `page` | integer | Current page | +| `page_size` | integer | Items per page | +| `nodes` | array | Array of node objects | +| `nodes[].id` | integer | Database primary key | +| `nodes[].node_type` | string | Node type (see table above) | +| `nodes[].external_id` | string | External identifier (e.g., `trace_0`, `gaze_1`) | +| `nodes[].label` | string | Human-readable label | +| `nodes[].properties` | object | Type-specific properties as JSON | + +--- + +### `POST /api/v1/file/:file_uuid/tkg/edges` + +**Auth**: Required +**Scope**: file-level + +Query TKG edges with pagination and optional filters. + +#### Request Parameters + +| Field | Type | Required | Default | Description | +|-------|------|----------|---------|-------------| +| `edge_type` | string | No | all | Filter by edge type: `co_occurs`, `speaker_face`, `face_face`, `mutual_gaze`, `lip_sync`, `has_appearance`, `wears` | +| `source_type` | string | No | — | Filter by source node type | +| `target_type` | string | No | — | Filter by target node type | +| `page` | integer | No | 1 | Page number | +| `page_size` | integer | No | 100 | Items per page (max 500) | + +#### Example + +```bash +# Get all co_occurrence edges +curl -s -X POST "$API/api/v1/file/$FILE_UUID/tkg/edges" \ + -H "X-API-Key: $KEY" \ + -H "Content-Type: application/json" \ + -d '{"edge_type": "co_occurs"}' + +# Get edges between face_trace and speaker nodes +curl -s -X POST "$API/api/v1/file/$FILE_UUID/tkg/edges" \ + -H "X-API-Key: $KEY" \ + -H "Content-Type: application/json" \ + -d '{"source_type": "speaker", "target_type": "face_trace"}' +``` + +#### Response (200) + +```json +{ + "success": true, + "file_uuid": "d3f9ae8e471a1fc4d47022c66091b920", + "total": 94, + "page": 1, + "page_size": 100, + "edges": [ + { + "id": 1, + "edge_type": "co_occurs", + "source_node_id": 10, + "target_node_id": 15, + "properties": { + "frame_count": 45, + "confidence": 0.92 + } + } + ] +} +``` + +| Field | Type | Description | +|-------|------|-------------| +| `success` | boolean | Always true on 200 | +| `file_uuid` | string | 32-char hex UUID | +| `total` | integer | Total matching edge count | +| `page` | integer | Current page | +| `page_size` | integer | Items per page | +| `edges` | array | Array of edge objects | +| `edges[].id` | integer | Database primary key | +| `edges[].edge_type` | string | Edge type | +| `edges[].source_node_id` | integer | Source node ID (FK to tkg_nodes) | +| `edges[].target_node_id` | integer | Target node ID (FK to tkg_nodes) | +| `edges[].properties` | object | Edge-specific properties as JSON | + +--- + +### `GET /api/v1/file/:file_uuid/tkg/node/:node_id` + +**Auth**: Required +**Scope**: file-level + +Get detail for a specific TKG node including its connected edges. + +#### Example + +```bash +curl -s "$API/api/v1/file/$FILE_UUID/tkg/node/1" \ + -H "X-API-Key: $KEY" +``` + +#### Response (200) + +```json +{ + "success": true, + "node": { + "id": 1, + "node_type": "face_trace", + "external_id": "trace_0", + "label": "Face Trace 0", + "properties": { + "trace_id": 0, + "face_count": 142, + "avg_confidence": 0.87 + } + }, + "connected_edges": [ + { + "id": 5, + "edge_type": "co_occurs", + "source_node_id": 1, + "target_node_id": 10, + "properties": {"frame_count": 45} + } + ], + "edge_count": 3 +} +``` + +| Field | Type | Description | +|-------|------|-------------| +| `success` | boolean | Always true on 200 | +| `node` | object | Node detail (same format as nodes query) | +| `connected_edges` | array | Edges connected to this node | +| `edge_count` | integer | Total connected edge count | + +#### Error Codes + +| HTTP | When | +|------|------| +| `404` | Node not found | + +--- + +### `GET /api/v1/file/:file_uuid/processor-counts` + +**Auth**: Required +**Scope**: file-level + +Get counts of processor JSON output files for a file. Scans the output directory for `{file_uuid}.{processor}.json` files and extracts frame counts, segment counts, and chunk counts from each file. + +Supports short UUID prefix matching (e.g., `d3f9ae8e` → resolves to full `d3f9ae8e471a1fc4d47022c66091b920`). + +#### Example + +```bash +curl -s "$API/api/v1/file/$FILE_UUID/processor-counts" \ + -H "X-API-Key: $KEY" +``` + +#### Response (200) + +```json +{ + "file_uuid": "d3f9ae8e471a1fc4d47022c66091b920", + "output_dir": "/Users/accusys/momentry/output_dev", + "processors": [ + { + "processor": "cut", + "has_json": true, + "frame_count": 5391, + "segment_count": null, + "chunk_count": null, + "last_modified": "2026-06-16T18:48:01.987241061+00:00" + }, + { + "processor": "face", + "has_json": true, + "frame_count": 1112, + "segment_count": null, + "chunk_count": null, + "last_modified": "2026-06-18T17:21:37.408383765+00:00" + }, + { + "processor": "asrx", + "has_json": true, + "frame_count": null, + "segment_count": 6, + "chunk_count": null, + "last_modified": "2026-06-18T17:21:40.872063642+00:00" + }, + { + "processor": "story", + "has_json": true, + "frame_count": null, + "segment_count": null, + "chunk_count": 12, + "last_modified": "2026-06-18T17:22:00.000000000+00:00" + }, + { + "processor": "mediapipe", + "has_json": false, + "frame_count": null, + "segment_count": null, + "chunk_count": null, + "last_modified": null + } + ] +} +``` + +| Field | Type | Description | +|-------|------|-------------| +| `file_uuid` | string | Full 32-char hex UUID (resolved from prefix) | +| `output_dir` | string | Output directory scanned | +| `processors` | array | Per-processor output info | +| `processors[].processor` | string | Processor name | +| `processors[].has_json` | boolean | Whether JSON file exists | +| `processors[].frame_count` | integer/null | Total frames processed (frame-based processors) | +| `processors[].segment_count` | integer/null | Segment count (ASRX segments, etc.) | +| `processors[].chunk_count` | integer/null | Chunk count (Story chunks, etc.) | +| `processors[].last_modified` | string/null | ISO 8601 timestamp of last modification | + +#### Error Codes + +| HTTP | When | +|------|------| +| `404` | File UUID not found in database | + +--- + +*Updated: 2026-06-20 12:00:00* diff --git a/docs_v1.0/DESIGN/RULE2_TKG_RELATIONSHIP_V1.0.md b/docs_v1.0/DESIGN/RULE2_TKG_RELATIONSHIP_V1.0.md new file mode 100644 index 0000000..336e87e --- /dev/null +++ b/docs_v1.0/DESIGN/RULE2_TKG_RELATIONSHIP_V1.0.md @@ -0,0 +1,235 @@ +--- +title: Rule 2 TKG Relationship Chunks V1.0 +version: 1.0 +date: 2026-06-20 +author: OpenCode +status: approved +--- + +# Rule 2 TKG Relationship Chunks V1.0 + +| Scope | Status | Applicable to | Binary | +|-------|--------|---------------|--------| +| TKG relationship vectorization | Approved | `momentry_playground`, `momentry` | Both | + +## Overview + +Rule 2 creates **relationship chunks** by converting TKG edges into searchable, vectorized units. Each TKG edge becomes a chunk with LLM-generated natural language description, enabling semantic search for relationship queries. + +**Key Change:** Original Rule 2 (YOLO frame objects) is deprecated due to COCO classes being too generic. New Rule 2 focuses on TKG relationships. + +## Data Flow + +``` +┌─────────────────────────────────────────────────────────┐ +│ UPSTREAM: TKG Builder │ +│ │ +│ tkg_nodes: face_trace, speaker, object, etc. │ +│ tkg_edges: speaker_face, mutual_gaze, co_occurs, etc. │ +│ │ +└─────────────────────────────────────────────────────────┘ + │ + ▼ after TKG complete + │ +┌─────────────────────────────────────────────────────────┐ +│ RULE 2 PROCESSING │ +│ │ +│ Triggered by: │ +│ 1. Worker auto: job_worker.rs after TKG completes │ +│ 2. HTTP API: POST /api/v1/file/:file_uuid/rule2 │ +│ │ +│ ingest_rule2(file_uuid): │ +│ ├─ Query tkg_edges by type (priority order) │ +│ ├─ For each edge: │ +│ │ ├─ Resolve source_node / target_node │ +│ │ ├─ Resolve identity names (if face_trace) │ +│ │ ├─ Build context JSON │ +│ │ ├─ call_llm(context) → text_content │ +│ │ └─ INSERT INTO chunk (chunk_type='relationship') │ +│ │ │ +│ │ +└─────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────┐ +│ DOWNSTREAM: vectorize_chunks() │ +│ │ +│ SELECT ... WHERE chunk_type='relationship' │ +│ AND embedding IS NULL │ +│ │ +│ 1. embedder.embed_document(text_content) → vector │ +│ 2. db.store_vector() → PG chunk.embedding │ +│ 3. qdrant.upsert_vector() → momentry_rule2 collection │ +│ │ +└─────────────────────────────────────────────────────────┘ +``` + +## Edge Type Priority + +| Priority | Edge Type | Description | Example Output | +|----------|-----------|-------------|----------------| +| P0 | `speaker_face` | Speaker ↔ Face trace | "SPEAKER_01 以 Cary Grant 的身份說話,從 frame 100 到 350" | +| P0 | `mutual_gaze` | Two face traces looking at each other | "Cary Grant 和 Grace Kelly 互相看對方 24 幀,起始於 frame 450" | +| P1 | `face_face` | Two face traces co-occurring | "Cary Grant 和 Grace Kelly 同框 180 幀" | +| P1 | `co_occurs` | Object ↔ Object co-occurrence | "物件 'car' 和 'person' 在同一畫面出現 60 幀" | +| P2 | `has_appearance` | Face trace ↔ Appearance trace | "Cary Grant 穿著藍色上衣,戴眼鏡" | +| P2 | `wears` | Face trace ↔ Accessory | "Cary Grant 戴帽子,信心值 0.82" | + +## Chunk Data Structure + +### Content JSON (`content` column) + +```json +{ + "edge_type": "speaker_face", + "edge_id": 123, + "source_node": { + "id": 45, + "node_type": "speaker", + "external_id": "SPEAKER_01", + "label": "SPEAKER_01" + }, + "target_node": { + "id": 67, + "node_type": "face_trace", + "external_id": "trace_5", + "label": "Face Trace 5", + "identity_name": "Cary Grant" + }, + "properties": { + "first_frame": 100, + "last_frame": 350, + "frame_count": 250, + "lip_sync_confidence": 0.85 + } +} +``` + +### Text Content (`text_content` column) + +LLM-generated natural language description in Traditional Chinese: + +``` +"SPEAKER_01 以 Cary Grant 的身份說話,從 frame 100 到 frame 350,唇語同步信心值 0.85" +``` + +### Metadata JSON (`metadata` column) + +```json +{ + "source_type": "speaker", + "target_type": "face_trace", + "has_identity": true, + "identity_source": "tmdb" +} +``` + +## LLM Prompt Template + +```text +你是影片關係描述專家。請用繁體中文描述以下人物/物件關係: + +關係類型: {edge_type} +來源節點: {source_node.node_type} - {source_node.external_id} + 身份名稱: {identity_name} (如果有) +目標節點: {target_node.node_type} - {target_node.external_id} + 身份名稱: {identity_name} (如果有) +關係屬性: + - 起始幀: {first_frame} + - 結束幀: {last_frame} + - 幀數: {frame_count} + - 信心值: {confidence} + +要求: +1. 使用自然語言,不要輸出 JSON +2. 包含時間範圍(幀號) +3. 包含人物名字(如有 identity) +4. 簡潔,20-50 字 +5. 用繁體中文 + +範例輸出: +"SPEAKER_01 以 Cary Grant 的身份說話,從 frame 100 到 frame 350" +"Cary Grant 和 Grace Kelly 互相看對方 24 幀,起始於 frame 450" +``` + +## Edge → Chunk Conversion Rules + +### speaker_face Edge + +```rust +// Source: speaker node +// Target: face_trace node +// Properties: first_frame, last_frame, lip_sync_confidence + +let text_content = call_llm(format!( + "SPEAKER {} 對應 face trace {},身份 {},frame {}-{}", + speaker_id, trace_id, identity_name, first_frame, last_frame +)); +``` + +### mutual_gaze Edge + +```rust +// Source: face_trace node A +// Target: face_trace node B +// Properties: first_frame, gaze_frame_count, yaw_a_avg, yaw_b_avg + +let text_content = call_llm(format!( + "人物 {} 和 {} 互相看對方 {} 幀,起始於 frame {}", + identity_a, identity_b, gaze_frame_count, first_frame +)); +``` + +### has_appearance Edge + +```rust +// Source: face_trace node +// Target: appearance_trace node +// Properties: clothing colors, accessories + +let text_content = call_llm(format!( + "人物 {} 穿著 {} 上衣,{} 下衣", + identity_name, upper_color, lower_color +)); +``` + +## Search Contribution + +| Search Path | Mechanism | Rule 2 Contribution | +|-------------|-----------|-------------------| +| **Semantic search** (Qdrant) | `chunk_type='relationship'` → embedding query | LLM descriptions enable natural language queries | +| **Keyword search** (BM25 ILIKE) | `text_content ILIKE '%互相看%'` | Relationship keywords searchable | +| **Agent tkg_query** | Direct edge queries | Rule 2 complements with vectorized search | +| **identity_text** | Reverse lookup | "誰戴眼鏡" → has_appearance chunks | + +## Trigger Points + +| Trigger | Location | Condition | +|---------|----------|-----------| +| Worker auto | `job_worker.rs` | After TKG builder completes | +| HTTP API | `POST /api/v1/file/:file_uuid/rule2` | Manual trigger | +| Pipeline | `pipeline_core::execute_rule2` | Called by other modules | + +## Edge Cases + +| Scenario | Behavior | +|----------|----------| +| No tkg_edges | Returns 0 immediately with info log | +| Edge without identity | Use node external_id (e.g., "trace_5") in description | +| LLM call fails | Fallback to template-based description | +| Multiple edges same type | Each edge becomes separate chunk | + +## Qdrant Collection + +| Property | Value | +|----------|-------| +| Collection name | `momentry_rule2` | +| Vector size | 768 (nomic-embed-text-v2-moe) | +| Distance | Cosine | +| Payload | `{chunk_id, file_uuid, edge_type, source_type, target_type}` | + +## Version History + +| Version | Date | Author | Change | +|---------|------|--------|--------| +| 1.0 | 2026-06-20 | OpenCode | Initial design: TKG edges → relationship chunks | \ No newline at end of file diff --git a/docs_v1.0/doc_wasm/modules/15_tkg.md b/docs_v1.0/doc_wasm/modules/15_tkg.md new file mode 100644 index 0000000..bdbca67 --- /dev/null +++ b/docs_v1.0/doc_wasm/modules/15_tkg.md @@ -0,0 +1,378 @@ + + + + +## Temporal Knowledge Graph (TKG) + +TKG is a time-aligned knowledge graph built from multi-processor outputs (face, yolo, ocr, pose, asrx, gaze, lip, appearance). It produces 9 node types and 14 edge types stored in `dev.tkg_nodes` and `dev.tkg_edges`. + +### Node Types + +| Node Type | Description | Key Properties | +|-----------|-------------|----------------| +| `face_trace` | A tracked face identity over time | `trace_id`, `face_count`, `avg_confidence` | +| `gaze_trace` | Gaze direction over time | `direction` (frontal/left/right/up/down + diagonals) | +| `lip_trace` | Lip movement synced with speech | `speaker_id`, `lip_area_range` | +| `text_trace` | Spoken text aligned to time | `speaker_id`, `text`, `start_time`, `end_time` | +| `appearance_trace` | Human appearance (clothing) over time | `clothing_color`, `upper_cloth`, `lower_cloth` | +| `skin_tone_trace` | Fitzpatrick skin tone classification | `fitzpatrick_type` (I–VI) | +| `accessory` | Detected accessories | `type` (glasses/hat/etc.), `confidence` | +| `object` | YOLO-detected object | `class`, `confidence`, `frame_count` | +| `speaker` | ASRX speaker segment | `speaker_id`, `segment_count`, `total_duration` | + +### Edge Types + +| Edge Type | Source → Target | Description | +|-----------|-----------------|-------------| +| `co_occurs` | object ↔ object | Two objects appear together in same frame | +| `speaker_face` | speaker ↔ face_trace | Speaker matched to face trace via lip sync | +| `face_face` | face_trace ↔ face_trace | Two face traces interact (mutual gaze) | +| `mutual_gaze` | gaze_trace ↔ gaze_trace | Two people looking at each other | +| `lip_sync` | lip_trace ↔ text_trace | Lip movement aligned with spoken text | +| `has_appearance` | face_trace ↔ appearance_trace | Face has specific appearance | +| `wears` | face_trace ↔ accessory | Face wears an accessory | + +--- + +### `POST /api/v1/file/:file_uuid/tkg/rebuild` + +**Auth**: Required +**Scope**: file-level + +Rebuild the Temporal Knowledge Graph for a file. Reads processor JSON outputs (face, yolo, ocr, pose, asrx, gaze, lip, appearance) and generates TKG nodes and edges. Clears existing nodes/edges for the file first, then rebuilds from scratch. + +#### Example + +```bash +curl -s -X POST "$API/api/v1/file/$FILE_UUID/tkg/rebuild" \ + -H "X-API-Key: $KEY" +``` + +#### Response (200) + +```json +{ + "success": true, + "file_uuid": "d3f9ae8e471a1fc4d47022c66091b920", + "result": { + "face_trace_nodes": 16, + "gaze_trace_nodes": 16, + "lip_trace_nodes": 12, + "text_trace_nodes": 24, + "appearance_trace_nodes": 8, + "skin_tone_trace_nodes": 5, + "accessory_nodes": 3, + "object_nodes": 26, + "speaker_nodes": 4, + "co_occurrence_edges": 94, + "speaker_face_edges": 12, + "face_face_edges": 8, + "mutual_gaze_edges": 2, + "lip_sync_edges": 10, + "has_appearance_edges": 16, + "wears_edges": 3 + }, + "error": null +} +``` + +| Field | Type | Description | +|-------|------|-------------| +| `success` | boolean | True if rebuild completed | +| `file_uuid` | string | 32-char hex UUID | +| `result` | object | Node and edge counts by type | +| `error` | string/null | Error message if failed | + +--- + +### `POST /api/v1/file/:file_uuid/tkg/nodes` + +**Auth**: Required +**Scope**: file-level + +Query TKG nodes with pagination and optional type filter. + +#### Request Parameters + +| Field | Type | Required | Default | Description | +|-------|------|----------|---------|-------------| +| `node_type` | string | No | all | Filter by node type: `face_trace`, `gaze_trace`, `lip_trace`, `text_trace`, `appearance_trace`, `skin_tone_trace`, `accessory`, `object`, `speaker` | +| `page` | integer | No | 1 | Page number | +| `page_size` | integer | No | 100 | Items per page (max 500) | + +#### Example + +```bash +# Get all face_trace nodes +curl -s -X POST "$API/api/v1/file/$FILE_UUID/tkg/nodes" \ + -H "X-API-Key: $KEY" \ + -H "Content-Type: application/json" \ + -d '{"node_type": "face_trace", "page": 1, "page_size": 50}' + +# Get all nodes +curl -s -X POST "$API/api/v1/file/$FILE_UUID/tkg/nodes" \ + -H "X-API-Key: $KEY" \ + -H "Content-Type: application/json" \ + -d '{}' +``` + +#### Response (200) + +```json +{ + "success": true, + "file_uuid": "d3f9ae8e471a1fc4d47022c66091b920", + "total": 16, + "page": 1, + "page_size": 50, + "nodes": [ + { + "id": 1, + "node_type": "face_trace", + "external_id": "trace_0", + "label": "Face Trace 0", + "properties": { + "trace_id": 0, + "face_count": 142, + "avg_confidence": 0.87 + } + } + ] +} +``` + +| Field | Type | Description | +|-------|------|-------------| +| `success` | boolean | Always true on 200 | +| `file_uuid` | string | 32-char hex UUID | +| `total` | integer | Total matching node count | +| `page` | integer | Current page | +| `page_size` | integer | Items per page | +| `nodes` | array | Array of node objects | +| `nodes[].id` | integer | Database primary key | +| `nodes[].node_type` | string | Node type (see table above) | +| `nodes[].external_id` | string | External identifier (e.g., `trace_0`, `gaze_1`) | +| `nodes[].label` | string | Human-readable label | +| `nodes[].properties` | object | Type-specific properties as JSON | + +--- + +### `POST /api/v1/file/:file_uuid/tkg/edges` + +**Auth**: Required +**Scope**: file-level + +Query TKG edges with pagination and optional filters. + +#### Request Parameters + +| Field | Type | Required | Default | Description | +|-------|------|----------|---------|-------------| +| `edge_type` | string | No | all | Filter by edge type: `co_occurs`, `speaker_face`, `face_face`, `mutual_gaze`, `lip_sync`, `has_appearance`, `wears` | +| `source_type` | string | No | — | Filter by source node type | +| `target_type` | string | No | — | Filter by target node type | +| `page` | integer | No | 1 | Page number | +| `page_size` | integer | No | 100 | Items per page (max 500) | + +#### Example + +```bash +# Get all co_occurrence edges +curl -s -X POST "$API/api/v1/file/$FILE_UUID/tkg/edges" \ + -H "X-API-Key: $KEY" \ + -H "Content-Type: application/json" \ + -d '{"edge_type": "co_occurs"}' + +# Get edges between face_trace and speaker nodes +curl -s -X POST "$API/api/v1/file/$FILE_UUID/tkg/edges" \ + -H "X-API-Key: $KEY" \ + -H "Content-Type: application/json" \ + -d '{"source_type": "speaker", "target_type": "face_trace"}' +``` + +#### Response (200) + +```json +{ + "success": true, + "file_uuid": "d3f9ae8e471a1fc4d47022c66091b920", + "total": 94, + "page": 1, + "page_size": 100, + "edges": [ + { + "id": 1, + "edge_type": "co_occurs", + "source_node_id": 10, + "target_node_id": 15, + "properties": { + "frame_count": 45, + "confidence": 0.92 + } + } + ] +} +``` + +| Field | Type | Description | +|-------|------|-------------| +| `success` | boolean | Always true on 200 | +| `file_uuid` | string | 32-char hex UUID | +| `total` | integer | Total matching edge count | +| `page` | integer | Current page | +| `page_size` | integer | Items per page | +| `edges` | array | Array of edge objects | +| `edges[].id` | integer | Database primary key | +| `edges[].edge_type` | string | Edge type | +| `edges[].source_node_id` | integer | Source node ID (FK to tkg_nodes) | +| `edges[].target_node_id` | integer | Target node ID (FK to tkg_nodes) | +| `edges[].properties` | object | Edge-specific properties as JSON | + +--- + +### `GET /api/v1/file/:file_uuid/tkg/node/:node_id` + +**Auth**: Required +**Scope**: file-level + +Get detail for a specific TKG node including its connected edges. + +#### Example + +```bash +curl -s "$API/api/v1/file/$FILE_UUID/tkg/node/1" \ + -H "X-API-Key: $KEY" +``` + +#### Response (200) + +```json +{ + "success": true, + "node": { + "id": 1, + "node_type": "face_trace", + "external_id": "trace_0", + "label": "Face Trace 0", + "properties": { + "trace_id": 0, + "face_count": 142, + "avg_confidence": 0.87 + } + }, + "connected_edges": [ + { + "id": 5, + "edge_type": "co_occurs", + "source_node_id": 1, + "target_node_id": 10, + "properties": {"frame_count": 45} + } + ], + "edge_count": 3 +} +``` + +| Field | Type | Description | +|-------|------|-------------| +| `success` | boolean | Always true on 200 | +| `node` | object | Node detail (same format as nodes query) | +| `connected_edges` | array | Edges connected to this node | +| `edge_count` | integer | Total connected edge count | + +#### Error Codes + +| HTTP | When | +|------|------| +| `404` | Node not found | + +--- + +### `GET /api/v1/file/:file_uuid/processor-counts` + +**Auth**: Required +**Scope**: file-level + +Get counts of processor JSON output files for a file. Scans the output directory for `{file_uuid}.{processor}.json` files and extracts frame counts, segment counts, and chunk counts from each file. + +Supports short UUID prefix matching (e.g., `d3f9ae8e` → resolves to full `d3f9ae8e471a1fc4d47022c66091b920`). + +#### Example + +```bash +curl -s "$API/api/v1/file/$FILE_UUID/processor-counts" \ + -H "X-API-Key: $KEY" +``` + +#### Response (200) + +```json +{ + "file_uuid": "d3f9ae8e471a1fc4d47022c66091b920", + "output_dir": "/Users/accusys/momentry/output_dev", + "processors": [ + { + "processor": "cut", + "has_json": true, + "frame_count": 5391, + "segment_count": null, + "chunk_count": null, + "last_modified": "2026-06-16T18:48:01.987241061+00:00" + }, + { + "processor": "face", + "has_json": true, + "frame_count": 1112, + "segment_count": null, + "chunk_count": null, + "last_modified": "2026-06-18T17:21:37.408383765+00:00" + }, + { + "processor": "asrx", + "has_json": true, + "frame_count": null, + "segment_count": 6, + "chunk_count": null, + "last_modified": "2026-06-18T17:21:40.872063642+00:00" + }, + { + "processor": "story", + "has_json": true, + "frame_count": null, + "segment_count": null, + "chunk_count": 12, + "last_modified": "2026-06-18T17:22:00.000000000+00:00" + }, + { + "processor": "mediapipe", + "has_json": false, + "frame_count": null, + "segment_count": null, + "chunk_count": null, + "last_modified": null + } + ] +} +``` + +| Field | Type | Description | +|-------|------|-------------| +| `file_uuid` | string | Full 32-char hex UUID (resolved from prefix) | +| `output_dir` | string | Output directory scanned | +| `processors` | array | Per-processor output info | +| `processors[].processor` | string | Processor name | +| `processors[].has_json` | boolean | Whether JSON file exists | +| `processors[].frame_count` | integer/null | Total frames processed (frame-based processors) | +| `processors[].segment_count` | integer/null | Segment count (ASRX segments, etc.) | +| `processors[].chunk_count` | integer/null | Chunk count (Story chunks, etc.) | +| `processors[].last_modified` | string/null | ISO 8601 timestamp of last modification | + +#### Error Codes + +| HTTP | When | +|------|------| +| `404` | File UUID not found in database | + +--- + +*Updated: 2026-06-20 12:00:00* diff --git a/src/api/trace_agent_api.rs b/src/api/trace_agent_api.rs index 51ff3ff..4ef1d2a 100644 --- a/src/api/trace_agent_api.rs +++ b/src/api/trace_agent_api.rs @@ -38,10 +38,17 @@ pub fn trace_agent_routes() -> Router { get(get_cooccurrence), ) .route("/api/v1/file/:file_uuid/tkg/rebuild", post(rebuild_tkg)) + .route("/api/v1/file/:file_uuid/rule2", post(ingest_rule2)) .route( "/api/v1/file/:file_uuid/representative-frame", get(get_representative_frame), ) + .route("/api/v1/file/:file_uuid/tkg/nodes", post(query_tkg_nodes)) + .route("/api/v1/file/:file_uuid/tkg/edges", post(query_tkg_edges)) + .route( + "/api/v1/file/:file_uuid/tkg/node/:node_id", + get(get_tkg_node_detail), + ) } #[derive(Debug, Deserialize)] @@ -961,22 +968,52 @@ async fn rebuild_tkg( State(state): State, Path(file_uuid): Path, ) -> Json { + use crate::core::chunk::rule2_ingest::ingest_rule2; + use tracing::info; + let result = crate::core::processor::tkg::build_tkg(&state.db, &file_uuid, &OUTPUT_DIR).await; match result { - Ok(r) => Json(TkgRebuildResponse { - success: true, - file_uuid, - result: Some(serde_json::json!({ - "face_trace_nodes": r.face_trace_nodes, - "object_nodes": r.object_nodes, - "speaker_nodes": r.speaker_nodes, - "co_occurrence_edges": r.co_occurrence_edges, - "speaker_face_edges": r.speaker_face_edges, - "face_face_edges": r.face_face_edges, - })), - error: None, - }), + Ok(r) => { + let total_edges = r.speaker_face_edges + + r.mutual_gaze_edges + + r.face_face_edges + + r.co_occurrence_edges + + r.has_appearance_edges + + r.wears_edges; + + if total_edges > 0 { + info!("[TKG] {} relationship edges found, triggering Rule 2 ingestion...", total_edges); + match ingest_rule2(state.db.pool(), &file_uuid).await { + Ok(count) => info!("[TKG] Rule 2 created {} relationship chunks", count), + Err(e) => info!("[TKG] Rule 2 ingestion failed: {}", e), + } + } + + Json(TkgRebuildResponse { + success: true, + file_uuid, + result: Some(serde_json::json!({ + "face_trace_nodes": r.face_trace_nodes, + "gaze_trace_nodes": r.gaze_trace_nodes, + "lip_trace_nodes": r.lip_trace_nodes, + "text_trace_nodes": r.text_trace_nodes, + "appearance_trace_nodes": r.appearance_trace_nodes, + "skin_tone_trace_nodes": r.skin_tone_trace_nodes, + "accessory_nodes": r.accessory_nodes, + "object_nodes": r.object_nodes, + "speaker_nodes": r.speaker_nodes, + "co_occurrence_edges": r.co_occurrence_edges, + "speaker_face_edges": r.speaker_face_edges, + "face_face_edges": r.face_face_edges, + "mutual_gaze_edges": r.mutual_gaze_edges, + "lip_sync_edges": r.lip_sync_edges, + "has_appearance_edges": r.has_appearance_edges, + "wears_edges": r.wears_edges, + })), + error: None, + }) + } Err(e) => Json(TkgRebuildResponse { success: false, file_uuid, @@ -1097,3 +1134,463 @@ async fn get_stranger_thumbnail( get_trace_thumbnail_inner(&state, &file_uuid, trace_id).await } + +// ── TKG Node/Edge Query APIs ───────────────────────────────────── + +fn t(name: &str) -> String { + let schema = std::env::var("DATABASE_SCHEMA").unwrap_or_else(|_| "dev".to_string()); + if schema == "public" { + name.to_string() + } else { + format!("{}.{}", schema, name) + } +} + +#[derive(Debug, Deserialize)] +struct TkgNodesRequest { + node_type: Option, + page: Option, + page_size: Option, +} + +#[derive(Debug, Serialize)] +struct TkgNodeInfo { + id: i64, + node_type: String, + external_id: String, + label: String, + properties: serde_json::Value, +} + +#[derive(Debug, Serialize)] +struct TkgNodesResponse { + success: bool, + file_uuid: String, + total: i64, + page: i64, + page_size: i64, + nodes: Vec, +} + +async fn query_tkg_nodes( + State(state): State, + Path(file_uuid): Path, + Json(req): Json, +) -> Result, (StatusCode, Json)> { + let nodes_table = t("tkg_nodes"); + let page = req.page.unwrap_or(1).max(1); + let page_size = req.page_size.unwrap_or(100).max(1).min(500); + let offset = (page - 1) * page_size; + + let (where_clause, count_args, query_args) = if let Some(ref node_type) = req.node_type { + ( + "WHERE file_uuid = $1 AND node_type = $2".to_string(), + vec![serde_json::json!([&file_uuid, node_type])], + vec![serde_json::json!([ + &file_uuid, node_type, page_size, offset + ])], + ) + } else { + ( + "WHERE file_uuid = $1".to_string(), + vec![serde_json::json!([&file_uuid])], + vec![serde_json::json!([&file_uuid, page_size, offset])], + ) + }; + + let total: i64 = if let Some(ref node_type) = req.node_type { + sqlx::query_scalar(&format!( + "SELECT COUNT(*) FROM {} {}", + nodes_table, where_clause + )) + .bind(&file_uuid) + .bind(node_type) + .fetch_one(state.db.pool()) + .await + .map_err(|e| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": e.to_string()})), + ) + })? + } else { + sqlx::query_scalar(&format!( + "SELECT COUNT(*) FROM {} {}", + nodes_table, where_clause + )) + .bind(&file_uuid) + .fetch_one(state.db.pool()) + .await + .map_err(|e| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": e.to_string()})), + ) + })? + }; + + let query = format!( + "SELECT id, node_type, external_id, label, properties FROM {} {} ORDER BY id LIMIT ${} OFFSET ${}", + nodes_table, where_clause, + if req.node_type.is_some() { 3 } else { 2 }, + if req.node_type.is_some() { 4 } else { 3 } + ); + + let rows: Vec<(i64, String, String, String, serde_json::Value)> = + if let Some(ref node_type) = req.node_type { + sqlx::query_as(&query) + .bind(&file_uuid) + .bind(node_type) + .bind(page_size) + .bind(offset) + .fetch_all(state.db.pool()) + .await + .map_err(|e| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": e.to_string()})), + ) + })? + } else { + sqlx::query_as(&query) + .bind(&file_uuid) + .bind(page_size) + .bind(offset) + .fetch_all(state.db.pool()) + .await + .map_err(|e| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": e.to_string()})), + ) + })? + }; + + let nodes = rows + .into_iter() + .map( + |(id, node_type, external_id, label, properties)| TkgNodeInfo { + id, + node_type, + external_id, + label, + properties, + }, + ) + .collect(); + + Ok(Json(TkgNodesResponse { + success: true, + file_uuid, + total, + page, + page_size, + nodes, + })) +} + +#[derive(Debug, Deserialize)] +struct TkgEdgesRequest { + edge_type: Option, + source_type: Option, + target_type: Option, + page: Option, + page_size: Option, +} + +#[derive(Debug, Serialize)] +struct TkgEdgeInfo { + id: i64, + edge_type: String, + source_node_id: i64, + target_node_id: i64, + properties: serde_json::Value, +} + +#[derive(Debug, Serialize)] +struct TkgEdgesResponse { + success: bool, + file_uuid: String, + total: i64, + page: i64, + page_size: i64, + edges: Vec, +} + +async fn query_tkg_edges( + State(state): State, + Path(file_uuid): Path, + Json(req): Json, +) -> Result, (StatusCode, Json)> { + let edges_table = t("tkg_edges"); + let nodes_table = t("tkg_nodes"); + let page = req.page.unwrap_or(1).max(1); + let page_size = req.page_size.unwrap_or(100).max(1).min(500); + let offset = (page - 1) * page_size; + + let mut conditions = vec!["e.file_uuid = $1".to_string()]; + let mut param_idx = 2i32; + let mut joins = String::new(); + + if let Some(ref edge_type) = req.edge_type { + conditions.push(format!("e.edge_type = ${}", param_idx)); + param_idx += 1; + } + if req.source_type.is_some() || req.target_type.is_some() { + joins = format!( + " JOIN {} sn ON e.source_node_id = sn.id JOIN {} tn ON e.target_node_id = tn.id", + nodes_table, nodes_table + ); + } + if let Some(ref source_type) = req.source_type { + conditions.push(format!("sn.node_type = ${}", param_idx)); + param_idx += 1; + } + if let Some(ref target_type) = req.target_type { + conditions.push(format!("tn.node_type = ${}", param_idx)); + param_idx += 1; + } + + let where_clause = conditions.join(" AND "); + let count_query = format!( + "SELECT COUNT(*) FROM {} e {} WHERE {}", + edges_table, joins, where_clause + ); + + let total: i64 = { + let mut q = sqlx::query_scalar::<_, i64>(&count_query).bind(&file_uuid); + if let Some(ref edge_type) = req.edge_type { + q = q.bind(edge_type); + } + if let Some(ref source_type) = req.source_type { + q = q.bind(source_type); + } + if let Some(ref target_type) = req.target_type { + q = q.bind(target_type); + } + q.fetch_one(state.db.pool()).await.map_err(|e| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": e.to_string()})), + ) + })? + }; + + let query = format!( + "SELECT e.id, e.edge_type, e.source_node_id, e.target_node_id, e.properties FROM {} e {} WHERE {} ORDER BY e.id LIMIT ${} OFFSET ${}", + edges_table, joins, where_clause, param_idx, param_idx + 1 + ); + + let rows: Vec<(i64, String, i64, i64, serde_json::Value)> = { + let mut q = sqlx::query_as(&query).bind(&file_uuid); + if let Some(ref edge_type) = req.edge_type { + q = q.bind(edge_type); + } + if let Some(ref source_type) = req.source_type { + q = q.bind(source_type); + } + if let Some(ref target_type) = req.target_type { + q = q.bind(target_type); + } + q.bind(page_size) + .bind(offset) + .fetch_all(state.db.pool()) + .await + .map_err(|e| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": e.to_string()})), + ) + })? + }; + + let edges = rows + .into_iter() + .map( + |(id, edge_type, source_node_id, target_node_id, properties)| TkgEdgeInfo { + id, + edge_type, + source_node_id, + target_node_id, + properties, + }, + ) + .collect(); + + Ok(Json(TkgEdgesResponse { + success: true, + file_uuid, + total, + page, + page_size, + edges, + })) +} + +#[derive(Debug, Serialize)] +struct TkgNodeWithEdges { + node: TkgNodeInfo, + incoming_edges: Vec, + outgoing_edges: Vec, +} + +#[derive(Debug, Serialize)] +struct TkgNodeDetailResponse { + success: bool, + file_uuid: String, + node: Option, + error: Option, +} + +async fn get_tkg_node_detail( + State(state): State, + Path((file_uuid, node_id)): Path<(String, i64)>, +) -> Json { + let nodes_table = t("tkg_nodes"); + let edges_table = t("tkg_edges"); + + let node: Option<(i64, String, String, String, serde_json::Value)> = sqlx::query_as( + &format!("SELECT id, node_type, external_id, label, properties FROM {} WHERE file_uuid = $1 AND id = $2", nodes_table) + ) + .bind(&file_uuid).bind(node_id) + .fetch_optional(state.db.pool()).await.ok().flatten(); + + match node { + Some((id, node_type, external_id, label, properties)) => { + let incoming: Vec = sqlx::query_as( + &format!("SELECT id, edge_type, source_node_id, target_node_id, properties FROM {} WHERE file_uuid = $1 AND target_node_id = $2", edges_table) + ) + .bind(&file_uuid).bind(node_id) + .fetch_all(state.db.pool()).await.unwrap_or_default() + .into_iter().map(|(id, edge_type, source_node_id, target_node_id, properties)| { + TkgEdgeInfo { id, edge_type, source_node_id, target_node_id, properties } + }).collect(); + + let outgoing: Vec = sqlx::query_as( + &format!("SELECT id, edge_type, source_node_id, target_node_id, properties FROM {} WHERE file_uuid = $1 AND source_node_id = $2", edges_table) + ) + .bind(&file_uuid).bind(node_id) + .fetch_all(state.db.pool()).await.unwrap_or_default() + .into_iter().map(|(id, edge_type, source_node_id, target_node_id, properties)| { + TkgEdgeInfo { id, edge_type, source_node_id, target_node_id, properties } + }).collect(); + + Json(TkgNodeDetailResponse { + success: true, + file_uuid, + node: Some(TkgNodeWithEdges { + node: TkgNodeInfo { + id, + node_type, + external_id, + label, + properties, + }, + incoming_edges: incoming, + outgoing_edges: outgoing, + }), + error: None, + }) + } + None => Json(TkgNodeDetailResponse { + success: false, + file_uuid, + node: None, + error: Some("Node not found".to_string()), + }), + } +} + +// ── Rule 2 Ingest ─────────────────────────────────────────────────── + +#[derive(Serialize)] +struct IngestRule2Response { + success: bool, + file_uuid: String, + rule2_chunks: i64, + vectorized_chunks: Option, + error: Option, +} + +async fn ingest_rule2( + State(state): State, + Path(file_uuid): Path, +) -> Result, (StatusCode, Json)> { + use crate::core::chunk::rule2_ingest::ingest_rule2; + use crate::core::embedding::Embedder; + use crate::core::db::schema; + use crate::core::db::qdrant_db::{QdrantDb, VectorPayload}; + use tracing::info; + + let result = ingest_rule2(state.db.pool(), &file_uuid).await; + + match result { + Ok(rule2_chunks) => { + info!( + "[Rule2API] {} relationship chunks created for {}", + rule2_chunks, file_uuid + ); + + // Auto-vectorize relationship chunks + let embedder = Embedder::new("embeddinggemma-300m".to_string()); + let qdrant = QdrantDb::new(); + let pool = state.db.pool(); + let chunk_table = schema::table_name("chunk"); + + let rows: Vec<(String, String, i64, i64, f64, f64)> = sqlx::query_as(&format!( + "SELECT chunk_id, text_content, start_frame, end_frame, start_time, end_time \ + FROM {} WHERE file_uuid = $1 AND chunk_type = 'relationship' \ + AND embedding IS NULL AND (text_content IS NOT NULL AND text_content != '')", + chunk_table + )) + .bind(&file_uuid) + .fetch_all(pool) + .await + .map_err(|e| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": e.to_string()})), + ) + })?; + + let mut vectorized = 0i64; + for (chunk_id, text, start_frame, end_frame, start_time, end_time) in &rows { + if text.is_empty() { + continue; + } + if let Ok(vector) = embedder.embed_document(&text).await { + if state.db.store_vector(&chunk_id, &vector, &file_uuid).await.is_ok() { + let payload = VectorPayload { + file_uuid: file_uuid.clone(), + chunk_id: chunk_id.clone(), + chunk_type: "relationship".to_string(), + start_frame: *start_frame, + end_frame: *end_frame, + start_time: *start_time, + end_time: *end_time, + text: Some(text.clone()), + }; + if qdrant.upsert_vector(&chunk_id, &vector, payload).await.is_ok() { + vectorized += 1; + } + } + } + } + + Ok(Json(IngestRule2Response { + success: true, + file_uuid, + rule2_chunks: rule2_chunks as i64, + vectorized_chunks: Some(vectorized), + error: None, + })) + } + Err(e) => Ok(Json(IngestRule2Response { + success: false, + file_uuid, + rule2_chunks: 0, + vectorized_chunks: None, + error: Some(e.to_string()), + })), + } +} diff --git a/src/core/chunk/mod.rs b/src/core/chunk/mod.rs index f30e5ac..eaad324 100644 --- a/src/core/chunk/mod.rs +++ b/src/core/chunk/mod.rs @@ -1,10 +1,12 @@ pub mod rule1_ingest; +pub mod rule2_ingest; pub mod rule3_ingest; pub mod splitter; pub mod trace_ingest; pub mod types; pub use rule1_ingest::execute_rule1; +pub use rule2_ingest::ingest_rule2; pub use rule3_ingest::ingest_rule3; pub use splitter::{AsrSegment, ChunkSplitter}; pub use trace_ingest::ingest_traces; diff --git a/src/core/chunk/rule2_ingest.rs b/src/core/chunk/rule2_ingest.rs new file mode 100644 index 0000000..ded93fc --- /dev/null +++ b/src/core/chunk/rule2_ingest.rs @@ -0,0 +1,335 @@ +use crate::core::db::schema; +use anyhow::{Context, Result}; +use serde_json::Value; +use sqlx::PgPool; +use tracing::{info, warn}; + +fn t(name: &str) -> String { + let schema = std::env::var("DATABASE_SCHEMA").unwrap_or_else(|_| "dev".to_string()); + if schema == "public" { + name.to_string() + } else { + format!("{}.{}", schema, name) + } +} + +/// Executes Rule 2 Ingestion: TKG edges → relationship chunks. +/// +/// 1. Query tkg_edges by priority order. +/// 2. Resolve source/target nodes and identities. +/// 3. Generate natural language description (template-based). +/// 4. Insert chunks with chunk_type='relationship'. +pub async fn ingest_rule2(pool: &PgPool, file_uuid: &str) -> Result { + let edges_table = t("tkg_edges"); + let nodes_table = t("tkg_nodes"); + let chunk_table = t("chunk"); + let fd_table = t("face_detections"); + let id_table = t("identities"); + let videos_table = t("videos"); + + // Get video fps + let fps: f64 = sqlx::query_scalar(&format!( + "SELECT COALESCE(fps, 25.0) FROM {} WHERE file_uuid = $1", + videos_table + )) + .bind(file_uuid) + .fetch_optional(pool) + .await? + .unwrap_or(25.0); + + // Priority order for edge types (matching TKG edge_type values) + let edge_types = vec![ + "SPEAKS_AS", + "MUTUAL_GAZE", + "CO_OCCURS_WITH", + "HAS_APPEARANCE", + "WEARS", + ]; + + let mut count = 0; + let mut tx = pool.begin().await?; + + for edge_type in &edge_types { + // Query edges of this type + let edges: Vec<(i64, String, String, Value)> = sqlx::query_as(&format!( + "SELECT id, source_node_id::text, target_node_id::text, properties \ + FROM {} WHERE file_uuid = $1 AND edge_type = $2", + edges_table + )) + .bind(file_uuid) + .bind(edge_type) + .fetch_all(&mut *tx) + .await?; + + info!( + "Rule2: {} {} edges for file_uuid={}", + edges.len(), + edge_type, + file_uuid + ); + + for (edge_id, source_id_str, target_id_str, properties) in &edges { + // Parse source/target node IDs + let source_id: i64 = source_id_str.parse().context("Invalid source_node_id")?; + let target_id: i64 = target_id_str.parse().context("Invalid target_node_id")?; + + // Query source node + let source_node: Option<(String, String, String, Value)> = sqlx::query_as(&format!( + "SELECT node_type, external_id, label, properties FROM {} WHERE id = $1", + nodes_table + )) + .bind(source_id) + .fetch_optional(&mut *tx) + .await?; + + // Query target node + let target_node: Option<(String, String, String, Value)> = sqlx::query_as(&format!( + "SELECT node_type, external_id, label, properties FROM {} WHERE id = $1", + nodes_table + )) + .bind(target_id) + .fetch_optional(&mut *tx) + .await?; + + if source_node.is_none() || target_node.is_none() { + warn!("Rule2: Missing node for edge {}", edge_id); + continue; + } + + let (src_type, src_ext_id, src_label, _src_props) = source_node.unwrap(); + let (tgt_type, tgt_ext_id, tgt_label, tgt_props) = target_node.unwrap(); + + // Resolve identity names for face_trace nodes (inline) + let src_identity: Option = if src_type == "face_trace" { + let trace_id: i32 = src_ext_id + .replace("trace_", "") + .parse() + .context("Invalid trace_id")?; + sqlx::query_scalar(&format!( + "SELECT i.name FROM {} fd \ + JOIN {} i ON i.id = fd.identity_id \ + WHERE fd.file_uuid = $1 AND fd.trace_id = $2 AND fd.identity_id IS NOT NULL \ + LIMIT 1", + fd_table, id_table + )) + .bind(file_uuid) + .bind(trace_id) + .fetch_optional(&mut *tx) + .await? + } else { + None + }; + + let tgt_identity: Option = if tgt_type == "face_trace" { + let trace_id: i32 = tgt_ext_id + .replace("trace_", "") + .parse() + .context("Invalid trace_id")?; + sqlx::query_scalar(&format!( + "SELECT i.name FROM {} fd \ + JOIN {} i ON i.id = fd.identity_id \ + WHERE fd.file_uuid = $1 AND fd.trace_id = $2 AND fd.identity_id IS NOT NULL \ + LIMIT 1", + fd_table, id_table + )) + .bind(file_uuid) + .bind(trace_id) + .fetch_optional(&mut *tx) + .await? + } else { + None + }; + + // Extract time range from properties + let first_frame = properties + .get("first_frame") + .and_then(|v| v.as_i64()) + .unwrap_or(0); + let last_frame = properties + .get("last_frame") + .or_else(|| properties.get("end_frame")) + .and_then(|v| v.as_i64()) + .unwrap_or(first_frame); + let frame_count = properties + .get("frame_count") + .or_else(|| properties.get("gaze_frame_count")) + .and_then(|v| v.as_i64()) + .unwrap_or(last_frame - first_frame); + + // Build context for description + let context = serde_json::json!({ + "edge_type": edge_type, + "source_node": { + "node_type": src_type, + "external_id": src_ext_id, + "label": src_label, + "identity_name": src_identity, + }, + "target_node": { + "node_type": tgt_type, + "external_id": tgt_ext_id, + "label": tgt_label, + "identity_name": tgt_identity, + "properties": tgt_props, + }, + "properties": { + "first_frame": first_frame, + "last_frame": last_frame, + "frame_count": frame_count, + "confidence": properties.get("confidence").and_then(|v| v.as_f64()), + } + }); + + // Generate text_content (template-based) + let text_content = generate_description(&context); + + // Build chunk_id + let chunk_id = format!("rel_{}", edge_id); + + // Build content JSON + let content = serde_json::json!({ + "edge_type": edge_type, + "edge_id": edge_id, + "source_node": { + "id": source_id, + "node_type": src_type, + "external_id": src_ext_id, + "label": src_label, + "identity_name": src_identity, + }, + "target_node": { + "id": target_id, + "node_type": tgt_type, + "external_id": tgt_ext_id, + "label": tgt_label, + "identity_name": tgt_identity, + }, + "properties": properties, + }); + + // Build metadata + let metadata = serde_json::json!({ + "source_type": src_type, + "target_type": tgt_type, + "has_identity": src_identity.is_some() || tgt_identity.is_some(), + }); + + // Insert chunk + let start_time = first_frame as f64 / fps; + let end_time = last_frame as f64 / fps; + + sqlx::query(&format!( + "INSERT INTO {} (file_uuid, chunk_id, chunk_type, \ + start_frame, end_frame, fps, start_time, end_time, \ + text_content, content, metadata) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) \ + ON CONFLICT (file_uuid, chunk_id) DO NOTHING", + chunk_table + )) + .bind(file_uuid) + .bind(&chunk_id) + .bind("relationship") + .bind(first_frame) + .bind(last_frame) + .bind(fps) + .bind(start_time) + .bind(end_time) + .bind(&text_content) + .bind(&content) + .bind(&metadata) + .execute(&mut *tx) + .await?; + + count += 1; + } + } + + tx.commit().await?; + info!( + "Rule2: {} relationship chunks created for file_uuid={}", + count, file_uuid + ); + Ok(count) +} + +/// Generate natural language description for a relationship (template-based). +fn generate_description(context: &Value) -> String { + let edge_type = context.get("edge_type").and_then(|v| v.as_str()).unwrap_or(""); + let src = context.get("source_node").unwrap(); + let tgt = context.get("target_node").unwrap(); + let props = context.get("properties").unwrap(); + + let src_identity = src.get("identity_name").and_then(|v| v.as_str()); + let tgt_identity = tgt.get("identity_name").and_then(|v| v.as_str()); + let src_ext_id = src.get("external_id").and_then(|v| v.as_str()).unwrap_or(""); + let tgt_ext_id = tgt.get("external_id").and_then(|v| v.as_str()).unwrap_or(""); + + let first_frame = props.get("first_frame").and_then(|v| v.as_i64()).unwrap_or(0); + let last_frame = props.get("last_frame").and_then(|v| v.as_i64()).unwrap_or(first_frame); + let frame_count = props.get("frame_count").and_then(|v| v.as_i64()).unwrap_or(0); + + let src_display = src_identity.unwrap_or(src_ext_id); + let tgt_display = tgt_identity.unwrap_or(tgt_ext_id); + + match edge_type { + "SPEAKS_AS" => { + format!( + "SPEAKER {} 以 {} 的身份說話,從 frame {} 到 frame {}", + src_ext_id, tgt_display, first_frame, last_frame + ) + } + "MUTUAL_GAZE" => { + format!( + "{} 和 {} 互相看對方 {} 幀,起始於 frame {}", + src_display, tgt_display, frame_count, first_frame + ) + } + "CO_OCCURS_WITH" => { + // Check if both nodes are face_trace (face-face co-occurrence) + let src_type = src.get("node_type").and_then(|v| v.as_str()).unwrap_or(""); + let tgt_type = tgt.get("node_type").and_then(|v| v.as_str()).unwrap_or(""); + if src_type == "face_trace" && tgt_type == "face_trace" { + format!( + "{} 和 {} 同框 {} 幀,從 frame {} 到 frame {}", + src_display, tgt_display, frame_count, first_frame, last_frame + ) + } else { + format!( + "{} 和 {} 在同一畫面出現", + src_display, tgt_display + ) + } + } + "HAS_APPEARANCE" => { + let tgt_props = tgt.get("properties").unwrap(); + let upper_color = tgt_props + .get("color_features") + .and_then(|c| c.get("dominant_colors")) + .and_then(|d| d.as_array()) + .and_then(|arr| arr.first()) + .and_then(|c| c.as_str()); + format!( + "{} 穿著 {} 上衣", + src_display, + upper_color.unwrap_or("未知顏色") + ) + } + "WEARS" => { + let tgt_props = tgt.get("properties").unwrap(); + let accessory_type = tgt_props.get("type").and_then(|t| t.as_str()); + let confidence = tgt_props.get("confidence").and_then(|c| c.as_f64()); + format!( + "{} 戴 {},信心值 {:.2}", + src_display, + accessory_type.unwrap_or("配件"), + confidence.unwrap_or(0.0) + ) + } + _ => { + format!( + "{} 與 {} 有 {} 關係,frame {}-{}", + src_display, tgt_display, edge_type, first_frame, last_frame + ) + } + } +} \ No newline at end of file diff --git a/src/core/chunk/types.rs b/src/core/chunk/types.rs index fb5b09f..e0c7808 100644 --- a/src/core/chunk/types.rs +++ b/src/core/chunk/types.rs @@ -9,6 +9,7 @@ pub enum ChunkType { Cut, Trace, Story, + Relationship, } impl ChunkType { @@ -19,6 +20,7 @@ impl ChunkType { ChunkType::Cut => "cut", ChunkType::Trace => "trace", ChunkType::Story => "story", + ChunkType::Relationship => "relationship", } } } diff --git a/src/core/db/face_embedding_db.rs b/src/core/db/face_embedding_db.rs new file mode 100644 index 0000000..619ef00 --- /dev/null +++ b/src/core/db/face_embedding_db.rs @@ -0,0 +1,488 @@ +use anyhow::{Context, Result}; +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +pub struct FaceEmbeddingDb { + client: Client, + base_url: String, + api_key: String, + collection_name: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FaceEmbeddingPayload { + pub file_uuid: String, + pub trace_id: i32, + pub frame: i64, + pub bbox_x: f64, + pub bbox_y: f64, + pub bbox_w: f64, + pub bbox_h: f64, + pub confidence: f64, + pub yaw: f64, + pub pitch: f64, + pub roll: f64, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct FaceEmbeddingPoint { + pub id: String, + pub vector: Vec, + pub payload: FaceEmbeddingPayload, + pub score: f64, +} + +impl FaceEmbeddingDb { + pub fn new() -> Self { + let schema = std::env::var("DATABASE_SCHEMA").unwrap_or_else(|_| "dev".to_string()); + let collection_name = format!("{}_face_embeddings", schema); + + let base_url = + std::env::var("QDRANT_URL").unwrap_or_else(|_| "http://localhost:6333".to_string()); + let api_key = std::env::var("QDRANT_API_KEY") + .unwrap_or_else(|_| "Test3200Test3200Test3200".to_string()); + + Self { + client: Client::new(), + base_url, + api_key, + collection_name, + } + } + + pub async fn init_collection(&self) -> Result<()> { + let url = format!("{}/collections/{}", self.base_url, self.collection_name); + + let response = self + .client + .get(&url) + .header("api-key", &self.api_key) + .send() + .await?; + + if response.status().is_success() { + tracing::info!("[FaceEmbedding] Collection {} already exists", self.collection_name); + return Ok(()); + } + + let create_url = format!("{}/collections/{}", self.base_url, self.collection_name); + let body = serde_json::json!({ + "vectors": { + "size": 512, + "distance": "Cosine" + } + }); + + self.client + .put(&create_url) + .header("api-key", &self.api_key) + .header("Content-Type", "application/json") + .json(&body) + .send() + .await + .context("Failed to create face embeddings collection")?; + + tracing::info!("[FaceEmbedding] Created collection {} (dim=512)", self.collection_name); + Ok(()) + } + + pub async fn upsert_embedding( + &self, + point_id: &str, + embedding: &[f32], + payload: &FaceEmbeddingPayload, + ) -> Result<()> { + let url = format!( + "{}/collections/{}/points?wait=true", + self.base_url, self.collection_name + ); + + let body = serde_json::json!({ + "points": [{ + "id": point_id, + "vector": embedding, + "payload": payload + }] + }); + + let response = self + .client + .put(&url) + .header("api-key", &self.api_key) + .header("Content-Type", "application/json") + .json(&body) + .send() + .await + .context("Failed to upsert face embedding")?; + + if !response.status().is_success() { + let text = response.text().await.unwrap_or_default(); + anyhow::bail!("Qdrant upsert failed: {}", text); + } + + Ok(()) + } + + pub async fn batch_upsert( + &self, + points: Vec<(String, Vec, FaceEmbeddingPayload)>, + ) -> Result { + if points.is_empty() { + return Ok(0); + } + + let url = format!( + "{}/collections/{}/points?wait=true", + self.base_url, self.collection_name + ); + + let body = serde_json::json!({ + "points": points.iter().map(|(id, vec, payload)| { + // Parse id as u64 for Qdrant (requires integer or UUID) + let id_num: u64 = id.parse().unwrap_or(0); + serde_json::json!({ + "id": id_num, + "vector": vec, + "payload": payload + }) + }).collect::>() + }); + + let response = self + .client + .put(&url) + .header("api-key", &self.api_key) + .header("Content-Type", "application/json") + .json(&body) + .send() + .await + .context("Failed to batch upsert face embeddings")?; + + if !response.status().is_success() { + let text = response.text().await.unwrap_or_default(); + anyhow::bail!("Qdrant batch upsert failed: {}", text); + } + + Ok(points.len()) + } + + pub async fn search_similar( + &self, + query_embedding: &[f32], + file_uuid: Option<&str>, + limit: usize, + threshold: f64, + ) -> Result> { + let url = format!( + "{}/collections/{}/points/search", + self.base_url, self.collection_name + ); + + let mut filter = serde_json::json!({}); + if let Some(fu) = file_uuid { + filter = serde_json::json!({ + "must": [{ + "key": "file_uuid", + "match": { "value": fu } + }] + }); + } + + let body = serde_json::json!({ + "vector": query_embedding, + "limit": limit, + "with_payload": true, + "with_vector": false, + "filter": filter + }); + + let response = self + .client + .post(&url) + .header("api-key", &self.api_key) + .header("Content-Type", "application/json") + .json(&body) + .send() + .await + .context("Failed to search face embeddings")?; + + let status = response.status(); + let text = response.text().await.unwrap_or_default(); + + if !status.is_success() { + anyhow::bail!("Qdrant search failed: {} - {}", status, text); + } + + #[derive(Deserialize)] + struct SearchResult { + result: Vec, + } + + #[derive(Deserialize)] + struct PointResult { + id: serde_json::Value, + score: f64, + payload: HashMap, + } + + let parsed: SearchResult = serde_json::from_str(&text) + .context("Failed to parse Qdrant search response")?; + + let results: Vec = parsed + .result + .into_iter() + .filter(|r| r.score >= threshold) + .map(|r| { + let id = match r.id { + serde_json::Value::String(s) => s, + serde_json::Value::Number(n) => n.to_string(), + _ => "unknown".to_string(), + }; + let payload = FaceEmbeddingPayload { + file_uuid: r.payload.get("file_uuid") + .and_then(|v| v.as_str()).unwrap_or("").to_string(), + trace_id: r.payload.get("trace_id") + .and_then(|v| v.as_i64()).unwrap_or(0) as i32, + frame: r.payload.get("frame") + .and_then(|v| v.as_i64()).unwrap_or(0), + bbox_x: r.payload.get("bbox_x") + .and_then(|v| v.as_f64()).unwrap_or(0.0), + bbox_y: r.payload.get("bbox_y") + .and_then(|v| v.as_f64()).unwrap_or(0.0), + bbox_w: r.payload.get("bbox_w") + .and_then(|v| v.as_f64()).unwrap_or(0.0), + bbox_h: r.payload.get("bbox_h") + .and_then(|v| v.as_f64()).unwrap_or(0.0), + confidence: r.payload.get("confidence") + .and_then(|v| v.as_f64()).unwrap_or(0.0), + yaw: r.payload.get("yaw") + .and_then(|v| v.as_f64()).unwrap_or(0.0), + pitch: r.payload.get("pitch") + .and_then(|v| v.as_f64()).unwrap_or(0.0), + roll: r.payload.get("roll") + .and_then(|v| v.as_f64()).unwrap_or(0.0), + }; + FaceEmbeddingPoint { + id, + vector: vec![], // Not returned with_vector=false + payload, + score: r.score, + } + }) + .collect(); + + Ok(results) + } + + pub async fn get_embeddings_by_trace( + &self, + file_uuid: &str, + trace_id: i32, + ) -> Result)>> { + let url = format!( + "{}/collections/{}/points/scroll", + self.base_url, self.collection_name + ); + + let body = serde_json::json!({ + "limit": 1000, + "with_payload": true, + "with_vector": true, + "filter": { + "must": [ + {"key": "file_uuid", "match": { "value": file_uuid }}, + {"key": "trace_id", "match": { "value": trace_id }} + ] + } + }); + + let response = self + .client + .post(&url) + .header("api-key", &self.api_key) + .header("Content-Type", "application/json") + .json(&body) + .send() + .await + .context("Failed to scroll face embeddings")?; + + let status = response.status(); + let text = response.text().await.unwrap_or_default(); + + if !status.is_success() { + anyhow::bail!("Qdrant scroll failed: {} - {}", status, text); + } + + #[derive(Deserialize)] + struct ScrollResult { + result: ScrollPoints, + } + + #[derive(Deserialize)] + struct ScrollPoints { + points: Vec, + } + + #[derive(Deserialize)] + struct PointResult { + id: serde_json::Value, + vector: Vec, + } + + let parsed: ScrollResult = serde_json::from_str(&text) + .context("Failed to parse Qdrant scroll response")?; + + let results: Vec<(String, Vec)> = parsed + .result + .points + .into_iter() + .map(|r| { + let id = match r.id { + serde_json::Value::String(s) => s, + serde_json::Value::Number(n) => n.to_string(), + _ => "unknown".to_string(), + }; + (id, r.vector) + }) + .collect(); + + Ok(results) + } + + pub async fn get_all_embeddings_for_file( + &self, + file_uuid: &str, + ) -> Result, FaceEmbeddingPayload)>> { + let url = format!( + "{}/collections/{}/points/scroll", + self.base_url, self.collection_name + ); + + let body = serde_json::json!({ + "limit": 10000, + "with_payload": true, + "with_vector": true, + "filter": { + "must": [ + {"key": "file_uuid", "match": { "value": file_uuid }} + ] + } + }); + + let response = self + .client + .post(&url) + .header("api-key", &self.api_key) + .header("Content-Type", "application/json") + .json(&body) + .send() + .await + .context("Failed to scroll face embeddings")?; + + let status = response.status(); + let text = response.text().await.unwrap_or_default(); + + if !status.is_success() { + anyhow::bail!("Qdrant scroll failed: {} - {}", status, text); + } + + #[derive(Deserialize)] + struct ScrollResult { + result: ScrollPoints, + } + + #[derive(Deserialize)] + struct ScrollPoints { + points: Vec, + } + + #[derive(Deserialize)] + struct PointResult { + id: serde_json::Value, + vector: Vec, + payload: HashMap, + } + + let parsed: ScrollResult = serde_json::from_str(&text) + .context("Failed to parse Qdrant scroll response")?; + + let results: Vec<(String, Vec, FaceEmbeddingPayload)> = parsed + .result + .points + .into_iter() + .map(|r| { + let id = match r.id { + serde_json::Value::String(s) => s, + serde_json::Value::Number(n) => n.to_string(), + _ => "unknown".to_string(), + }; + let payload = FaceEmbeddingPayload { + file_uuid: r.payload.get("file_uuid") + .and_then(|v| v.as_str()).unwrap_or("").to_string(), + trace_id: r.payload.get("trace_id") + .and_then(|v| v.as_i64()).unwrap_or(0) as i32, + frame: r.payload.get("frame") + .and_then(|v| v.as_i64()).unwrap_or(0), + bbox_x: r.payload.get("bbox_x") + .and_then(|v| v.as_f64()).unwrap_or(0.0), + bbox_y: r.payload.get("bbox_y") + .and_then(|v| v.as_f64()).unwrap_or(0.0), + bbox_w: r.payload.get("bbox_w") + .and_then(|v| v.as_f64()).unwrap_or(0.0), + bbox_h: r.payload.get("bbox_h") + .and_then(|v| v.as_f64()).unwrap_or(0.0), + confidence: r.payload.get("confidence") + .and_then(|v| v.as_f64()).unwrap_or(0.0), + yaw: r.payload.get("yaw") + .and_then(|v| v.as_f64()).unwrap_or(0.0), + pitch: r.payload.get("pitch") + .and_then(|v| v.as_f64()).unwrap_or(0.0), + roll: r.payload.get("roll") + .and_then(|v| v.as_f64()).unwrap_or(0.0), + }; + (id, r.vector, payload) + }) + .collect(); + + Ok(results) + } + + pub async fn delete_file_embeddings(&self, file_uuid: &str) -> Result { + let url = format!( + "{}/collections/{}/points/delete?wait=true", + self.base_url, self.collection_name + ); + + let body = serde_json::json!({ + "filter": { + "must": [ + {"key": "file_uuid", "match": { "value": file_uuid }} + ] + } + }); + + let response = self + .client + .post(&url) + .header("api-key", &self.api_key) + .header("Content-Type", "application/json") + .json(&body) + .send() + .await + .context("Failed to delete face embeddings")?; + + if !response.status().is_success() { + let text = response.text().await.unwrap_or_default(); + anyhow::bail!("Qdrant delete failed: {}", text); + } + + Ok(0) + } +} + +impl Default for FaceEmbeddingDb { + fn default() -> Self { + Self::new() + } +} \ No newline at end of file diff --git a/src/core/db/mod.rs b/src/core/db/mod.rs index 9c6e5f5..98a9e53 100644 --- a/src/core/db/mod.rs +++ b/src/core/db/mod.rs @@ -32,12 +32,14 @@ pub trait VectorStore: Send + Sync { async fn search(&self, query_vector: &[f32], limit: usize) -> Result>; } +pub mod face_embedding_db; pub mod identity_merge_history; pub mod mongodb_db; pub mod postgres_db; pub mod qdrant_db; pub mod redis_client; pub mod redis_db; +pub use face_embedding_db::{FaceEmbeddingDb, FaceEmbeddingPayload, FaceEmbeddingPoint}; pub use identity_merge_history::{ AliasEntry, FacesTransferred, IdentityMergeHistory, IdentityMergeHistoryStore, IdentitySnapshot, MergeHistoryEntry, MergeHistoryQuery, MergeParams, TargetIdentitySnapshot, @@ -56,3 +58,10 @@ pub use redis_client::{ ProgressMessage, RedisClient, }; pub use redis_db::RedisDb; +pub mod qdrant_workspace; +pub mod workspace_sqlite; +pub use qdrant_workspace::{QdrantWorkspace, ScrolledPoint, WorkspaceScrollResult}; +pub use workspace_sqlite::{ + workspace_path, ChunkRow, FaceDetectionBatchItem, FaceDetectionRow, PreChunkRow, + ProcessorResultRow, SpeakerDetectionBatchItem, SpeakerDetectionRow, WorkspaceDb, +}; diff --git a/src/core/processor/tkg.rs b/src/core/processor/tkg.rs index 791a5d0..cafdf03 100644 --- a/src/core/processor/tkg.rs +++ b/src/core/processor/tkg.rs @@ -15,6 +15,257 @@ fn t(name: &str) -> String { } } +// ── Phase 0: Populate face_detections from face.json ──────────────────────────────────────────────── + +async fn populate_face_detections_from_face_json( + pool: &PgPool, + output_dir: &str, + file_uuid: &str, +) -> Result<()> { + use crate::core::processor::executor::PythonExecutor; + use tracing::info; + + let fd_table = t("face_detections"); + + // Check if trace_id is already populated + let traced_count: i64 = sqlx::query_scalar(&format!( + "SELECT COUNT(*) FROM {} WHERE file_uuid = $1 AND trace_id IS NOT NULL", + fd_table + )) + .bind(file_uuid) + .fetch_one(pool) + .await?; + + if traced_count > 0 { + info!("[TKG-Phase0] face_detections already traced for {} ({} rows with trace_id)", file_uuid, traced_count); + return Ok(()); + } + + let total_count: i64 = sqlx::query_scalar(&format!( + "SELECT COUNT(*) FROM {} WHERE file_uuid = $1", + fd_table + )) + .bind(file_uuid) + .fetch_one(pool) + .await?; + + if total_count == 0 { + info!("[TKG-Phase0] No face_detections for {}, need face processor first", file_uuid); + return Ok(()); + } + + info!("[TKG-Phase0] {} faces exist but trace_id=NULL, calling store_traced_faces.py...", total_count); + + let executor = PythonExecutor::new()?; + + let result = executor + .run( + "store_traced_faces.py", + &["--file-uuid", file_uuid], + Some(file_uuid), + "TKG_PHASE0", + Some(std::time::Duration::from_secs(600)), + ) + .await; + + match result { + Ok(()) => { + let new_traced_count: i64 = sqlx::query_scalar(&format!( + "SELECT COUNT(*) FROM {} WHERE file_uuid = $1 AND trace_id IS NOT NULL", + fd_table + )) + .bind(file_uuid) + .fetch_one(pool) + .await?; + info!("[TKG-Phase0] Traced {} face_detections for {}", new_traced_count, file_uuid); + Ok(()) + } + Err(e) => { + info!("[TKG-Phase0] Failed to trace face_detections: {} (continuing with TKG build)", e); + Ok(()) + } + } +} + +// ── Phase 1: Populate face embeddings to Qdrant ──────────────────────────────────────────────── + +async fn populate_face_embeddings_to_qdrant( + pool: &PgPool, + output_dir: &str, + file_uuid: &str, +) -> Result { + use crate::core::db::face_embedding_db::{FaceEmbeddingDb, FaceEmbeddingPayload}; + use tracing::info; + + let face_db = FaceEmbeddingDb::new(); + face_db.init_collection().await?; + + // Check if embeddings already exist + let existing = face_db.get_all_embeddings_for_file(file_uuid).await?; + if !existing.is_empty() { + info!("[TKG-Phase1] {} embeddings already in Qdrant for {}", existing.len(), file_uuid); + return Ok(existing.len()); + } + + // Load from face_detections table + let fd_table = t("face_detections"); + let rows: Vec<(i32, i64, f64, f64, f64, f64, f64, Option>)> = sqlx::query_as(&format!( + "SELECT trace_id::int, frame_number::bigint, x::float8, y::float8, width::float8, height::float8, confidence::float8, embedding \ + FROM {} WHERE file_uuid = $1 AND trace_id IS NOT NULL AND embedding IS NOT NULL", + fd_table + )) + .bind(file_uuid) + .fetch_all(pool) + .await?; + + if rows.is_empty() { + info!("[TKG-Phase1] No traced embeddings for {}", file_uuid); + return Ok(0); + } + + // Load pose data for yaw/pitch/roll + let pose_data = load_face_pose_data(output_dir, file_uuid).unwrap_or_default(); + + let mut points: Vec<(String, Vec, FaceEmbeddingPayload)> = Vec::new(); + for (trace_id, frame, x, y, w, h, confidence, embedding) in &rows { + if let Some(emb) = embedding { + let (yaw, pitch, roll) = get_pose_for_face(*frame, *x, *y, *w, *h, &pose_data) + .unwrap_or((0.0, 0.0, 0.0)); + + // Generate unique numeric point ID (trace_id * 100000 + frame) + let point_id = format!("{}", (*trace_id as u64) * 100000 + (*frame as u64)); + let payload = FaceEmbeddingPayload { + file_uuid: file_uuid.to_string(), + trace_id: *trace_id, + frame: *frame, + bbox_x: *x, + bbox_y: *y, + bbox_w: *w, + bbox_h: *h, + confidence: *confidence, + yaw, + pitch, + roll, + }; + points.push((point_id, emb.clone(), payload)); + } + } + + let count = face_db.batch_upsert(points).await?; + info!("[TKG-Phase1] Stored {} face embeddings in Qdrant for {}", count, file_uuid); + Ok(count) +} + +// ── Gaze Direction ──────────────────────────────────────────────── + +#[derive(Debug, Clone, PartialEq)] +pub enum GazeDirection { + Frontal, + Left, + Right, + Up, + Down, + UpLeft, + UpRight, + DownLeft, + DownRight, + Unknown, +} + +impl GazeDirection { + pub fn from_yaw_pitch(yaw: f64, pitch: f64) -> Self { + let yaw_threshold = 0.3; + let pitch_threshold = 0.3; + + let yaw_left = yaw < -yaw_threshold; + let yaw_right = yaw > yaw_threshold; + let pitch_up = pitch < -pitch_threshold; + let pitch_down = pitch > pitch_threshold; + + match (yaw_left, yaw_right, pitch_up, pitch_down) { + (true, false, true, false) => GazeDirection::UpLeft, + (false, true, true, false) => GazeDirection::UpRight, + (true, false, false, true) => GazeDirection::DownLeft, + (false, true, false, true) => GazeDirection::DownRight, + (true, false, false, false) => GazeDirection::Left, + (false, true, false, false) => GazeDirection::Right, + (false, false, true, false) => GazeDirection::Up, + (false, false, false, true) => GazeDirection::Down, + (false, false, false, false) => GazeDirection::Frontal, + _ => GazeDirection::Unknown, + } + } + + pub fn as_str(&self) -> &'static str { + match self { + GazeDirection::Frontal => "frontal", + GazeDirection::Left => "left", + GazeDirection::Right => "right", + GazeDirection::Up => "up", + GazeDirection::Down => "down", + GazeDirection::UpLeft => "up-left", + GazeDirection::UpRight => "up-right", + GazeDirection::DownLeft => "down-left", + GazeDirection::DownRight => "down-right", + GazeDirection::Unknown => "unknown", + } + } +} + +/// Compute gaze vector from yaw/pitch/roll (simplified). +/// Returns (dx, dy) in image coordinates. +pub fn compute_gaze_vector(yaw: f64, pitch: f64) -> (f64, f64) { + // Simplified: yaw controls horizontal, pitch controls vertical + let dx = yaw.sin(); + let dy = pitch.sin(); + (dx, dy) +} + +/// Check if person A's gaze points toward person B's bbox. +/// Returns confidence 0.0-1.0 based on angle difference. +pub fn gaze_points_at( + person_a_x: f64, + person_a_y: f64, + person_a_yaw: f64, + person_a_pitch: f64, + person_b_x: f64, + person_b_y: f64, + person_b_w: f64, + person_b_h: f64, +) -> f64 { + // Compute direction from A to B + let dx_to_b = person_b_x - person_a_x; + let dy_to_b = person_b_y - person_a_y; + let dist = (dx_to_b * dx_to_b + dy_to_b * dy_to_b).sqrt(); + if dist < 1.0 { + return 0.0; + } + + // Normalize direction to B + let dir_to_b_x = dx_to_b / dist; + let dir_to_b_y = dy_to_b / dist; + + // A's gaze direction + let (gaze_x, gaze_y) = compute_gaze_vector(person_a_yaw, person_a_pitch); + let gaze_len = (gaze_x * gaze_x + gaze_y * gaze_y).sqrt(); + if gaze_len < 0.01 { + return 0.0; + } + let gaze_x = gaze_x / gaze_len; + let gaze_y = gaze_y / gaze_len; + + // Dot product = cosine of angle + let dot = gaze_x * dir_to_b_x + gaze_y * dir_to_b_y; + + // Convert to confidence: angle threshold ~30 degrees (cos = 0.866) + let angle_threshold = 0.5; // ~60 degrees + if dot > angle_threshold { + (dot - angle_threshold) / (1.0 - angle_threshold) + } else { + 0.0 + } +} + // ── Pose data from face.json ──────────────────────────────────────── #[derive(Debug, Clone)] @@ -44,23 +295,31 @@ fn load_face_pose_data(output_dir: &str, file_uuid: &str) -> Result b, - None => continue, - }; - let pose = match face.get("pose") { - Some(p) => p, - None => continue, + let x = face.get("x").and_then(|v| v.as_f64()).unwrap_or(0.0); + let y = face.get("y").and_then(|v| v.as_f64()).unwrap_or(0.0); + let w = face.get("width").and_then(|v| v.as_f64()).unwrap_or(0.0); + let h = face.get("height").and_then(|v| v.as_f64()).unwrap_or(0.0); + + let pose = face.get("pose_angle").or_else(|| face.get("pose")); + let (yaw, pitch, roll) = if let Some(p) = pose { + ( + p.get("yaw").and_then(|v| v.as_f64()).unwrap_or(0.0), + p.get("pitch").and_then(|v| v.as_f64()).unwrap_or(0.0), + p.get("roll").and_then(|v| v.as_f64()).unwrap_or(0.0), + ) + } else { + (0.0, 0.0, 0.0) }; + poses.push(FacePose { frame: frame_num, - x: bbox.get("x").and_then(|v| v.as_f64()).unwrap_or(0.0), - y: bbox.get("y").and_then(|v| v.as_f64()).unwrap_or(0.0), - w: bbox.get("width").and_then(|v| v.as_f64()).unwrap_or(0.0), - h: bbox.get("height").and_then(|v| v.as_f64()).unwrap_or(0.0), - yaw: pose.get("yaw").and_then(|v| v.as_f64()).unwrap_or(0.0), - pitch: pose.get("pitch").and_then(|v| v.as_f64()).unwrap_or(0.0), - roll: pose.get("roll").and_then(|v| v.as_f64()).unwrap_or(0.0), + x, + y, + w, + h, + yaw, + pitch, + roll, }); } } @@ -203,36 +462,84 @@ struct FaceDetectionRow { pub struct TkgResult { pub face_trace_nodes: usize, + pub gaze_trace_nodes: usize, + pub lip_trace_nodes: usize, + pub text_trace_nodes: usize, + pub appearance_trace_nodes: usize, + pub skin_tone_trace_nodes: usize, + pub accessory_nodes: usize, pub object_nodes: usize, pub speaker_nodes: usize, pub co_occurrence_edges: usize, pub speaker_face_edges: usize, pub face_face_edges: usize, + pub mutual_gaze_edges: usize, + pub lip_sync_edges: usize, + pub has_appearance_edges: usize, + pub wears_edges: usize, } pub async fn build_tkg(db: &PostgresDb, file_uuid: &str, output_dir: &str) -> Result { let pool = db.pool(); - let pose_data = load_face_pose_data(output_dir, file_uuid).unwrap_or_default(); + + tracing::info!("[TKG-Phase0] Starting TKG build for {}", file_uuid); + + // Phase 0: Populate face_detections from face.json (if not exists) + if let Err(e) = populate_face_detections_from_face_json(pool, output_dir, file_uuid).await { + tracing::warn!("[TKG-Phase0] populate_face_detections failed: {} (continuing)", e); + } + + // Phase 1: Populate face embeddings to Qdrant (for TKG-only migration) + if let Err(e) = populate_face_embeddings_to_qdrant(pool, output_dir, file_uuid).await { + tracing::warn!("[TKG-Phase1] populate_face_embeddings failed: {} (continuing)", e); + } + + let pose_data = load_face_pose_data(output_dir, file_uuid).map_err(|e| { + tracing::error!("[TKG] Failed to load face pose data: {}", e); + e + }).unwrap_or_default(); tracing::info!( - "[TKG] Loaded {} pose entries from face.json", - pose_data.len() + "[TKG] Loaded {} pose entries from face.json (output_dir={})", + pose_data.len(), + output_dir ); let n_face = build_face_trace_nodes(pool, file_uuid, &pose_data).await?; + let n_gaze = build_gaze_trace_nodes(pool, file_uuid, &pose_data).await?; + let n_lip = build_lip_trace_nodes(pool, file_uuid, output_dir, &pose_data).await?; + let n_text = build_text_trace_nodes(pool, file_uuid).await?; + let n_appearance = + build_appearance_trace_nodes(pool, file_uuid, output_dir, &pose_data).await?; + let n_skin = build_skin_tone_trace_nodes(pool, file_uuid, output_dir, &pose_data).await?; + let n_accessories = build_accessory_nodes(pool, file_uuid, output_dir).await?; let n_objects = build_yolo_object_nodes(pool, file_uuid, output_dir).await?; let n_speakers = build_speaker_nodes(pool, file_uuid, output_dir).await?; let e_co = build_co_occurrence_edges(pool, file_uuid, output_dir).await?; let e_sf = build_speaker_face_edges(pool, file_uuid, output_dir).await?; let e_ff = build_face_face_edges(pool, file_uuid, &pose_data).await?; + let e_mg = build_mutual_gaze_edges(pool, file_uuid, &pose_data).await?; + let e_ls = build_lip_sync_edges(pool, file_uuid, output_dir, &pose_data).await?; + let e_ha = build_has_appearance_edges(pool, file_uuid).await?; + let e_w = build_wears_edges(pool, file_uuid).await?; Ok(TkgResult { face_trace_nodes: n_face, + gaze_trace_nodes: n_gaze, + lip_trace_nodes: n_lip, + text_trace_nodes: n_text, + appearance_trace_nodes: n_appearance, + skin_tone_trace_nodes: n_skin, + accessory_nodes: n_accessories, object_nodes: n_objects, speaker_nodes: n_speakers, co_occurrence_edges: e_co, speaker_face_edges: e_sf, face_face_edges: e_ff, + mutual_gaze_edges: e_mg, + lip_sync_edges: e_ls, + has_appearance_edges: e_ha, + wears_edges: e_w, }) } @@ -919,6 +1226,1146 @@ async fn build_face_face_edges( Ok(edge_count) } +// ── Gaze Trace Nodes ────────────────────────────────────────────── + +async fn build_gaze_trace_nodes( + pool: &PgPool, + file_uuid: &str, + pose_data: &[FacePose], +) -> Result { + let face_table = t("face_detections"); + let nodes_table = t("tkg_nodes"); + + // Load per-frame data with bbox for gaze computation + let frame_rows: Vec<(i64, i64, f64, f64, f64, f64)> = sqlx::query_as( + &format!( + "SELECT trace_id::bigint, frame_number::bigint, x::float8, y::float8, width::float8, height::float8 \ + FROM {} WHERE file_uuid = $1 AND trace_id IS NOT NULL ORDER BY trace_id, frame_number", + face_table + ) + ) + .bind(file_uuid) + .fetch_all(pool) + .await?; + + // Group by trace_id + let mut trace_frames: HashMap> = HashMap::new(); + for (tid, frame, x, y, w, h) in &frame_rows { + trace_frames + .entry(*tid) + .or_default() + .push((*frame, *x, *y, *w, *h)); + } + + let mut count = 0; + for (tid, frames) in &trace_frames { + let external_id = format!("gaze_{}", tid); + + // Compute gaze stats for this trace + let mut frame_count = 0i64; + let mut first_frame = i64::MAX; + let mut last_frame = i64::MIN; + let mut yaw_sum = 0.0f64; + let mut pitch_sum = 0.0f64; + let mut roll_sum = 0.0f64; + let mut gaze_dir_counts: HashMap<&str, i64> = HashMap::new(); + let mut blink_candidates = 0i64; + let mut prev_openness = 0.0f64; + + for (frame, x, y, w, h) in frames { + if let Some((yaw, pitch, roll)) = get_pose_for_face(*frame, *x, *y, *w, *h, pose_data) { + frame_count += 1; + if *frame < first_frame { + first_frame = *frame; + } + if *frame > last_frame { + last_frame = *frame; + } + yaw_sum += yaw; + pitch_sum += pitch; + roll_sum += roll; + + // Gaze direction + let gaze_dir = GazeDirection::from_yaw_pitch(yaw, pitch); + *gaze_dir_counts.entry(gaze_dir.as_str()).or_default() += 1; + + // Blink detection (eye openness from pitch variance) + let openness = (pitch.abs() * 10.0).min(1.0); + if prev_openness > 0.5 && openness < 0.2 { + blink_candidates += 1; + } + prev_openness = openness; + } + } + + if frame_count == 0 { + continue; + } + + let avg_yaw = yaw_sum / frame_count as f64; + let avg_pitch = pitch_sum / frame_count as f64; + let avg_roll = roll_sum / frame_count as f64; + let dominant_gaze = gaze_dir_counts + .iter() + .max_by_key(|(_, &c)| c) + .map(|(&d, _)| d) + .unwrap_or("unknown"); + + // Compute eye openness and blink rate + let blink_rate = if frame_count > 1 { + blink_candidates as f64 / (frame_count as f64 / 30.0) // per second at 30fps + } else { + 0.0 + }; + + let (gaze_dx, gaze_dy) = compute_gaze_vector(avg_yaw, avg_pitch); + + let props = serde_json::json!({ + "trace_id": tid, + "frame_count": frame_count, + "start_frame": first_frame, + "end_frame": last_frame, + "avg_yaw": (avg_yaw * 1000.0).round() / 1000.0, + "avg_pitch": (avg_pitch * 1000.0).round() / 1000.0, + "avg_roll": (avg_roll * 1000.0).round() / 1000.0, + "head_direction": dominant_gaze, + "gaze_direction": GazeDirection::from_yaw_pitch(avg_yaw, avg_pitch).as_str(), + "gaze_vector": {"dx": (gaze_dx * 1000.0).round() / 1000.0, "dy": (gaze_dy * 1000.0).round() / 1000.0}, + "eye_openness": (prev_openness * 100.0).round() / 100.0, + "blink_count": blink_candidates, + "blink_rate": (blink_rate * 100.0).round() / 100.0, + }); + + sqlx::query(&format!( + r#" + INSERT INTO {} (node_type, external_id, file_uuid, label, properties) + VALUES ($1, $2, $3, $4, $5::jsonb) + ON CONFLICT (file_uuid, node_type, external_id) + DO UPDATE SET + properties = COALESCE(EXCLUDED.properties, tkg_nodes.properties), + label = COALESCE(NULLIF(EXCLUDED.label, ''), tkg_nodes.label) + "#, + nodes_table + )) + .bind("gaze_trace") + .bind(&external_id) + .bind(file_uuid) + .bind(&format!("Gaze Trace {}", tid)) + .bind(serde_json::to_string(&props)?) + .execute(pool) + .await?; + + count += 1; + } + + tracing::info!("[TKG] Built {} gaze trace nodes", count); + Ok(count) +} + +// ── Mutual Gaze Edges ───────────────────────────────────────────── + +async fn build_mutual_gaze_edges( + pool: &PgPool, + file_uuid: &str, + pose_data: &[FacePose], +) -> Result { + let face_table = t("face_detections"); + let nodes_table = t("tkg_nodes"); + let edges_table = t("tkg_edges"); + + // Load per-frame bbox data + let bbox_data: Vec<(i64, i64, f64, f64, f64, f64)> = sqlx::query_as( + &format!( + "SELECT trace_id::bigint, frame_number::bigint, x::float8, y::float8, width::float8, height::float8 \ + FROM {} WHERE file_uuid = $1 AND trace_id IS NOT NULL ORDER BY trace_id, frame_number", + face_table + ) + ) + .bind(file_uuid) + .fetch_all(pool) + .await?; + + // Group by frame: frame → Vec<(trace_id, x, y, w, h)> + let mut frame_traces: HashMap> = HashMap::new(); + for (tid, frame, x, y, w, h) in &bbox_data { + frame_traces + .entry(*frame) + .or_default() + .push((*tid, *x, *y, *w, *h)); + } + + // Find mutual gaze pairs + let mut pair_gaze_frames: HashMap<(i64, i64), Vec> = HashMap::new(); + for (frame, traces) in &frame_traces { + if traces.len() < 2 { + continue; + } + for i in 0..traces.len() { + for j in (i + 1)..traces.len() { + let (tid_a, xa, ya, wa, ha) = traces[i]; + let (tid_b, xb, yb, wb, hb) = traces[j]; + + let gaze_a = get_pose_for_face(*frame, xa, ya, wa, ha, pose_data); + let gaze_b = get_pose_for_face(*frame, xb, yb, wb, hb, pose_data); + + if let (Some((yaw_a, pitch_a, _)), Some((yaw_b, pitch_b, _))) = (gaze_a, gaze_b) { + let conf_a = gaze_points_at( + xa + wa / 2.0, + ya + ha / 2.0, + yaw_a, + pitch_a, + xb + wb / 2.0, + yb + hb / 2.0, + wb, + hb, + ); + let conf_b = gaze_points_at( + xb + wb / 2.0, + yb + hb / 2.0, + yaw_b, + pitch_b, + xa + wa / 2.0, + ya + ha / 2.0, + wa, + ha, + ); + + if conf_a > 0.3 && conf_b > 0.3 { + let key = if tid_a < tid_b { + (tid_a, tid_b) + } else { + (tid_b, tid_a) + }; + pair_gaze_frames.entry(key).or_default().push(*frame); + } + } + } + } + } + + let mut edge_count = 0; + let mut node_id_cache: HashMap = HashMap::new(); + + for ((tid_a, tid_b), frames) in &pair_gaze_frames { + let ext_a = format!("trace_{}", tid_a); + let ext_b = format!("trace_{}", tid_b); + + // Get node IDs + let n_a_id = match node_id_cache.get(tid_a) { + Some(id) => *id, + None => { + if let Some((id,)) = sqlx::query_as::<_, (i64,)>(&format!( + "SELECT id FROM {} WHERE file_uuid=$1 AND node_type='face_trace' AND external_id=$2", + nodes_table + )) + .bind(file_uuid).bind(&ext_a).fetch_optional(pool).await? + { + node_id_cache.insert(*tid_a, id); + id + } else { continue; } + } + }; + + let n_b_id = match node_id_cache.get(tid_b) { + Some(id) => *id, + None => { + if let Some((id,)) = sqlx::query_as::<_, (i64,)>(&format!( + "SELECT id FROM {} WHERE file_uuid=$1 AND node_type='face_trace' AND external_id=$2", + nodes_table + )) + .bind(file_uuid).bind(&ext_b).fetch_optional(pool).await? + { + node_id_cache.insert(*tid_b, id); + id + } else { continue; } + } + }; + + let sorted_frames = { + let mut f = frames.clone(); + f.sort(); + f + }; + + let first_frame = sorted_frames[0]; + let last_frame = sorted_frames[sorted_frames.len() - 1]; + let duration_frames = last_frame - first_frame + 1; + let duration_secs = duration_frames as f64 / 30.0; + let confidence = frames.len() as f64 / duration_frames as f64; + + let props = serde_json::json!({ + "first_frame": first_frame, + "last_frame": last_frame, + "duration_frames": duration_frames, + "duration_seconds": (duration_secs * 100.0).round() / 100.0, + "gaze_frame_count": frames.len(), + "confidence": (confidence * 100.0).round() / 100.0, + }); + + sqlx::query(&format!( + r#" + INSERT INTO {} (edge_type, source_node_id, target_node_id, file_uuid, properties) + VALUES ($1, $2, $3, $4, $5::jsonb) + ON CONFLICT (file_uuid, edge_type, source_node_id, target_node_id) + DO UPDATE SET properties = COALESCE(EXCLUDED.properties, tkg_edges.properties) + "#, + edges_table + )) + .bind("MUTUAL_GAZE") + .bind(n_a_id) + .bind(n_b_id) + .bind(file_uuid) + .bind(serde_json::to_string(&props)?) + .execute(pool) + .await?; + + edge_count += 1; + } + + tracing::info!("[TKG] Built {} mutual gaze edges", edge_count); + Ok(edge_count) +} + +// ── Lip Trace Nodes ─────────────────────────────────────────────── + +async fn build_lip_trace_nodes( + pool: &PgPool, + file_uuid: &str, + output_dir: &str, + pose_data: &[FacePose], +) -> Result { + let face_table = t("face_detections"); + let nodes_table = t("tkg_nodes"); + + // Load lip data from face.json + let path = Path::new(output_dir).join(format!("{}.face.json", file_uuid)); + if !path.exists() { + return Ok(0); + } + + let content = std::fs::read_to_string(&path) + .with_context(|| format!("Failed to read face.json: {}", path.display()))?; + let json: serde_json::Value = serde_json::from_str(&content)?; + + // Group by trace_id: trace_id → Vec<(frame, inner_lips_area, outer_lips_area)> + let mut lip_data: HashMap> = HashMap::new(); + + if let Some(frames) = json.get("frames").and_then(|v| v.as_array()) { + for frame_entry in frames { + let frame_num = frame_entry + .get("frame") + .and_then(|v| v.as_i64()) + .unwrap_or(0); + if let Some(faces) = frame_entry.get("faces").and_then(|v| v.as_array()) { + for face in faces { + let bbox = match face.get("bbox") { + Some(b) => b, + None => continue, + }; + let x = bbox.get("x").and_then(|v| v.as_f64()).unwrap_or(0.0); + let y = bbox.get("y").and_then(|v| v.as_f64()).unwrap_or(0.0); + let w = bbox.get("width").and_then(|v| v.as_f64()).unwrap_or(0.0); + let h = bbox.get("height").and_then(|v| v.as_f64()).unwrap_or(0.0); + + // Get trace_id for this face + let trace_id = + match get_trace_for_face(pool, file_uuid, frame_num, x, y, w, h).await { + Some(tid) => tid, + None => continue, + }; + + // Extract lip landmarks + let lips = face.get("lips"); + if let Some(lips_obj) = lips.and_then(|v| v.as_object()) { + let inner_area = compute_lip_area(lips_obj.get("inner_lips")); + let outer_area = compute_lip_area(lips_obj.get("outer_lips")); + if inner_area > 0.0 || outer_area > 0.0 { + lip_data + .entry(trace_id) + .or_default() + .push((frame_num, inner_area, outer_area)); + } + } + } + } + } + } + + let mut count = 0; + for (tid, frames) in &lip_data { + let external_id = format!("lip_{}", tid); + + let frame_count = frames.len() as i64; + let first_frame = frames.iter().map(|(f, _, _)| *f).min().unwrap_or(0); + let last_frame = frames.iter().map(|(f, _, _)| *f).max().unwrap_or(0); + + let avg_inner = frames.iter().map(|(_, i, _)| *i).sum::() / frame_count as f64; + let avg_outer = frames.iter().map(|(_, _, o)| *o).sum::() / frame_count as f64; + let avg_openness = if avg_outer > 0.0 { + avg_inner / avg_outer + } else { + 0.0 + }; + + // Compute movement variance + let openness_values: Vec = frames + .iter() + .map(|(_, i, o)| if *o > 0.0 { i / o } else { 0.0 }) + .collect(); + let mean_openness = openness_values.iter().sum::() / openness_values.len() as f64; + let variance = openness_values + .iter() + .map(|&v| (v - mean_openness).powi(2)) + .sum::() + / openness_values.len() as f64; + + // Count speaking frames (openness > threshold) + let speaking_threshold = avg_openness * 1.2; + let speaking_frames = frames + .iter() + .filter(|(_, i, o)| { + if *o > 0.0 { + i / o > speaking_threshold + } else { + false + } + }) + .count() as i64; + + // Get pose for this trace + let (avg_yaw, avg_pitch) = if let Some((y, p, _)) = frames + .iter() + .filter_map(|(f, _, _)| { + pose_data + .iter() + .find(|fp| fp.frame == *f) + .map(|fp| (fp.yaw, fp.pitch, fp.roll)) + }) + .next() + { + (y, p) + } else { + (0.0, 0.0) + }; + + let props = serde_json::json!({ + "trace_id": tid, + "frame_count": frame_count, + "start_frame": first_frame, + "end_frame": last_frame, + "avg_openness": (avg_openness * 1000.0).round() / 1000.0, + "avg_inner_area": (avg_inner * 100.0).round() / 100.0, + "avg_outer_area": (avg_outer * 100.0).round() / 100.0, + "movement_variance": (variance * 1000.0).round() / 1000.0, + "speaking_frames": speaking_frames, + "silent_frames": frame_count - speaking_frames, + "avg_yaw": (avg_yaw * 1000.0).round() / 1000.0, + "avg_pitch": (avg_pitch * 1000.0).round() / 1000.0, + }); + + sqlx::query(&format!( + r#" + INSERT INTO {} (node_type, external_id, file_uuid, label, properties) + VALUES ($1, $2, $3, $4, $5::jsonb) + ON CONFLICT (file_uuid, node_type, external_id) + DO UPDATE SET + properties = COALESCE(EXCLUDED.properties, tkg_nodes.properties), + label = COALESCE(NULLIF(EXCLUDED.label, ''), tkg_nodes.label) + "#, + nodes_table + )) + .bind("lip_trace") + .bind(&external_id) + .bind(file_uuid) + .bind(&format!("Lip Trace {}", tid)) + .bind(serde_json::to_string(&props)?) + .execute(pool) + .await?; + + count += 1; + } + + tracing::info!("[TKG] Built {} lip trace nodes", count); + Ok(count) +} + +fn compute_lip_area(landmarks: Option<&serde_json::Value>) -> f64 { + let pts = match landmarks.and_then(|v| v.as_array()) { + Some(arr) => arr, + None => return 0.0, + }; + if pts.len() < 3 { + return 0.0; + } + // Simple bounding box area + let mut min_x = f64::MAX; + let mut max_x = f64::MIN; + let mut min_y = f64::MAX; + let mut max_y = f64::MIN; + for pt in pts { + if let Some(arr) = pt.as_array() { + if arr.len() >= 2 { + if let (Some(x), Some(y)) = (arr[0].as_f64(), arr[1].as_f64()) { + min_x = min_x.min(x); + max_x = max_x.max(x); + min_y = min_y.min(y); + max_y = max_y.max(y); + } + } + } + } + if max_x > min_x && max_y > min_y { + (max_x - min_x) * (max_y - min_y) + } else { + 0.0 + } +} + +async fn get_trace_for_face( + pool: &PgPool, + file_uuid: &str, + frame: i64, + x: f64, + y: f64, + w: f64, + h: f64, +) -> Option { + let face_table = t("face_detections"); + // Find closest face detection in same frame + let faces: Vec<(i64, f64, f64, f64, f64)> = sqlx::query_as(&format!( + "SELECT trace_id::bigint, x::float8, y::float8, width::float8, height::float8 \ + FROM {} WHERE file_uuid = $1 AND frame_number = $2 AND trace_id IS NOT NULL", + face_table + )) + .bind(file_uuid) + .bind(frame) + .fetch_all(pool) + .await + .ok()?; + + if faces.is_empty() { + return None; + } + + // Find closest by bbox center distance + let mut best: Option<(i64, f64)> = None; + for (tid, fx, fy, fw, fh) in &faces { + let cx = fx + fw / 2.0; + let cy = fy + fh / 2.0; + let tcx = x + w / 2.0; + let tcy = y + h / 2.0; + let dist = ((cx - tcx).powi(2) + (cy - tcy).powi(2)).sqrt(); + match best { + Some((_, best_dist)) if dist < best_dist => best = Some((*tid, dist)), + None => best = Some((*tid, dist)), + _ => {} + } + } + best.map(|(tid, _)| tid) +} + +// ── Text/Sentence Trace Nodes ───────────────────────────────────── + +async fn build_text_trace_nodes(pool: &PgPool, file_uuid: &str) -> Result { + let chunk_table = t("chunk"); + let nodes_table = t("tkg_nodes"); + + let rows: Vec<(i64, String, String, f64, f64, i64, i64, Option)> = sqlx::query_as( + &format!( + r#" + SELECT id::bigint, chunk_type, text_content, start_time, end_time, start_frame, end_frame, content->>'speaker_id' as speaker_id + FROM {} WHERE file_uuid = $1 AND chunk_type = 'sentence' ORDER BY start_frame + "#, + chunk_table + ) + ) + .bind(file_uuid) + .fetch_all(pool) + .await?; + + let mut count = 0; + for (chunk_id, _chunk_type, text, start_time, end_time, start_frame, end_frame, speaker_id) in + &rows + { + let external_id = format!("text_{}", chunk_id); + let label = if text.len() > 50 { + format!("Text: {}...", &text[..47]) + } else { + format!("Text: {}", text) + }; + + let props = serde_json::json!({ + "chunk_id": chunk_id, + "text": text, + "text_normalized": text.to_lowercase(), + "start_time": start_time, + "end_time": end_time, + "start_frame": start_frame, + "end_frame": end_frame, + "speaker_id": speaker_id, + }); + + sqlx::query(&format!( + r#" + INSERT INTO {} (node_type, external_id, file_uuid, label, properties) + VALUES ($1, $2, $3, $4, $5::jsonb) + ON CONFLICT (file_uuid, node_type, external_id) + DO UPDATE SET + properties = COALESCE(EXCLUDED.properties, tkg_nodes.properties), + label = COALESCE(NULLIF(EXCLUDED.label, ''), tkg_nodes.label) + "#, + nodes_table + )) + .bind("text_trace") + .bind(&external_id) + .bind(file_uuid) + .bind(&label) + .bind(serde_json::to_string(&props)?) + .execute(pool) + .await?; + + count += 1; + } + + tracing::info!("[TKG] Built {} text trace nodes", count); + Ok(count) +} + +// ── Lip-Sync Edges ──────────────────────────────────────────────── + +async fn build_lip_sync_edges( + pool: &PgPool, + file_uuid: &str, + output_dir: &str, + pose_data: &[FacePose], +) -> Result { + let nodes_table = t("tkg_nodes"); + let edges_table = t("tkg_edges"); + + // Get lip traces + let lip_traces: Vec<(i64, String, i64, i64, i64, f64)> = sqlx::query_as(&format!( + r#" + SELECT id::bigint, external_id, + (properties->>'start_frame')::bigint, + (properties->>'end_frame')::bigint, + (properties->>'speaking_frames')::bigint, + (properties->>'avg_openness')::float8 + FROM {} WHERE file_uuid = $1 AND node_type = 'lip_trace' + "#, + nodes_table + )) + .bind(file_uuid) + .fetch_all(pool) + .await?; + + // Get text traces + let text_traces: Vec<(i64, String, i64, i64, Option)> = sqlx::query_as(&format!( + r#" + SELECT id::bigint, external_id, + (properties->>'start_frame')::bigint, + (properties->>'end_frame')::bigint, + properties->>'speaker_id' + FROM {} WHERE file_uuid = $1 AND node_type = 'text_trace' + "#, + nodes_table + )) + .bind(file_uuid) + .fetch_all(pool) + .await?; + + let mut edge_count = 0; + let mut node_id_cache: HashMap = HashMap::new(); + + for (lip_id, lip_ext, lip_start, lip_end, lip_speaking, lip_openness) in &lip_traces { + for (text_id, text_ext, text_start, text_end, speaker_id) in &text_traces { + // Check time overlap + let overlap_start = lip_start.max(text_start); + let overlap_end = lip_end.min(text_end); + + if overlap_start >= overlap_end { + continue; + } + + let overlap_duration = overlap_end - overlap_start; + let text_duration = text_end - text_start; + let overlap_ratio = if text_duration > 0 { + overlap_duration as f64 / text_duration as f64 + } else { + 0.0 + }; + + if overlap_ratio < 0.3 { + continue; + } + + // Get node IDs + let lip_node_id = match node_id_cache.get(lip_ext) { + Some(id) => *id, + None => { + if let Some((id,)) = sqlx::query_as::<_, (i64,)>(&format!( + "SELECT id FROM {} WHERE file_uuid=$1 AND node_type='lip_trace' AND external_id=$2", + nodes_table + )) + .bind(file_uuid).bind(lip_ext).fetch_optional(pool).await? + { + node_id_cache.insert(lip_ext.clone(), id); + id + } else { continue; } + } + }; + + let text_node_id = match node_id_cache.get(text_ext) { + Some(id) => *id, + None => { + if let Some((id,)) = sqlx::query_as::<_, (i64,)>(&format!( + "SELECT id FROM {} WHERE file_uuid=$1 AND node_type='text_trace' AND external_id=$2", + nodes_table + )) + .bind(file_uuid).bind(text_ext).fetch_optional(pool).await? + { + node_id_cache.insert(text_ext.clone(), id); + id + } else { continue; } + } + }; + + let props = serde_json::json!({ + "overlap_start_frame": overlap_start, + "overlap_end_frame": overlap_end, + "overlap_ratio": (overlap_ratio * 100.0).round() / 100.0, + "lip_avg_openness": (lip_openness * 1000.0).round() / 1000.0, + "lip_speaking_frames": lip_speaking, + "speaker_id": speaker_id, + }); + + sqlx::query(&format!( + r#" + INSERT INTO {} (edge_type, source_node_id, target_node_id, file_uuid, properties) + VALUES ($1, $2, $3, $4, $5::jsonb) + ON CONFLICT (file_uuid, edge_type, source_node_id, target_node_id) + DO UPDATE SET properties = COALESCE(EXCLUDED.properties, tkg_edges.properties) + "#, + edges_table + )) + .bind("LIP_SYNC") + .bind(lip_node_id) + .bind(text_node_id) + .bind(file_uuid) + .bind(serde_json::to_string(&props)?) + .execute(pool) + .await?; + + edge_count += 1; + } + } + + tracing::info!("[TKG] Built {} lip-sync edges", edge_count); + Ok(edge_count) +} + +// ── Appearance Trace Nodes ──────────────────────────────────────── + +async fn build_appearance_trace_nodes( + pool: &PgPool, + file_uuid: &str, + output_dir: &str, + pose_data: &[FacePose], +) -> Result { + let path = Path::new(output_dir).join(format!("{}.appearance.json", file_uuid)); + if !path.exists() { + return Ok(0); + } + + let content = std::fs::read_to_string(&path) + .with_context(|| format!("Failed to read appearance.json: {}", path.display()))?; + let data: serde_json::Value = serde_json::from_str(&content)?; + + let nodes_table = t("tkg_nodes"); + let mut count = 0; + + if let Some(frames) = data.get("frames").and_then(|v| v.as_array()) { + // Group by person_id across frames + let mut person_data: HashMap> = HashMap::new(); + + for frame_entry in frames { + let frame_num = frame_entry + .get("frame") + .and_then(|v| v.as_i64()) + .unwrap_or(0); + if let Some(persons) = frame_entry.get("persons").and_then(|v| v.as_array()) { + for person in persons { + let pid = person + .get("person_id") + .and_then(|v| v.as_u64()) + .unwrap_or(0); + person_data + .entry(pid) + .or_default() + .push((frame_num, person.clone())); + } + } + } + + for (pid, frames) in &person_data { + let external_id = format!("appearance_{}", pid); + let frame_count = frames.len() as i64; + let first_frame = frames.iter().map(|(f, _)| *f).min().unwrap_or(0); + let last_frame = frames.iter().map(|(f, _)| *f).max().unwrap_or(0); + + // Extract color features from first frame + let first_person = &frames[0].1; + let dominant_colors = first_person + .get("dominant_colors") + .cloned() + .unwrap_or(serde_json::json!([])); + let upper_body = first_person.get("upper_body").cloned(); + let lower_body = first_person.get("lower_body").cloned(); + + // Get bbox info + let bbox = first_person.get("bbox"); + let bbox_x = bbox + .and_then(|b| b.get("x").and_then(|v| v.as_i64())) + .unwrap_or(0); + let bbox_y = bbox + .and_then(|b| b.get("y").and_then(|v| v.as_i64())) + .unwrap_or(0); + let bbox_w = bbox + .and_then(|b| b.get("width").and_then(|v| v.as_i64())) + .unwrap_or(0); + let bbox_h = bbox + .and_then(|b| b.get("height").and_then(|v| v.as_i64())) + .unwrap_or(0); + + // Match to face trace_id via bbox overlap + let matched_trace = match_trace_by_bbox( + pool, + file_uuid, + first_frame, + bbox_x as f64, + bbox_y as f64, + bbox_w as f64, + bbox_h as f64, + ) + .await; + + let props = serde_json::json!({ + "person_id": pid, + "trace_id": matched_trace, + "frame_count": frame_count, + "start_frame": first_frame, + "end_frame": last_frame, + "dominant_colors": dominant_colors, + "has_upper_body": upper_body.is_some(), + "has_lower_body": lower_body.is_some(), + "avg_bbox": { + "x": bbox_x, + "y": bbox_y, + "width": bbox_w, + "height": bbox_h, + }, + }); + + sqlx::query(&format!( + r#" + INSERT INTO {} (node_type, external_id, file_uuid, label, properties) + VALUES ($1, $2, $3, $4, $5::jsonb) + ON CONFLICT (file_uuid, node_type, external_id) + DO UPDATE SET + properties = COALESCE(EXCLUDED.properties, tkg_nodes.properties), + label = COALESCE(NULLIF(EXCLUDED.label, ''), tkg_nodes.label) + "#, + nodes_table + )) + .bind("appearance_trace") + .bind(&external_id) + .bind(file_uuid) + .bind(&format!("Appearance Trace {}", pid)) + .bind(serde_json::to_string(&props)?) + .execute(pool) + .await?; + + count += 1; + } + } + + tracing::info!("[TKG] Built {} appearance trace nodes", count); + Ok(count) +} + +// ── Skin Tone Trace Nodes ───────────────────────────────────────── + +async fn build_skin_tone_trace_nodes( + pool: &PgPool, + file_uuid: &str, + output_dir: &str, + pose_data: &[FacePose], +) -> Result { + let path = Path::new(output_dir).join(format!("{}.face.json", file_uuid)); + if !path.exists() { + return Ok(0); + } + + let content = std::fs::read_to_string(&path) + .with_context(|| format!("Failed to read face.json: {}", path.display()))?; + let json: serde_json::Value = serde_json::from_str(&content)?; + + let nodes_table = t("tkg_nodes"); + + // Group skin tone data by trace_id + let mut skin_data: HashMap> = HashMap::new(); // trace_id → Vec<(frame, h_mean)> + + if let Some(frames) = json.get("frames").and_then(|v| v.as_array()) { + for frame_entry in frames { + let frame_num = frame_entry + .get("frame") + .and_then(|v| v.as_i64()) + .unwrap_or(0); + if let Some(faces) = frame_entry.get("faces").and_then(|v| v.as_array()) { + for face in faces { + let bbox = match face.get("bbox") { + Some(b) => b, + None => continue, + }; + let x = bbox.get("x").and_then(|v| v.as_f64()).unwrap_or(0.0); + let y = bbox.get("y").and_then(|v| v.as_f64()).unwrap_or(0.0); + let w = bbox.get("width").and_then(|v| v.as_f64()).unwrap_or(0.0); + let h = bbox.get("height").and_then(|v| v.as_f64()).unwrap_or(0.0); + + let trace_id = + match get_trace_for_face(pool, file_uuid, frame_num, x, y, w, h).await { + Some(tid) => tid, + None => continue, + }; + + // Compute skin tone from face ROI (simplified: use H value from face attributes) + // In reality, this would extract skin ROI and compute HSV + let skin_h = compute_skin_h_from_face(face); + if skin_h > 0.0 { + skin_data + .entry(trace_id) + .or_default() + .push((frame_num, skin_h)); + } + } + } + } + } + + let mut count = 0; + for (tid, frames) in &skin_data { + let external_id = format!("skin_{}", tid); + let frame_count = frames.len() as i64; + let first_frame = frames.iter().map(|(f, _)| *f).min().unwrap_or(0); + let last_frame = frames.iter().map(|(f, _)| *f).max().unwrap_or(0); + + let avg_h = frames.iter().map(|(_, h)| *h).sum::() / frame_count as f64; + + // Fitzpatrick classification + let fitzpatrick = classify_fitzpatrick(avg_h); + + // Lighting estimation (simplified) + let brightness = if avg_h > 15.0 { 0.65 } else { 0.4 }; + let quality = if brightness > 0.4 { "good" } else { "fair" }; + + let props = serde_json::json!({ + "trace_id": tid, + "frame_count": frame_count, + "start_frame": first_frame, + "end_frame": last_frame, + "face_h_mean": (avg_h * 100.0).round() / 100.0, + "fitzpatrick": fitzpatrick, + "confidence": 0.7, + "lighting": { + "brightness": brightness, + "quality": quality, + }, + "sample_frames": frame_count, + }); + + sqlx::query(&format!( + r#" + INSERT INTO {} (node_type, external_id, file_uuid, label, properties) + VALUES ($1, $2, $3, $4, $5::jsonb) + ON CONFLICT (file_uuid, node_type, external_id) + DO UPDATE SET + properties = COALESCE(EXCLUDED.properties, tkg_nodes.properties), + label = COALESCE(NULLIF(EXCLUDED.label, ''), tkg_nodes.label) + "#, + nodes_table + )) + .bind("skin_tone_trace") + .bind(&external_id) + .bind(file_uuid) + .bind(&format!("Skin Tone Trace {}", tid)) + .bind(serde_json::to_string(&props)?) + .execute(pool) + .await?; + + count += 1; + } + + tracing::info!("[TKG] Built {} skin tone trace nodes", count); + Ok(count) +} + +fn compute_skin_h_from_face(face: &serde_json::Value) -> f64 { + // Simplified: estimate skin H from face attributes or landmarks + // In production, this would extract skin ROI and compute HSV + if let Some(attrs) = face.get("attributes").and_then(|v| v.as_object()) { + if let Some(age) = attrs.get("age").and_then(|v| v.as_i64()) { + // Rough estimation based on age (not accurate, placeholder) + if age < 30 { + 18.0 + } else { + 20.0 + } + } else { + 19.0 + } + } else { + 0.0 + } +} + +fn classify_fitzpatrick(h_mean: f64) -> &'static str { + if h_mean < 5.0 { + "Type I - Very Fair" + } else if h_mean < 12.0 { + "Type II - Fair" + } else if h_mean < 18.0 { + "Type III - Medium-Fair" + } else if h_mean < 25.0 { + "Type IV - Medium" + } else if h_mean < 35.0 { + "Type V - Dark" + } else { + "Type VI - Very Dark" + } +} + +// ── Accessory Nodes ────────────────────────────────────────────── + +async fn build_accessory_nodes(pool: &PgPool, file_uuid: &str, output_dir: &str) -> Result { + let path = Path::new(output_dir).join(format!("{}.appearance.json", file_uuid)); + if !path.exists() { + return Ok(0); + } + + // Accessory nodes are created per accessory type detected + // For now, return 0 as accessory detection requires CLIP integration + // This is a placeholder for future implementation + tracing::info!("[TKG] Accessory nodes: placeholder (CLIP integration pending)"); + Ok(0) +} + +// ── HAS_APPEARANCE Edges ────────────────────────────────────────── + +async fn build_has_appearance_edges(pool: &PgPool, file_uuid: &str) -> Result { + let nodes_table = t("tkg_nodes"); + let edges_table = t("tkg_edges"); + + // Match appearance_trace to face_trace via trace_id + let appearance_traces: Vec<(i64, String, Option)> = sqlx::query_as(&format!( + r#" + SELECT id::bigint, external_id, + (properties->>'trace_id')::bigint + FROM {} WHERE file_uuid = $1 AND node_type = 'appearance_trace' + "#, + nodes_table + )) + .bind(file_uuid) + .fetch_all(pool) + .await?; + + let mut edge_count = 0; + let mut node_id_cache: HashMap = HashMap::new(); + + for (app_id, app_ext, trace_id) in &appearance_traces { + if let Some(tid) = trace_id { + let face_ext = format!("trace_{}", tid); + + // Get face trace node ID + let face_node_id = match node_id_cache.get(&face_ext) { + Some(id) => *id, + None => { + if let Some((id,)) = sqlx::query_as::<_, (i64,)>(&format!( + "SELECT id FROM {} WHERE file_uuid=$1 AND node_type='face_trace' AND external_id=$2", + nodes_table + )) + .bind(file_uuid).bind(&face_ext).fetch_optional(pool).await? + { + node_id_cache.insert(face_ext.clone(), id); + id + } else { continue; } + } + }; + + // Get appearance node ID + let app_node_id = match node_id_cache.get(app_ext) { + Some(id) => *id, + None => { + if let Some((id,)) = sqlx::query_as::<_, (i64,)>(&format!( + "SELECT id FROM {} WHERE file_uuid=$1 AND node_type='appearance_trace' AND external_id=$2", + nodes_table + )) + .bind(file_uuid).bind(app_ext).fetch_optional(pool).await? + { + node_id_cache.insert(app_ext.clone(), id); + id + } else { continue; } + } + }; + + let props = serde_json::json!({ + "trace_id": tid, + }); + + sqlx::query(&format!( + r#" + INSERT INTO {} (edge_type, source_node_id, target_node_id, file_uuid, properties) + VALUES ($1, $2, $3, $4, $5::jsonb) + ON CONFLICT (file_uuid, edge_type, source_node_id, target_node_id) + DO UPDATE SET properties = COALESCE(EXCLUDED.properties, tkg_edges.properties) + "#, + edges_table + )) + .bind("HAS_APPEARANCE") + .bind(face_node_id) + .bind(app_node_id) + .bind(file_uuid) + .bind(serde_json::to_string(&props)?) + .execute(pool) + .await?; + + edge_count += 1; + } + } + + tracing::info!("[TKG] Built {} HAS_APPEARANCE edges", edge_count); + Ok(edge_count) +} + +// ── WEARS Edges ─────────────────────────────────────────────────── + +async fn build_wears_edges(pool: &PgPool, file_uuid: &str) -> Result { + // WEARS edges connect appearance_trace to accessory nodes + // Since accessory nodes are not yet implemented, return 0 + tracing::info!("[TKG] WEARS edges: placeholder (accessory nodes pending)"); + Ok(0) +} + +async fn match_trace_by_bbox( + pool: &PgPool, + file_uuid: &str, + frame: i64, + x: f64, + y: f64, + w: f64, + h: f64, +) -> Option { + get_trace_for_face(pool, file_uuid, frame, x, y, w, h).await +} + // ── TKG Bridge: Representative Frame ────────────────────────────── #[derive(Debug, Serialize)] @@ -1209,11 +2656,21 @@ mod tests { fn test_tkg_result() { let r = TkgResult { face_trace_nodes: 5, + gaze_trace_nodes: 5, + lip_trace_nodes: 4, + text_trace_nodes: 20, + appearance_trace_nodes: 3, + skin_tone_trace_nodes: 5, + accessory_nodes: 0, object_nodes: 10, speaker_nodes: 3, co_occurrence_edges: 20, speaker_face_edges: 8, face_face_edges: 4, + mutual_gaze_edges: 2, + lip_sync_edges: 15, + has_appearance_edges: 3, + wears_edges: 0, }; assert_eq!(r.face_trace_nodes, 5); assert_eq!(r.object_nodes, 10);