cleanup: remove dead code and duplicate docs

- Remove session-ses_2f27.md (161KB raw session log)
- Remove 49 ROOT_* duplicate files across REFERENCE/
- Remove 14 duplicate files between REFERENCE/ root and history/
- Remove asr_legacy.rs (dead code, replaced by asr.rs)
- Remove src/core/worker/ (duplicate JobWorker)
- Remove src/core/layers/ (empty directory)
- Remove 4 .bak files in src/
- Remove 7 dead private methods in worker/processor.rs
- Remove backup directory from git tracking
This commit is contained in:
Warren
2026-05-04 01:31:21 +08:00
parent ee81e343ce
commit e75c4d6f07
3270 changed files with 35190 additions and 53367 deletions

View File

@@ -9,6 +9,7 @@ use crate::core::chunk::{rule1_ingest, rule3_ingest};
use crate::core::db::{MonitorJobStatus, PostgresDb, ProcessorJobStatus, RedisClient, VideoStatus};
use crate::worker::config::WorkerConfig;
use crate::worker::processor::{ProcessorPool, ProcessorTask};
use crate::worker::resources::SystemResources;
pub struct JobWorker {
db: Arc<PostgresDb>,
@@ -51,15 +52,126 @@ impl JobWorker {
}
async fn poll_and_process(&self) -> Result<()> {
// Always check for completion of running jobs first
// This ensures jobs with all processors in terminal states are marked complete/failed
let running_jobs_done = self
.db
.get_running_jobs_with_all_processors_done(self.config.batch_size)
.await?;
for job in running_jobs_done {
if let Err(e) = self.check_and_complete_job(job.id, &job.uuid).await {
error!("Failed to complete job {}: {}", job.uuid, e);
// 檢查系統資源並寫入 Redis
let resources = SystemResources::check();
let dynamic_max = resources.safe_max_concurrent(self.config.max_concurrent);
let health_key = format!("{}health", crate::core::config::REDIS_KEY_PREFIX.as_str());
let now = chrono::Utc::now().to_rfc3339();
let running_count = self.processor_pool.get_running_count().await;
if let Ok(mut conn) = self.redis.get_conn().await {
let gpu_util_str = resources
.gpu_utilization
.map(|v| format!("{:.1}", v))
.unwrap_or_default();
let gpu_mem_str = resources
.gpu_memory_used_pct
.map(|v| format!("{:.1}", v))
.unwrap_or_default();
let gpu_avail_str = if resources.gpu_available {
"true"
} else {
"false"
};
let _: Option<String> = redis::cmd("HMSET")
.arg(&[
(&health_key as &str),
"status",
if dynamic_max > 0 {
"healthy"
} else {
"throttled"
},
"cpu_idle_pct",
&format!("{:.1}", resources.cpu_idle_percent),
"memory_available_mb",
&resources.memory_available_mb.to_string(),
"memory_total_mb",
&resources.memory_total_mb.to_string(),
"memory_used_pct",
&format!("{:.1}", resources.memory_used_percent),
"gpu_available",
gpu_avail_str,
"gpu_utilization_pct",
&gpu_util_str,
"gpu_memory_used_pct",
&gpu_mem_str,
"dynamic_concurrency",
&dynamic_max.to_string(),
"config_concurrency",
&self.config.max_concurrent.to_string(),
"running_processors",
&running_count.to_string(),
"updated_at",
&now,
])
.query_async(&mut conn)
.await
.ok();
}
let gpu_log = match (
resources.gpu_available,
resources.gpu_utilization,
resources.gpu_memory_used_pct,
) {
(true, Some(util), Some(mem)) => format!("GPU util={:.1}% mem={:.1}%", util, mem),
(true, Some(util), None) => format!("GPU util={:.1}%", util),
(true, None, _) => "GPU available".to_string(),
(false, _, _) => "No GPU".to_string(),
};
info!(
"System: CPU idle={:.1}%, Memory={}MB/{}MB ({:.1}%), {}. Dynamic concurrency: {} (config: {})",
resources.cpu_idle_percent,
resources.memory_available_mb,
resources.memory_total_mb,
resources.memory_used_percent,
gpu_log,
dynamic_max,
self.config.max_concurrent,
);
// Check for running jobs that may have pending dependencies satisfied
// First: jobs with all processors done (正常完成檢查)
{
let running_jobs_done = self
.db
.get_running_jobs_with_all_processors_done(self.config.batch_size)
.await?;
for job in running_jobs_done {
let should_retry = self.check_and_complete_job(job.id, &job.uuid).await.is_ok();
if should_retry && self.processor_pool.can_start().await {
if let Err(e) = self.process_job(job.clone()).await {
error!("Failed to reprocess job {}: {}", job.uuid, e);
}
}
}
}
// Second: check running jobs with pending/deferred processors
{
let running_jobs = self.db.get_all_running_jobs(self.config.batch_size).await?;
for job in running_jobs {
if !self.processor_pool.can_start().await {
break;
}
let results = self
.db
.get_processor_results_by_job(job.id)
.await
.unwrap_or_default();
// 若有任何 processor 是 pending/skipped未真正啟動重新處理 job
let has_unstarted = results.iter().any(|r| {
matches!(
r.status,
crate::core::db::ProcessorJobStatus::Pending
| crate::core::db::ProcessorJobStatus::Skipped
)
});
if has_unstarted {
if let Err(e) = self.process_job(job.clone()).await {
error!("Failed to reprocess job {}: {}", job.uuid, e);
}
}
}
}
@@ -88,18 +200,51 @@ impl JobWorker {
info!("Processing job: {} ({})", job.uuid, job.id);
// Determine which processors to run based on job.processors field
let processors_to_run: Vec<crate::core::db::ProcessorType> = if job.processors.is_empty() {
info!("No processors specified, running all processors");
crate::core::db::ProcessorType::all()
} else {
info!("Processors specified: {:?}", job.processors);
job.processors
.iter()
.filter_map(|p| crate::core::db::ProcessorType::from_db_str(p))
.collect()
};
let mut processors_to_run: Vec<crate::core::db::ProcessorType> =
if job.processors.is_empty() {
info!("No processors specified, running all processors");
crate::core::db::ProcessorType::all()
} else {
info!("Processors specified: {:?}", job.processors);
job.processors
.iter()
.filter_map(|p| crate::core::db::ProcessorType::from_db_str(p))
.collect()
};
let total_processors = processors_to_run.len() as i32;
// 長影片動態調整:若 CUT 場景過長Face 需在 ASR 之前執行
if let Ok(Some(video)) = self.db.get_video_by_uuid(&job.uuid).await {
// 條件cut_done 且場景數 <= 3 且最長場景 > 600s10分鐘
if video.cut_done && video.cut_count <= 3 && video.cut_max_duration > 600.0 {
info!(
"[DYNAMIC] Long cut detected: {} scenes, max_dur={:.0}s for {}. Moving Face before ASR.",
video.cut_count, video.cut_max_duration, job.uuid
);
// 確保 Face 在 ASR 之前
if let Some(asr_pos) = processors_to_run
.iter()
.position(|p| matches!(p, crate::core::db::ProcessorType::Asr))
{
if let Some(face_pos) = processors_to_run
.iter()
.position(|p| matches!(p, crate::core::db::ProcessorType::Face))
{
if face_pos > asr_pos {
// 將 Face 移到 ASR 前面
let face = processors_to_run.remove(face_pos);
let insert_pos = processors_to_run
.iter()
.position(|p| matches!(p, crate::core::db::ProcessorType::Asr))
.unwrap();
processors_to_run.insert(insert_pos, face);
info!("[DYNAMIC] Reordered processors: Face now ahead of ASR");
}
}
}
}
}
let total_processor_types = processors_to_run.len() as i32;
// Get video total_frames for progress tracking
let video = self.db.get_video_by_uuid(&job.uuid).await?;
@@ -116,7 +261,7 @@ impl JobWorker {
.await?;
self.redis
.update_worker_job_status(&job.uuid, job.id, "running", None, 0, total_processors)
.update_worker_job_status(&job.uuid, job.id, "running", None, 0, total_processor_types)
.await?;
// Get existing processor results for this job
@@ -126,7 +271,8 @@ impl JobWorker {
result_map.insert(result.processor_type, result);
}
for processor_type in processors_to_run {
let mut started_count = 0i32;
for processor_type in &processors_to_run {
// Update processor status to running
self.db
.update_processor_progress(
@@ -139,17 +285,19 @@ impl JobWorker {
.await?;
// Check if processor already in terminal state
if let Some(result) = result_map.get(&processor_type) {
if let Some(result) = result_map.get(processor_type) {
match result.status {
ProcessorJobStatus::Completed | ProcessorJobStatus::Skipped => {
ProcessorJobStatus::Completed => {
info!(
"Processor {} already completed, skipping",
processor_type.as_str()
);
started_count += 1;
continue;
}
ProcessorJobStatus::Failed => {
info!("Processor {} failed, skipping", processor_type.as_str());
started_count += 1;
continue;
}
ProcessorJobStatus::Running => {
@@ -157,41 +305,136 @@ impl JobWorker {
"Processor {} already running, skipping",
processor_type.as_str()
);
started_count += 1;
continue;
}
ProcessorJobStatus::Pending => {
// Skipped 不視為 terminal — 允許重新啟動
ProcessorJobStatus::Skipped | ProcessorJobStatus::Pending => {
// Continue to start processor
}
}
}
// Check dependencies: all dependent processors must be completed
let deps = processor_type.dependencies();
if !deps.is_empty() {
let mut any_dep_failed = false;
let all_deps_met =
deps.iter()
.all(|dep| match result_map.get(dep).map(|r| &r.status) {
Some(ProcessorJobStatus::Completed) => true,
Some(ProcessorJobStatus::Failed) => {
any_dep_failed = true;
false
}
_ => false,
});
if any_dep_failed {
info!(
"Processor {} dependency failed (need {:?}), skipping",
processor_type.as_str(),
deps.iter().map(|d| d.as_str()).collect::<Vec<_>>(),
);
// 創建 skipped 記錄讓 job 可以正確完成
if let Err(e) = self
.db
.create_processor_result(job.id, *processor_type, &job.uuid)
.await
{
error!("Failed to create skipped processor result: {}", e);
}
// 同時更新 Redis 狀態
let _ = self
.redis
.update_worker_processor_status(
&job.uuid,
processor_type.as_str(),
"skipped",
None,
0,
0,
0,
0,
0,
)
.await;
started_count += 1;
continue;
}
if !all_deps_met {
info!(
"Processor {} dependencies not met (need {:?}), deferring",
processor_type.as_str(),
deps.iter().map(|d| d.as_str()).collect::<Vec<_>>(),
);
continue;
}
}
// Check capacity before starting processor
if !self.processor_pool.can_start().await {
info!(
"Max concurrent processors reached, skipping remaining processors for job {}",
job.uuid
);
// 為所有未啟動的 processors 創建 Skipped 記錄
for skipped_type in processors_to_run.iter().skip(started_count as usize) {
if let Err(e) = self
.db
.create_processor_result(job.id, *skipped_type, &job.uuid)
.await
{
error!("Failed to create skipped processor result: {}", e);
}
let _ = self
.redis
.update_worker_processor_status(
&job.uuid,
skipped_type.as_str(),
"skipped",
None,
0,
0,
0,
0,
0,
)
.await;
}
break;
}
let processor_result_id = self
.db
.create_processor_result(job.id, processor_type, &job.uuid)
.create_processor_result(job.id, *processor_type, &job.uuid)
.await?;
self.redis
.update_worker_processor_status(&job.uuid, processor_type.as_str(), "pending", None)
.update_worker_processor_status(
&job.uuid,
processor_type.as_str(),
"pending",
None,
0,
0,
0,
0,
0,
)
.await?;
let task = ProcessorTask {
job: job.clone(),
processor_type,
processor_type: *processor_type,
processor_result_id,
};
self.processor_pool.start_processor(task).await?;
started_count += 1;
}
// 總是檢查是否可以完成 jobcheck_and_complete_job 內部會判斷)
// processor_results 不足時它會自動略過
self.check_and_complete_job(job.id, &job.uuid).await?;
Ok(())
@@ -200,14 +443,32 @@ impl JobWorker {
async fn check_and_complete_job(&self, job_id: i32, uuid: &str) -> Result<()> {
let results = self.db.get_processor_results_by_job(job_id).await?;
let all_completed = results.iter().all(|r| {
matches!(
r.status,
crate::core::db::ProcessorJobStatus::Completed
| crate::core::db::ProcessorJobStatus::Skipped
)
// 如果 processor_results 筆數少於總 processor 數,代表有 processor 尚未啟動(如依賴未滿足)
let all_processor_types = crate::core::db::ProcessorType::all().len();
if results.len() < all_processor_types {
info!(
"Job {} has {}/{} processor results, not all processors created yet. Skipping completion check.",
uuid,
results.len(),
all_processor_types
);
return Ok(());
}
// 定義必要 processor必須完成的才算 job 成功)
let essential_processors = ["cut", "asr", "yolo"];
let essential_completed = essential_processors.iter().all(|ep| {
results.iter().any(|r| {
r.processor_type.as_str() == *ep
&& matches!(r.status, crate::core::db::ProcessorJobStatus::Completed)
})
});
let all_completed = results
.iter()
.all(|r| matches!(r.status, crate::core::db::ProcessorJobStatus::Completed));
let any_failed = results
.iter()
.any(|r| matches!(r.status, crate::core::db::ProcessorJobStatus::Failed));
@@ -242,7 +503,7 @@ impl JobWorker {
.map(|r| r.processor_type.as_str().to_string())
.collect();
// Check prerequisites for Rule 1 Chunking BEFORE moving arrays
// Check prerequisites for post-processing triggers
let has_asr = completed_processors.iter().any(|p| p == "asr");
let has_asrx = completed_processors.iter().any(|p| p == "asrx");
let has_cut = completed_processors.iter().any(|p| p == "cut");
@@ -251,33 +512,33 @@ impl JobWorker {
// Update processor arrays in job record
self.db
.update_job_processors_arrays(job_id, completed_processors, failed_processors)
.update_job_processors_arrays(job_id, completed_processors, failed_processors.clone())
.await?;
if all_completed && !any_failed {
// 🚀 P1 Trigger: Rule 1 Chunking
if has_asr && has_asrx {
info!("📝 Prerequisites met for Rule 1 Chunking. Starting ingestion...");
let db_clone = self.db.clone();
let uuid_clone = uuid.to_string();
tokio::spawn(async move {
match db_clone.get_video_by_uuid(&uuid_clone).await {
Ok(Some(video)) => {
let fps = video.fps;
match rule1_ingest::execute_rule1(&db_clone, &uuid_clone, fps).await {
Ok(count) => info!(
"✅ Rule 1 Ingestion completed: {} chunks inserted.",
count
),
Err(e) => error!("❌ Rule 1 Ingestion failed: {}", e),
// 🚀 P1 Trigger: Rule 1 Chunking僅需 ASR + ASRX
if has_asr && has_asrx {
info!("📝 Prerequisites met for Rule 1 Chunking. Starting ingestion...");
let db_clone = self.db.clone();
let uuid_clone = uuid.to_string();
tokio::spawn(async move {
match db_clone.get_video_by_uuid(&uuid_clone).await {
Ok(Some(video)) => {
let fps = video.fps;
match rule1_ingest::execute_rule1(&db_clone, &uuid_clone, fps).await {
Ok(count) => {
info!("✅ Rule 1 Ingestion completed: {} chunks inserted.", count)
}
Err(e) => error!("❌ Rule 1 Ingestion failed: {}", e),
}
Ok(None) => error!("Video not found for chunking: {}", uuid_clone),
Err(e) => error!("Failed to get video info for chunking: {}", e),
}
});
}
Ok(None) => error!("Video not found for chunking: {}", uuid_clone),
Err(e) => error!("Failed to get video info for chunking: {}", e),
}
});
}
// Rule 3 / Trace / Identity 需要 all_completed含非必要 processor 失敗也可)
if all_completed {
// 🚀 P1 Trigger: Rule 3 Scene Chunking
if has_cut && has_asr {
info!("📝 Prerequisites met for Rule 3 Scene Chunking. Starting ingestion...");
@@ -294,6 +555,35 @@ impl JobWorker {
});
}
// 🚀 P2 Trigger: Trace Face Aggregation (after Face)
if has_face {
info!("📝 Face completed, triggering trace_face aggregation...");
let db_clone = self.db.clone();
let uuid_clone = uuid.to_string();
tokio::spawn(async move {
let executor = match crate::core::processor::PythonExecutor::new() {
Ok(ex) => ex,
Err(e) => {
error!("Failed to create PythonExecutor for trace_face: {}", e);
return;
}
};
match executor
.run(
"trace_face_aggregator.py",
&["--file-uuid", &uuid_clone],
Some(&uuid_clone),
"TRACE_FACE",
Some(std::time::Duration::from_secs(300)),
)
.await
{
Ok(()) => info!("✅ Trace Face aggregation completed for {}", uuid_clone),
Err(e) => error!("❌ Trace Face aggregation failed: {}", e),
}
});
}
// 🚀 P3 Trigger: Identity Agent (Face + ASRX)
if has_face && has_asrx {
info!("📝 Prerequisites met for Identity Agent. Starting analysis...");
@@ -336,6 +626,38 @@ impl JobWorker {
self.redis.delete_worker_job(uuid).await?;
info!("Job {} completed successfully", job_id);
} else if essential_completed && !all_completed {
// 必要 processor 完成但部分非必要失敗 → 仍算完成
info!(
"Job {} completed with non-essential failures. Essential: {:?}",
job_id, essential_processors
);
self.db
.update_job_status(job_id, MonitorJobStatus::Completed)
.await?;
self.db
.update_video_status(uuid, VideoStatus::Completed)
.await?;
let video = self.db.get_video_by_uuid(uuid).await?;
let total_frames = video.map(|v| v.total_frames).unwrap_or(0);
self.db
.update_processing_status_completed(uuid, total_frames)
.await?;
self.redis
.update_worker_job_status(uuid, job_id, "completed", None, completed_count, 7)
.await?;
self.redis.delete_worker_job(uuid).await?;
info!(
"Job {} completed with {} non-essential failures",
job_id,
failed_processors.len()
);
} else if any_failed {
self.db
.update_job_status(job_id, MonitorJobStatus::Failed)

View File

@@ -1,6 +1,7 @@
pub mod config;
pub mod job_worker;
pub mod processor;
pub mod resources;
pub use config::WorkerConfig;
pub use job_worker::JobWorker;

View File

@@ -6,7 +6,9 @@ use tokio::sync::{mpsc, RwLock};
use tracing::{error, info};
use crate::core::config::{OUTPUT_DIR, PYTHON_PATH, SCRIPTS_DIR};
use crate::core::db::{MonitorJob, PostgresDb, ProcessorJobStatus, ProcessorType, RedisClient};
use crate::core::db::{
MonitorJob, PostgresDb, ProcessorJobStatus, ProcessorType, QdrantDb, RedisClient,
};
use crate::core::processor;
use crate::core::processor::asr::AsrResult;
use crate::core::processor::asrx::AsrxResult;
@@ -17,12 +19,16 @@ use crate::core::processor::pose::PoseResult;
use crate::core::processor::scene_classification::SceneClassificationResult;
use crate::core::processor::visual_chunk::VisualChunkResult;
use crate::core::processor::yolo::YoloResult;
use crate::worker::resources::SystemResources;
#[derive(Debug)]
struct ProcessorOutput {
data: serde_json::Value,
chunks_produced: i32,
frames_processed: i32,
total_frames: i32,
retry_count: i32,
pid: i32,
}
#[derive(Debug, Clone)]
@@ -35,7 +41,7 @@ pub struct ProcessorTask {
pub struct ProcessorPool {
db: Arc<PostgresDb>,
redis: Arc<RedisClient>,
max_concurrent: usize,
config_max: usize,
running: Arc<RwLock<HashMap<i32, ProcessorHandle>>>,
running_count: Arc<RwLock<usize>>,
}
@@ -51,15 +57,22 @@ impl ProcessorPool {
Self {
db,
redis,
max_concurrent,
config_max: max_concurrent,
running: Arc::new(RwLock::new(HashMap::new())),
running_count: Arc::new(RwLock::new(0)),
}
}
/// 根據系統資源計算當前安全的並發上限
pub async fn current_max(&self) -> usize {
let resources = SystemResources::check();
resources.safe_max_concurrent(self.config_max).max(1)
}
pub async fn can_start(&self) -> bool {
let count = *self.running_count.read().await;
count < self.max_concurrent
let max = self.current_max().await;
count < max
}
pub async fn start_processor(&self, task: ProcessorTask) -> Result<()> {
@@ -67,10 +80,14 @@ impl ProcessorPool {
let job_id = task.job.id;
let processor_type = task.processor_type;
let current_limit = self.current_max().await;
{
let mut count = self.running_count.write().await;
if *count >= self.max_concurrent {
anyhow::bail!("Max concurrent processors reached");
if *count >= current_limit {
anyhow::bail!(
"Max concurrent processors reached (dynamic limit: {})",
current_limit
);
}
*count += 1;
}
@@ -104,7 +121,17 @@ impl ProcessorPool {
.await;
let _ = redis
.update_worker_processor_status(&job.uuid, &processor_name, "running", None)
.update_worker_processor_status(
&job.uuid,
&processor_name,
"running",
None,
0,
0,
0,
0,
0,
)
.await;
let result = Self::run_processor(&db, &redis, &job, processor_type, cancel_rx).await;
@@ -142,6 +169,11 @@ impl ProcessorPool {
&processor_name,
"completed",
None,
output.frames_processed,
output.chunks_produced,
output.total_frames,
output.retry_count,
output.pid,
)
.await
{
@@ -173,6 +205,11 @@ impl ProcessorPool {
&processor_name,
"failed",
Some(&e.to_string()),
0,
0,
0,
0,
0,
)
.await
{
@@ -196,12 +233,8 @@ impl ProcessorPool {
// Generate output path
let output_dir = PathBuf::from(OUTPUT_DIR.as_str());
let output_path = output_dir.join(format!(
"job_{}_{}_{}.json",
job.id,
processor_type.as_str(),
chrono::Utc::now().timestamp_millis()
));
let output_path =
output_dir.join(format!("{}.{}.json", job.uuid, processor_type.as_str(),));
// Ensure output directory exists
if let Some(parent) = output_path.parent() {
@@ -229,11 +262,22 @@ impl ProcessorPool {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
total_frames,
retry_count: 0,
pid: 0,
})
}
ProcessorType::Cut => {
let result =
processor::process_cut(video_path, output_path.to_str().unwrap(), uuid).await?;
let cut_path =
std::path::Path::new(&output_dir).join(format!("{}.cut.json", job.uuid));
let result = if cut_path.exists() {
// CUT 在 register 階段已完成,直接載入
let content =
std::fs::read_to_string(&cut_path).context("Failed to read cut.json")?;
serde_json::from_str(&content).context("Failed to parse cut.json")?
} else {
processor::process_cut(video_path, output_path.to_str().unwrap(), uuid).await?
};
let chunks_produced = result.scenes.len() as i32;
tracing::info!(
"CUT completed, storing {} scenes for {}",
@@ -247,6 +291,9 @@ impl ProcessorPool {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
total_frames,
retry_count: 0,
pid: 0,
})
}
ProcessorType::Yolo => {
@@ -266,6 +313,9 @@ impl ProcessorPool {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
total_frames,
retry_count: 0,
pid: 0,
})
}
ProcessorType::Ocr => {
@@ -284,6 +334,9 @@ impl ProcessorPool {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
total_frames,
retry_count: 0,
pid: 0,
})
}
ProcessorType::Face => {
@@ -299,10 +352,17 @@ impl ProcessorPool {
if let Err(e) = Self::store_face_chunks(db, &job.uuid, &result).await {
tracing::error!("Failed to store FACE chunks for {}: {}", job.uuid, e);
}
// 將 face embedding 寫入 Qdrant
if let Err(e) = Self::store_face_embeddings_to_qdrant(&job.uuid, &result).await {
tracing::error!("Failed to store face embeddings to Qdrant: {}", e);
}
Ok(ProcessorOutput {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
total_frames,
retry_count: 0,
pid: 0,
})
}
ProcessorType::Pose => {
@@ -322,6 +382,9 @@ impl ProcessorPool {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
total_frames,
retry_count: 0,
pid: 0,
})
}
ProcessorType::Asrx => {
@@ -337,10 +400,17 @@ impl ProcessorPool {
if let Err(e) = Self::store_asrx_chunks(db, &job.uuid, &result).await {
tracing::error!("Failed to store ASRX chunks for {}: {}", job.uuid, e);
}
// 將 voice embeddings 寫入 Qdrant
if let Err(e) = Self::store_voice_embeddings_to_qdrant(&job.uuid, &result).await {
tracing::error!("Failed to store voice embeddings to Qdrant: {}", e);
}
Ok(ProcessorOutput {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
total_frames,
retry_count: 0,
pid: 0,
})
}
ProcessorType::VisualChunk => {
@@ -363,15 +433,44 @@ impl ProcessorPool {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
total_frames,
retry_count: 0,
pid: 0,
})
}
ProcessorType::Scene => {
let result = processor::process_scene_classification(
video_path,
output_path.to_str().unwrap(),
uuid,
)
.await?;
let scene_path =
std::path::Path::new(&output_dir).join(format!("{}.scene.json", job.uuid));
let scene_err =
std::path::Path::new(&output_dir).join(format!("{}.scene.err", job.uuid));
let scene_tmp =
std::path::Path::new(&output_dir).join(format!("{}.scene.tmp", job.uuid));
// 優先順序:.err跳過→ .json載入→ .tmp等待或重新執行
let result = if scene_err.exists() {
tracing::warn!("Scene previously failed for {}, skipping", job.uuid);
return Ok(ProcessorOutput {
data: serde_json::Value::Null,
chunks_produced: 0,
frames_processed: 0,
total_frames,
retry_count: 0,
pid: 0,
});
} else if scene_path.exists() {
tracing::info!("Scene JSON exists for {}, loading from file", job.uuid);
crate::core::processor::load_scene_from_file(scene_path.to_str().unwrap())?
} else if scene_tmp.exists() {
tracing::warn!("Scene tmp exists for {}, waiting for completion", job.uuid);
// 生產環境應等待,此處直接跳過避免卡住
crate::core::processor::SceneClassificationResult::default()
} else {
processor::process_scene_classification(
video_path,
output_path.to_str().unwrap(),
uuid,
)
.await?
};
let chunks_produced = result.scenes.len() as i32;
tracing::info!(
"Scene classification completed, storing {} scenes for {}",
@@ -385,186 +484,14 @@ impl ProcessorPool {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
total_frames,
retry_count: 0,
pid: 0,
})
}
}
}
#[allow(dead_code)]
async fn run_asr(
_db: &PostgresDb,
_redis: &RedisClient,
video_path: &str,
_cancel_rx: &mut mpsc::Receiver<()>,
) -> Result<serde_json::Value> {
let script_path = std::env::var("MOMENTRY_ASR_SCRIPT")
.unwrap_or_else(|_| format!("{}/asr_processor.py", SCRIPTS_DIR.as_str()));
let output = tokio::process::Command::new(PYTHON_PATH.as_str())
.arg(&script_path)
.arg(video_path)
.output()
.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("ASR script failed: {}", stderr);
}
let result: serde_json::Value = serde_json::from_slice(&output.stdout)?;
Ok(result)
}
#[allow(dead_code)]
async fn run_cut(
_db: &PostgresDb,
_redis: &RedisClient,
video_path: &str,
_cancel_rx: &mut mpsc::Receiver<()>,
) -> Result<serde_json::Value> {
let script_path = std::env::var("MOMENTRY_CUT_SCRIPT")
.unwrap_or_else(|_| format!("{}/cut_processor.py", SCRIPTS_DIR.as_str()));
let output = tokio::process::Command::new(PYTHON_PATH.as_str())
.arg(&script_path)
.arg(video_path)
.output()
.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("CUT script failed: {}", stderr);
}
let result: serde_json::Value = serde_json::from_slice(&output.stdout)?;
Ok(result)
}
#[allow(dead_code)]
async fn run_yolo(
_db: &PostgresDb,
_redis: &RedisClient,
video_path: &str,
_cancel_rx: &mut mpsc::Receiver<()>,
) -> Result<serde_json::Value> {
let script_path = std::env::var("MOMENTRY_YOLO_SCRIPT")
.unwrap_or_else(|_| format!("{}/yolo_processor.py", SCRIPTS_DIR.as_str()));
let output = tokio::process::Command::new(PYTHON_PATH.as_str())
.arg(&script_path)
.arg(video_path)
.output()
.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("YOLO script failed: {}", stderr);
}
let result: serde_json::Value = serde_json::from_slice(&output.stdout)?;
Ok(result)
}
#[allow(dead_code)]
async fn run_ocr(
_db: &PostgresDb,
_redis: &RedisClient,
video_path: &str,
_cancel_rx: &mut mpsc::Receiver<()>,
) -> Result<serde_json::Value> {
let script_path = std::env::var("MOMENTRY_OCR_SCRIPT")
.unwrap_or_else(|_| format!("{}/ocr_processor.py", SCRIPTS_DIR.as_str()));
let output = tokio::process::Command::new(PYTHON_PATH.as_str())
.arg(&script_path)
.arg(video_path)
.output()
.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("OCR script failed: {}", stderr);
}
let result: serde_json::Value = serde_json::from_slice(&output.stdout)?;
Ok(result)
}
#[allow(dead_code)]
async fn run_face(
_db: &PostgresDb,
_redis: &RedisClient,
video_path: &str,
_cancel_rx: &mut mpsc::Receiver<()>,
) -> Result<serde_json::Value> {
let script_path = std::env::var("MOMENTRY_FACE_SCRIPT")
.unwrap_or_else(|_| format!("{}/face_processor.py", SCRIPTS_DIR.as_str()));
let output = tokio::process::Command::new(PYTHON_PATH.as_str())
.arg(&script_path)
.arg(video_path)
.output()
.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("Face script failed: {}", stderr);
}
let result: serde_json::Value = serde_json::from_slice(&output.stdout)?;
Ok(result)
}
#[allow(dead_code)]
async fn run_pose(
_db: &PostgresDb,
_redis: &RedisClient,
video_path: &str,
_cancel_rx: &mut mpsc::Receiver<()>,
) -> Result<serde_json::Value> {
let script_path = std::env::var("MOMENTRY_POSE_SCRIPT")
.unwrap_or_else(|_| format!("{}/pose_processor.py", SCRIPTS_DIR.as_str()));
let output = tokio::process::Command::new(PYTHON_PATH.as_str())
.arg(&script_path)
.arg(video_path)
.output()
.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("Pose script failed: {}", stderr);
}
let result: serde_json::Value = serde_json::from_slice(&output.stdout)?;
Ok(result)
}
#[allow(dead_code)]
async fn run_asrx(
_db: &PostgresDb,
_redis: &RedisClient,
video_path: &str,
_cancel_rx: &mut mpsc::Receiver<()>,
) -> Result<serde_json::Value> {
let script_path = std::env::var("MOMENTRY_ASRX_SCRIPT")
.unwrap_or_else(|_| format!("{}/asrx_processor_custom.py", SCRIPTS_DIR.as_str()));
let output = tokio::process::Command::new(PYTHON_PATH.as_str())
.arg(&script_path)
.arg(video_path)
.output()
.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("ASRX script failed: {}", stderr);
}
let result: serde_json::Value = serde_json::from_slice(&output.stdout)?;
Ok(result)
}
pub async fn store_asr_chunks(
db: &PostgresDb,
uuid: &str,
@@ -726,14 +653,7 @@ impl ProcessorPool {
"timestamp": frame.timestamp,
});
// We could potentially parse identity_id if it's already matched, but for raw ingestion it's None.
pre_chunks_to_store.push((
frame.frame as i64,
Some(frame.timestamp),
data,
None, // identity_id
None, // confidence
));
pre_chunks_to_store.push((frame.frame as i64, Some(frame.timestamp), data, None, None));
}
db.store_raw_pre_chunks_batch(uuid, "face", &pre_chunks_to_store)
@@ -741,6 +661,118 @@ impl ProcessorPool {
Ok(())
}
/// 將 face embeddings 寫入 Qdrant momentry_dev_face collection
pub async fn store_face_embeddings_to_qdrant(
uuid: &str,
face_result: &FaceResult,
) -> Result<()> {
let qdrant = QdrantDb::new();
let collection = format!(
"{}{}",
crate::core::config::REDIS_KEY_PREFIX
.as_str()
.trim_end_matches(':'),
"_face"
);
let mut count = 0;
for frame in &face_result.frames {
for face in &frame.faces {
if let Some(embedding) = &face.embedding {
if embedding.len() != 512 {
continue;
}
// 使用 hash 作為 Qdrant point ID需要 unsigned integer
// 使用 frame number 作為 Qdrant point IDu64
let point_id = frame.frame as u64;
let payload = serde_json::json!({
"file_uuid": uuid,
"face_id": face.face_id,
"frame": frame.frame,
"timestamp": frame.timestamp,
"x": face.x,
"y": face.y,
"width": face.width,
"height": face.height,
"confidence": face.confidence,
});
if let Err(e) = qdrant
.upsert_vector_to_collection(
&collection,
point_id,
embedding,
Some(payload),
)
.await
{
tracing::error!("Failed to upsert face vector {}: {}", point_id, e);
} else {
count += 1;
}
}
}
}
if count > 0 {
tracing::info!("Stored {} face embeddings to Qdrant for {}", count, uuid);
}
Ok(())
}
/// 將 voice embeddings 寫入 Qdrant momentry_dev_voice collection
pub async fn store_voice_embeddings_to_qdrant(
uuid: &str,
asrx_result: &AsrxResult,
) -> Result<()> {
let qdrant = QdrantDb::new();
let collection = format!(
"{}{}",
crate::core::config::REDIS_KEY_PREFIX
.as_str()
.trim_end_matches(':'),
"_voice"
);
let embeddings = match &asrx_result.embeddings {
Some(e) => e,
None => return Ok(()),
};
let mut count = 0;
for (i, segment) in asrx_result.segments.iter().enumerate() {
if let Some(emb) = embeddings.get(i) {
if emb.len() != 192 {
continue;
}
let payload = serde_json::json!({
"file_uuid": uuid,
"speaker_id": segment.speaker_id,
"segment_index": i,
"start_frame": segment.start_frame,
"end_frame": segment.end_frame,
"start_time": segment.start_time,
"end_time": segment.end_time,
});
if let Err(e) = qdrant
.upsert_vector_to_collection(&collection, i as u64, emb, Some(payload))
.await
{
tracing::error!("Failed to upsert voice vector {}: {}", i, e);
} else {
count += 1;
}
}
}
if count > 0 {
tracing::info!("Stored {} voice embeddings to Qdrant for {}", count, uuid);
}
Ok(())
}
pub async fn store_pose_chunks(
db: &PostgresDb,
uuid: &str,
@@ -787,12 +819,11 @@ impl ProcessorPool {
let data = serde_json::json!({
"text": segment.text,
"speaker_id": segment.speaker_id,
"timestamp": segment.start,
"timestamp": segment.start_time,
});
// ASRX is time-based, so we use segment index or start time as coordinate.
// Let's use index for simplicity in pre_chunks, or start time.
pre_chunks_to_store.push((i as i64, Some(segment.start), data, None, None));
pre_chunks_to_store.push((i as i64, Some(segment.start_time), data, None, None));
}
db.store_raw_pre_chunks_batch(uuid, "asrx", &pre_chunks_to_store)

279
src/worker/resources.rs Normal file
View File

@@ -0,0 +1,279 @@
use tracing::{info, warn};
pub struct SystemResources {
pub cpu_idle_percent: f64,
pub cpu_used_percent: f64,
pub memory_available_mb: u64,
pub memory_total_mb: u64,
pub memory_used_percent: f64,
pub gpu_available: bool,
pub gpu_utilization: Option<f64>,
pub gpu_memory_used_pct: Option<f64>,
}
impl SystemResources {
pub fn check() -> Self {
let cpu_idle = Self::get_cpu_idle();
let (mem_available, mem_total) = Self::get_memory_info();
let mem_used_pct = if mem_total > 0 && mem_available <= mem_total {
((mem_total - mem_available) as f64 / mem_total as f64) * 100.0
} else if mem_total > 0 {
100.0
} else {
0.0
};
let (gpu_avail, gpu_util, gpu_mem) = Self::get_gpu_info();
Self {
cpu_idle_percent: cpu_idle,
cpu_used_percent: 100.0 - cpu_idle,
memory_available_mb: mem_available,
memory_total_mb: mem_total,
memory_used_percent: mem_used_pct,
gpu_available: gpu_avail,
gpu_utilization: gpu_util,
gpu_memory_used_pct: gpu_mem,
}
}
pub fn safe_max_concurrent(&self, config_max: usize) -> usize {
let mut limit = config_max;
// 記憶體限制
if self.memory_available_mb < 1000 {
limit = limit.min(1);
warn!(
"Low memory ({}MB available), reducing concurrency to 1",
self.memory_available_mb
);
} else if self.memory_available_mb < 2000 {
limit = limit.min(2);
info!(
"Moderate memory ({}MB available), limiting concurrency to 2",
self.memory_available_mb
);
} else if self.memory_available_mb < 4000 {
limit = limit.min(3);
info!(
"Adequate memory ({}MB available), limiting concurrency to 3",
self.memory_available_mb
);
} else if self.memory_available_mb < 8000 {
limit = limit.min(4);
info!(
"Good memory ({}MB available), limiting concurrency to 4",
self.memory_available_mb
);
}
// CPU 限制
if self.cpu_idle_percent < 15.0 {
limit = limit.min(1);
warn!(
"High CPU load (idle={:.1}%), reducing concurrency to 1",
self.cpu_idle_percent
);
} else if self.cpu_idle_percent < 30.0 {
limit = limit.min(2);
info!(
"Moderate CPU load (idle={:.1}%), limiting concurrency to 2",
self.cpu_idle_percent
);
}
// GPU 限制:利用率 > 80% 或記憶體 > 90% 時降並發
if let Some(util) = self.gpu_utilization {
if util > 80.0 {
limit = limit.min(1);
warn!(
"High GPU utilization ({:.1}%), reducing concurrency to 1",
util
);
} else if util > 60.0 {
limit = limit.min(2);
info!(
"Moderate GPU utilization ({:.1}%), limiting concurrency to 2",
util
);
}
}
if let Some(mem) = self.gpu_memory_used_pct {
if mem > 90.0 {
limit = limit.min(1);
warn!(
"High GPU memory usage ({:.1}%), reducing concurrency to 1",
mem
);
}
}
limit.max(1)
}
fn get_cpu_idle() -> f64 {
use std::process::Command;
let output = Command::new("top").args(["-l", "1", "-n", "1"]).output();
match output {
Ok(o) => {
let s = String::from_utf8_lossy(&o.stdout);
if let Some(line) = s.lines().find(|l| l.contains("idle")) {
if let Some(pct) = line
.split_whitespace()
.find_map(|s| s.strip_suffix("%idle"))
{
pct.trim().parse().ok().unwrap_or(50.0)
} else {
50.0
}
} else {
50.0
}
}
Err(_) => 50.0,
}
}
fn get_memory_info() -> (u64, u64) {
use std::process::Command;
// 總記憶體
let total = Command::new("sysctl")
.args(["hw.memsize"])
.output()
.ok()
.and_then(|o| {
let s = String::from_utf8_lossy(&o.stdout);
s.split_whitespace()
.nth(1)
.and_then(|v| v.parse::<u64>().ok())
})
.unwrap_or(0)
/ 1024
/ 1024;
// 用 memory_pressure 取得真實可用記憶體
// macOS 的可用記憶體 = free + inactive + speculative
let available = Command::new("memory_pressure")
.output()
.ok()
.and_then(|o| {
let s = String::from_utf8_lossy(&o.stdout);
// 從 "System-wide memory free percentage: XX%" 這行解析
for line in s.lines() {
if line.contains("free percentage") {
if let Some(pct_str) = line.split(':').nth(1) {
let pct = pct_str.trim().trim_end_matches('%').parse::<f64>().ok()?;
return Some((total as f64 * pct / 100.0) as u64);
}
}
}
None
})
.unwrap_or_else(|| {
// fallback: vm_stat
Command::new("vm_stat")
.output()
.ok()
.and_then(|v| {
let vs = String::from_utf8_lossy(&v.stdout);
let mut free_pages: u64 = 0;
let mut inactive_pages: u64 = 0;
let mut speculative_pages: u64 = 0;
for line in vs.lines() {
if line.contains("Pages free:") {
free_pages = line
.split_whitespace()
.last()
.and_then(|v| v.trim_end_matches('.').parse().ok())
.unwrap_or(0);
} else if line.contains("Pages inactive:") {
inactive_pages = line
.split_whitespace()
.last()
.and_then(|v| v.trim_end_matches('.').parse().ok())
.unwrap_or(0);
} else if line.contains("Pages speculative:") {
speculative_pages = line
.split_whitespace()
.last()
.and_then(|v| v.trim_end_matches('.').parse().ok())
.unwrap_or(0);
}
}
Some(
(free_pages + inactive_pages + speculative_pages) * 16384 / 1024 / 1024,
)
})
.unwrap_or(total / 4)
});
(available, total)
}
fn get_gpu_info() -> (bool, Option<f64>, Option<f64>) {
use std::process::Command;
// Apple Silicon (MPS) — 用 ioreg 取 GPU 利用率
let ioreg = Command::new("ioreg")
.args(["-r", "-c", "AppleM2ScalerCSCDriver"])
.output();
if let Ok(o) = ioreg {
let s = String::from_utf8_lossy(&o.stdout);
if s.contains("PerformanceStatistics") {
let util = s.lines().find_map(|l| {
if l.contains("GPU Utilization") {
l.split('=').nth(1).and_then(|v| {
v.trim()
.trim_matches('"')
.trim_end_matches('}')
.parse::<f64>()
.ok()
})
} else {
None
}
});
let mem = s.lines().find_map(|l| {
if l.contains("GPU Memory Utilization") {
l.split('=').nth(1).and_then(|v| {
v.trim()
.trim_matches('"')
.trim_end_matches('}')
.parse::<f64>()
.ok()
})
} else {
None
}
});
return (true, util, mem);
}
}
// NVIDIA GPU via nvidia-smi
let nvidia = Command::new("nvidia-smi")
.args([
"--query-gpu=utilization.gpu,memory.used,memory.total",
"--format=csv,noheader,nounits",
])
.output();
if let Ok(o) = nvidia {
if o.status.success() {
let s = String::from_utf8_lossy(&o.stdout);
let parts: Vec<&str> = s.trim().split(',').collect();
if parts.len() >= 3 {
let util = parts[0].trim().parse::<f64>().ok();
let mem_used = parts[1].trim().parse::<f64>().ok();
let mem_total = parts[2].trim().parse::<f64>().ok();
let mem_pct = match (mem_used, mem_total) {
(Some(u), Some(t)) if t > 0.0 => Some(u / t * 100.0),
_ => None,
};
return (true, util, mem_pct);
}
}
}
(false, None, None)
}
}