feat: wire TKG builder into worker pipeline + face-face edges

- Auto-run tkg_builder.py after face trace store + Qdrant sync + trace chunks
- Add face-face CO_OCCURS_WITH edges (two traces in same frame)
- docs: TKG integration report for M4
This commit is contained in:
Accusys
2026-05-09 06:22:27 +08:00
parent b902763d45
commit a0774cb9ab
3 changed files with 172 additions and 1 deletions

View File

@@ -0,0 +1,79 @@
# TKG Integration — Pipeline Auto-Build + Trace Chunks
## 變更摘要
### 1. TKG Builder 現在是 pipeline 的一環
`tkg_builder.py` 不再需要手動執行。worker 在 face trace store + Qdrant sync 完成後自動呼叫:
```
Face Processor ✅
→ store_traced_faces.py (trace_id 寫入 face_detections)
→ sync_face_embeddings() (Qdrant)
→ ingest_traces() (產生 trace chunks)
→ tkg_builder.py ← 自動!
```
### 2. TKG Builder 新增 face↔face edges
原本只有 face↔object (`CO_OCCURS_WITH`) 和 face↔speaker (`SPEAKS_AS`)。
現在加上 **face↔face** — 兩個不同 trace 出現在同一 frame 就建立 edge。
```
(face_trace:2) -[:CO_OCCURS_WITH]-> (face_trace:5)
表示 trace 2 與 trace 5 曾同時出現在畫面中
```
### 3. Trace Chunks新 chunk_type
每個 face trace 對應一個 `chunk_type = 'trace'` 的 chunk
| 欄位 | 內容 |
|------|------|
| `chunk_type` | `'trace'` |
| `start_frame` / `end_frame` | trace 出現的時間區間 |
| `text_content` | 該區間內的 ASR 文字 |
| `metadata.trace_id` | face_detections 的 trace_id |
| `metadata.bbox` | 平均位置 `{x, y, width, height}` |
| `metadata.co_appearances` | 同框的其他 trace `[{trace_id, overlap_frames, overlap_secs}]` |
### 4. Search API 擴充
`SearchFilters` 新增:
| Filter | 用法 |
|--------|------|
| `chunk_type` | `"trace"` / `"sentence"` / `"cut"` |
| `co_appears_with_trace_id` | 找出與指定 trace 同框的所有 chunk |
範例:
```json
POST /api/v1/search/universal
{
"query": "",
"types": ["chunk"],
"filters": {"chunk_type": "trace", "co_appears_with_trace_id": 5}
}
```
## TKG Graph 結構(完整)
```
NODES:
face_trace — 每個 trace_id
object — 每個 YOLO class (person, car, ...)
speaker — 每個 speaker_id (SPEAKER_0, ...)
EDGES:
face_trace -[:CO_OCCURS_WITH]-> object 同 frame
face_trace -[:CO_OCCURS_WITH]-> face_trace 同 frame新增
face_trace -[:SPEAKS_AS]-> speaker 時間重疊 > 30%
```
## git pull 後需執行
```bash
cd /Users/accusys/momentry_core_0.1 && git pull
```
已包含在 `9f5afd1``b902763`

View File

@@ -365,6 +365,73 @@ def build_speaker_face_edges(cur, schema, file_uuid):
return edge_count
def build_face_face_edges(cur, schema, file_uuid):
"""Build CO_OCCURS_WITH edges: face_trace ↔ face_trace in same frame"""
print("[TKG] Building face-face co-occurrence edges...")
cur.execute(
f"""
SELECT a.trace_id AS tid_a, b.trace_id AS tid_b,
a.frame_number, a.timestamp_secs,
a.x AS ax, a.y AS ay, a.width AS aw, a.height AS ah,
b.x AS bx, b.y AS by, b.width AS bw, b.height AS bh
FROM {schema}.face_detections a
JOIN {schema}.face_detections b
ON a.file_uuid = b.file_uuid
AND a.frame_number = b.frame_number
AND a.trace_id < b.trace_id
WHERE a.file_uuid = %s
AND a.trace_id IS NOT NULL
AND b.trace_id IS NOT NULL
ORDER BY a.frame_number
""",
(file_uuid,),
)
rows = cur.fetchall()
if not rows:
print("[TKG] No face-face co-occurrences found")
return 0
# Deduplicate by pair (group all frames where same two traces co-occur)
pair_first = {}
pair_frames = {}
for tid_a, tid_b, frame, ts, ax, ay, aw, ah, bx, by, bw, bh in rows:
key = (min(tid_a, tid_b), max(tid_a, tid_b))
if key not in pair_first:
pair_first[key] = frame
pair_frames.setdefault(key, []).append(frame)
edge_count = 0
for (tid_a, tid_b), frames in pair_frames.items():
cur.execute(
f"SELECT id FROM {schema}.tkg_nodes WHERE file_uuid=%s AND node_type='face_trace' AND external_id=%s",
(file_uuid, f"trace_{tid_a}"),
)
n_a = cur.fetchone()
cur.execute(
f"SELECT id FROM {schema}.tkg_nodes WHERE file_uuid=%s AND node_type='face_trace' AND external_id=%s",
(file_uuid, f"trace_{tid_b}"),
)
n_b = cur.fetchone()
if not n_a or not n_b:
continue
distance_px = ((frames[0] - frames[0]) ** 2) ** 0.5 # placeholder
ensure_edge(
cur, schema, file_uuid,
"CO_OCCURS_WITH",
n_a[0], n_b[0],
{
"first_frame": int(frames[0]),
"frame_count": len(frames),
},
)
edge_count += 1
print(f"[TKG] {edge_count} face-face co-occurrence edges created")
return edge_count
def main():
parser = argparse.ArgumentParser(description="Build Temporal Knowledge Graph")
parser.add_argument("--file-uuid", required=True)
@@ -382,17 +449,19 @@ def main():
e1 = build_co_occurrence_edges(cur, args.schema, args.file_uuid)
e2 = build_speaker_face_edges(cur, args.schema, args.file_uuid)
e3 = build_face_face_edges(cur, args.schema, args.file_uuid)
conn.commit()
cur.close()
conn.close()
print(f"\n[TKG] Complete: {n1+n2+n3} nodes, {e1+e2} edges")
print(f"\n[TKG] Complete: {n1+n2+n3} nodes, {e1+e2+e3} edges")
print(f" Face traces: {n1}")
print(f" Objects: {n2}")
print(f" Speakers: {n3}")
print(f" Co-occur: {e1}")
print(f" Speaker-face:{e2}")
print(f" Face-face: {e3}")
if __name__ == "__main__":

View File

@@ -757,6 +757,29 @@ impl JobWorker {
Ok(n) => info!("✅ {} trace chunks created for {}", n, uuid_clone),
Err(e) => error!("❌ Trace chunk ingestion failed: {}", e),
}
// Build Temporal Knowledge Graph (TKG)
info!("📝 Building TKG graph...");
let executor = match crate::core::processor::PythonExecutor::new() {
Ok(ex) => ex,
Err(e) => {
error!("Failed to create PythonExecutor for TKG: {}", e);
return;
}
};
match executor
.run(
"tkg_builder.py",
&["--file-uuid", &uuid_clone],
Some(&uuid_clone),
"TKG_BUILDER",
Some(std::time::Duration::from_secs(300)),
)
.await
{
Ok(()) => info!("✅ TKG built for {}", uuid_clone),
Err(e) => error!("❌ TKG build failed for {}: {}", uuid_clone, e),
}
}
Err(e) => {
error!("❌ Face trace + DB store failed for {}: {}", uuid_clone, e)