use anyhow::{Context, Result}; use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::{mpsc, RwLock}; use tracing::{error, info}; use crate::core::chunk::types::{Chunk, ChunkRule, ChunkType}; use crate::core::config::{OUTPUT_DIR, PYTHON_PATH, SCRIPTS_DIR}; use crate::core::db::RedisClient; use crate::core::db::{MonitorJob, PostgresDb, ProcessorJobStatus, ProcessorType}; use crate::core::processor; use crate::core::processor::asr::AsrResult; use crate::core::processor::asrx::AsrxResult; use crate::core::processor::cut::CutResult; use crate::core::processor::face::FaceResult; use crate::core::processor::ocr::OcrResult; use crate::core::processor::pose::PoseResult; use crate::core::processor::visual_chunk::VisualChunkResult; use crate::core::processor::yolo::YoloResult; #[derive(Debug, Clone)] pub struct ProcessorTask { pub job: MonitorJob, pub processor_type: ProcessorType, pub processor_result_id: i32, } pub struct ProcessorPool { db: Arc, redis: Arc, max_concurrent: usize, running: Arc>>, running_count: Arc>, } struct ProcessorHandle { #[allow(dead_code)] processor_type: ProcessorType, cancel_tx: mpsc::Sender<()>, } impl ProcessorPool { pub fn new(db: Arc, redis: Arc, max_concurrent: usize) -> Self { Self { db, redis, max_concurrent, running: Arc::new(RwLock::new(HashMap::new())), running_count: Arc::new(RwLock::new(0)), } } pub async fn can_start(&self) -> bool { let count = *self.running_count.read().await; count < self.max_concurrent } pub async fn start_processor(&self, task: ProcessorTask) -> Result<()> { let (cancel_tx, cancel_rx) = mpsc::channel(1); let job_id = task.job.id; let processor_type = task.processor_type; { let mut count = self.running_count.write().await; if *count >= self.max_concurrent { anyhow::bail!("Max concurrent processors reached"); } *count += 1; } let running = self.running.clone(); let running_count = self.running_count.clone(); running.write().await.insert( job_id, ProcessorHandle { processor_type, cancel_tx, }, ); let db = self.db.clone(); let redis = self.redis.clone(); let job = task.job.clone(); let processor_result_id = task.processor_result_id; let processor_name = processor_type.as_str().to_string(); tokio::spawn(async move { info!("Starting processor {} for job {}", processor_name, job.uuid); let _ = db .update_processor_result( processor_result_id, ProcessorJobStatus::Running, None, None, ) .await; let _ = redis .update_worker_processor_status(&job.uuid, &processor_name, "running", None) .await; let result = Self::run_processor(&db, &redis, &job, processor_type, cancel_rx).await; { let mut running_guard = running.write().await; running_guard.remove(&job_id); let mut count_guard = running_count.write().await; *count_guard -= 1; } match result { Ok(output) => { info!( "Processor {} completed for job {}", processor_name, job.uuid ); if let Err(e) = db .update_processor_result( processor_result_id, ProcessorJobStatus::Completed, None, Some(&output), ) .await { error!("Failed to update processor result to completed: {}", e); } if let Err(e) = redis .update_worker_processor_status( &job.uuid, &processor_name, "completed", None, ) .await { error!("Failed to update Redis processor status: {}", e); } } Err(e) => { error!( "Processor {} failed for job {}: {}", processor_name, job.uuid, e ); if let Err(db_err) = db .update_processor_result( processor_result_id, ProcessorJobStatus::Failed, Some(&e.to_string()), None, ) .await { error!("Failed to update processor result to failed: {}", db_err); } if let Err(redis_err) = redis .update_worker_processor_status( &job.uuid, &processor_name, "failed", Some(&e.to_string()), ) .await { error!("Failed to update Redis processor status: {}", redis_err); } } } }); Ok(()) } async fn run_processor( db: &PostgresDb, _redis: &RedisClient, job: &MonitorJob, processor_type: ProcessorType, _cancel_rx: mpsc::Receiver<()>, ) -> Result { let video_path = job.video_path.as_ref().context("No video path in job")?; // Generate output path let output_dir = PathBuf::from(OUTPUT_DIR.as_str()); let output_path = output_dir.join(format!( "job_{}_{}_{}.json", job.id, processor_type.as_str(), chrono::Utc::now().timestamp_millis() )); // Ensure output directory exists if let Some(parent) = output_path.parent() { tokio::fs::create_dir_all(parent).await?; } let uuid = Some(job.uuid.as_str()); match processor_type { ProcessorType::Asr => { let result = processor::process_asr(video_path, output_path.to_str().unwrap(), uuid).await?; // Store ASR chunks in database tracing::info!( "ASR completed, storing {} segments for {}", result.segments.len(), job.uuid ); if let Err(e) = Self::store_asr_chunks(db, &job.uuid, &result).await { tracing::error!("Failed to store ASR chunks for {}: {}", job.uuid, e); } Ok(serde_json::to_value(result)?) } ProcessorType::Cut => { let result = processor::process_cut(video_path, output_path.to_str().unwrap(), uuid).await?; // Store CUT chunks in database tracing::info!( "CUT completed, storing {} scenes for {}", result.scenes.len(), job.uuid ); if let Err(e) = Self::store_cut_chunks(db, &job.uuid, &result).await { tracing::error!("Failed to store CUT chunks for {}: {}", job.uuid, e); } Ok(serde_json::to_value(result)?) } ProcessorType::Yolo => { let result = processor::process_yolo(video_path, output_path.to_str().unwrap(), uuid) .await?; // Store YOLO chunks in database tracing::info!( "YOLO completed, storing {} frames for {}", result.frames.len(), job.uuid ); if let Err(e) = Self::store_yolo_chunks(db, &job.uuid, &result).await { tracing::error!("Failed to store YOLO chunks for {}: {}", job.uuid, e); } Ok(serde_json::to_value(result)?) } ProcessorType::Ocr => { let result = processor::process_ocr(video_path, output_path.to_str().unwrap(), uuid).await?; // Store OCR chunks in database tracing::info!( "OCR completed, storing {} frames for {}", result.frames.len(), job.uuid ); if let Err(e) = Self::store_ocr_chunks(db, &job.uuid, &result).await { tracing::error!("Failed to store OCR chunks for {}: {}", job.uuid, e); } Ok(serde_json::to_value(result)?) } ProcessorType::Face => { let result = processor::process_face(video_path, output_path.to_str().unwrap(), uuid) .await?; // Store FACE chunks in database tracing::info!( "FACE completed, storing {} frames for {}", result.frames.len(), job.uuid ); if let Err(e) = Self::store_face_chunks(db, &job.uuid, &result).await { tracing::error!("Failed to store FACE chunks for {}: {}", job.uuid, e); } Ok(serde_json::to_value(result)?) } ProcessorType::Pose => { let result = processor::process_pose(video_path, output_path.to_str().unwrap(), uuid) .await?; // Store POSE chunks in database tracing::info!( "POSE completed, storing {} frames for {}", result.frames.len(), job.uuid ); if let Err(e) = Self::store_pose_chunks(db, &job.uuid, &result).await { tracing::error!("Failed to store POSE chunks for {}: {}", job.uuid, e); } Ok(serde_json::to_value(result)?) } ProcessorType::Asrx => { let result = processor::process_asrx(video_path, output_path.to_str().unwrap(), uuid) .await?; // Store ASRX chunks in database tracing::info!( "ASRX completed, storing {} segments for {}", result.segments.len(), job.uuid ); if let Err(e) = Self::store_asrx_chunks(db, &job.uuid, &result).await { tracing::error!("Failed to store ASRX chunks for {}: {}", job.uuid, e); } Ok(serde_json::to_value(result)?) } ProcessorType::VisualChunk => { let result = processor::process_visual_chunk_advanced( video_path, output_path.to_str().unwrap(), uuid, ) .await?; // Store VisualChunk chunks in database tracing::info!( "VisualChunk completed, storing {} chunks for {}", result.chunk_count, job.uuid ); if let Err(e) = Self::store_visual_chunk_chunks(db, &job.uuid, &result).await { tracing::error!("Failed to store VisualChunk chunks for {}: {}", job.uuid, e); } Ok(serde_json::to_value(result)?) } } } #[allow(dead_code)] async fn run_asr( _db: &PostgresDb, _redis: &RedisClient, video_path: &str, _cancel_rx: &mut mpsc::Receiver<()>, ) -> Result { let script_path = std::env::var("MOMENTRY_ASR_SCRIPT") .unwrap_or_else(|_| format!("{}/asr_processor.py", SCRIPTS_DIR.as_str())); let output = tokio::process::Command::new(PYTHON_PATH.as_str()) .arg(&script_path) .arg(video_path) .output() .await?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); anyhow::bail!("ASR script failed: {}", stderr); } let result: serde_json::Value = serde_json::from_slice(&output.stdout)?; Ok(result) } #[allow(dead_code)] async fn run_cut( _db: &PostgresDb, _redis: &RedisClient, video_path: &str, _cancel_rx: &mut mpsc::Receiver<()>, ) -> Result { let script_path = std::env::var("MOMENTRY_CUT_SCRIPT") .unwrap_or_else(|_| format!("{}/cut_processor.py", SCRIPTS_DIR.as_str())); let output = tokio::process::Command::new(PYTHON_PATH.as_str()) .arg(&script_path) .arg(video_path) .output() .await?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); anyhow::bail!("CUT script failed: {}", stderr); } let result: serde_json::Value = serde_json::from_slice(&output.stdout)?; Ok(result) } #[allow(dead_code)] async fn run_yolo( _db: &PostgresDb, _redis: &RedisClient, video_path: &str, _cancel_rx: &mut mpsc::Receiver<()>, ) -> Result { let script_path = std::env::var("MOMENTRY_YOLO_SCRIPT") .unwrap_or_else(|_| format!("{}/yolo_processor.py", SCRIPTS_DIR.as_str())); let output = tokio::process::Command::new(PYTHON_PATH.as_str()) .arg(&script_path) .arg(video_path) .output() .await?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); anyhow::bail!("YOLO script failed: {}", stderr); } let result: serde_json::Value = serde_json::from_slice(&output.stdout)?; Ok(result) } #[allow(dead_code)] async fn run_ocr( _db: &PostgresDb, _redis: &RedisClient, video_path: &str, _cancel_rx: &mut mpsc::Receiver<()>, ) -> Result { let script_path = std::env::var("MOMENTRY_OCR_SCRIPT") .unwrap_or_else(|_| format!("{}/ocr_processor.py", SCRIPTS_DIR.as_str())); let output = tokio::process::Command::new(PYTHON_PATH.as_str()) .arg(&script_path) .arg(video_path) .output() .await?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); anyhow::bail!("OCR script failed: {}", stderr); } let result: serde_json::Value = serde_json::from_slice(&output.stdout)?; Ok(result) } #[allow(dead_code)] async fn run_face( _db: &PostgresDb, _redis: &RedisClient, video_path: &str, _cancel_rx: &mut mpsc::Receiver<()>, ) -> Result { let script_path = std::env::var("MOMENTRY_FACE_SCRIPT") .unwrap_or_else(|_| format!("{}/face_processor.py", SCRIPTS_DIR.as_str())); let output = tokio::process::Command::new(PYTHON_PATH.as_str()) .arg(&script_path) .arg(video_path) .output() .await?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); anyhow::bail!("Face script failed: {}", stderr); } let result: serde_json::Value = serde_json::from_slice(&output.stdout)?; Ok(result) } #[allow(dead_code)] async fn run_pose( _db: &PostgresDb, _redis: &RedisClient, video_path: &str, _cancel_rx: &mut mpsc::Receiver<()>, ) -> Result { let script_path = std::env::var("MOMENTRY_POSE_SCRIPT") .unwrap_or_else(|_| format!("{}/pose_processor.py", SCRIPTS_DIR.as_str())); let output = tokio::process::Command::new(PYTHON_PATH.as_str()) .arg(&script_path) .arg(video_path) .output() .await?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); anyhow::bail!("Pose script failed: {}", stderr); } let result: serde_json::Value = serde_json::from_slice(&output.stdout)?; Ok(result) } #[allow(dead_code)] async fn run_asrx( _db: &PostgresDb, _redis: &RedisClient, video_path: &str, _cancel_rx: &mut mpsc::Receiver<()>, ) -> Result { let script_path = std::env::var("MOMENTRY_ASRX_SCRIPT") .unwrap_or_else(|_| format!("{}/asrx_processor.py", SCRIPTS_DIR.as_str())); let output = tokio::process::Command::new(PYTHON_PATH.as_str()) .arg(&script_path) .arg(video_path) .output() .await?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); anyhow::bail!("ASRX script failed: {}", stderr); } let result: serde_json::Value = serde_json::from_slice(&output.stdout)?; Ok(result) } pub async fn store_asr_chunks( db: &PostgresDb, uuid: &str, asr_result: &AsrResult, ) -> Result<()> { // Get video record to obtain file_id and fps let video = db .get_video_by_uuid(uuid) .await? .ok_or_else(|| anyhow::anyhow!("Video not found for uuid: {}", uuid))?; let file_id = video.id; let fps = if video.fps > 0.0 { video.fps } else { 30.0 }; for (i, segment) in asr_result.segments.iter().enumerate() { let chunk = Chunk::from_seconds( file_id as i32, uuid.to_string(), i as u32, ChunkType::Sentence, ChunkRule::Rule1, segment.start, segment.end, fps, serde_json::json!({ "text": segment.text, "text_normalized": segment.text.to_lowercase(), }), ) .with_metadata(serde_json::json!({ "language": asr_result.language, "language_probability": asr_result.language_probability, })); match db.store_chunk(&chunk).await { Ok(_) => { tracing::info!("Stored ASR chunk {} for video {}", i, uuid); } Err(e) => { tracing::error!("Failed to store ASR chunk {}: {}", i, e); } } } Ok(()) } pub async fn store_cut_chunks( db: &PostgresDb, uuid: &str, cut_result: &CutResult, ) -> Result<()> { // Get video record to obtain file_id and fps let video = db .get_video_by_uuid(uuid) .await? .ok_or_else(|| anyhow::anyhow!("Video not found for uuid: {}", uuid))?; let file_id = video.id; let fps = if video.fps > 0.0 { video.fps } else { 30.0 }; for (i, scene) in cut_result.scenes.iter().enumerate() { let chunk = Chunk::from_seconds( file_id as i32, uuid.to_string(), i as u32, ChunkType::Cut, ChunkRule::Rule1, scene.start_time, scene.end_time, fps, serde_json::json!({ "scene_number": scene.scene_number, "start_frame": scene.start_frame, "end_frame": scene.end_frame, }), ); match db.store_chunk(&chunk).await { Ok(_) => { tracing::info!("Stored CUT chunk {} for video {}", i, uuid); } Err(e) => { tracing::error!("Failed to store CUT chunk {}: {}", i, e); } } } Ok(()) } pub async fn store_yolo_chunks( db: &PostgresDb, uuid: &str, yolo_result: &YoloResult, ) -> Result<()> { // Get video record to obtain file_id and fps let video = match db.get_video_by_uuid(uuid).await { Ok(Some(video)) => video, Ok(None) => { tracing::error!("Video not found for uuid: {}", uuid); return Ok(()); } Err(e) => { tracing::error!("Failed to get video for uuid {}: {}", uuid, e); return Ok(()); } }; let file_id = video.id; let fps = if video.fps > 0.0 { video.fps } else { 30.0 }; for (i, frame) in yolo_result.frames.iter().enumerate() { let mut chunk = Chunk::new( file_id as i32, uuid.to_string(), i as u32, ChunkType::Trace, ChunkRule::Rule1, frame.frame as i64, frame.frame as i64 + 1, fps, serde_json::json!({ "objects": frame.objects, "timestamp": frame.timestamp, }), ); // Override chunk_id to include processor prefix for uniqueness chunk.chunk_id = format!("trace_yolo_{:04}", i); // Populate text_content for BM25 search let object_names: Vec = frame.objects.iter().map(|o| o.class_name.clone()).collect(); if !object_names.is_empty() { chunk = chunk.with_text_content(object_names.join(" ")); } match db.store_chunk(&chunk).await { Ok(_) => { tracing::info!( "Stored YOLO chunk {} (frame {}) for video {}", i, frame.frame, uuid ); } Err(e) => { tracing::error!("Failed to store YOLO chunk {}: {}", i, e); } } } Ok(()) } pub async fn store_ocr_chunks( db: &PostgresDb, uuid: &str, ocr_result: &OcrResult, ) -> Result<()> { // Get video record to obtain file_id and fps let video = match db.get_video_by_uuid(uuid).await { Ok(Some(video)) => video, Ok(None) => { tracing::error!("Video not found for uuid: {}", uuid); return Ok(()); } Err(e) => { tracing::error!("Failed to get video for uuid {}: {}", uuid, e); return Ok(()); } }; let file_id = video.id; let fps = if video.fps > 0.0 { video.fps } else { 30.0 }; for (i, frame) in ocr_result.frames.iter().enumerate() { let mut chunk = Chunk::new( file_id as i32, uuid.to_string(), i as u32, ChunkType::Trace, ChunkRule::Rule1, frame.frame as i64, frame.frame as i64 + 1, fps, serde_json::json!({ "texts": frame.texts, "timestamp": frame.timestamp, }), ); // Override chunk_id to include processor prefix for uniqueness chunk.chunk_id = format!("trace_ocr_{:04}", i); // Populate text_content for BM25 search let texts: Vec = frame.texts.iter().map(|t| t.text.clone()).collect(); if !texts.is_empty() { chunk = chunk.with_text_content(texts.join(" ")); } match db.store_chunk(&chunk).await { Ok(_) => { tracing::info!( "Stored OCR chunk {} (frame {}) for video {}", i, frame.frame, uuid ); } Err(e) => { tracing::error!("Failed to store OCR chunk {}: {}", i, e); } } } Ok(()) } pub async fn store_face_chunks( db: &PostgresDb, uuid: &str, face_result: &FaceResult, ) -> Result<()> { // Get video record to obtain file_id and fps let video = match db.get_video_by_uuid(uuid).await { Ok(Some(video)) => video, Ok(None) => { tracing::error!("Video not found for uuid: {}", uuid); return Ok(()); } Err(e) => { tracing::error!("Failed to get video for uuid {}: {}", uuid, e); return Ok(()); } }; let file_id = video.id; let fps = if video.fps > 0.0 { video.fps } else { 30.0 }; for (i, frame) in face_result.frames.iter().enumerate() { let mut chunk = Chunk::new( file_id as i32, uuid.to_string(), i as u32, ChunkType::Trace, ChunkRule::Rule1, frame.frame as i64, frame.frame as i64 + 1, fps, serde_json::json!({ "faces": frame.faces, "timestamp": frame.timestamp, }), ); // Override chunk_id to include processor prefix for uniqueness chunk.chunk_id = format!("trace_face_{:04}", i); // Populate text_content for BM25 search (face IDs) let face_ids: Vec = frame .faces .iter() .filter_map(|f| f.face_id.clone()) .collect(); if !face_ids.is_empty() { chunk = chunk.with_text_content(face_ids.join(" ")); } match db.store_chunk(&chunk).await { Ok(_) => { tracing::info!( "Stored FACE chunk {} (frame {}) for video {}", i, frame.frame, uuid ); } Err(e) => { tracing::error!("Failed to store FACE chunk {}: {}", i, e); } } } Ok(()) } pub async fn store_pose_chunks( db: &PostgresDb, uuid: &str, pose_result: &PoseResult, ) -> Result<()> { // Get video record to obtain file_id and fps let video = match db.get_video_by_uuid(uuid).await { Ok(Some(video)) => video, Ok(None) => { tracing::error!("Video not found for uuid: {}", uuid); return Ok(()); } Err(e) => { tracing::error!("Failed to get video for uuid {}: {}", uuid, e); return Ok(()); } }; let file_id = video.id; let fps = if video.fps > 0.0 { video.fps } else { 30.0 }; for (i, frame) in pose_result.frames.iter().enumerate() { let mut chunk = Chunk::new( file_id as i32, uuid.to_string(), i as u32, ChunkType::Trace, ChunkRule::Rule1, frame.frame as i64, frame.frame as i64 + 1, fps, serde_json::json!({ "persons": frame.persons, "timestamp": frame.timestamp, }), ); // Override chunk_id to include processor prefix for uniqueness chunk.chunk_id = format!("trace_pose_{:04}", i); // Populate text_content for BM25 search (person count indicator) let person_count = frame.persons.len(); if person_count > 0 { let text = format!("person person person") .repeat(person_count.min(10)) .trim() .to_string(); chunk = chunk.with_text_content(text); } match db.store_chunk(&chunk).await { Ok(_) => { tracing::info!( "Stored POSE chunk {} (frame {}) for video {}", i, frame.frame, uuid ); } Err(e) => { tracing::error!("Failed to store POSE chunk {}: {}", i, e); } } } Ok(()) } pub async fn store_asrx_chunks( db: &PostgresDb, uuid: &str, asrx_result: &AsrxResult, ) -> Result<()> { // Get video record to obtain file_id and fps let video = match db.get_video_by_uuid(uuid).await { Ok(Some(video)) => video, Ok(None) => { tracing::error!("Video not found for uuid: {}", uuid); return Ok(()); } Err(e) => { tracing::error!("Failed to get video for uuid {}: {}", uuid, e); return Ok(()); } }; let file_id = video.id; let fps = if video.fps > 0.0 { video.fps } else { 30.0 }; for (i, segment) in asrx_result.segments.iter().enumerate() { let mut chunk = Chunk::from_seconds( file_id as i32, uuid.to_string(), i as u32, ChunkType::Trace, ChunkRule::Rule1, segment.start, segment.end, fps, serde_json::json!({ "text": segment.text, "timestamp": segment.start, }), ); // Override chunk_id to include processor prefix for uniqueness chunk.chunk_id = format!("trace_asrx_{:04}", i); // Populate text_content for BM25 search (already has text) chunk = chunk.with_text_content(segment.text.clone()); // Also store speaker_id in content chunk.content = serde_json::json!({ "text": segment.text, "speaker_id": segment.speaker_id, "timestamp": segment.start, }); match db.store_chunk(&chunk).await { Ok(_) => { tracing::info!("Stored ASRX chunk {} for video {}", i, uuid); } Err(e) => { tracing::error!("Failed to store ASRX chunk {}: {}", i, e); } } } Ok(()) } pub async fn store_visual_chunk_chunks( db: &PostgresDb, uuid: &str, visual_chunk_result: &VisualChunkResult, ) -> Result<()> { for (i, chunk) in visual_chunk_result.chunks.iter().enumerate() { match db.store_chunk(chunk).await { Ok(_) => { tracing::info!("Stored VisualChunk chunk {} for video {}", i, uuid); } Err(e) => { tracing::error!("Failed to store VisualChunk chunk {}: {}", i, e); } } } Ok(()) } pub async fn get_running_count(&self) -> usize { *self.running_count.read().await } pub async fn cancel_all(&self) { let mut running = self.running.write().await; for (_, handle) in running.drain() { let _ = handle.cancel_tx.send(()).await; } let mut count = self.running_count.write().await; *count = 0; } }