feat: fix Chinese text search and duplicate chunk_id bug
- Add helper functions to extract text from nested content structure - Update SearchResult to include uuid field - Add PostgreSQL function get_chunk_by_chunk_id_and_uuid to handle duplicate chunk_ids - Update Qdrant search functions to extract uuid from payload - Change embedding model to nomic-embed-text-v2-moe:latest - Update Qdrant collection name to momentry_rule1 - Fix MongoDB authentication and disable cache for development - Improve error handling in processor.rs - Update documentation with new embedding model
This commit is contained in:
@@ -5,6 +5,7 @@ use crate::core::chunk::Chunk;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SearchResult {
|
||||
pub uuid: String,
|
||||
pub chunk_id: String,
|
||||
pub score: f32,
|
||||
}
|
||||
|
||||
@@ -711,6 +711,22 @@ impl PostgresDb {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn update_monitor_job_video_id(&self, job_id: i32, video_id: i64) -> Result<()> {
|
||||
sqlx::query(
|
||||
r#"
|
||||
UPDATE monitor_jobs
|
||||
SET video_id = $1, updated_at = CURRENT_TIMESTAMP
|
||||
WHERE id = $2
|
||||
"#,
|
||||
)
|
||||
.bind(video_id)
|
||||
.bind(job_id)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_monitor_job_by_uuid(&self, uuid: &str) -> Result<Option<MonitorJob>> {
|
||||
let row = sqlx::query(
|
||||
r#"
|
||||
@@ -1937,6 +1953,81 @@ impl PostgresDb {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_chunk_by_chunk_id_and_uuid(
|
||||
&self,
|
||||
chunk_id: &str,
|
||||
uuid: &str,
|
||||
) -> Result<Option<Chunk>> {
|
||||
let row = sqlx::query(
|
||||
"SELECT COALESCE(file_id, 0) as file_id, uuid, chunk_id, chunk_index, chunk_type, COALESCE(fps, 24.0) as fps, COALESCE(start_frame, 0) as start_frame, COALESCE(end_frame, 0) as end_frame, text_content, content, metadata, vector_id, COALESCE(frame_count, 0) as frame_count, pre_chunk_ids, parent_chunk_id, child_chunk_ids FROM chunks WHERE chunk_id = $1 AND uuid = $2"
|
||||
)
|
||||
.bind(chunk_id)
|
||||
.bind(uuid)
|
||||
.fetch_optional(&self.pool)
|
||||
.await?;
|
||||
|
||||
if let Some(r) = row {
|
||||
let chunk_type_str: String = r.get(4);
|
||||
let chunk_index: i32 = r.get(3);
|
||||
let chunk_type = match chunk_type_str.as_str() {
|
||||
"time" => ChunkType::TimeBased,
|
||||
"sentence" => ChunkType::Sentence,
|
||||
"cut" => ChunkType::Cut,
|
||||
"trace" => ChunkType::Trace,
|
||||
"story" => ChunkType::Story,
|
||||
_ => ChunkType::TimeBased,
|
||||
};
|
||||
|
||||
let content: serde_json::Value = r.get(9);
|
||||
let metadata: Option<serde_json::Value> = r.get(10);
|
||||
|
||||
let pre_chunk_ids: Vec<i32> = r.try_get(13).unwrap_or_default();
|
||||
let parent_chunk_id: Option<String> = r.try_get(14).ok().flatten();
|
||||
let child_chunk_ids: Vec<String> = r.try_get(15).unwrap_or_default();
|
||||
|
||||
let (rule, content_data) = if content.get("rule").is_some() {
|
||||
let rule_str = content
|
||||
.get("rule")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("rule_1");
|
||||
let rule = if rule_str == "rule_2" {
|
||||
ChunkRule::Rule2
|
||||
} else {
|
||||
ChunkRule::Rule1
|
||||
};
|
||||
let data = content.get("data").cloned().unwrap_or(content);
|
||||
(rule, data)
|
||||
} else {
|
||||
(ChunkRule::Rule1, content)
|
||||
};
|
||||
|
||||
let file_id: i32 = sqlx::Row::get(&r, "file_id");
|
||||
let frame_count: i32 = sqlx::Row::get(&r, "frame_count");
|
||||
|
||||
Ok(Some(Chunk {
|
||||
file_id,
|
||||
uuid: r.get("uuid"),
|
||||
chunk_id: r.get("chunk_id"),
|
||||
chunk_index: chunk_index as u32,
|
||||
chunk_type,
|
||||
rule,
|
||||
fps: r.get("fps"),
|
||||
start_frame: r.get("start_frame"),
|
||||
end_frame: r.get("end_frame"),
|
||||
text_content: r.get("text_content"),
|
||||
content: content_data,
|
||||
metadata,
|
||||
vector_id: r.get("vector_id"),
|
||||
frame_count,
|
||||
pre_chunk_ids,
|
||||
parent_chunk_id,
|
||||
child_chunk_ids,
|
||||
}))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn store_pre_chunk(&self, pre_chunk: &PreChunk) -> Result<i64> {
|
||||
let row = sqlx::query(
|
||||
r#"
|
||||
@@ -2225,7 +2316,7 @@ impl PostgresDb {
|
||||
r#"
|
||||
INSERT INTO chunk_vectors (chunk_id, uuid, chunk_type, embedding)
|
||||
VALUES ($1, $2, 'sentence', $3::jsonb)
|
||||
ON CONFLICT (chunk_id) DO UPDATE SET
|
||||
ON CONFLICT (chunk_id, uuid) DO UPDATE SET
|
||||
embedding = EXCLUDED.embedding
|
||||
"#,
|
||||
)
|
||||
@@ -2433,8 +2524,7 @@ impl PostgresDb {
|
||||
|
||||
let qdrant = QdrantDb::init().await?;
|
||||
let vector_results = if let Some(uuid) = uuid {
|
||||
let query_f64: Vec<f64> = query_vector.iter().map(|&x| x as f64).collect();
|
||||
qdrant.search_in_uuid(&query_f64, uuid, limit * 2).await?
|
||||
qdrant.search_in_uuid(query_vector, uuid, limit * 2).await?
|
||||
} else {
|
||||
qdrant.search(query_vector, limit * 2).await?
|
||||
};
|
||||
|
||||
@@ -30,7 +30,13 @@ impl QdrantDb {
|
||||
let api_key = std::env::var("QDRANT_API_KEY")
|
||||
.unwrap_or_else(|_| "Test3200Test3200Test3200".to_string());
|
||||
let collection_name =
|
||||
std::env::var("QDRANT_COLLECTION").unwrap_or_else(|_| "chunks_v3".to_string());
|
||||
std::env::var("QDRANT_COLLECTION").unwrap_or_else(|_| "momentry_rule1".to_string());
|
||||
|
||||
tracing::debug!(
|
||||
"QdrantDb::new() - base_url: {}, collection_name: {}",
|
||||
base_url,
|
||||
collection_name
|
||||
);
|
||||
|
||||
Self {
|
||||
client: Client::new(),
|
||||
@@ -84,15 +90,21 @@ impl QdrantDb {
|
||||
|
||||
pub async fn upsert_vector(
|
||||
&self,
|
||||
_chunk_id: &str,
|
||||
chunk_id: &str,
|
||||
vector: &[f32],
|
||||
payload: VectorPayload,
|
||||
) -> Result<()> {
|
||||
let url = format!(
|
||||
"{}/collections/{}/points",
|
||||
"{}/collections/{}/points?wait=true",
|
||||
self.base_url, self.collection_name
|
||||
);
|
||||
|
||||
tracing::debug!(
|
||||
"Qdrant upsert URL: {}, collection_name: {}",
|
||||
url,
|
||||
self.collection_name
|
||||
);
|
||||
|
||||
let mut payload_map = HashMap::new();
|
||||
payload_map.insert("uuid".to_string(), serde_json::json!(payload.uuid));
|
||||
payload_map.insert("chunk_id".to_string(), serde_json::json!(payload.chunk_id));
|
||||
@@ -109,7 +121,14 @@ impl QdrantDb {
|
||||
payload_map.insert("text".to_string(), serde_json::json!(text));
|
||||
}
|
||||
|
||||
let point_id = uuid::Uuid::new_v4().to_string();
|
||||
// Generate consistent point ID from uuid and chunk_id
|
||||
// Qdrant requires integer or UUID point IDs. We'll use a simple integer hash.
|
||||
let point_id_str = format!("{}_{}", payload.uuid, chunk_id);
|
||||
use std::collections::hash_map::DefaultHasher;
|
||||
use std::hash::{Hash, Hasher};
|
||||
let mut hasher = DefaultHasher::new();
|
||||
point_id_str.hash(&mut hasher);
|
||||
let point_id = hasher.finish() as u64;
|
||||
|
||||
let body = serde_json::json!({
|
||||
"points": [{
|
||||
@@ -119,15 +138,41 @@ impl QdrantDb {
|
||||
}]
|
||||
});
|
||||
|
||||
self.client
|
||||
tracing::debug!(
|
||||
"Upserting vector to Qdrant: chunk_id={}, uuid={}, vector_len={}",
|
||||
chunk_id,
|
||||
payload.uuid,
|
||||
vector.len()
|
||||
);
|
||||
|
||||
let response = self
|
||||
.client
|
||||
.put(&url)
|
||||
.header("api-key", &self.api_key)
|
||||
.header("Content-Type", "application/json")
|
||||
.json(&body)
|
||||
.send()
|
||||
.await
|
||||
.context("Failed to upsert vector in Qdrant")?;
|
||||
.context("Failed to send upsert request to Qdrant")?;
|
||||
|
||||
// Check response status
|
||||
let status = response.status();
|
||||
let response_text = response
|
||||
.text()
|
||||
.await
|
||||
.unwrap_or_else(|_| "Failed to read response".to_string());
|
||||
|
||||
if !status.is_success() {
|
||||
tracing::error!("Qdrant upsert failed: {} - {}", status, response_text);
|
||||
return Err(anyhow::anyhow!(
|
||||
"Qdrant upsert failed with status {}: {}",
|
||||
status,
|
||||
response_text
|
||||
));
|
||||
}
|
||||
|
||||
tracing::debug!("Qdrant response: {}", response_text);
|
||||
tracing::info!("Successfully upserted vector for chunk: {}", chunk_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -153,6 +198,22 @@ impl QdrantDb {
|
||||
.await
|
||||
.context("Failed to search Qdrant")?;
|
||||
|
||||
// Check response status
|
||||
let status = response.status();
|
||||
let response_text = response
|
||||
.text()
|
||||
.await
|
||||
.unwrap_or_else(|_| "Failed to read response".to_string());
|
||||
|
||||
if !status.is_success() {
|
||||
tracing::error!("Qdrant search failed: {} - {}", status, response_text);
|
||||
return Err(anyhow::anyhow!(
|
||||
"Qdrant search failed with status {}: {}",
|
||||
status,
|
||||
response_text
|
||||
));
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct QdrantSearchResult {
|
||||
result: Vec<QdrantPoint>,
|
||||
@@ -166,12 +227,19 @@ impl QdrantDb {
|
||||
payload: HashMap<String, serde_json::Value>,
|
||||
}
|
||||
|
||||
let result: QdrantSearchResult = response.json().await?;
|
||||
let result: QdrantSearchResult = serde_json::from_str(&response_text)
|
||||
.context("Failed to parse Qdrant search response")?;
|
||||
|
||||
let search_results: Vec<SearchResult> = result
|
||||
.result
|
||||
.into_iter()
|
||||
.map(|r| {
|
||||
let uuid = r
|
||||
.payload
|
||||
.get("uuid")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("unknown")
|
||||
.to_string();
|
||||
let chunk_id = r
|
||||
.payload
|
||||
.get("chunk_id")
|
||||
@@ -179,6 +247,7 @@ impl QdrantDb {
|
||||
.unwrap_or("unknown")
|
||||
.to_string();
|
||||
SearchResult {
|
||||
uuid,
|
||||
chunk_id,
|
||||
score: r.score as f32,
|
||||
}
|
||||
@@ -190,7 +259,7 @@ impl QdrantDb {
|
||||
|
||||
pub async fn search_in_uuid(
|
||||
&self,
|
||||
query_vector: &[f64],
|
||||
query_vector: &[f32],
|
||||
uuid: &str,
|
||||
limit: usize,
|
||||
) -> Result<Vec<SearchResult>> {
|
||||
@@ -225,6 +294,26 @@ impl QdrantDb {
|
||||
.await
|
||||
.context("Failed to search Qdrant")?;
|
||||
|
||||
// Check response status
|
||||
let status = response.status();
|
||||
let response_text = response
|
||||
.text()
|
||||
.await
|
||||
.unwrap_or_else(|_| "Failed to read response".to_string());
|
||||
|
||||
if !status.is_success() {
|
||||
tracing::error!(
|
||||
"Qdrant search_in_uuid failed: {} - {}",
|
||||
status,
|
||||
response_text
|
||||
);
|
||||
return Err(anyhow::anyhow!(
|
||||
"Qdrant search_in_uuid failed with status {}: {}",
|
||||
status,
|
||||
response_text
|
||||
));
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct QdrantSearchResult {
|
||||
result: Vec<QdrantPoint>,
|
||||
@@ -238,12 +327,19 @@ impl QdrantDb {
|
||||
payload: HashMap<String, serde_json::Value>,
|
||||
}
|
||||
|
||||
let result: QdrantSearchResult = response.json().await?;
|
||||
let result: QdrantSearchResult = serde_json::from_str(&response_text)
|
||||
.context("Failed to parse Qdrant search_in_uuid response")?;
|
||||
|
||||
let search_results: Vec<SearchResult> = result
|
||||
.result
|
||||
.into_iter()
|
||||
.map(|r| {
|
||||
let uuid = r
|
||||
.payload
|
||||
.get("uuid")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("unknown")
|
||||
.to_string();
|
||||
let chunk_id = r
|
||||
.payload
|
||||
.get("chunk_id")
|
||||
@@ -251,6 +347,7 @@ impl QdrantDb {
|
||||
.unwrap_or("unknown")
|
||||
.to_string();
|
||||
SearchResult {
|
||||
uuid,
|
||||
chunk_id,
|
||||
score: r.score as f32,
|
||||
}
|
||||
|
||||
@@ -78,7 +78,7 @@ impl SyncDb {
|
||||
let response = client
|
||||
.post("http://localhost:11434/api/embeddings")
|
||||
.json(&json!({
|
||||
"model": "nomic-embed-text",
|
||||
"model": "nomic-embed-text-v2-moe:latest",
|
||||
"prompt": text
|
||||
}))
|
||||
.send()
|
||||
@@ -137,7 +137,8 @@ impl SyncDb {
|
||||
for chunk in chunks {
|
||||
let text = chunk
|
||||
.content
|
||||
.get("text")
|
||||
.get("data")
|
||||
.and_then(|data| data.get("text"))
|
||||
.and_then(|t| t.as_str())
|
||||
.unwrap_or("")
|
||||
.to_string();
|
||||
|
||||
Reference in New Issue
Block a user