Release v1.0.0 candidate

This commit is contained in:
Accusys
2026-05-08 00:48:15 +08:00
parent 26d9c33419
commit 573714788f
17 changed files with 5040 additions and 895 deletions

View File

@@ -6,7 +6,13 @@ use tokio::time::sleep;
use tracing::{error, info, warn};
use crate::core::chunk::{rule1_ingest, rule3_ingest};
use crate::core::db::{MonitorJobStatus, PostgresDb, ProcessorJobStatus, RedisClient, VideoStatus};
use crate::core::db::qdrant_db::QdrantDb;
use crate::api::five_w1h_agent_api::run_5w1h_agent;
use crate::api::identity_agent_api::run_identity_agent;
use crate::core::db::{
MonitorJobStatus, PostgresDb, ProcessorJobStatus, RedisClient, VectorPayload, VideoStatus,
};
use crate::core::embedding::Embedder;
use crate::worker::config::WorkerConfig;
use crate::worker::processor::{ProcessorPool, ProcessorTask};
use crate::worker::resources::SystemResources;
@@ -140,7 +146,12 @@ impl JobWorker {
.get_running_jobs_with_all_processors_done(self.config.batch_size)
.await?;
for job in running_jobs_done {
let should_retry = self.check_and_complete_job(job.id, &job.uuid).await.is_ok();
let expected_count = if job.processors.is_empty() {
crate::core::db::ProcessorType::all().len()
} else {
job.processors.len()
};
let should_retry = self.check_and_complete_job(job.id, &job.uuid, expected_count).await.is_ok();
if should_retry && self.processor_pool.can_start().await {
if let Err(e) = self.process_job(job.clone()).await {
error!("Failed to reprocess job {}: {}", job.uuid, e);
@@ -453,22 +464,33 @@ impl JobWorker {
// 總是檢查是否可以完成 jobcheck_and_complete_job 內部會判斷)
// processor_results 不足時它會自動略過
self.check_and_complete_job(job.id, &job.uuid).await?;
let expected_count = if job.processors.is_empty() {
crate::core::db::ProcessorType::all().len()
} else {
job.processors.len()
};
self.check_and_complete_job(job.id, &job.uuid, expected_count).await?;
Ok(())
}
async fn check_and_complete_job(&self, job_id: i32, uuid: &str) -> Result<()> {
async fn check_and_complete_job(&self, job_id: i32, uuid: &str, expected_count: usize) -> Result<()> {
let results = self.db.get_processor_results_by_job(job_id).await?;
// 如果 processor_results 筆數少於總 processor 數,代表有 processor 尚未啟動(如依賴未滿足)
let all_processor_types = crate::core::db::ProcessorType::all().len();
if results.len() < all_processor_types {
info!(
"check_and_complete_job: {} results={}, expected={}",
uuid,
results.len(),
expected_count
);
// 如果 processor_results 筆數少於期望的 processor 數,代表有 processor 尚未啟動(如依賴未滿足)
if results.len() < expected_count {
info!(
"Job {} has {}/{} processor results, not all processors created yet. Skipping completion check.",
uuid,
results.len(),
all_processor_types
expected_count
);
return Ok(());
}
@@ -491,6 +513,14 @@ impl JobWorker {
.iter()
.any(|r| matches!(r.status, crate::core::db::ProcessorJobStatus::Failed));
let any_pending = results
.iter()
.any(|r| matches!(r.status, crate::core::db::ProcessorJobStatus::Pending));
let any_skipped = results
.iter()
.any(|r| matches!(r.status, crate::core::db::ProcessorJobStatus::Skipped));
let completed_count = results
.iter()
.filter(|r| {
@@ -544,7 +574,14 @@ impl JobWorker {
let fps = video.fps;
match rule1_ingest::execute_rule1(&db_clone, &uuid_clone, fps).await {
Ok(count) => {
info!("✅ Rule 1 Ingestion completed: {} chunks inserted.", count)
info!("✅ Rule 1 Ingestion completed: {} chunks inserted.", count);
// Automatically vectorize new sentence chunks
if count > 0 {
info!("📝 Starting automatic vectorize for {} chunks...", count);
if let Err(e) = Self::vectorize_chunks(&db_clone, &uuid_clone).await {
error!("❌ Auto-vectorize failed for {}: {}", uuid_clone, e);
}
}
}
Err(e) => error!("❌ Rule 1 Ingestion failed: {}", e),
}
@@ -640,19 +677,27 @@ impl JobWorker {
// 🚀 P3 Trigger: Identity Agent (Face + ASRX)
if has_face && has_asrx {
info!("📝 Prerequisites met for Identity Agent. Starting analysis...");
let db_clone = self.db.clone();
let uuid_clone = uuid.to_string();
tokio::spawn(async move {
tracing::info!("Identity Agent started for video {}", uuid_clone);
match run_identity_agent(&db_clone, &uuid_clone).await {
Ok(()) => info!("✅ Identity Agent completed for {}", uuid_clone),
Err(e) => error!("❌ Identity Agent failed for {}: {}", uuid_clone, e),
}
});
}
// 🚀 P4 Trigger: 5W1H Agent (after Rule 3 completion)
if has_cut && has_asr {
info!("📝 Prerequisites met for 5W1H Agent. Will trigger after Rule 3...");
info!("📝 Prerequisites met for 5W1H Agent. Starting...");
let db_clone = self.db.clone();
let uuid_clone = uuid.to_string();
tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
tracing::info!("5W1H Agent started for video {}", uuid_clone);
match run_5w1h_agent(&db_clone, &uuid_clone).await {
Ok(()) => info!("✅ 5W1H Agent completed for {}", uuid_clone),
Err(e) => error!("❌ 5W1H Agent failed for {}: {}", uuid_clone, e),
}
});
}
@@ -679,8 +724,8 @@ impl JobWorker {
self.redis.delete_worker_job(uuid).await?;
info!("Job {} completed successfully", job_id);
} else if essential_completed && !all_completed {
// 必要 processor 完成但部分非必要失敗 → 仍算完成
} else if essential_completed && !all_completed && !any_pending && !any_skipped {
// 必要 processor 完成但部分非必要失敗 → 仍算完成(但無 pending 者才觸發)
info!(
"Job {} completed with non-essential failures. Essential: {:?}",
job_id, essential_processors
@@ -738,6 +783,66 @@ impl JobWorker {
info!("Shutting down worker...");
self.processor_pool.cancel_all().await;
}
/// 自動對 sentence chunks 產生 vector embedding 並存入 PG + Qdrant
async fn vectorize_chunks(db: &PostgresDb, uuid: &str) -> anyhow::Result<()> {
let embedder = Embedder::new("mxbai-embed-large:latest".to_string());
let qdrant = QdrantDb::new();
let pool = db.pool();
let rows = sqlx::query_as::<_, (String, String, String, f64, f64, String)>(
"SELECT chunk_id, chunk_type, text_content, start_time, end_time, content::text FROM dev.chunks WHERE file_uuid = $1 AND chunk_type = 'sentence' AND embedding IS NULL AND (text_content IS NOT NULL AND text_content != '') ORDER BY chunk_index",
)
.bind(uuid)
.fetch_all(pool)
.await?;
if rows.is_empty() {
info!("[Vectorize] No sentence chunks to vectorize for {}", uuid);
return Ok(());
}
let total = rows.len();
info!("[Vectorize] Starting vectorize of {} chunks for {}", total, uuid);
let mut stored = 0usize;
for (chunk_id, _chunk_type, text, start_time, end_time, _content_str) in &rows {
if text.is_empty() {
continue;
}
match embedder.embed_document(text).await {
Ok(vector) => {
if let Err(e) = db.store_vector(chunk_id, &vector, uuid).await {
error!("[Vectorize] PG store failed for {}: {}", chunk_id, e);
continue;
}
let payload = VectorPayload {
uuid: uuid.to_string(),
chunk_id: chunk_id.clone(),
chunk_type: "sentence".to_string(),
start_time: *start_time,
end_time: *end_time,
text: Some(text.clone()),
};
if let Err(e) = qdrant.upsert_vector(chunk_id, &vector, payload).await {
error!("[Vectorize] Qdrant upsert failed for {}: {}", chunk_id, e);
continue;
}
stored += 1;
if stored % 50 == 0 {
info!("[Vectorize] {}/{} vectors stored for {}", stored, total, uuid);
}
}
Err(e) => {
error!("[Vectorize] Embedding failed for {}: {}", chunk_id, e);
}
}
}
info!("[Vectorize] Completed: {}/{} vectors stored for {}", stored, total, uuid);
Ok(())
}
}
#[cfg(test)]

View File

@@ -1,10 +1,37 @@
use anyhow::{Context, Result};
use libc;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use tracing::{error, info, warn};
/// Guard that ensures processor pool cleanup runs even if the task panics.
struct ProcessorCleanupGuard {
job_id: i32,
running: Arc<RwLock<HashMap<i32, ProcessorHandle>>>,
running_count: Arc<RwLock<usize>>,
}
impl Drop for ProcessorCleanupGuard {
fn drop(&mut self) {
use tokio::sync::TryLockError;
// 嘗試同步清理;若 lock 被佔用則跳過(避免 deadlock
if let Ok(mut guard) = self.running.try_write() {
guard.remove(&self.job_id);
} else {
warn!("[ProcessorCleanupGuard] running lock contended, skipping cleanup");
}
if let Ok(mut guard) = self.running_count.try_write() {
if *guard > 0 {
*guard -= 1;
}
} else {
warn!("[ProcessorCleanupGuard] running_count lock contended, skipping cleanup");
}
}
}
use crate::core::config::{OUTPUT_DIR, PYTHON_PATH, SCRIPTS_DIR};
use crate::core::db::{
MonitorJob, PostgresDb, ProcessorJobStatus, ProcessorType, QdrantDb, RedisClient,
@@ -93,7 +120,7 @@ impl ProcessorPool {
if handle_count == 0 && count == 0 {
if let Err(e) = self
.db
.reset_stale_processor_results(ProcessorJobStatus::Failed, "Worker restarted")
.reset_stale_processor_results(ProcessorJobStatus::Pending, "Worker restarted")
.await
{
error!("Failed to reset stale processor results: {}", e);
@@ -101,7 +128,29 @@ impl ProcessorPool {
}
}
async fn kill_existing_processor(redis: &RedisClient, uuid: &str, processor: &str) {
let prefix = crate::core::config::REDIS_KEY_PREFIX.as_str();
let key = format!("{}worker:job:{}:processor:{}", prefix, uuid, processor);
if let Ok(mut conn) = redis.get_conn().await {
let old_pid: Option<i32> = redis::cmd("HGET")
.arg(&key)
.arg("pid")
.query_async(&mut conn)
.await
.ok()
.flatten();
if let Some(pid) = old_pid {
if pid > 0 {
warn!("[PID] Killing existing process {} for {}/{}", pid, uuid, processor);
unsafe { libc::kill(pid, libc::SIGKILL); }
}
}
}
}
pub async fn start_processor(&self, task: ProcessorTask) -> Result<()> {
Self::kill_existing_processor(&*self.redis, &task.job.uuid, task.processor_type.as_str()).await;
let (cancel_tx, cancel_rx) = mpsc::channel(1);
let job_id = task.job.id;
let processor_type = task.processor_type;
@@ -144,6 +193,13 @@ impl ProcessorPool {
}
tokio::spawn(async move {
// Guard 的 Drop 確保 panic 時也清理 pool state
let _guard = ProcessorCleanupGuard {
job_id,
running: running.clone(),
running_count: running_count.clone(),
};
info!("Starting processor {} for job {}", processor_name, job.uuid);
let _ = db
@@ -171,19 +227,6 @@ impl ProcessorPool {
let result = Self::run_processor(&db, &redis, &job, processor_type, cancel_rx).await;
// Store child PID for stability
{
let mut pid_lock = child_pid.write().await;
*pid_lock = Some(0);
}
{
let mut running_guard = running.write().await;
running_guard.remove(&job_id);
let mut count_guard = running_count.write().await;
*count_guard -= 1;
}
match result {
Ok(output) => {
info!(
@@ -747,6 +790,12 @@ impl ProcessorPool {
"_face"
);
// 確保 collection 存在dim=512 for FaceNet
if let Err(e) = qdrant.ensure_collection(&collection, 512).await {
tracing::error!("Failed to ensure Qdrant face collection: {}", e);
return Ok(());
}
let mut count = 0;
for frame in &face_result.frames {
for face in &frame.faces {
@@ -807,6 +856,12 @@ impl ProcessorPool {
"_voice"
);
// 確保 collection 存在dim=192 for ASRX voice
if let Err(e) = qdrant.ensure_collection(&collection, 192).await {
tracing::error!("Failed to ensure Qdrant voice collection: {}", e);
return Ok(());
}
let embeddings = match &asrx_result.embeddings {
Some(e) => e,
None => return Ok(()),
@@ -958,6 +1013,24 @@ impl ProcessorPool {
db.store_scene_pre_chunks_batch(uuid, &scenes).await?;
for (i, scene) in scene_result.scenes.iter().enumerate() {
let chk_id = format!("scene_{}", i + 1);
let meta = serde_json::json!({
"scene_type": scene.scene_type,
"scene_type_zh": scene.scene_type_zh,
"confidence": scene.confidence,
"top_5": scene.top_5,
});
let _ = sqlx::query(
"UPDATE dev.chunks SET metadata = metadata || $1::jsonb WHERE file_uuid=$2 AND chunk_id=$3"
)
.bind(&meta)
.bind(uuid)
.bind(&chk_id)
.execute(db.pool())
.await;
}
tracing::info!(
"Stored {} Scene pre-chunks for video {}",
scenes.len(),