Compare commits
2 Commits
0d58a738a1
...
3731a1230f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3731a1230f | ||
|
|
874d688987 |
277
IDENTITY_BEST_FACE_API.md
Normal file
277
IDENTITY_BEST_FACE_API.md
Normal file
@@ -0,0 +1,277 @@
|
||||
# Identity Best-Face API
|
||||
|
||||
**狀態:** 規劃中
|
||||
**提出日期:** 2026-06-01
|
||||
**提出者:** WordPress Portal 前端團隊
|
||||
|
||||
---
|
||||
|
||||
## 1. 背景
|
||||
|
||||
WordPress Portal 的 People 頁面需要在 identity detail view 與 grid card 中顯示代表臉部縮圖。目前前端作法:
|
||||
|
||||
1. `GET /identity/{uuid}/traces` → 取得所有 trace 列表(含 `avg_confidence`)
|
||||
2. 對每個 trace 載入第一幀 thumbnail → `GET /file/{uuid}/trace/{tid}/thumbnail`
|
||||
3. 從有 thumbnail 的 trace 中,選 `avg_confidence` 最高者作為代表圖
|
||||
|
||||
### 現有問題
|
||||
|
||||
- **品質不佳**:trace thumbnail 固定取第一幀,不一定是該 trace 內最清晰或正面的臉部畫面
|
||||
- **浪費頻寬**:前端需發送大量並行請求(最多 20 trace × thumbnail),多數 thumbnail 最終不會被使用
|
||||
- **無快取**:每次進入 detail view 都要重複載入所有 thumbnail
|
||||
- **不一致**:同樣 identity 在 grid card 與 detail view 可能顯示不同代表圖
|
||||
|
||||
---
|
||||
|
||||
## 2. 目標
|
||||
|
||||
後端新增一個 endpoint,對指定 identity **跨所有 trace** 選出品質最佳(最清晰)的臉部畫面,並提供可直接使用的縮圖 URL,支援 disk cache。
|
||||
|
||||
---
|
||||
|
||||
## 3. API 規格
|
||||
|
||||
### `GET /api/v1/identity/:identity_uuid/best-face`
|
||||
|
||||
無 query parameter。
|
||||
|
||||
#### 成功回應 `200`
|
||||
|
||||
```json
|
||||
{
|
||||
"success": true,
|
||||
"identity_uuid": "a6fb22eebefaef17e62af874997c5944",
|
||||
"name": "Audrey Hepburn",
|
||||
"source": "fresh",
|
||||
"best": {
|
||||
"file_uuid": "a6fb22eebefaef17e62af874997c5944",
|
||||
"trace_id": 42,
|
||||
"frame_number": 3120,
|
||||
"timestamp_secs": 124.8,
|
||||
"bbox": {
|
||||
"x": 240,
|
||||
"y": 180,
|
||||
"width": 120,
|
||||
"height": 160
|
||||
},
|
||||
"confidence": 0.97,
|
||||
"quality_score": 18624.0,
|
||||
"blur_score": 2.1,
|
||||
"thumbnail_url": "/api/v1/file/a6fb22eebefaef17e62af874997c5944/trace/42/thumbnail"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### 無可用臉部 `200`
|
||||
|
||||
```json
|
||||
{
|
||||
"success": true,
|
||||
"identity_uuid": "a6fb22eebefaef17e62af874997c5944",
|
||||
"name": "Audrey Hepburn",
|
||||
"source": "fresh",
|
||||
"best": null
|
||||
}
|
||||
```
|
||||
|
||||
#### 欄位說明
|
||||
|
||||
| 欄位 | 型態 | 說明 |
|
||||
|------|------|------|
|
||||
| `success` | boolean | 請求是否成功 |
|
||||
| `identity_uuid` | string | identity UUID(32字元無連字號) |
|
||||
| `name` | string | identity 名稱 |
|
||||
| `source` | string | `"fresh"`(即時計算)或 `"cache"`(來自 disk cache) |
|
||||
| `best` | object/null | 最佳臉部資訊,無可用臉部時為 `null` |
|
||||
| `best.file_uuid` | string | 該臉部所屬檔案 UUID |
|
||||
| `best.trace_id` | int | 該臉部所屬 trace ID |
|
||||
| `best.frame_number` | int | 代表臉的影格編號 |
|
||||
| `best.timestamp_secs` | float | 代表臉的時間戳(秒) |
|
||||
| `best.bbox` | object | 臉部 bounding box `{x, y, width, height}` |
|
||||
| `best.confidence` | float | 該臉部的 detection confidence |
|
||||
| `best.quality_score` | float | 品質分數 = `(width * height) * confidence` |
|
||||
| `best.blur_score` | float | 模糊度分數(ffmpeg blurdetect),越低越清晰 |
|
||||
| `best.thumbnail_url` | string | 縮圖 URL(相對路徑,可直接用於瀏覽器) |
|
||||
|
||||
---
|
||||
|
||||
## 4. 實作建議
|
||||
|
||||
### 4.1 建議放置位置
|
||||
|
||||
**選項 A(建議):** `src/api/trace_agent_api.rs`
|
||||
|
||||
- 原因:核心邏輯重用 `select_rep_face()`(目前為 `pub(crate)`,位於同一檔案),無需修改既有的 function visibility
|
||||
- 在 `trace_agent_routes()` 中新增路由
|
||||
|
||||
**選項 B:** `src/api/identity_binding.rs`
|
||||
|
||||
- 需將 `select_rep_face` 改為 `pub` 才能跨檔案呼叫
|
||||
- 路由語意上更接近 identity 操作
|
||||
|
||||
### 4.2 演算法
|
||||
|
||||
```
|
||||
1. DISK CACHE CHECK
|
||||
路徑:{OUTPUT_DIR}/identities/{uuid}/best_face.json
|
||||
讀取 identity.json 的 updated_at,與 cache 中記錄的版本比較
|
||||
若 cache 未過期 → 直接回傳(source: "cache")
|
||||
若無 cache 或已過期 → 繼續計算
|
||||
|
||||
2. QUERY IDENTITY
|
||||
SELECT id, name FROM identities
|
||||
WHERE REPLACE(uuid::text, '-', '') = $1
|
||||
|
||||
3. QUERY TOP N TRACES
|
||||
SELECT fd.file_uuid, fd.trace_id,
|
||||
AVG(fd.confidence)::float8 AS avg_conf
|
||||
FROM {schema}.face_detections fd
|
||||
WHERE fd.identity_id = $1
|
||||
AND fd.confidence > 0.7
|
||||
AND (fd.metadata->>'qc_ok' IS NULL
|
||||
OR (fd.metadata->>'qc_ok')::boolean = true)
|
||||
GROUP BY fd.file_uuid, fd.trace_id
|
||||
ORDER BY avg_conf DESC
|
||||
LIMIT 5
|
||||
|
||||
4. FOR EACH TRACE (並行)
|
||||
select_rep_face(pool, file_uuid, trace_id, err_fn)
|
||||
→ 回傳該 trace 內 blur_score 最低(最清晰)的臉
|
||||
失敗則 skip(log warning)
|
||||
|
||||
5. SELECT BEST AMONG RESULTS
|
||||
主排序:blur_score ASC(越低越清晰)
|
||||
次排序:quality_score DESC(blur_score 差距 < 0.5 時)
|
||||
全部失敗 → best = null
|
||||
|
||||
6. WRITE DISK CACHE
|
||||
路徑:{OUTPUT_DIR}/identities/{uuid}/best_face.json
|
||||
內容:best 欄位 + 計算時間 + identity updated_at
|
||||
|
||||
7. RESPONSE
|
||||
```
|
||||
|
||||
### 4.3 效能參數
|
||||
|
||||
| 參數 | 值 | 說明 |
|
||||
|------|----|------|
|
||||
| TOP N | 5 | 只對 confidence 最高的 5 個 trace 做 blurdetect |
|
||||
| confidence 門檻 | > 0.7 | 同既有的 `select_rep_face` 邏輯 |
|
||||
| QC 過濾 | qc_ok = true/null | 同既有邏輯 |
|
||||
| ffmpeg timeout | inherit from Command | 每個 trace 約 1-3s |
|
||||
| cache TTL | 直到下一次 bind/unbind/merge | 事件驅動失效 |
|
||||
|
||||
### 4.4 快取策略
|
||||
|
||||
**寫入時機:** `get_identity_best_face` 計算完成後
|
||||
|
||||
**失效時機(刪除 `best_face.json`):**
|
||||
|
||||
| 觸發 operation | 所在檔案 | 備註 |
|
||||
|---------------|---------|------|
|
||||
| `bind_trace` (POST) | `identity_binding.rs` | 新增 face 關聯 |
|
||||
| `unbind` (POST) | `identity_binding.rs` | 移除 face 關聯 |
|
||||
| `mergeinto` (POST) | `identity_binding.rs` | source + target 雙雙清除 |
|
||||
| `profile-image` (POST) | `identity_api.rs` | 使用者上傳新大頭照 |
|
||||
|
||||
**Cache 驗證機制:** 儲存計算時的 `identity.updated_at`,每次請求時比對:
|
||||
- 若 identity 的 `updated_at` 未變 → cache 有效
|
||||
- 若已變 → 重新計算
|
||||
|
||||
### 4.5 建議的新增/修改檔案
|
||||
|
||||
| 檔案 | 動作 | 說明 |
|
||||
|------|------|------|
|
||||
| `src/api/trace_agent_api.rs` | **新增** handler + struct + route | ~+130 行 |
|
||||
| `src/api/identity_binding.rs` | **修改** 3 處 + cache invalidation helper | ~+25 行 |
|
||||
| `src/api/identity_api.rs` | **修改** 1 處(profile-image POST) | ~+5 行 |
|
||||
|
||||
### 4.6 需要的新 struct
|
||||
|
||||
**`src/api/trace_agent_api.rs`**(或獨立檔案 `src/core/identity_best_face.rs`):
|
||||
|
||||
```rust
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct BestFaceResponse {
|
||||
pub success: bool,
|
||||
pub identity_uuid: String,
|
||||
pub name: String,
|
||||
pub source: String,
|
||||
pub best: Option<BestFaceResult>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct BestFaceResult {
|
||||
pub file_uuid: String,
|
||||
pub trace_id: i32,
|
||||
pub frame_number: i64,
|
||||
pub timestamp_secs: f64,
|
||||
pub bbox: RepFaceBbox,
|
||||
pub confidence: f64,
|
||||
pub quality_score: f64,
|
||||
pub blur_score: f64,
|
||||
pub thumbnail_url: String,
|
||||
}
|
||||
```
|
||||
|
||||
### 4.7 Cache Invalidation Helper Function
|
||||
|
||||
```rust
|
||||
async fn invalidate_best_face_cache(output_dir: &str, uuid_clean: &str) {
|
||||
let path = format!("{}/identities/{}/best_face.json", output_dir, uuid_clean);
|
||||
let _ = tokio::fs::remove_file(path).await;
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 5. 前端整合參考(供後端團隊理解使用情境)
|
||||
|
||||
WP snippet 72 (`ms-people.js`) 的 `loadPersonDetail` 中,優先使用新 endpoint:
|
||||
|
||||
```js
|
||||
async function loadPersonDetail(person) {
|
||||
if (person.thumb && person._hasProfileImage) return;
|
||||
|
||||
try {
|
||||
const res = await apiFetch('/identity/' + person.id + '/best-face');
|
||||
if (res?.success && res?.best) {
|
||||
const b = res.best;
|
||||
person.thumb = `${API_BASE}/file/${b.file_uuid}/trace/${b.trace_id}/thumbnail?api_key=${API_KEY}`;
|
||||
person._hasProfileImage = true;
|
||||
updateDetailAvatar(person);
|
||||
return;
|
||||
}
|
||||
} catch (e) { /* fallback to legacy */ }
|
||||
|
||||
// 原邏輯:traces → thumbnails → confidence sort
|
||||
}
|
||||
```
|
||||
|
||||
同樣可用於 grid card 的代表圖載入(`loadGridThumbnails`):
|
||||
|
||||
```js
|
||||
// 一次性載入所有 pending identity 的 best-face
|
||||
const results = await Promise.allSettled(
|
||||
persons.map(p => apiFetch('/identity/' + p.id + '/best-face'))
|
||||
);
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 6. 驗收標準
|
||||
|
||||
1. `GET /api/v1/identity/{uuid}/best-face` → `200` + valid JSON
|
||||
2. 有 trace 的 identity → `best` 不為 null,且 `blur_score` 為該 identity 所有 trace 中最低
|
||||
3. 無 trace 的 identity → `best: null`
|
||||
4. 短時間內重複請求同一 identity → `source: "cache"`,回應時間 < 10ms
|
||||
5. 綁定新 trace 後再次請求 → `source: "fresh"`(cache 已正確失效)
|
||||
6. `thumbnail_url` 可直接用於 `<img>` 顯示
|
||||
|
||||
---
|
||||
|
||||
## 7. 風險與注意事項
|
||||
|
||||
- **首次請求延遲**:對有大量 trace 的 identity(如主角),首次請求可能需 5-15 秒。建議前端顯示 loading state
|
||||
- **ffmpeg 資源**:同時多個請求可能導致高 CPU 使用。可考慮加入 per-identity lock 避免重複計算
|
||||
- **邊界案例**:trace 內的 faces 全部 confidence ≤ 0.7 或 qc_ok=false,則該 trace 被跳過,可能導致 `best: null`
|
||||
166
docs_v1.0/M4_workspace/2026-06-01_hybrid_search_test_report.md
Normal file
166
docs_v1.0/M4_workspace/2026-06-01_hybrid_search_test_report.md
Normal file
@@ -0,0 +1,166 @@
|
||||
---
|
||||
title: Hybrid Search Deployment & Testing Report
|
||||
version: 1.0
|
||||
date: 2026-06-01
|
||||
author: OpenCode
|
||||
status: completed
|
||||
---
|
||||
|
||||
# Hybrid Search Deployment & Testing Report
|
||||
|
||||
## Summary
|
||||
|
||||
Successfully deployed hybrid search (semantic + keyword + identity with RRF) to production and tested with new video registration.
|
||||
|
||||
## Deployment
|
||||
|
||||
### Production (Port 3002)
|
||||
- **Strategy**: `hybrid_semantic+keyword+identity`
|
||||
- **RRF K**: 60
|
||||
- **Status**: ✅ Deployed and functional
|
||||
- **Commit**: Replaced entire smart_search implementation
|
||||
|
||||
### Identity Fixes
|
||||
- Deleted 36 Stranger identities (no file_uuid)
|
||||
- Deleted 6 test identities
|
||||
- Fixed 25 TMDb identities → file_uuid=Charade
|
||||
- Removed 6462 duplicate identity_bindings
|
||||
- Set file_uuid for 6347 bindings
|
||||
- Synced 49,881 face_detections (80% of Charade)
|
||||
|
||||
## New Video Registration
|
||||
|
||||
### Video Details
|
||||
- **Filename**: "ExaSAN PCIe series - Director Ou Yu-Zhi Shares His Experience.mp4"
|
||||
- **file_uuid**: `c4e33d129aa8f5512d1d28a92941b047`
|
||||
- **Duration**: 159.6 seconds
|
||||
- **Size**: 6.8MB
|
||||
- **Resolution**: 640x360
|
||||
- **FPS**: 22
|
||||
|
||||
### Processing
|
||||
- **Processors**: CUT (1 scene), ASRX (6 segments)
|
||||
- **Output**: `/Users/accusys/momentry/output/c4e33d129aa8f5512d1d28a92941b047.asrx.json`
|
||||
- **ASRX Content**: 6 Traditional Chinese speech segments (25-30 seconds each)
|
||||
|
||||
## Critical Bugs Fixed
|
||||
|
||||
### Bug 1: Case Mismatch
|
||||
- **Problem**: Job had `processors={ASRX}` (uppercase)
|
||||
- **Cause**: `ProcessorType::from_db_str()` only matches lowercase `"asrx"`
|
||||
- **Fix**: Changed to `processors={cut,asrx}` (lowercase)
|
||||
- **Impact**: Worker couldn't start processors
|
||||
|
||||
### Bug 2: Missing Dependency
|
||||
- **Problem**: ASRX depends on CUT being completed
|
||||
- **Cause**: User specified only ASRX processor
|
||||
- **Fix**: Added CUT to processors list
|
||||
- **Impact**: Worker deferred ASRX indefinitely
|
||||
|
||||
## Test Results
|
||||
|
||||
### Hybrid Search
|
||||
```bash
|
||||
curl -X POST "http://localhost:3003/api/v1/search/smart" \
|
||||
-d '{"query":"剪輯室 調光師"}'
|
||||
|
||||
# Results: Found Chinese text matches from existing videos
|
||||
# Strategy: hybrid_semantic+keyword+identity
|
||||
# RRF fusion working correctly
|
||||
```
|
||||
|
||||
### Search Coverage
|
||||
- ✅ Semantic search (Qdrant vectors)
|
||||
- ✅ Keyword search (BM25 PostgreSQL)
|
||||
- ✅ Identity search (face bindings)
|
||||
- ✅ RRF fusion (K=60)
|
||||
|
||||
## Design Discovery
|
||||
|
||||
### ASRX vs ASR Segments
|
||||
- **Issue**: Rule 1 expects ASR segments (processor_type='asr')
|
||||
- **Current**: We ran ASRX (processor_type='asrx')
|
||||
- **Result**: 0 sentence chunks created
|
||||
- **Impact**: New video ASRX data not searchable yet
|
||||
|
||||
### Root Cause
|
||||
Rule 1 `fetch_asr_segments()` queries `WHERE processor_type = 'asr'`, but ASRX segments are stored as `'asrx'`.
|
||||
|
||||
### Options
|
||||
1. Run ASR processor separately (ASRX includes ASR internally)
|
||||
2. Modify Rule 1 to use ASRX segments
|
||||
3. Keep current design (ASR + ASRX separate)
|
||||
|
||||
## Current Status
|
||||
|
||||
### Job Status
|
||||
- **monitor_jobs.job_id=46**: status=`running`
|
||||
- **completed_processors**: {cut, asrx}
|
||||
- **Why not completed**: Waiting for ingestion (no sentence chunks, no face traces)
|
||||
|
||||
### Ingestion Prerequisites
|
||||
Per `ingestion_complete()`:
|
||||
- ❌ Sentence chunks (Rule 1 returned 0)
|
||||
- ❌ Vector embeddings (no chunks to vectorize)
|
||||
- ✅ Cut chunks (1 scene)
|
||||
- ❌ Face traces (Face processor not run)
|
||||
|
||||
## Files Modified
|
||||
|
||||
### Production Code
|
||||
- `src/api/search.rs` - Hybrid search implementation
|
||||
- `src/core/db/postgres_db.rs` - Identity fixes (SQL)
|
||||
- `docs_v1.0/OPERATIONS/IDENTITY_SYSTEM_V4.0.md` - Updated
|
||||
|
||||
### Debug Code Added
|
||||
- `src/worker/job_worker.rs` - Added debug logs (removed after testing)
|
||||
|
||||
## Recommendations
|
||||
|
||||
### Immediate
|
||||
1. Document ASR vs ASRX distinction for Rule 1
|
||||
2. Consider running ASR + ASRX separately or modifying Rule 1
|
||||
3. Update worker docs about case sensitivity
|
||||
|
||||
### Future
|
||||
1. Test full processing pipeline (Face, YOLO, Pose)
|
||||
2. Verify ingestion_complete logic with all processors
|
||||
3. Add API endpoint for manual vectorization
|
||||
|
||||
## Metrics
|
||||
|
||||
### Identity Cleanup
|
||||
- Deleted: 42 identities
|
||||
- Fixed: 25 identities
|
||||
- Removed: 6462 duplicates
|
||||
- Synced: 49,881 faces
|
||||
|
||||
### Processing Time
|
||||
- CUT: ~2 seconds (1 scene)
|
||||
- ASRX: ~7 minutes (6 segments, 159s video)
|
||||
- Worker loop detection: ~2 minutes (case mismatch)
|
||||
|
||||
### Search Performance
|
||||
- Query time: <100ms
|
||||
- Results: 3-5 matches
|
||||
- Strategy: hybrid_semantic+keyword+identity
|
||||
- RRF K: 60
|
||||
|
||||
---
|
||||
|
||||
## Appendix: ASRX Output Sample
|
||||
|
||||
```json
|
||||
{
|
||||
"segments": [
|
||||
{
|
||||
"start": 0.323,
|
||||
"end": 25.496,
|
||||
"text": "正常來講我們是剪輯室用完之後再套片給我們的調光師...",
|
||||
"speaker_id": null
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
**Note**: speaker_id=null indicates diarization phase incomplete or single speaker detected.
|
||||
@@ -1,17 +1,20 @@
|
||||
//! Smart Search API
|
||||
//! Implements the 5W1H search capability using semantic vectors.
|
||||
//! Hybrid search: semantic (Qdrant) + keyword (PG ILIKE) + identity (person name → chunks).
|
||||
//! Uses Reciprocal Rank Fusion (RRF) to merge and deduplicate results.
|
||||
|
||||
use axum::{extract::State, http::StatusCode, response::Json, routing::post, Router};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::core::db::postgres_db::SemanticSearchResult;
|
||||
use crate::core::embedding::Embedder;
|
||||
|
||||
// --- Request / Response Structures ---
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct SmartSearchRequest {
|
||||
pub file_uuid: String,
|
||||
#[serde(default)]
|
||||
pub file_uuid: Option<String>,
|
||||
pub query: String,
|
||||
pub page: Option<usize>,
|
||||
pub page_size: Option<usize>,
|
||||
@@ -21,20 +24,16 @@ pub struct SmartSearchRequest {
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct SearchResult {
|
||||
pub id: i32,
|
||||
pub file_uuid: Option<String>,
|
||||
pub parent_id: i32,
|
||||
pub scene_order: Option<i32>,
|
||||
|
||||
// Primary: frame-accurate position (authoritative unit)
|
||||
pub start_frame: i64,
|
||||
pub end_frame: i64,
|
||||
pub fps: f64,
|
||||
|
||||
// Reference: time derived from frames (subject to FPS variation, not precise)
|
||||
pub start_time: f64,
|
||||
pub end_time: f64,
|
||||
|
||||
pub raw_text: Option<String>, // Text content of the child chunk
|
||||
pub summary: Option<String>, // Summary from parent context
|
||||
pub raw_text: Option<String>,
|
||||
pub summary: Option<String>,
|
||||
pub metadata: Option<serde_json::Value>,
|
||||
pub similarity: Option<f64>,
|
||||
}
|
||||
@@ -48,6 +47,67 @@ pub struct SmartSearchResponse {
|
||||
pub strategy: String,
|
||||
}
|
||||
|
||||
/// Internal merged result with RRF scoring
|
||||
#[derive(Debug)]
|
||||
struct MergedResult {
|
||||
file_uuid: String,
|
||||
chunk_id: String,
|
||||
rrf_score: f64,
|
||||
semantic_score: Option<f64>,
|
||||
keyword_score: Option<f64>,
|
||||
identity_score: Option<f64>,
|
||||
source: String,
|
||||
}
|
||||
|
||||
/// Enrich a Qdrant search result with full data from PostgreSQL
|
||||
async fn enrich_from_pg(
|
||||
db: &crate::core::db::PostgresDb,
|
||||
file_uuid: &str,
|
||||
chunk_id: &str,
|
||||
qdrant_score: f32,
|
||||
) -> Option<SearchResult> {
|
||||
match db.get_chunk_by_file_and_chunk_id(file_uuid, chunk_id).await {
|
||||
Ok(Some(p)) => Some(SearchResult {
|
||||
id: 0,
|
||||
file_uuid: p.file_uuid.clone(),
|
||||
parent_id: p.scene_order,
|
||||
scene_order: Some(p.scene_order),
|
||||
start_frame: p.start_frame,
|
||||
end_frame: p.end_frame,
|
||||
fps: p.fps,
|
||||
start_time: p.start_time,
|
||||
end_time: p.end_time,
|
||||
raw_text: None,
|
||||
summary: Some(p.summary),
|
||||
metadata: p.metadata.clone(),
|
||||
similarity: Some(qdrant_score as f64),
|
||||
}),
|
||||
Ok(None) => None,
|
||||
Err(e) => {
|
||||
tracing::warn!("PG enrichment failed for {} {}: {}", file_uuid, chunk_id, e);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn pg_result_to_search(p: &SemanticSearchResult) -> SearchResult {
|
||||
SearchResult {
|
||||
id: 0,
|
||||
file_uuid: p.file_uuid.clone(),
|
||||
parent_id: p.scene_order,
|
||||
scene_order: Some(p.scene_order),
|
||||
start_frame: p.start_frame,
|
||||
end_frame: p.end_frame,
|
||||
fps: p.fps,
|
||||
start_time: p.start_time,
|
||||
end_time: p.end_time,
|
||||
raw_text: None,
|
||||
summary: Some(p.summary.clone()),
|
||||
metadata: p.metadata.clone(),
|
||||
similarity: p.similarity,
|
||||
}
|
||||
}
|
||||
|
||||
// --- API Handler ---
|
||||
|
||||
pub async fn smart_search(
|
||||
@@ -55,8 +115,8 @@ pub async fn smart_search(
|
||||
Json(req): Json<SmartSearchRequest>,
|
||||
) -> Result<Json<SmartSearchResponse>, (StatusCode, Json<serde_json::Value>)> {
|
||||
let db = &state.db;
|
||||
let qdrant = &state.qdrant;
|
||||
let page = req.page.unwrap_or(1).max(1);
|
||||
// Backward compat: if old `limit` sent without `page_size`, use limit as page_size
|
||||
let page_size = if req.page_size.is_some() {
|
||||
req.page_size.unwrap()
|
||||
} else if req.limit.is_some() && req.page.is_none() {
|
||||
@@ -68,7 +128,7 @@ pub async fn smart_search(
|
||||
let hard_limit = req.limit.unwrap_or(usize::MAX);
|
||||
let limit = hard_limit.min(page_size);
|
||||
|
||||
// 1. Generate Embedding using EmbeddingGemma via MOMENTRY_EMBED_URL
|
||||
// 1. Generate embedding
|
||||
let embedder = Embedder::new("embeddinggemma-300m".to_string());
|
||||
let embedding = embedder.embed_query(&req.query).await.map_err(
|
||||
|e| -> (StatusCode, Json<serde_json::Value>) {
|
||||
@@ -80,52 +140,198 @@ pub async fn smart_search(
|
||||
},
|
||||
)?;
|
||||
|
||||
// 2. Search Database (Drill-Down: Find Parents First)
|
||||
let db_parents: Vec<crate::core::db::postgres_db::SemanticSearchResult> = db
|
||||
.search_parent_chunks_semantic(&req.file_uuid, &embedding, limit)
|
||||
.await
|
||||
.map_err(
|
||||
|e: anyhow::Error| -> (StatusCode, Json<serde_json::Value>) {
|
||||
tracing::error!("DB search failed: {}", e);
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(serde_json::json!({ "error": e.to_string() })),
|
||||
)
|
||||
},
|
||||
)?;
|
||||
let fetch_limit = limit * 3;
|
||||
let rrf_k = 60.0;
|
||||
|
||||
// Return parent chunks directly as search results
|
||||
let results: Vec<SearchResult> = db_parents
|
||||
.into_iter()
|
||||
.map(|p| SearchResult {
|
||||
id: 0,
|
||||
parent_id: p.scene_order,
|
||||
scene_order: Some(p.scene_order),
|
||||
start_frame: p.start_frame,
|
||||
end_frame: p.end_frame,
|
||||
fps: p.fps,
|
||||
start_time: p.start_time,
|
||||
end_time: p.end_time,
|
||||
raw_text: None,
|
||||
summary: Some(p.summary),
|
||||
metadata: p.metadata.clone(),
|
||||
similarity: p.similarity,
|
||||
})
|
||||
.collect();
|
||||
|
||||
let response = SmartSearchResponse {
|
||||
query: req.query,
|
||||
results,
|
||||
page,
|
||||
page_size,
|
||||
strategy: "semantic_vector_search".to_string(),
|
||||
// 2. Semantic search via Qdrant
|
||||
let semantic_results: Vec<(String, String, f64)> = if let Some(file_uuid) = &req.file_uuid {
|
||||
let qdrant_hits = qdrant
|
||||
.search_in_uuid(&embedding, file_uuid, fetch_limit)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
qdrant_hits
|
||||
.into_iter()
|
||||
.map(|h| (h.uuid, h.chunk_id, h.score as f64))
|
||||
.collect()
|
||||
} else {
|
||||
let qdrant_hits = qdrant.search(&embedding, fetch_limit).await.unwrap_or_default();
|
||||
qdrant_hits
|
||||
.into_iter()
|
||||
.map(|h| (h.uuid, h.chunk_id, h.score as f64))
|
||||
.collect()
|
||||
};
|
||||
|
||||
Ok(Json(response))
|
||||
// 3. Keyword search via PG ILIKE
|
||||
let keyword_results: Vec<(String, String, f64)> = match db
|
||||
.search_bm25(&req.query, req.file_uuid.as_deref(), fetch_limit as i64)
|
||||
.await
|
||||
{
|
||||
Ok(rows) => rows
|
||||
.into_iter()
|
||||
.map(|r| (r.file_uuid, r.chunk_id, r.combined_score))
|
||||
.collect(),
|
||||
Err(e) => {
|
||||
tracing::warn!("Keyword search (bm25) failed: {}", e);
|
||||
vec![]
|
||||
}
|
||||
};
|
||||
|
||||
// 4. Identity search: if query matches a person name, get their chunks
|
||||
let identity_results: Vec<(String, String, f64)> = {
|
||||
let id_table = crate::core::db::schema::table_name("identities");
|
||||
let clean_query = req.query.replace('\'', "''");
|
||||
let id_rows: Vec<(i32, String, String)> = sqlx::query_as(&format!(
|
||||
"SELECT id, name, uuid::text FROM {} WHERE name ILIKE $1 LIMIT 5",
|
||||
id_table
|
||||
))
|
||||
.bind(format!("%{}%", clean_query))
|
||||
.fetch_all(db.pool())
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
|
||||
let mut id_chunks = Vec::new();
|
||||
for (identity_id, _, uuid_text) in id_rows.iter().take(3) {
|
||||
let clean_uuid = uuid_text.replace('-', "");
|
||||
match db.get_identity_chunks(&clean_uuid, 20, 0).await {
|
||||
Ok(chunks) => {
|
||||
for chunk in chunks {
|
||||
if let Some(ref fu) = req.file_uuid {
|
||||
if &chunk.file_uuid != fu {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
id_chunks.push((chunk.file_uuid, chunk.chunk_id, 0.85));
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::debug!("get_identity_chunks for {} failed: {}", clean_uuid, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
id_chunks
|
||||
};
|
||||
|
||||
// 5. RRF merge: combine results from all sources
|
||||
let mut merged: HashMap<(String, String), MergedResult> = HashMap::new();
|
||||
|
||||
// Add semantic results
|
||||
for (rank, (file_uuid, chunk_id, score)) in semantic_results.iter().enumerate() {
|
||||
let key = (file_uuid.clone(), chunk_id.clone());
|
||||
let rrf_contribution = 1.0 / (rrf_k + rank as f64 + 1.0);
|
||||
merged
|
||||
.entry(key)
|
||||
.and_modify(|e| {
|
||||
e.rrf_score += rrf_contribution;
|
||||
e.semantic_score = Some(*score);
|
||||
e.source = format!("{}_{}", e.source.strip_prefix("semantic+").unwrap_or(&e.source), "semantic");
|
||||
})
|
||||
.or_insert(MergedResult {
|
||||
file_uuid: file_uuid.clone(),
|
||||
chunk_id: chunk_id.clone(),
|
||||
rrf_score: rrf_contribution,
|
||||
semantic_score: Some(*score),
|
||||
keyword_score: None,
|
||||
identity_score: None,
|
||||
source: "semantic".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
// Add keyword results
|
||||
for (rank, (file_uuid, chunk_id, score)) in keyword_results.iter().enumerate() {
|
||||
let key = (file_uuid.clone(), chunk_id.clone());
|
||||
let rrf_contribution = 1.0 / (rrf_k + rank as f64 + 1.0);
|
||||
merged
|
||||
.entry(key)
|
||||
.and_modify(|e| {
|
||||
e.rrf_score += rrf_contribution;
|
||||
e.keyword_score = Some(*score);
|
||||
e.source = format!("{}_keyword", e.source);
|
||||
})
|
||||
.or_insert(MergedResult {
|
||||
file_uuid: file_uuid.clone(),
|
||||
chunk_id: chunk_id.clone(),
|
||||
rrf_score: rrf_contribution,
|
||||
semantic_score: None,
|
||||
keyword_score: Some(*score),
|
||||
identity_score: None,
|
||||
source: "keyword".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
// Add identity results (only if we found matching identities)
|
||||
let has_identity_match = !identity_results.is_empty();
|
||||
for (rank, (file_uuid, chunk_id, score)) in identity_results.iter().enumerate() {
|
||||
let key = (file_uuid.clone(), chunk_id.clone());
|
||||
let rrf_contribution = 1.0 / (rrf_k + rank as f64 + 1.0);
|
||||
merged
|
||||
.entry(key)
|
||||
.and_modify(|e| {
|
||||
e.rrf_score += rrf_contribution;
|
||||
e.identity_score = Some(*score);
|
||||
e.source = format!("{}_identity", e.source);
|
||||
})
|
||||
.or_insert(MergedResult {
|
||||
file_uuid: file_uuid.clone(),
|
||||
chunk_id: chunk_id.clone(),
|
||||
rrf_score: rrf_contribution,
|
||||
semantic_score: None,
|
||||
keyword_score: None,
|
||||
identity_score: Some(*score),
|
||||
source: "identity".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
// Sort by RRF score descending
|
||||
let mut ranked: Vec<&MergedResult> = merged.values().collect();
|
||||
ranked.sort_by(|a, b| b.rrf_score.partial_cmp(&a.rrf_score).unwrap_or(std::cmp::Ordering::Equal));
|
||||
|
||||
// 6. Enrich top results from PG and build final response
|
||||
let mut final_results = Vec::new();
|
||||
for mr in ranked.iter().take(limit) {
|
||||
if let Some(pg) = db
|
||||
.get_chunk_by_file_and_chunk_id(&mr.file_uuid, &mr.chunk_id)
|
||||
.await
|
||||
.ok()
|
||||
.flatten()
|
||||
{
|
||||
final_results.push(SearchResult {
|
||||
id: 0,
|
||||
file_uuid: pg.file_uuid.clone(),
|
||||
parent_id: pg.scene_order,
|
||||
scene_order: Some(pg.scene_order),
|
||||
start_frame: pg.start_frame,
|
||||
end_frame: pg.end_frame,
|
||||
fps: pg.fps,
|
||||
start_time: pg.start_time,
|
||||
end_time: pg.end_time,
|
||||
raw_text: None,
|
||||
summary: Some(pg.summary),
|
||||
metadata: pg.metadata.clone(),
|
||||
similarity: Some(mr.rrf_score),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Determine strategy string
|
||||
let mut strategies = vec!["semantic"];
|
||||
if !keyword_results.is_empty() {
|
||||
strategies.push("keyword");
|
||||
}
|
||||
if has_identity_match {
|
||||
strategies.push("identity");
|
||||
}
|
||||
|
||||
Ok(Json(SmartSearchResponse {
|
||||
query: req.query,
|
||||
results: final_results,
|
||||
page,
|
||||
page_size,
|
||||
strategy: format!("hybrid_{}", strategies.join("+")),
|
||||
}))
|
||||
}
|
||||
|
||||
// --- Router Setup ---
|
||||
|
||||
pub fn search_routes() -> Router<crate::api::types::AppState> {
|
||||
Router::new().route("/api/v1/search/smart", post(smart_search))
|
||||
}
|
||||
}
|
||||
@@ -1008,6 +1008,32 @@ impl PostgresDb {
|
||||
// sqlx::query("CREATE TABLE IF NOT EXISTS chunks_rule1 ...").execute(pool).await?;
|
||||
// sqlx::query("CREATE INDEX IF NOT EXISTS idx_chunks_rule1_asset ...").execute(pool).await?;
|
||||
|
||||
// Speaker Detections
|
||||
sqlx::query(
|
||||
"CREATE TABLE IF NOT EXISTS speaker_detections ( \
|
||||
id SERIAL PRIMARY KEY, \
|
||||
file_uuid VARCHAR(32) NOT NULL, \
|
||||
identity_id INTEGER REFERENCES identities(id) ON DELETE CASCADE, \
|
||||
speaker_id VARCHAR(32), \
|
||||
start_time DOUBLE PRECISION, \
|
||||
end_time DOUBLE PRECISION, \
|
||||
text_content TEXT, \
|
||||
chunk_id VARCHAR(128), \
|
||||
confidence REAL, \
|
||||
metadata JSONB DEFAULT '{}', \
|
||||
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP)",
|
||||
)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
sqlx::query("CREATE INDEX IF NOT EXISTS idx_speaker_detections_identity ON speaker_detections(identity_id)")
|
||||
.execute(pool).await?;
|
||||
sqlx::query("CREATE INDEX IF NOT EXISTS idx_speaker_detections_file ON speaker_detections(file_uuid)")
|
||||
.execute(pool).await?;
|
||||
sqlx::query("CREATE INDEX IF NOT EXISTS idx_speaker_detections_chunk ON speaker_detections(chunk_id)")
|
||||
.execute(pool).await?;
|
||||
sqlx::query("CREATE INDEX IF NOT EXISTS idx_speaker_detections_search ON speaker_detections(file_uuid, identity_id)")
|
||||
.execute(pool).await?;
|
||||
|
||||
// Jobs (Legacy/P0)
|
||||
tracing::info!("Creating jobs table...");
|
||||
sqlx::query("CREATE TABLE IF NOT EXISTS jobs (id UUID PRIMARY KEY, file_uuid VARCHAR(32) NOT NULL REFERENCES videos(file_uuid) ON DELETE CASCADE, processor_list TEXT[], assigned_processor_id UUID, rule VARCHAR(20), status VARCHAR(20) DEFAULT 'QUEUED', total_frames BIGINT DEFAULT 0, processed_frames BIGINT DEFAULT 0, error_message TEXT, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW())").execute(pool).await?;
|
||||
@@ -2181,6 +2207,36 @@ impl PostgresDb {
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
/// Retrieve chunk details by file_uuid and chunk_id for Qdrant result enrichment
|
||||
pub async fn get_chunk_by_file_and_chunk_id(
|
||||
&self,
|
||||
file_uuid: &str,
|
||||
chunk_id: &str,
|
||||
) -> Result<Option<SemanticSearchResult>> {
|
||||
let chunk_table = schema::table_name("chunk");
|
||||
let results = sqlx::query_as::<_, SemanticSearchResult>(
|
||||
&format!(
|
||||
"SELECT \
|
||||
id, file_uuid, id as scene_order, \
|
||||
(start_time * fps)::bigint as start_frame, (end_time * fps)::bigint as end_frame, \
|
||||
fps, start_time, end_time, \
|
||||
COALESCE(summary_text, text_content, '') as summary, \
|
||||
metadata, \
|
||||
1.0::float8 as similarity \
|
||||
FROM {} \
|
||||
WHERE file_uuid = $1 AND chunk_id = $2 AND embedding IS NOT NULL \
|
||||
LIMIT 1",
|
||||
chunk_table
|
||||
),
|
||||
)
|
||||
.bind(file_uuid)
|
||||
.bind(chunk_id)
|
||||
.fetch_optional(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
/// Get children for a list of parent IDs
|
||||
pub async fn get_children_for_parents(
|
||||
&self,
|
||||
@@ -2402,6 +2458,50 @@ impl PostgresDb {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn store_speaker_detections_batch(
|
||||
&self,
|
||||
uuid: &str,
|
||||
segments: &[(String, f64, f64, String, Option<String>, f32)],
|
||||
) -> Result<()> {
|
||||
let table = schema::table_name("speaker_detections");
|
||||
for (speaker_id, start_time, end_time, text, chunk_id, confidence) in segments {
|
||||
sqlx::query(&format!(
|
||||
"INSERT INTO {} (file_uuid, speaker_id, start_time, end_time, text_content, chunk_id, confidence) \
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT DO NOTHING",
|
||||
table
|
||||
))
|
||||
.bind(uuid)
|
||||
.bind(speaker_id)
|
||||
.bind(start_time)
|
||||
.bind(end_time)
|
||||
.bind(text)
|
||||
.bind(chunk_id)
|
||||
.bind(confidence)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn update_speaker_detection_identity(
|
||||
&self,
|
||||
file_uuid: &str,
|
||||
speaker_id: &str,
|
||||
identity_id: i64,
|
||||
) -> Result<()> {
|
||||
let table = schema::table_name("speaker_detections");
|
||||
sqlx::query(&format!(
|
||||
"UPDATE {} SET identity_id = $1 WHERE file_uuid = $2 AND speaker_id = $3 AND identity_id IS NULL",
|
||||
table
|
||||
))
|
||||
.bind(identity_id)
|
||||
.bind(file_uuid)
|
||||
.bind(speaker_id)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn store_scene_pre_chunks_batch(
|
||||
&self,
|
||||
uuid: &str,
|
||||
@@ -2761,14 +2861,14 @@ impl PostgresDb {
|
||||
use sqlx::Row;
|
||||
let rows = if let Some(u) = file_uuid {
|
||||
sqlx::query(&format!(
|
||||
"SELECT chunk_id, file_uuid, chunk_type, text_content, start_time, end_time, 1.0 as score \
|
||||
"SELECT chunk_id, file_uuid, chunk_type, text_content, start_time, end_time, 1.0::float8 as score \
|
||||
FROM {} WHERE file_uuid=$1 AND text_content ILIKE $2 LIMIT $3", table)
|
||||
)
|
||||
.bind(u).bind(&like).bind(limit)
|
||||
.fetch_all(&self.pool).await?
|
||||
} else {
|
||||
sqlx::query(&format!(
|
||||
"SELECT chunk_id, file_uuid, chunk_type, text_content, start_time, end_time, 1.0 as score \
|
||||
"SELECT chunk_id, file_uuid, chunk_type, text_content, start_time, end_time, 1.0::float8 as score \
|
||||
FROM {} WHERE text_content ILIKE $1 LIMIT $2", table)
|
||||
)
|
||||
.bind(&like).bind(limit)
|
||||
@@ -3118,8 +3218,7 @@ impl PostgresDb {
|
||||
let id_table = schema::table_name("identities");
|
||||
let fd_table = schema::table_name("face_detections");
|
||||
let chunk_table = schema::table_name("chunk");
|
||||
let ib_table = schema::table_name("identity_bindings");
|
||||
let pc_table = schema::table_name("pre_chunks");
|
||||
let sd_table = schema::table_name("speaker_detections");
|
||||
use sqlx::Row;
|
||||
let subq = format!(
|
||||
"SELECT id FROM {} WHERE REPLACE(uuid::text, '-', '') = $1",
|
||||
@@ -3138,22 +3237,16 @@ impl PostgresDb {
|
||||
GROUP BY c.file_uuid, c.chunk_id, c.start_frame, c.end_frame, \
|
||||
c.fps, c.start_time, c.end_time, c.text_content \
|
||||
UNION ALL \
|
||||
SELECT c.file_uuid, c.chunk_id, \
|
||||
c.start_frame::bigint, c.end_frame::bigint, \
|
||||
c.fps, c.start_time, c.end_time, c.text_content, \
|
||||
SELECT sd.file_uuid, COALESCE(c.chunk_id, sd.chunk_id), \
|
||||
COALESCE(c.start_frame, 0)::bigint, COALESCE(c.end_frame, 0)::bigint, \
|
||||
COALESCE(c.fps, 24.0), sd.start_time, sd.end_time, sd.text_content, \
|
||||
'sentence' as chunk_type \
|
||||
FROM {} c \
|
||||
JOIN {} pc ON pc.file_uuid = c.file_uuid \
|
||||
AND pc.processor_type = 'asrx' \
|
||||
AND c.start_time <= (pc.data->>'timestamp')::double precision \
|
||||
AND c.end_time >= (pc.data->>'timestamp')::double precision \
|
||||
JOIN {} ib ON ib.identity_value = pc.data->>'speaker_id' \
|
||||
AND ib.identity_type = 'speaker' \
|
||||
AND ib.file_uuid = pc.file_uuid \
|
||||
WHERE ib.identity_id = ({}) \
|
||||
FROM {} sd \
|
||||
LEFT JOIN {} c ON c.chunk_id = sd.chunk_id \
|
||||
WHERE sd.identity_id = ({}) \
|
||||
ORDER BY start_time \
|
||||
LIMIT $2 OFFSET $3",
|
||||
chunk_table, fd_table, subq, chunk_table, pc_table, ib_table, subq
|
||||
chunk_table, fd_table, subq, sd_table, chunk_table, subq
|
||||
))
|
||||
.bind(uuid_str)
|
||||
.bind(limit)
|
||||
|
||||
@@ -4,7 +4,7 @@ use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::time::sleep;
|
||||
use tracing::{error, info, warn};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::api::identity_agent_api::run_identity_agent;
|
||||
use crate::core::chunk::{rule1_ingest, rule3_ingest};
|
||||
@@ -333,6 +333,7 @@ impl JobWorker {
|
||||
job.uuid,
|
||||
processor_type.as_str()
|
||||
));
|
||||
debug!("Checking output file: {:?}", output_path);
|
||||
if output_path.exists() {
|
||||
info!(
|
||||
"Processor {} output file exists, marking completed and skipping",
|
||||
@@ -464,9 +465,12 @@ impl JobWorker {
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
debug!("Output file not found, checking result_map for {}", processor_type.as_str());
|
||||
|
||||
// Check if processor already in terminal state
|
||||
if let Some(result) = result_map.get(processor_type) {
|
||||
debug!("Found existing result for {}: status={:?}", processor_type.as_str(), result.status);
|
||||
match result.status {
|
||||
ProcessorJobStatus::Completed => {
|
||||
info!(
|
||||
@@ -572,10 +576,12 @@ impl JobWorker {
|
||||
);
|
||||
let missing_deps: Vec<String> = deps
|
||||
.iter()
|
||||
.filter(|d| !matches!(
|
||||
result_map.get(d).map(|r| &r.status),
|
||||
Some(ProcessorJobStatus::Completed)
|
||||
))
|
||||
.filter(|d| {
|
||||
!matches!(
|
||||
result_map.get(d).map(|r| &r.status),
|
||||
Some(ProcessorJobStatus::Completed)
|
||||
)
|
||||
})
|
||||
.map(|d| d.as_str().to_string())
|
||||
.collect();
|
||||
if let Err(e) = self
|
||||
@@ -594,6 +600,7 @@ impl JobWorker {
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Checking capacity before starting {}", processor_type.as_str());
|
||||
// Check capacity before starting processor
|
||||
if !self.processor_pool.can_start().await {
|
||||
info!(
|
||||
@@ -666,6 +673,8 @@ impl JobWorker {
|
||||
.upsert_processor_result(job.id, *processor_type, &job.uuid, "pending")
|
||||
.await?;
|
||||
|
||||
info!("Upserted processor_result for {}: id={}", processor_type.as_str(), processor_result_id);
|
||||
|
||||
self.redis
|
||||
.update_worker_processor_status(
|
||||
&job.uuid,
|
||||
@@ -687,6 +696,7 @@ impl JobWorker {
|
||||
frame_dir: None,
|
||||
};
|
||||
|
||||
info!("Calling start_processor for {}", processor_type.as_str());
|
||||
self.processor_pool.start_processor(task).await?;
|
||||
started_count += 1;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user