fix: Qdrant collection name + PipelineProgress accumulation
- scan.rs: rule1 collection 'momentry_public_rule1_v2' → 'momentry_rule1' - progress.rs: publish_pipeline_progress now reads existing progress and merges stages
This commit is contained in:
@@ -801,7 +801,11 @@ async fn get_file_stats(
|
|||||||
|
|
||||||
// Text chunk stats (rule1 collection)
|
// Text chunk stats (rule1 collection)
|
||||||
let schema = std::env::var("DATABASE_SCHEMA").unwrap_or_else(|_| "dev".to_string());
|
let schema = std::env::var("DATABASE_SCHEMA").unwrap_or_else(|_| "dev".to_string());
|
||||||
let rule1_collection = format!("momentry_{}_rule1_v2", schema);
|
let rule1_collection = if schema == "public" {
|
||||||
|
"momentry_rule1".to_string()
|
||||||
|
} else {
|
||||||
|
format!("momentry_{}_rule1_v2", schema)
|
||||||
|
};
|
||||||
let text_filter = json!({
|
let text_filter = json!({
|
||||||
"must": [{"key": "file_uuid", "match": {"value": file_uuid}}]
|
"must": [{"key": "file_uuid", "match": {"value": file_uuid}}]
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -518,7 +518,7 @@ pub async fn get_progress(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Publish pipeline progress to Redis
|
/// Publish pipeline progress to Redis (accumulates with existing progress)
|
||||||
pub async fn publish_pipeline_progress(
|
pub async fn publish_pipeline_progress(
|
||||||
redis: &RedisClient,
|
redis: &RedisClient,
|
||||||
file_uuid: &str,
|
file_uuid: &str,
|
||||||
@@ -530,7 +530,32 @@ pub async fn publish_pipeline_progress(
|
|||||||
file_uuid
|
file_uuid
|
||||||
);
|
);
|
||||||
if let Ok(mut conn) = redis.get_conn().await {
|
if let Ok(mut conn) = redis.get_conn().await {
|
||||||
let json = serde_json::to_string(progress).unwrap_or_default();
|
// Try to read existing progress first
|
||||||
|
let existing: Option<PipelineProgress> = redis::cmd("GET")
|
||||||
|
.arg(&key)
|
||||||
|
.query_async(&mut conn)
|
||||||
|
.await
|
||||||
|
.ok()
|
||||||
|
.and_then(|s: String| serde_json::from_str(&s).ok());
|
||||||
|
|
||||||
|
let merged = if let Some(mut existing) = existing {
|
||||||
|
// Merge: update stages from new progress onto existing
|
||||||
|
for new_stage in &progress.stages {
|
||||||
|
if new_stage.status == "completed" || new_stage.progress > 0.0 {
|
||||||
|
if let Some(existing_stage) = existing.stages.iter_mut().find(|s| s.name == new_stage.name) {
|
||||||
|
existing_stage.status = new_stage.status.clone();
|
||||||
|
existing_stage.progress = new_stage.progress;
|
||||||
|
existing_stage.detail = new_stage.detail.clone();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
existing.recalculate_overall();
|
||||||
|
existing
|
||||||
|
} else {
|
||||||
|
progress.clone()
|
||||||
|
};
|
||||||
|
|
||||||
|
let json = serde_json::to_string(&merged).unwrap_or_default();
|
||||||
let _: Result<(), _> = redis::cmd("SET")
|
let _: Result<(), _> = redis::cmd("SET")
|
||||||
.arg(&[&key, &json])
|
.arg(&[&key, &json])
|
||||||
.query_async(&mut conn)
|
.query_async(&mut conn)
|
||||||
|
|||||||
Reference in New Issue
Block a user