From 2393d81a3f242cfcf39c1bc4948b388b032b8703 Mon Sep 17 00:00:00 2001 From: Warren Date: Sun, 29 Mar 2026 04:44:28 +0800 Subject: [PATCH] feat: fix Chinese text search and duplicate chunk_id bug - Add helper functions to extract text from nested content structure - Update SearchResult to include uuid field - Add PostgreSQL function get_chunk_by_chunk_id_and_uuid to handle duplicate chunk_ids - Update Qdrant search functions to extract uuid from payload - Change embedding model to nomic-embed-text-v2-moe:latest - Update Qdrant collection name to momentry_rule1 - Fix MongoDB authentication and disable cache for development - Improve error handling in processor.rs - Update documentation with new embedding model --- .env | 4 +- .env.development | 6 +- .env.example | 2 +- docs/PROCESSING_PIPELINE.md | 33 ++++++---- src/api/server.rs | 90 +++++++++++++++++++-------- src/core/db/mod.rs | 1 + src/core/db/postgres_db.rs | 96 ++++++++++++++++++++++++++++- src/core/db/qdrant_db.rs | 115 ++++++++++++++++++++++++++++++++--- src/core/db/sync_db.rs | 5 +- src/main.rs | 68 ++++++++++++++------- src/player/chunk_selector.rs | 6 +- src/playground.rs | 5 +- src/worker/processor.rs | 30 +++------ 13 files changed, 355 insertions(+), 106 deletions(-) diff --git a/.env b/.env index 991bd11..7f32cc0 100644 --- a/.env +++ b/.env @@ -1,3 +1,5 @@ DB_MAX_CONNECTIONS=50 DB_ACQUIRE_TIMEOUT=30 -QDRANT_URL=http://127.0.0.1:6333 \ No newline at end of file +QDRANT_URL=http://127.0.0.1:6333 +QDRANT_API_KEY=Test3200Test3200Test3200 +QDRANT_COLLECTION=momentry_rule1 \ No newline at end of file diff --git a/.env.development b/.env.development index 5ee1c98..1709823 100644 --- a/.env.development +++ b/.env.development @@ -18,7 +18,7 @@ MOMENTRY_WORKER_BATCH_SIZE=5 DATABASE_URL=postgres://accusys@localhost:5432/momentry # MongoDB -MONGODB_URL=mongodb://accusys:Test3200Test3200@localhost:27017/admin +MONGODB_URL=mongodb://localhost:27017 MONGODB_DATABASE=momentry # Redis @@ -28,7 +28,7 @@ REDIS_PASSWORD=accusys # Qdrant Vector Database (same as production) QDRANT_URL=http://localhost:6333 QDRANT_API_KEY=Test3200Test3200Test3200 -QDRANT_COLLECTION=chunks_v3 +QDRANT_COLLECTION=momentry_rule1 # Paths MOMENTRY_OUTPUT_DIR=/Users/accusys/momentry/output_dev @@ -51,7 +51,7 @@ MOMENTRY_CUT_TIMEOUT=3600 MOMENTRY_DEFAULT_TIMEOUT=7200 # Cache Settings -MONGODB_CACHE_ENABLED=true +MONGODB_CACHE_ENABLED=false MONGODB_CACHE_TTL_VIDEOS=300 MONGODB_CACHE_TTL_SEARCH=300 MONGODB_CACHE_TTL_HYBRID_SEARCH=600 diff --git a/.env.example b/.env.example index fc59b95..9ff9a71 100644 --- a/.env.example +++ b/.env.example @@ -24,7 +24,7 @@ MONGODB_DATABASE=momentry # =========================================== QDRANT_URL=http://localhost:6333 QDRANT_API_KEY=your_qdrant_api_key -QDRANT_COLLECTION=chunks_v3 +QDRANT_COLLECTION=momentry_rule1 # =========================================== # API Server Configuration diff --git a/docs/PROCESSING_PIPELINE.md b/docs/PROCESSING_PIPELINE.md index 2c42463..3ebd733 100644 --- a/docs/PROCESSING_PIPELINE.md +++ b/docs/PROCESSING_PIPELINE.md @@ -119,11 +119,11 @@ cargo run --bin momentry -- chunk ### Stage 4: 向量化 ```bash -# 向量化 chunks +# 向量化 chunks(使用預設模型 nomic-embed-text-v2-moe:latest) cargo run --bin momentry -- vectorize -# 指定模型 -cargo run --bin momentry -- vectorize --model sentence-transformers/all-MiniLM-L6-v2 +# 明確指定模型 +cargo run --bin momentry -- vectorize --model nomic-embed-text-v2-moe:latest ``` --- @@ -187,18 +187,27 @@ YOLO: ✓ Already complete, skipping ## 向量化模型選擇 +### 統一嵌入模型 +Momentry Core 統一使用 **`nomic-embed-text-v2-moe:latest`** 作為所有規則的嵌入模型: + ```bash -# 預設模型 ---model sentence-transformers/all-MiniLM-L6-v2 +# 統一模型(所有 Rule 1/2/3 使用) +--model nomic-embed-text-v2-moe:latest +``` -# 高精度模型 ---model sentence-transformers/all-mpnet-base-v2 +### 模型特性 +| 特性 | 說明 | +|------|------| +| **模型名稱** | `nomic-embed-text-v2-moe:latest` | +| **向量維度** | 768 維 | +| **多語言支持** | ✅ 完整支持(英語、中文、日語、韓語等) | +| **模型架構** | Mixture of Experts (MoE) | +| **推理速度** | 快速,適合實時應用 | -# 多語言模型 ---model sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2 - -# 中文模型 ---model sentence-transformers/paraphrase-multilingual-mpnet-base-v2 +### 使用方式 +```bash +# 向量化命令 +cargo run --bin momentry -- vectorize --model nomic-embed-text-v2-moe:latest ``` --- diff --git a/src/api/server.rs b/src/api/server.rs index 789320c..258066a 100644 --- a/src/api/server.rs +++ b/src/api/server.rs @@ -237,6 +237,26 @@ struct HybridSearchResponse { query: String, } +fn extract_text_from_content(content: &serde_json::Value) -> String { + content + .get("data") + .and_then(|data| data.get("text")) + .and_then(|v| v.as_str()) + .or_else(|| content.get("text").and_then(|v| v.as_str())) + .unwrap_or("") + .to_string() +} + +fn extract_title_from_content(content: &serde_json::Value) -> String { + content + .get("data") + .and_then(|data| data.get("title")) + .and_then(|v| v.as_str()) + .or_else(|| content.get("title").and_then(|v| v.as_str())) + .unwrap_or("") + .to_string() +} + #[derive(Debug, Deserialize)] struct LookupQuery { path: Option, @@ -537,10 +557,22 @@ async fn register( let mut width = 0u32; let mut height = 0u32; + let mut fps = 0.0; for stream in &probe_result.streams { if stream.codec_type.as_deref() == Some("video") { width = stream.width.unwrap_or(0); height = stream.height.unwrap_or(0); + + // Parse FPS from r_frame_rate (e.g., "60000/1001") + if let Some(frame_rate_str) = &stream.r_frame_rate { + if let Some((num_str, den_str)) = frame_rate_str.split_once('/') { + if let (Ok(num), Ok(den)) = (num_str.parse::(), den_str.parse::()) { + if den != 0.0 { + fps = num / den; + } + } + } + } } } @@ -566,7 +598,7 @@ async fn register( duration, width, height, - fps: 0.0, + fps, probe_json: Some(json_str), storage: Default::default(), status: VideoStatus::Pending, @@ -599,6 +631,17 @@ async fn register( StatusCode::INTERNAL_SERVER_ERROR })?; + db.update_monitor_job_video_id(job.id, video_id) + .await + .map_err(|e| { + tracing::error!( + "Failed to update monitor job video_id for job {}: {}", + job.id, + e + ); + StatusCode::INTERNAL_SERVER_ERROR + })?; + let _ = state.mongo_cache.invalidate_videos_list().await; Ok(Json(RegisterResponse { @@ -771,21 +814,20 @@ async fn search( .map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?; let search_results = if let Some(ref uuid) = req.uuid { - let query_f64: Vec = query_vector.iter().map(|&x| x as f64).collect(); - qdrant.search_in_uuid(&query_f64, uuid, limit).await? + qdrant.search_in_uuid(&query_vector, uuid, limit).await? } else { qdrant.search(&query_vector, limit).await? }; let mut results = Vec::new(); for r in search_results { - if let Some(chunk) = pg.get_chunk_by_chunk_id(&r.chunk_id).await.ok().flatten() { - let text = chunk - .content - .get("text") - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); + if let Some(chunk) = pg + .get_chunk_by_chunk_id_and_uuid(&r.chunk_id, &r.uuid) + .await + .ok() + .flatten() + { + let text = extract_text_from_content(&chunk.content); results.push(SearchResult { uuid: chunk.uuid.clone(), @@ -834,8 +876,7 @@ async fn n8n_search( .map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?; let search_results = if let Some(ref uuid) = req.uuid { - let query_f64: Vec = query_vector.iter().map(|&x| x as f64).collect(); - qdrant.search_in_uuid(&query_f64, uuid, limit).await? + qdrant.search_in_uuid(&query_vector, uuid, limit).await? } else { qdrant.search(&query_vector, limit).await? }; @@ -843,20 +884,15 @@ async fn n8n_search( let mut hits = Vec::new(); for r in search_results { - if let Some(chunk) = pg.get_chunk_by_chunk_id(&r.chunk_id).await.ok().flatten() { - let text = chunk - .content - .get("text") - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); + if let Some(chunk) = pg + .get_chunk_by_chunk_id_and_uuid(&r.chunk_id, &r.uuid) + .await + .ok() + .flatten() + { + let text = extract_text_from_content(&chunk.content); - let title = chunk - .content - .get("title") - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); + let title = extract_title_from_content(&chunk.content); let file_path = if chunk.uuid.is_empty() { None @@ -1376,7 +1412,7 @@ async fn unregister( pub async fn start_server(host: &str, port: u16) -> anyhow::Result<()> { let _ = SERVER_START.set(Instant::now()); - let embedder = std::sync::Arc::new(Embedder::new("nomic-embed-text:v1.5".to_string())); + let embedder = std::sync::Arc::new(Embedder::new("nomic-embed-text-v2-moe:latest".to_string())); let mongo_cache = MongoCache::init().await?; let redis_cache = RedisCache::new()?; let db = PostgresDb::init().await?; @@ -1384,7 +1420,7 @@ pub async fn start_server(host: &str, port: u16) -> anyhow::Result<()> { let state = AppState { embedder, - embedder_model: "nomic-embed-text:v1.5".to_string(), + embedder_model: "nomic-embed-text-v2-moe:latest".to_string(), mongo_cache, redis_cache, api_state, diff --git a/src/core/db/mod.rs b/src/core/db/mod.rs index b08fdbd..1aba44e 100644 --- a/src/core/db/mod.rs +++ b/src/core/db/mod.rs @@ -5,6 +5,7 @@ use crate::core::chunk::Chunk; #[derive(Debug, Clone)] pub struct SearchResult { + pub uuid: String, pub chunk_id: String, pub score: f32, } diff --git a/src/core/db/postgres_db.rs b/src/core/db/postgres_db.rs index 137f7c3..2b2eae4 100644 --- a/src/core/db/postgres_db.rs +++ b/src/core/db/postgres_db.rs @@ -711,6 +711,22 @@ impl PostgresDb { Ok(()) } + pub async fn update_monitor_job_video_id(&self, job_id: i32, video_id: i64) -> Result<()> { + sqlx::query( + r#" + UPDATE monitor_jobs + SET video_id = $1, updated_at = CURRENT_TIMESTAMP + WHERE id = $2 + "#, + ) + .bind(video_id) + .bind(job_id) + .execute(&self.pool) + .await?; + + Ok(()) + } + pub async fn get_monitor_job_by_uuid(&self, uuid: &str) -> Result> { let row = sqlx::query( r#" @@ -1937,6 +1953,81 @@ impl PostgresDb { } } + pub async fn get_chunk_by_chunk_id_and_uuid( + &self, + chunk_id: &str, + uuid: &str, + ) -> Result> { + let row = sqlx::query( + "SELECT COALESCE(file_id, 0) as file_id, uuid, chunk_id, chunk_index, chunk_type, COALESCE(fps, 24.0) as fps, COALESCE(start_frame, 0) as start_frame, COALESCE(end_frame, 0) as end_frame, text_content, content, metadata, vector_id, COALESCE(frame_count, 0) as frame_count, pre_chunk_ids, parent_chunk_id, child_chunk_ids FROM chunks WHERE chunk_id = $1 AND uuid = $2" + ) + .bind(chunk_id) + .bind(uuid) + .fetch_optional(&self.pool) + .await?; + + if let Some(r) = row { + let chunk_type_str: String = r.get(4); + let chunk_index: i32 = r.get(3); + let chunk_type = match chunk_type_str.as_str() { + "time" => ChunkType::TimeBased, + "sentence" => ChunkType::Sentence, + "cut" => ChunkType::Cut, + "trace" => ChunkType::Trace, + "story" => ChunkType::Story, + _ => ChunkType::TimeBased, + }; + + let content: serde_json::Value = r.get(9); + let metadata: Option = r.get(10); + + let pre_chunk_ids: Vec = r.try_get(13).unwrap_or_default(); + let parent_chunk_id: Option = r.try_get(14).ok().flatten(); + let child_chunk_ids: Vec = r.try_get(15).unwrap_or_default(); + + let (rule, content_data) = if content.get("rule").is_some() { + let rule_str = content + .get("rule") + .and_then(|v| v.as_str()) + .unwrap_or("rule_1"); + let rule = if rule_str == "rule_2" { + ChunkRule::Rule2 + } else { + ChunkRule::Rule1 + }; + let data = content.get("data").cloned().unwrap_or(content); + (rule, data) + } else { + (ChunkRule::Rule1, content) + }; + + let file_id: i32 = sqlx::Row::get(&r, "file_id"); + let frame_count: i32 = sqlx::Row::get(&r, "frame_count"); + + Ok(Some(Chunk { + file_id, + uuid: r.get("uuid"), + chunk_id: r.get("chunk_id"), + chunk_index: chunk_index as u32, + chunk_type, + rule, + fps: r.get("fps"), + start_frame: r.get("start_frame"), + end_frame: r.get("end_frame"), + text_content: r.get("text_content"), + content: content_data, + metadata, + vector_id: r.get("vector_id"), + frame_count, + pre_chunk_ids, + parent_chunk_id, + child_chunk_ids, + })) + } else { + Ok(None) + } + } + pub async fn store_pre_chunk(&self, pre_chunk: &PreChunk) -> Result { let row = sqlx::query( r#" @@ -2225,7 +2316,7 @@ impl PostgresDb { r#" INSERT INTO chunk_vectors (chunk_id, uuid, chunk_type, embedding) VALUES ($1, $2, 'sentence', $3::jsonb) - ON CONFLICT (chunk_id) DO UPDATE SET + ON CONFLICT (chunk_id, uuid) DO UPDATE SET embedding = EXCLUDED.embedding "#, ) @@ -2433,8 +2524,7 @@ impl PostgresDb { let qdrant = QdrantDb::init().await?; let vector_results = if let Some(uuid) = uuid { - let query_f64: Vec = query_vector.iter().map(|&x| x as f64).collect(); - qdrant.search_in_uuid(&query_f64, uuid, limit * 2).await? + qdrant.search_in_uuid(query_vector, uuid, limit * 2).await? } else { qdrant.search(query_vector, limit * 2).await? }; diff --git a/src/core/db/qdrant_db.rs b/src/core/db/qdrant_db.rs index b87fb7c..da6681e 100644 --- a/src/core/db/qdrant_db.rs +++ b/src/core/db/qdrant_db.rs @@ -30,7 +30,13 @@ impl QdrantDb { let api_key = std::env::var("QDRANT_API_KEY") .unwrap_or_else(|_| "Test3200Test3200Test3200".to_string()); let collection_name = - std::env::var("QDRANT_COLLECTION").unwrap_or_else(|_| "chunks_v3".to_string()); + std::env::var("QDRANT_COLLECTION").unwrap_or_else(|_| "momentry_rule1".to_string()); + + tracing::debug!( + "QdrantDb::new() - base_url: {}, collection_name: {}", + base_url, + collection_name + ); Self { client: Client::new(), @@ -84,15 +90,21 @@ impl QdrantDb { pub async fn upsert_vector( &self, - _chunk_id: &str, + chunk_id: &str, vector: &[f32], payload: VectorPayload, ) -> Result<()> { let url = format!( - "{}/collections/{}/points", + "{}/collections/{}/points?wait=true", self.base_url, self.collection_name ); + tracing::debug!( + "Qdrant upsert URL: {}, collection_name: {}", + url, + self.collection_name + ); + let mut payload_map = HashMap::new(); payload_map.insert("uuid".to_string(), serde_json::json!(payload.uuid)); payload_map.insert("chunk_id".to_string(), serde_json::json!(payload.chunk_id)); @@ -109,7 +121,14 @@ impl QdrantDb { payload_map.insert("text".to_string(), serde_json::json!(text)); } - let point_id = uuid::Uuid::new_v4().to_string(); + // Generate consistent point ID from uuid and chunk_id + // Qdrant requires integer or UUID point IDs. We'll use a simple integer hash. + let point_id_str = format!("{}_{}", payload.uuid, chunk_id); + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + let mut hasher = DefaultHasher::new(); + point_id_str.hash(&mut hasher); + let point_id = hasher.finish() as u64; let body = serde_json::json!({ "points": [{ @@ -119,15 +138,41 @@ impl QdrantDb { }] }); - self.client + tracing::debug!( + "Upserting vector to Qdrant: chunk_id={}, uuid={}, vector_len={}", + chunk_id, + payload.uuid, + vector.len() + ); + + let response = self + .client .put(&url) .header("api-key", &self.api_key) .header("Content-Type", "application/json") .json(&body) .send() .await - .context("Failed to upsert vector in Qdrant")?; + .context("Failed to send upsert request to Qdrant")?; + // Check response status + let status = response.status(); + let response_text = response + .text() + .await + .unwrap_or_else(|_| "Failed to read response".to_string()); + + if !status.is_success() { + tracing::error!("Qdrant upsert failed: {} - {}", status, response_text); + return Err(anyhow::anyhow!( + "Qdrant upsert failed with status {}: {}", + status, + response_text + )); + } + + tracing::debug!("Qdrant response: {}", response_text); + tracing::info!("Successfully upserted vector for chunk: {}", chunk_id); Ok(()) } @@ -153,6 +198,22 @@ impl QdrantDb { .await .context("Failed to search Qdrant")?; + // Check response status + let status = response.status(); + let response_text = response + .text() + .await + .unwrap_or_else(|_| "Failed to read response".to_string()); + + if !status.is_success() { + tracing::error!("Qdrant search failed: {} - {}", status, response_text); + return Err(anyhow::anyhow!( + "Qdrant search failed with status {}: {}", + status, + response_text + )); + } + #[derive(Deserialize)] struct QdrantSearchResult { result: Vec, @@ -166,12 +227,19 @@ impl QdrantDb { payload: HashMap, } - let result: QdrantSearchResult = response.json().await?; + let result: QdrantSearchResult = serde_json::from_str(&response_text) + .context("Failed to parse Qdrant search response")?; let search_results: Vec = result .result .into_iter() .map(|r| { + let uuid = r + .payload + .get("uuid") + .and_then(|v| v.as_str()) + .unwrap_or("unknown") + .to_string(); let chunk_id = r .payload .get("chunk_id") @@ -179,6 +247,7 @@ impl QdrantDb { .unwrap_or("unknown") .to_string(); SearchResult { + uuid, chunk_id, score: r.score as f32, } @@ -190,7 +259,7 @@ impl QdrantDb { pub async fn search_in_uuid( &self, - query_vector: &[f64], + query_vector: &[f32], uuid: &str, limit: usize, ) -> Result> { @@ -225,6 +294,26 @@ impl QdrantDb { .await .context("Failed to search Qdrant")?; + // Check response status + let status = response.status(); + let response_text = response + .text() + .await + .unwrap_or_else(|_| "Failed to read response".to_string()); + + if !status.is_success() { + tracing::error!( + "Qdrant search_in_uuid failed: {} - {}", + status, + response_text + ); + return Err(anyhow::anyhow!( + "Qdrant search_in_uuid failed with status {}: {}", + status, + response_text + )); + } + #[derive(Deserialize)] struct QdrantSearchResult { result: Vec, @@ -238,12 +327,19 @@ impl QdrantDb { payload: HashMap, } - let result: QdrantSearchResult = response.json().await?; + let result: QdrantSearchResult = serde_json::from_str(&response_text) + .context("Failed to parse Qdrant search_in_uuid response")?; let search_results: Vec = result .result .into_iter() .map(|r| { + let uuid = r + .payload + .get("uuid") + .and_then(|v| v.as_str()) + .unwrap_or("unknown") + .to_string(); let chunk_id = r .payload .get("chunk_id") @@ -251,6 +347,7 @@ impl QdrantDb { .unwrap_or("unknown") .to_string(); SearchResult { + uuid, chunk_id, score: r.score as f32, } diff --git a/src/core/db/sync_db.rs b/src/core/db/sync_db.rs index c349b98..340fa25 100644 --- a/src/core/db/sync_db.rs +++ b/src/core/db/sync_db.rs @@ -78,7 +78,7 @@ impl SyncDb { let response = client .post("http://localhost:11434/api/embeddings") .json(&json!({ - "model": "nomic-embed-text", + "model": "nomic-embed-text-v2-moe:latest", "prompt": text })) .send() @@ -137,7 +137,8 @@ impl SyncDb { for chunk in chunks { let text = chunk .content - .get("text") + .get("data") + .and_then(|data| data.get("text")) .and_then(|t| t.as_str()) .unwrap_or("") .to_string(); diff --git a/src/main.rs b/src/main.rs index 942ffdf..1b3c2a6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ use anyhow::{Context, Result}; use clap::{Parser, Subcommand}; use futures_util::StreamExt; +use std::io::Write; use std::path::Path; use std::str; use std::sync::{Arc, Mutex}; @@ -2226,18 +2227,24 @@ async fn main() -> Result<()> { .await .context("Failed to init PostgreSQL")?; let qdrant = QdrantDb::init().await.context("Failed to init Qdrant")?; - let embedder = Embedder::new("nomic-embed-text:v1.5".to_string()); - - let target_uuid = if uuid == "all" { - None - } else { - Some(uuid.as_str()) - }; + let embedder = Embedder::new("nomic-embed-text-v2-moe:latest".to_string()); let mut stored_count = 0usize; - if let Some(target) = target_uuid { - let chunks = pg.get_chunks_by_uuid(target).await?; + // Get list of videos to process + let videos_to_process = if uuid == "all" { + // Get all videos + let videos = pg.list_videos().await?; + videos.into_iter().map(|v| v.uuid).collect::>() + } else { + // Process single video + vec![uuid.clone()] + }; + + for target in &videos_to_process { + println!("\n=== Processing video: {} ===", target); + + let chunks = pg.get_chunks_by_uuid(target.as_str()).await?; let sentence_chunks: Vec<_> = chunks .into_iter() .filter(|c| c.chunk_type == ChunkType::Sentence) @@ -2249,21 +2256,32 @@ async fn main() -> Result<()> { target ); + let mut video_stored_count = 0usize; + for chunk in sentence_chunks { + // Try to extract text from different possible locations let text = chunk .content - .get("text") + .get("data") // Try data->text structure first + .and_then(|data| data.get("text")) .and_then(|v| v.as_str()) + .or_else(|| chunk.content.get("text").and_then(|v| v.as_str())) // Try root text structure .unwrap_or(""); if text.is_empty() { + eprintln!( + "Empty text for chunk {}, content: {:?}", + chunk.chunk_id, chunk.content + ); continue; } print!("Embedding chunk {}... ", chunk.chunk_id); + std::io::stdout().flush().unwrap(); match embedder.embed_document(text).await { Ok(vector) => { + println!("embedding success ({} dims)", vector.len()); let vector_id = format!("{}_{}", chunk.uuid, chunk.chunk_id); if let Err(e) = @@ -2295,32 +2313,40 @@ async fn main() -> Result<()> { } stored_count += 1; - println!("done ({} dims)", vector.len()); + video_stored_count += 1; + println!( + "stored (video: {}, total: {})", + video_stored_count, stored_count + ); } Err(e) => { - println!("failed: {}", e); + println!("embedding failed: {}", e); } } } - // Only update storage status if vectors were actually stored - if stored_count > 0 { - pg.update_storage_status(target, "pvector_chunk", true) + // Only update storage status if vectors were actually stored for this video + if video_stored_count > 0 { + pg.update_storage_status(target.as_str(), "pvector_chunk", true) .await?; - pg.update_storage_status(target, "qvector_chunk", true) + pg.update_storage_status(target.as_str(), "qvector_chunk", true) .await?; println!( - "\n✓ Vectorize stage completed for {}! ({} vectors stored)", - target, stored_count + "✓ Vectorize stage completed for {}! ({} vectors stored)", + target, video_stored_count ); } else { println!( - "\n✗ Vectorize stage failed for {}! (0 vectors stored)", + "✗ Vectorize stage failed for {}! (0 vectors stored)", target ); } - } else { - println!("\n✓ Vectorize stage completed for all videos!"); + } + + println!("\n=== Vectorization Summary ==="); + println!("Total vectors stored: {}", stored_count); + if uuid == "all" { + println!("✓ Vectorize stage completed for all videos!"); } Ok(()) } diff --git a/src/player/chunk_selector.rs b/src/player/chunk_selector.rs index 376e673..bd87f4e 100644 --- a/src/player/chunk_selector.rs +++ b/src/player/chunk_selector.rs @@ -17,7 +17,7 @@ const QDRANT_API_KEY: &str = "Test3200Test3200Test3200"; #[allow(dead_code)] const OLLAMA_URL: &str = "http://localhost:11434"; #[allow(dead_code)] -const MODEL: &str = "nomic-embed-text-v2-moe"; +const MODEL: &str = "nomic-embed-text-v2-moe:latest"; #[derive(Debug, Clone)] #[allow(dead_code)] @@ -112,8 +112,8 @@ impl ChunkSelector { return Ok(Vec::new()); } - // Search Qdrant - try both collections (chunks_v3 for multilingual, AccusysDB for others) - let collections = ["chunks_v3", "AccusysDB"]; + // Search Qdrant - use momentry_rule1 collection (Rule1 specification) + let collections = ["momentry_rule1"]; for collection in collections { let vector_str = serde_json::to_string(&embedding) diff --git a/src/playground.rs b/src/playground.rs index 91d7597..285f2ef 100644 --- a/src/playground.rs +++ b/src/playground.rs @@ -2235,7 +2235,7 @@ async fn main() -> Result<()> { .await .context("Failed to init PostgreSQL")?; let qdrant = QdrantDb::init().await.context("Failed to init Qdrant")?; - let embedder = Embedder::new("nomic-embed-text:v1.5".to_string()); + let embedder = Embedder::new("nomic-embed-text-v2-moe:latest".to_string()); let target_uuid = if uuid == "all" { None @@ -2261,7 +2261,8 @@ async fn main() -> Result<()> { for chunk in sentence_chunks { let text = chunk .content - .get("text") + .get("data") + .and_then(|data| data.get("text")) .and_then(|v| v.as_str()) .unwrap_or(""); diff --git a/src/worker/processor.rs b/src/worker/processor.rs index a21efd9..e8411ef 100644 --- a/src/worker/processor.rs +++ b/src/worker/processor.rs @@ -486,17 +486,10 @@ impl ProcessorPool { asr_result: &AsrResult, ) -> Result<()> { // Get video record to obtain file_id and fps - let video = match db.get_video_by_uuid(uuid).await { - Ok(Some(video)) => video, - Ok(None) => { - tracing::error!("Video not found for uuid: {}", uuid); - return Ok(()); - } - Err(e) => { - tracing::error!("Failed to get video for uuid {}: {}", uuid, e); - return Ok(()); - } - }; + let video = db + .get_video_by_uuid(uuid) + .await? + .ok_or_else(|| anyhow::anyhow!("Video not found for uuid: {}", uuid))?; let file_id = video.id; let fps = if video.fps > 0.0 { video.fps } else { 30.0 }; @@ -538,17 +531,10 @@ impl ProcessorPool { cut_result: &CutResult, ) -> Result<()> { // Get video record to obtain file_id and fps - let video = match db.get_video_by_uuid(uuid).await { - Ok(Some(video)) => video, - Ok(None) => { - tracing::error!("Video not found for uuid: {}", uuid); - return Ok(()); - } - Err(e) => { - tracing::error!("Failed to get video for uuid {}: {}", uuid, e); - return Ok(()); - } - }; + let video = db + .get_video_by_uuid(uuid) + .await? + .ok_or_else(|| anyhow::anyhow!("Video not found for uuid: {}", uuid))?; let file_id = video.id; let fps = if video.fps > 0.0 { video.fps } else { 30.0 };