update: pipeline, search, clip, embedding fixes

This commit is contained in:
Accusys
2026-05-17 19:46:35 +08:00
parent eec2eea880
commit 3164a65554
36 changed files with 4313 additions and 4061 deletions

View File

@@ -448,7 +448,7 @@ impl JobWorker {
// 創建 skipped 記錄讓 job 可以正確完成
if let Err(e) = self
.db
.create_processor_result(job.id, *processor_type, &job.uuid)
.upsert_processor_result(job.id, *processor_type, &job.uuid, "skipped")
.await
{
error!("Failed to create skipped processor result: {}", e);
@@ -491,7 +491,7 @@ impl JobWorker {
for skipped_type in processors_to_run.iter().skip(started_count as usize) {
if let Err(e) = self
.db
.create_processor_result(job.id, *skipped_type, &job.uuid)
.upsert_processor_result(job.id, *skipped_type, &job.uuid, "skipped")
.await
{
error!("Failed to create skipped processor result: {}", e);
@@ -550,7 +550,7 @@ impl JobWorker {
let processor_result_id = self
.db
.create_processor_result(job.id, *processor_type, &job.uuid)
.upsert_processor_result(job.id, *processor_type, &job.uuid, "pending")
.await?;
self.redis
@@ -855,10 +855,31 @@ impl JobWorker {
)
.await
{
Ok(count) => info!(
"✅ TMDb face matching: {} bindings created for {}",
count, uuid_clone
),
Ok(count) => {
info!(
"✅ TMDb face matching: {} bindings created for {}",
count, uuid_clone
);
// Save identity files for affected identities
let ids = sqlx::query_scalar::<_, uuid::Uuid>(
"SELECT DISTINCT i.uuid FROM identities i \
JOIN face_detections fd ON fd.identity_id = i.id \
WHERE fd.file_uuid = $1 AND fd.identity_id IS NOT NULL"
)
.bind(&uuid_clone)
.fetch_all(db_clone.pool())
.await
.unwrap_or_default();
for id_uuid in &ids {
let us = id_uuid.to_string().replace('-', "");
if let Err(e) = crate::core::identity::storage::save_identity_file(
&db_clone, &us
).await {
warn!("[P2.5] Failed to save identity file {}: {}", us, e);
}
}
info!("[P2.5] {} identity files saved for {}", ids.len(), uuid_clone);
}
Err(e) => error!("❌ TMDb face matching failed for {}: {}", uuid_clone, e),
}
});

View File

@@ -131,7 +131,7 @@ impl ProcessorPool {
async fn kill_existing_processor(redis: &RedisClient, uuid: &str, processor: &str) {
let prefix = crate::core::config::REDIS_KEY_PREFIX.as_str();
let key = format!("{}worker:job:{}:processor:{}", prefix, uuid, processor);
let key = format!("{}job:{}:processor:{}", prefix, uuid, processor);
if let Ok(mut conn) = redis.get_conn().await {
let old_pid: Option<i32> = redis::cmd("HGET")
.arg(&key)
@@ -231,8 +231,59 @@ impl ProcessorPool {
0,
)
.await;
// Set started_at once (subscriber's update_worker_processor_status won't touch it)
if let Ok(mut conn) = redis.get_conn().await {
let prefix = crate::core::config::REDIS_KEY_PREFIX.as_str();
let key = format!("{}job:{}:processor:{}", prefix, &job.uuid, &processor_name);
let now = chrono::Utc::now().to_rfc3339();
let _: Option<String> = redis::cmd("HSET")
.arg(&key).arg("started_at").arg(&now)
.query_async(&mut conn).await.ok();
let _: Option<String> = redis::cmd("HSET")
.arg(&key).arg("embedding_started_at").arg(&now)
.query_async(&mut conn).await.ok();
}
// Subscribe to Redis progress pub/sub and update processor hash in real-time
let sub_redis = redis.clone();
let sub_uuid = job.uuid.clone();
let sub_processor = processor_name.clone();
let progress_handle = tokio::spawn(async move {
let cb_redis = sub_redis.clone();
let cb_uuid = sub_uuid.clone();
let cb_processor = sub_processor.clone();
if let Err(e) = sub_redis
.subscribe_and_callback(&sub_uuid, move |msg| {
tracing::info!("[Subscriber] Got msg for={} cur={} tot={}",
msg.processor,
msg.data.current.unwrap_or(0),
msg.data.total.unwrap_or(0));
if msg.processor == cb_processor {
let cur = msg.data.current.unwrap_or(0);
let tot = msg.data.total.unwrap_or(0);
let oc = msg.data.output_count.unwrap_or(0);
let r = cb_redis.clone();
let u = cb_uuid.clone();
let p = cb_processor.clone();
tokio::spawn(async move {
match r.update_worker_processor_status(
&u, &p, "running", None,
cur, oc, tot, 0, 0,
).await {
Ok(_) => tracing::info!("[Subscriber] Updated {}: cur={} tot={}", p, cur, tot),
Err(e) => tracing::error!("[Subscriber] FAILED {}: {}", p, e),
}
});
}
})
.await
{
tracing::warn!("[ProgressSub] Subscriber ended: {}", e);
}
});
let result = Self::run_processor(&db, &redis, &job, processor_type, cancel_rx).await;
progress_handle.abort();
match result {
Ok(output) => {
@@ -375,8 +426,11 @@ impl ProcessorPool {
// Generate output path
let output_dir = PathBuf::from(OUTPUT_DIR.as_str());
let output_path =
output_dir.join(format!("{}.{}.json", job.uuid, processor_type.as_str(),));
let suffix = match processor_type {
ProcessorType::Story => format!("{}.story_story", job.uuid),
_ => format!("{}.{}", job.uuid, processor_type.as_str()),
};
let output_path = output_dir.join(format!("{}.json", suffix));
// Ensure output directory exists
if let Some(parent) = output_path.parent() {
@@ -636,7 +690,7 @@ impl ProcessorPool {
let _ = executor
.run(
"parent_chunk_5w1h.py",
&["--file-uuid", &job.uuid, "--max-scenes", "300"],
&["--file-uuid", &job.uuid, "--embed"],
uuid,
"STORY",
Some(std::time::Duration::from_secs(300)),
@@ -662,6 +716,26 @@ impl ProcessorPool {
pid: 0,
})
}
ProcessorType::FiveW1H => {
let executor = crate::core::processor::PythonExecutor::new()?;
let _ = executor
.run(
"parent_chunk_5w1h.py",
&["--file-uuid", &job.uuid, "--embed", "--mode", "llm"],
uuid,
"5W1H",
Some(std::time::Duration::from_secs(300)),
)
.await;
Ok(ProcessorOutput {
data: serde_json::Value::Null,
chunks_produced: 0,
frames_processed: total_frames,
total_frames,
retry_count: 0,
pid: 0,
})
}
}
}