From 088aefdac72f2b920c788ab43e5986b0f6ec8ec6 Mon Sep 17 00:00:00 2001 From: Accusys Date: Mon, 18 May 2026 00:36:14 +0800 Subject: [PATCH] fix: pipeline timeline log, chunk lookup, face processor no fallback, Qdrant UUID script, delete safety rules --- AGENTS.md | 14 +- .../OPERATIONS/SYSTEM_AUDIT_2026-05-17.md | 71 +++++++ scripts/face_processor_v1.py | 39 ++-- scripts/sync_dev_to_public.sh | 148 +++++++++++++++ scripts/update_qdrant_uuid.py | 174 ++++++++++++++++++ src/api/server.rs | 34 +++- src/core/db/postgres_db.rs | 52 +++++- 7 files changed, 503 insertions(+), 29 deletions(-) create mode 100644 docs_v1.0/OPERATIONS/SYSTEM_AUDIT_2026-05-17.md create mode 100755 scripts/sync_dev_to_public.sh create mode 100644 scripts/update_qdrant_uuid.py diff --git a/AGENTS.md b/AGENTS.md index e54c1e1..179b760 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -11,6 +11,9 @@ Rust-based digital asset management system with video analysis and RAG capabilit - **絕對不可修改 n8n 工作流或設定** - **絕對不可修改 WordPress 或 n8n 的資料庫 table** - **除非是 release 作業,絕對不可動 port 3002 (production)** +- **🔴 DELETE / REMOVE / DROP / CLEAR 任何資料前必須先問使用者「要刪嗎?」獲得明確同意後才能執行** +- **🔴 Qdrant collection 刪除、DB truncate、檔案刪除、資料清空 — 一律要先問** +- **🔴 不確定是否該刪 → 先問,不要自己決定** ### 開發範圍界定 | 範圍 | 狀態 | 說明 | @@ -52,7 +55,7 @@ Rust-based digital asset management system with video analysis and RAG capabilit |------|------|----------| | `sed` API doc 編號 | `sed -i '' 's/.../.../g'` 改到所有行 | 先 `grep -c` 確認匹配,`git diff` 再提交 | | 亂加 `/api/v1/register` route | 不必要的 API 別名,汙染路由表 | 角色切換:路由設計不該由實作方決定 | -| `DESIGN/` vs `STANDARDS/` vs `REFERENCE/` | 文件放到錯誤分類 | 先確認文件性質(草稿/標準/參考/記錄) | +| `API_WORKSPACE/` vs `GUIDES/` vs `REFERENCE/` vs `DESIGN/` vs `OPERATIONS/` vs `INTEGRATIONS/` | 文件放到錯誤分類 | API 文件改在 API_WORKSPACE/modules/ 編輯,`make deploy` 生成到 GUIDES/ | | Build release binary in plan mode | 浪費時間,無意義 | 嚴格遵守 plan/build mode 規定 | ### ⛔ 嚴格測試隔離規則 (Strict Test Isolation) @@ -715,7 +718,8 @@ This applies to docs, code, API responses, and curl examples. Exceptions: intern ## Document Compliance Checklist -Before creating any `REFERENCE/*.md` file, verify all items below. +Before creating any file in `docs_v1.0/` (API_WORKSPACE, GUIDES, REFERENCE, DESIGN, OPERATIONS, INTEGRATIONS), verify all items below. +**IMPORTANT**: API functional documents are generated from `API_WORKSPACE/modules/`. Edit modules there, then run `make deploy` in `API_WORKSPACE/` to update `GUIDES/`. Never edit generated files in `GUIDES/` directly. See `DESIGN/Modular_Doc_System_V1.0.md` for the full system design. ### P0 — Mandatory (7 items) @@ -727,7 +731,7 @@ Before creating any `REFERENCE/*.md` file, verify all items below. | 4 | PascalCase filename | e.g. `DetectorRegistry.md`, not `detector_registry.md` | | 5 | `_` separator | Within filenames use `_`, never spaces or other chars | | 6 | English content | Entire file in English | -| 7 | REFERENCE/ path | File must reside under `docs_v1.0/REFERENCE/` | +| 7 | Correct directory | File must reside in appropriate directory: `API_WORKSPACE/modules/` (API endpoint modules), `GUIDES/` (user docs, generated), `REFERENCE/` (data models), `DESIGN/` (architecture), `OPERATIONS/` (infra/release), `INTEGRATIONS/` (n8n/tests) | ### P0b — UUID Naming @@ -740,7 +744,7 @@ Before creating any `REFERENCE/*.md` file, verify all items below. | # | Check | Note | |---|-------|------| -| 1 | Cross-references | Link to related REFERENCE docs | +| 1 | Cross-references | Link to related docs in API_WORKSPACE/, GUIDES/, REFERENCE/, DESIGN/, OPERATIONS/ | | 2 | Glossary terms | Define non-obvious terms inline or link glossary | | 3 | Diagrams | Include Mermaid/ASCII diagram for complex topics | @@ -754,4 +758,4 @@ Before creating any `REFERENCE/*.md` file, verify all items below. 完整交付程序(M4_workspace → M5 → Release → Deploy → Public)見: -`docs_v1.0/REFERENCE/DELIVERY_PROCEDURE.md` +`docs_v1.0/OPERATIONS/DELIVERY_PROCEDURE.md` diff --git a/docs_v1.0/OPERATIONS/SYSTEM_AUDIT_2026-05-17.md b/docs_v1.0/OPERATIONS/SYSTEM_AUDIT_2026-05-17.md new file mode 100644 index 0000000..eca8e02 --- /dev/null +++ b/docs_v1.0/OPERATIONS/SYSTEM_AUDIT_2026-05-17.md @@ -0,0 +1,71 @@ +# System Audit — 2026-05-17 + +## Current State + +### Embedding Storage (三重冗余,無主) + +| 資料類型 | PG pgvector | Qdrant | JSON 檔案 | +|---------|------------|--------|-----------| +| Sentence 向量 | `chunk.embedding` ✅ | `dev_v1` / `rule1_v2` / `sentence_*` ✅ | ❌ 無 | +| Story 向量 | `chunk.embedding` ✅ | `dev_v1` / `dev_stories` ✅ | `.story_llm.json` ✅ | +| Face 向量 | ❌ 已清除(依使用者指示) | `dev_faces` ✅ (97K) | `.face.json` ✅ | +| Voice 向量 | ❌ 無 | `dev_voice` ✅ (4K) | ❌ 無 | + +### Pipeline 問題 + +| 問題 | 影響 | +|------|------| +| `processor_results.duration_secs` 全為 0 | 無法查各步驟耗時 | +| `processor_results.started_at/completed_at` 全 NULL | 時間線遺失 | +| Redis timing 在 job 完成後被清掉 | 唯一 timing 來源消失 | +| `get_chunk_by_chunk_id_and_uuid` 原本是 stub(已修) | Smart search 找不到 PG chunk | +| `server.rs::search()` 未 mount 但仍編譯 | Dead code,混淆 Qdrant 用途 | +| Face embedding 只寫 Qdrant 不寫 PG | 已刪除則全失 | + +### Qdrant Collections 現況 + +| Collection | Points | 來源 | UUID | +|-----------|--------|------|------| +| `dev_v1` | 9,936 | PG rebuild | ✅ bd80fec... | +| `dev_faces` | 97,000 | face.json rebuild | ✅ bd80fec... | +| `dev_stories` | 560 | Snapshot | ✅ bd80fec... | +| `dev_voice` | 4,188 | Snapshot | ✅ bd80fec... | +| `dev_rule1_v2` | 3,417 | Snapshot | ✅ bd80fec... | +| `sentence_story` | 4,188 | Snapshot | ✅ bd80fec... | +| `sentence_summary` | 4,188 | Snapshot | ✅ bd80fec... | + +## Safeguards & Fixes + +### P0 — 必須修 + +| # | Fix | 做法 | +|---|-----|------| +| 1 | **Pipeline timing 寫入 DB** | `update_processor_result()` 加入 `started_at`、`completed_at`、`duration_secs` | +| 2 | **Qdrant 不當主要儲存** | Embedding 以 PG `chunk.embedding` 為 source of truth,Qdrant 唯讀 cache | +| 3 | **Smart search 只走 PG pgvector** | `search_parent_chunks_semantic` 已正確,無需 Qdrant | +| 4 | **移除 `server.rs::search()` dead code** | 或 mount 到正式 route 並確認可用 | + +### P1 — 建議修 + +| # | Fix | 做法 | +|---|-----|------| +| 5 | **刪除 Qdrant 前先 snapshot** | 自動 snapshot script | +| 6 | **清理多餘 Qdrant collections** | `dev_voice` / `dev_stories` / `dev_rule1_v2` / `sentence_*` 無 server reader,可移除 | +| 7 | **Face embedding 寫入 PG 或移除 dead code** | 目前 face Qdrant write 無人讀取,可移除 `sync_face_embeddings` | +| 8 | **UUID 一致性檢查** | 同一 content 不應產生不同 UUID | + +### P2 — 可選 + +| # | Fix | 做法 | +|---|-----|------| +| 9 | `chunk_selector.rs` (player binary)hardcode `momentry_rule1` | 改讀 env var 或 PG | +| 10 | AGENTS.md 已加入 delete 安全規則 | ✅ Done | + +## Data Recovery Path + +| 資料來源 | 可恢復到 | 方法 | +|---------|---------|------| +| `chunk.embedding` (PG) | Qdrant `dev_v1` | SQL → Qdrant upsert | +| `face.json` (磁碟) | Qdrant `dev_faces` | Python script | +| `story_llm.json` (磁碟) | Qdrant `dev_stories` | Python script | +| Qdrant snapshots (phase1) | Qdrant collections | Snapshot upload API | diff --git a/scripts/face_processor_v1.py b/scripts/face_processor_v1.py index ee7e502..f72b39f 100755 --- a/scripts/face_processor_v1.py +++ b/scripts/face_processor_v1.py @@ -64,6 +64,27 @@ def process_face( app = None coreml_embedder = None + + # 載入 CoreML FaceNet(必要,無 fallback) + try: + import coremltools as ct + coreml_path = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "../models/facenet512.mlpackage" + ) + if not os.path.exists(coreml_path): + raise FileNotFoundError(f"CoreML model not found at {coreml_path}") + coreml_embedder = ct.models.MLModel(coreml_path) + framework.publish_info("COREML_FACENET_LOADED") + except Exception as e: + error_msg = f"CoreML FaceNet512 load failed: {e}" + print(f"[FACE] {error_msg}") + framework.publish_error(error_msg) + result = {"metadata": {"status": "error", "error": error_msg}, "frames": {}} + with open(output_path, "w") as f: + json.dump(result, f, indent=2) + return result + try: framework.publish_info("LOADING_INSIGHTFACE") app = insightface.app.FaceAnalysis( @@ -72,21 +93,6 @@ def process_face( app.prepare(ctx_id=0, det_size=(320, 320)) framework.publish_info("INSIGHTFACE_LOADED") - # 嘗試載入 CoreML FaceNet 模型(MIT license,可用 ANE) - try: - import coremltools as ct - coreml_path = os.path.join( - os.path.dirname(os.path.abspath(__file__)), - "../models/facenet512.mlpackage" - ) - if os.path.exists(coreml_path): - coreml_embedder = ct.models.MLModel(coreml_path) - framework.publish_info("COREML_FACENET_LOADED") - else: - print(f"[FACE] CoreML model not found at {coreml_path}, using InsightFace embedding") - except Exception as e: - print(f"[FACE] CoreML load failed: {e}, using InsightFace embedding") - except Exception as e: print(f"[FACE] InsightFace failed to load (REQUIRED): {e}") error_msg = f"InsightFace failed to load (REQUIRED): {e}" @@ -219,8 +225,7 @@ def process_face( embedding = coreml_out[emb_key].flatten().tolist() except Exception as e: print(f"[FACE] CoreML embedding error for face at ({x1},{y1}): {e}") - if embedding is None and hasattr(face, "embedding"): - embedding = face.embedding.tolist() + landmarks = None if hasattr(face, "kps"): diff --git a/scripts/sync_dev_to_public.sh b/scripts/sync_dev_to_public.sh new file mode 100755 index 0000000..16ebb14 --- /dev/null +++ b/scripts/sync_dev_to_public.sh @@ -0,0 +1,148 @@ +#!/bin/bash +# sync_dev_to_public.sh — 比對 dev/public schema,同步 pipeline 資料 +# Usage: ./sync_dev_to_public.sh [check|sync] [file_uuid] + +PSQL="/opt/homebrew/opt/libpq/bin/psql" + +set -euo pipefail + +SCHEMA="${MOMENTRY_DB_SCHEMA:-dev}" +DB_URL="${DATABASE_URL:-postgres://accusys@localhost:5432/momentry}" +MODE="${1:-check}" +FILE_UUID="${2:-}" + +TABLES=("videos" "chunk" "face_detections" "processor_results" "monitor_jobs" + "identities" "identity_bindings" "tkg_nodes" "tkg_edges") + +TARGET="public" + +if [ -z "$FILE_UUID" ]; then + echo "Usage: $0 [check|sync] " + echo "" + echo "Examples:" + echo " $0 check bd80fec92b0b6963d177a2c55bf713e2" + echo " $0 sync bd80fec92b0b6963d177a2c55bf713e2" + exit 1 +fi + +echo "=== Schema Sync: $SCHEMA → $TARGET ===" +echo "File UUID: $FILE_UUID" +echo "Mode: $MODE" +echo "" + +check_table() { + local table=$1 + local col=$2 + local src_count dev_count pub_count + + dev_count=$($PSQL -At "$DB_URL" -c "SELECT COUNT(*) FROM ${SCHEMA}.${table} WHERE ${col} = '${FILE_UUID}';" 2>/dev/null || echo "ERROR") + pub_count=$($PSQL -At "$DB_URL" -c "SELECT COUNT(*) FROM ${TARGET}.${table} WHERE ${col} = '${FILE_UUID}';" 2>/dev/null || echo "ERROR") + + if [ "$dev_count" = "ERROR" ] || [ "$pub_count" = "ERROR" ]; then + echo " ⚠️ $table — query error (table may not exist in $TARGET)" + return 1 + fi + + if [ "$dev_count" -eq "$pub_count" ]; then + echo " ✅ $table — $dev_count rows (match)" + return 0 + else + echo " ❌ $table — dev=$dev_count pub=$pub_count (MISMATCH)" + return 1 + fi +} + +sync_table() { + local table=$1 + local col=$2 + local src_count dev_count pub_count + + dev_count=$($PSQL -At "$DB_URL" -c "SELECT COUNT(*) FROM ${SCHEMA}.${table} WHERE ${col} = '${FILE_UUID}';" 2>/dev/null || echo "0") + pub_count=$($PSQL -At "$DB_URL" -c "SELECT COUNT(*) FROM ${TARGET}.${table} WHERE ${col} = '${FILE_UUID}';" 2>/dev/null || echo "0") + + if [ "$dev_count" = "0" ]; then + echo " ⏭️ $table — dev has 0 rows, skipping" + return + fi + + if [ "$dev_count" -eq "$pub_count" ]; then + echo " ✅ $table — already synced ($dev_count rows)" + return + fi + + echo " 🔄 Syncing $table: dev=$dev_count → pub=$pub_count ..." + + # Delete existing public rows, insert from dev + $PSQL "$DB_URL" -q -c "DELETE FROM ${TARGET}.${table} WHERE ${col} = '${FILE_UUID}';" 2>/dev/null || true + + # Get columns list (excluding id for SERIAL) + COLS=$($PSQL -At "$DB_URL" -c " + SELECT string_agg(column_name, ', ' ORDER BY ordinal_position) + FROM information_schema.columns + WHERE table_schema='${SCHEMA}' AND table_name='${table}' + AND column_name != 'id' + AND is_updatable='YES'; + ") + + $PSQL "$DB_URL" -q -c " + INSERT INTO ${TARGET}.${table} (${COLS}) + SELECT ${COLS} + FROM ${SCHEMA}.${table} + WHERE ${col} = '${FILE_UUID}'; + " 2>/dev/null && echo " ✅ $table synced" || echo " ❌ $table sync FAILED" +} + +echo "=== Checking Tables ===" +echo "" +MISMATCH=0 +for table in "${TABLES[@]}"; do + # Determine the UUID column name for each table + case "$table" in + videos) col="file_uuid" ;; + chunk) col="file_uuid" ;; + face_detections) col="file_uuid" ;; + processor_results) col="file_uuid" ;; + monitor_jobs) col="uuid" ;; + identities) col="uuid" ;; # identities.uuid is UUID type + identity_bindings) col="uuid" ;; + tkg_nodes) col="file_uuid" ;; + tkg_edges) col="file_uuid" ;; + *) col="file_uuid" ;; + esac + + if ! check_table "$table" "$col"; then + MISMATCH=$((MISMATCH + 1)) + fi +done + +echo "" +if [ "$MISMATCH" -eq 0 ]; then + echo "✅ All tables in sync" + exit 0 +fi + +if [ "$MODE" != "sync" ]; then + echo "⚠️ $MISMATCH table(s) have mismatches. Run '$0 sync $FILE_UUID' to fix." + exit 1 +fi + +echo "=== Syncing Tables ===" +echo "" +for table in "${TABLES[@]}"; do + case "$table" in + videos) col="file_uuid" ;; + chunk) col="file_uuid" ;; + face_detections) col="file_uuid" ;; + processor_results) col="file_uuid" ;; + monitor_jobs) col="uuid" ;; + identities) col="uuid" ;; + identity_bindings) col="uuid" ;; + tkg_nodes) col="file_uuid" ;; + tkg_edges) col="file_uuid" ;; + *) col="file_uuid" ;; + esac + sync_table "$table" "$col" +done + +echo "" +echo "✅ Sync complete" diff --git a/scripts/update_qdrant_uuid.py b/scripts/update_qdrant_uuid.py new file mode 100644 index 0000000..0e72de7 --- /dev/null +++ b/scripts/update_qdrant_uuid.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python3 +"""批量更新 Qdrant collection 中的 file_uuid (舊→新)""" + +import json +import subprocess +import sys + +QDRANT_URL = "http://localhost:6333" + +# UUID mapping: 舊 → 新 +UUID_MAP = { + "aeed71342a899fe4b4c57b7d41bcb692": [ + "bd80fec92b0b6963d177a2c55bf713e2", + ], +} + +# Collections to process +COLLECTIONS = [ + "momentry_dev_v1", + "momentry_dev_stories", + "momentry_dev_voice", + "momentry_dev_rule1_v2", + "momentry_dev_faces", + "sentence_story", + "sentence_summary", +] + + +def qdrant_get(path: str) -> dict: + res = subprocess.run( + ["curl", "-s", "-X", "GET", f"{QDRANT_URL}{path}"], + capture_output=True, text=True + ) + return json.loads(res.stdout) if res.stdout.strip() else {} + + +def qdrant_post(path: str, body: dict) -> dict: + tmp = "/tmp/qdrant_post.json" + with open(tmp, "w") as f: + json.dump(body, f) + res = subprocess.run( + ["curl", "-s", "-X", "POST", f"{QDRANT_URL}{path}", + "-H", "Content-Type: application/json", "-d", f"@{tmp}"], + capture_output=True, text=True + ) + return json.loads(res.stdout) if res.stdout.strip() else {} + + +def qdrant_put(path: str, body: dict) -> dict: + tmp = "/tmp/qdrant_update.json" + with open(tmp, "w") as f: + json.dump(body, f) + res = subprocess.run( + ["curl", "-s", "-X", "PUT", f"{QDRANT_URL}{path}", + "-H", "Content-Type: application/json", "-d", f"@{tmp}"], + capture_output=True, text=True + ) + return json.loads(res.stdout) if res.stdout.strip() else {} + + +def scroll_all(collection: str, filter_old: dict) -> list: + """Scroll all matching points from a collection""" + points = [] + offset = None + while True: + body = { + "limit": 1000, + "with_payload": True, + "with_vector": True, + "filter": filter_old, + } + if offset: + body["offset"] = offset + result = qdrant_post(f"/collections/{collection}/points/scroll", body) + batch = result.get("result", {}).get("points", []) + points.extend(batch) + next_offset = result.get("result", {}).get("next_page_offset") + if next_offset is None: + break + offset = next_offset + return points + + +def update_points(collection: str, points: list, old_uuid: str, new_uuid: str): + """Update file_uuid in payload for the given points""" + if not points: + return 0 + + updated = [] + for p in points: + pl = p.get("payload", {}) + # Check both 'uuid' and 'file_uuid' fields + changed = False + if pl.get("uuid") == old_uuid: + pl["uuid"] = new_uuid + changed = True + if pl.get("file_uuid") == old_uuid: + pl["file_uuid"] = new_uuid + changed = True + if changed: + updated.append({ + "id": p["id"], + "vector": p["vector"], + "payload": pl, + }) + + if not updated: + return 0 + + # Update in batches of 500 + total = len(updated) + for i in range(0, total, 500): + batch = updated[i:i+500] + result = qdrant_put( + f"/collections/{collection}/points?wait=true", + {"points": batch} + ) + if result.get("status") != "ok": + print(f" Error at {i}: {result}") + return i + return total + + +def main(): + for collection in COLLECTIONS: + # Check if collection exists + info = qdrant_get(f"/collections/{collection}") + if "result" not in info: + continue + + for old_uuid, new_uuids in UUID_MAP.items(): + for new_uuid in new_uuids: + # Scroll all points with this old UUID + filter_body = { + "must": [ + {"should": [ + {"key": "uuid", "match": {"value": old_uuid}}, + {"key": "file_uuid", "match": {"value": old_uuid}}, + ]} + ] + } + points = scroll_all(collection, filter_body) + if not points: + continue + + print(f"{collection}: {len(points)} points with UUID {old_uuid[:8]}...") + updated = update_points(collection, points, old_uuid, new_uuid) + print(f" → {updated} points updated to {new_uuid[:8]}...") + + # Verify + print("\n=== Verification ===") + for collection in COLLECTIONS: + for old_uuid, new_uuids in UUID_MAP.items(): + for what, uuid in [("old", old_uuid), ("new", new_uuids[0])]: + filter_body = { + "must": [ + {"should": [ + {"key": "uuid", "match": {"value": uuid}}, + {"key": "file_uuid", "match": {"value": uuid}}, + ]} + ] + } + result = qdrant_post( + f"/collections/{collection}/points/count", + {"filter": filter_body} + ) + cnt = result.get("result", {}).get("count", 0) + if cnt > 0: + print(f" {collection}: {cnt} points with {what} UUID") + print("✅ Done") + + +if __name__ == "__main__": + main() diff --git a/src/api/server.rs b/src/api/server.rs index aeafb74..a424e23 100644 --- a/src/api/server.rs +++ b/src/api/server.rs @@ -3765,10 +3765,18 @@ struct IngestionStep { detail: Option, } +#[derive(Debug, Serialize)] +struct IdentityRef { + uuid: String, + name: String, +} + #[derive(Debug, Serialize)] struct IngestionStatusResponse { file_uuid: String, steps: Vec, + related_identities: Vec, + strangers: i64, } async fn get_ingestion_status( @@ -3778,6 +3786,7 @@ async fn get_ingestion_status( let pool = state.db.pool(); let chunk = schema::table_name("chunk"); let fd = schema::table_name("face_detections"); + let identities = schema::table_name("identities"); let scene_meta_path = format!("{}/{}.scene_meta.json", crate::core::config::OUTPUT_DIR.as_str(), @@ -3796,13 +3805,30 @@ async fn get_ingestion_status( let sentence_count = count_sql!(&format!("SELECT COUNT(*) FROM {chunk} WHERE file_uuid = '{file_uuid}' AND chunk_type = 'sentence'")); let sentence_embedded = count_sql!(&format!("SELECT COUNT(*) FROM {chunk} WHERE file_uuid = '{file_uuid}' AND chunk_type = 'sentence' AND embedding IS NOT NULL")); let scene_count = count_sql!(&format!("SELECT COUNT(*) FROM {chunk} WHERE file_uuid = '{file_uuid}' AND chunk_type = 'cut'")); + let face_total = count_sql!(&format!("SELECT COUNT(*) FROM {fd} WHERE file_uuid = '{file_uuid}'")); let trace_count = count_sql!(&format!("SELECT COUNT(DISTINCT trace_id) FROM {fd} WHERE file_uuid = '{file_uuid}' AND trace_id IS NOT NULL")); let trace_chunks = count_sql!(&format!("SELECT COUNT(*) FROM {chunk} WHERE file_uuid = '{file_uuid}' AND chunk_type = 'trace'")); - let identities = count_sql!(&format!("SELECT COUNT(DISTINCT identity_id) FROM {fd} WHERE file_uuid = '{file_uuid}' AND identity_id IS NOT NULL")); + let identity_count = count_sql!(&format!("SELECT COUNT(DISTINCT identity_id) FROM {fd} WHERE file_uuid = '{file_uuid}' AND identity_id IS NOT NULL")); let tkg_nodes = count_sql!(&format!("SELECT COUNT(*) FROM {} WHERE file_uuid = '{file_uuid}'", schema::table_name("tkg_nodes"))); let tkg_edges = count_sql!(&format!("SELECT COUNT(*) FROM {} WHERE file_uuid = '{file_uuid}'", schema::table_name("tkg_edges"))); let scene_5w1h = count_sql!(&format!("SELECT COUNT(*) FROM {chunk} WHERE file_uuid = '{file_uuid}' AND chunk_type = 'cut' AND summary_text IS NOT NULL AND summary_text != ''")); + let related_identities: Vec = sqlx::query_as::<_, (String, String)>(&format!( + "SELECT DISTINCT i.uuid, i.name FROM {identities} i \ + JOIN {fd} fd ON fd.identity_id = i.id \ + WHERE fd.file_uuid = '{file_uuid}' AND fd.identity_id IS NOT NULL \ + ORDER BY i.name" + )).fetch_all(pool).await.unwrap_or_default().into_iter() + .map(|(uuid, name)| { + let uuid = uuid.replace('-', ""); + IdentityRef { uuid, name } + }).collect(); + + let strangers = count_sql!(&format!( + "SELECT COUNT(DISTINCT trace_id) FROM {fd} \ + WHERE file_uuid = '{file_uuid}' AND trace_id IS NOT NULL AND identity_id IS NULL" + )); + macro_rules! step { ($name:expr, $done:expr, $detail:expr) => { IngestionStep { @@ -3817,15 +3843,15 @@ async fn get_ingestion_status( step!("rule1_sentence", sentence_count > 0, Some(format!("{sentence_count} sentence chunks"))), step!("auto_vectorize", sentence_embedded > 0, Some(format!("{sentence_embedded} embedded"))), step!("rule3_scene", scene_count > 0, Some(format!("{scene_count} scene chunks"))), - step!("face_trace", trace_count > 0, Some(format!("{trace_count} traces"))), + step!("face_trace", trace_count > 0, Some(format!("{trace_count} traces / {face_total} detections"))), step!("trace_chunks", trace_chunks > 0, Some(format!("{trace_chunks} trace chunks"))), step!("tkg", tkg_nodes > 0 || tkg_edges > 0, Some(format!("{tkg_nodes} nodes, {tkg_edges} edges"))), - step!("identity_match", identities > 0, Some(format!("{identities} identities matched"))), + step!("identity_match", identity_count > 0, Some(format!("{identity_count} identities matched"))), step!("scene_metadata", scene_meta_ok, None), step!("5w1h", scene_5w1h > 0, Some(format!("{scene_5w1h} scenes with 5W1H"))), ]; - Ok(Json(IngestionStatusResponse { file_uuid, steps })) + Ok(Json(IngestionStatusResponse { file_uuid, steps, related_identities, strangers })) } #[derive(Debug, Deserialize)] diff --git a/src/core/db/postgres_db.rs b/src/core/db/postgres_db.rs index 3dc9120..71015ad 100644 --- a/src/core/db/postgres_db.rs +++ b/src/core/db/postgres_db.rs @@ -2236,9 +2236,36 @@ impl PostgresDb { Ok(()) } - pub async fn get_chunk_by_chunk_id_and_uuid(&self, chunk_id: &str, _uuid: &str) -> Result> { - // Returns a minimal stub. The full Chunk struct is complex to reconstruct from DB. - Ok(None) + pub async fn get_chunk_by_chunk_id_and_uuid(&self, chunk_id: &str, uuid: &str) -> Result> { + let table = schema::table_name("chunk"); + let row = sqlx::query_as::<_, (String, f64, f64, f64, String, Option, Option)>( + &format!("SELECT chunk_type, start_time, end_time, fps, content::text, text_content, metadata FROM {} WHERE file_uuid = $1 AND chunk_id = $2 LIMIT 1", table) + ) + .bind(uuid).bind(chunk_id) + .fetch_optional(&self.pool).await?; + + Ok(row.map(|(ct, st, et, fps, content_str, text_content, metadata)| { + let content: serde_json::Value = serde_json::from_str(&content_str).unwrap_or_default(); + let chunk_type = match ct.as_str() { + "time" => crate::core::chunk::types::ChunkType::TimeBased, + "sentence" => crate::core::chunk::types::ChunkType::Sentence, + "cut" => crate::core::chunk::types::ChunkType::Cut, + "trace" => crate::core::chunk::types::ChunkType::Trace, + "story" | "story_parent" | "story_child" => crate::core::chunk::types::ChunkType::Story, + "visual" => crate::core::chunk::types::ChunkType::Visual, + _ => crate::core::chunk::types::ChunkType::Story, + }; + let start_frame = (st * fps).round() as i64; + let end_frame = (et * fps).round() as i64; + let mut c = crate::core::chunk::types::Chunk::new( + 0, uuid.to_string(), chunk_id.to_string(), + chunk_type, crate::core::chunk::types::ChunkRule::Rule1, + start_frame, end_frame, fps, content, + ); + c.text_content = text_content; + c.metadata = metadata; + c + })) } pub async fn get_running_jobs_with_all_processors_done(&self, _limit: i32) -> Result> { @@ -2278,6 +2305,22 @@ impl PostgresDb { Ok(id) } + fn write_pipeline_timeline(uuid: &str, processor: &str, status: &str) { + let ts = chrono::Utc::now().to_rfc3339(); + let entry = serde_json::json!({ + "ts": ts, + "file_uuid": uuid, + "processor": processor, + "status": status, + }); + let path = std::path::Path::new(crate::core::config::OUTPUT_DIR.as_str()) + .join(format!("pipeline_{}.log", uuid)); + if let Ok(mut file) = std::fs::OpenOptions::new().create(true).append(true).open(&path) { + use std::io::Write; + let _ = writeln!(file, "{}", entry); + } + } + pub async fn upsert_processor_result( &self, job_id: i32, processor_type: crate::core::db::ProcessorType, uuid: &str, status: &str ) -> Result { @@ -2291,6 +2334,9 @@ impl PostgresDb { )) .bind(job_id).bind(ptype).bind(uuid).bind(status) .fetch_one(&self.pool).await?; + + Self::write_pipeline_timeline(uuid, ptype, status); + Ok(id) }