Files
momentry_core/src/worker/processor.rs

1232 lines
45 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use anyhow::{Context, Result};
use libc;
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use tracing::{error, info, warn};
/// Guard that ensures processor pool cleanup runs even if the task panics.
struct ProcessorCleanupGuard {
job_id: i32,
running: Arc<RwLock<HashMap<i32, ProcessorHandle>>>,
running_count: Arc<RwLock<usize>>,
}
impl Drop for ProcessorCleanupGuard {
fn drop(&mut self) {
use tokio::sync::TryLockError;
// 嘗試同步清理;若 lock 被佔用則跳過(避免 deadlock
if let Ok(mut guard) = self.running.try_write() {
guard.remove(&self.job_id);
} else {
warn!("[ProcessorCleanupGuard] running lock contended, skipping cleanup");
}
if let Ok(mut guard) = self.running_count.try_write() {
if *guard > 0 {
*guard -= 1;
}
} else {
warn!("[ProcessorCleanupGuard] running_count lock contended, skipping cleanup");
}
}
}
use crate::core::config::{OUTPUT_DIR, PYTHON_PATH, SCRIPTS_DIR};
use crate::core::db::{
MonitorJob, PostgresDb, ProcessorJobStatus, ProcessorType, QdrantDb, RedisClient,
};
use crate::core::processor;
use crate::core::processor::asr::AsrResult;
use crate::core::processor::asrx::AsrxResult;
use crate::core::processor::cut::CutResult;
use crate::core::processor::face::FaceResult;
use crate::core::processor::ocr::OcrResult;
use crate::core::processor::pose::PoseResult;
use crate::core::processor::scene_classification::SceneClassificationResult;
use crate::core::processor::visual_chunk::VisualChunkResult;
use crate::core::processor::yolo::YoloResult;
use crate::worker::resources::SystemResources;
#[derive(Debug)]
struct ProcessorOutput {
data: serde_json::Value,
chunks_produced: i32,
frames_processed: i32,
total_frames: i32,
retry_count: i32,
pid: i32,
}
#[derive(Debug, Clone)]
pub struct ProcessorTask {
pub job: MonitorJob,
pub processor_type: ProcessorType,
pub processor_result_id: i32,
pub frame_dir: Option<String>,
}
pub struct ProcessorPool {
db: Arc<PostgresDb>,
redis: Arc<RedisClient>,
config_max: usize,
running: Arc<RwLock<HashMap<i32, ProcessorHandle>>>,
running_count: Arc<RwLock<usize>>,
}
struct ProcessorHandle {
#[allow(dead_code)]
processor_type: ProcessorType,
cancel_tx: mpsc::Sender<()>,
child_pid: Arc<RwLock<Option<i32>>>,
}
impl ProcessorPool {
pub fn new(db: Arc<PostgresDb>, redis: Arc<RedisClient>, max_concurrent: usize) -> Self {
Self {
db,
redis,
config_max: max_concurrent,
running: Arc::new(RwLock::new(HashMap::new())),
running_count: Arc::new(RwLock::new(0)),
}
}
/// 根據系統資源計算當前安全的並發上限
pub async fn current_max(&self) -> usize {
let resources = SystemResources::check();
resources.safe_max_concurrent(self.config_max).max(1)
}
pub async fn can_start(&self) -> bool {
let count = *self.running_count.read().await;
let max = self.current_max().await;
count < max
}
/// 清理 stale running state若系統中實際運行的 processor 比記錄少,修正 count
pub async fn sweep_stale(&self) {
let handle_count = self.running.read().await.len();
let count = *self.running_count.read().await;
if handle_count != count {
warn!(
"[ProcessorPool] Stale count detected: handles={}, count={}, fixing",
handle_count, count
);
let mut c = self.running_count.write().await;
*c = handle_count;
}
if handle_count == 0 && count == 0 {
if let Err(e) = self
.db
.reset_stale_processor_results(ProcessorJobStatus::Pending, "Worker restarted")
.await
{
error!("Failed to reset stale processor results: {}", e);
}
}
}
async fn kill_existing_processor(redis: &RedisClient, uuid: &str, processor: &str) {
let prefix = crate::core::config::REDIS_KEY_PREFIX.as_str();
let key = format!("{}job:{}:processor:{}", prefix, uuid, processor);
if let Ok(mut conn) = redis.get_conn().await {
let old_pid: Option<i32> = redis::cmd("HGET")
.arg(&key)
.arg("pid")
.query_async(&mut conn)
.await
.ok()
.flatten();
if let Some(pid) = old_pid {
if pid > 0 {
warn!(
"[PID] Killing existing process {} for {}/{}",
pid, uuid, processor
);
unsafe {
libc::kill(pid, libc::SIGKILL);
}
}
}
}
}
pub async fn start_processor(&self, task: ProcessorTask) -> Result<()> {
Self::kill_existing_processor(&*self.redis, &task.job.uuid, task.processor_type.as_str())
.await;
let (cancel_tx, cancel_rx) = mpsc::channel(1);
let job_id = task.job.id;
let processor_type = task.processor_type;
let current_limit = self.current_max().await;
{
let mut count = self.running_count.write().await;
if *count >= current_limit {
anyhow::bail!(
"Max concurrent processors reached (dynamic limit: {})",
current_limit
);
}
*count += 1;
}
let running = self.running.clone();
let running_count = self.running_count.clone();
let child_pid: Arc<RwLock<Option<i32>>> = Arc::new(RwLock::new(None));
running.write().await.insert(
job_id,
ProcessorHandle {
processor_type,
cancel_tx,
child_pid: child_pid.clone(),
},
);
let db = self.db.clone();
let redis = self.redis.clone();
let job = task.job.clone();
let processor_result_id = task.processor_result_id;
let processor_name = processor_type.as_str().to_string();
// 設置共享 frame 目錄環境變數(若有)
if let Some(ref fd) = task.frame_dir {
std::env::set_var("MOMENTRY_FRAME_DIR", fd);
} else {
std::env::remove_var("MOMENTRY_FRAME_DIR");
}
tokio::spawn(async move {
// Guard 的 Drop 確保 panic 時也清理 pool state
let _guard = ProcessorCleanupGuard {
job_id,
running: running.clone(),
running_count: running_count.clone(),
};
info!("Starting processor {} for job {}", processor_name, job.uuid);
let _ = db
.update_processor_result(
processor_result_id,
ProcessorJobStatus::Running,
None,
None,
)
.await;
let _ = redis
.update_worker_processor_status(
&job.uuid,
&processor_name,
"running",
None,
0,
0,
0,
0,
0,
)
.await;
// Set started_at once (subscriber's update_worker_processor_status won't touch it)
if let Ok(mut conn) = redis.get_conn().await {
let prefix = crate::core::config::REDIS_KEY_PREFIX.as_str();
let key = format!("{}job:{}:processor:{}", prefix, &job.uuid, &processor_name);
let now = chrono::Utc::now().to_rfc3339();
let _: Option<String> = redis::cmd("HSET")
.arg(&key)
.arg("started_at")
.arg(&now)
.query_async(&mut conn)
.await
.ok();
let _: Option<String> = redis::cmd("HSET")
.arg(&key)
.arg("embedding_started_at")
.arg(&now)
.query_async(&mut conn)
.await
.ok();
}
// Subscribe to Redis progress pub/sub and update processor hash in real-time
let sub_redis = redis.clone();
let sub_uuid = job.uuid.clone();
let sub_processor = processor_name.clone();
let progress_handle = tokio::spawn(async move {
let cb_redis = sub_redis.clone();
let cb_uuid = sub_uuid.clone();
let cb_processor = sub_processor.clone();
if let Err(e) = sub_redis
.subscribe_and_callback(&sub_uuid, move |msg| {
tracing::info!(
"[Subscriber] Got msg for={} cur={} tot={}",
msg.processor,
msg.data.current.unwrap_or(0),
msg.data.total.unwrap_or(0)
);
if msg.processor == cb_processor {
let cur = msg.data.current.unwrap_or(0);
let tot = msg.data.total.unwrap_or(0);
let oc = msg.data.output_count.unwrap_or(0);
let r = cb_redis.clone();
let u = cb_uuid.clone();
let p = cb_processor.clone();
tokio::spawn(async move {
match r
.update_worker_processor_status(
&u, &p, "running", None, cur, oc, tot, 0, 0,
)
.await
{
Ok(_) => tracing::info!(
"[Subscriber] Updated {}: cur={} tot={}",
p,
cur,
tot
),
Err(e) => tracing::error!("[Subscriber] FAILED {}: {}", p, e),
}
});
}
})
.await
{
tracing::warn!("[ProgressSub] Subscriber ended: {}", e);
}
});
let result = Self::run_processor(&db, &redis, &job, processor_type, cancel_rx).await;
progress_handle.abort();
match result {
Ok(output) => {
// 驗收 agent 檢查產出內容
let verification =
crate::verification::verifier::verify_output(&processor_type, &job.uuid);
if verification.passed {
info!(
"Processor {} completed and verified for job {} ({} chunks, {} frames)",
processor_name,
job.uuid,
output.chunks_produced,
output.frames_processed
);
// 清理暫存備份
crate::verification::verifier::cleanup_temp_files(
&processor_type,
&job.uuid,
);
if let Err(e) = db
.update_processor_result_with_stats(
processor_result_id,
ProcessorJobStatus::Completed,
None,
Some(&output.data),
output.chunks_produced,
output.frames_processed,
)
.await
{
error!("Failed to update processor result to completed: {}", e);
}
if let Err(e) = redis
.update_worker_processor_status(
&job.uuid,
&processor_name,
"completed",
None,
output.frames_processed,
output.chunks_produced,
output.total_frames,
output.retry_count,
output.pid,
)
.await
{
error!("Failed to update Redis processor status: {}", e);
}
} else {
error!(
"Processor {} output failed verification for job {}: {:?}",
processor_name, job.uuid, verification.details
);
if let Err(db_err) = db
.update_processor_result_with_stats(
processor_result_id,
ProcessorJobStatus::Failed,
Some(&format!("verification failed: {:?}", verification.details)),
None,
0,
0,
)
.await
{
error!("Failed to update processor result to failed: {}", db_err);
}
if let Err(redis_err) = redis
.update_worker_processor_status(
&job.uuid,
&processor_name,
"failed",
Some(&format!("verification failed: {:?}", verification.details)),
0,
0,
0,
0,
0,
)
.await
{
error!("Failed to update Redis processor status: {}", redis_err);
}
}
}
Err(e) => {
error!(
"Processor {} failed for job {}: {}",
processor_name, job.uuid, e
);
if let Err(db_err) = db
.update_processor_result_with_stats(
processor_result_id,
ProcessorJobStatus::Failed,
Some(&e.to_string()),
None,
0,
0,
)
.await
{
error!("Failed to update processor result to failed: {}", db_err);
}
if let Err(redis_err) = redis
.update_worker_processor_status(
&job.uuid,
&processor_name,
"failed",
Some(&e.to_string()),
0,
0,
0,
0,
0,
)
.await
{
error!("Failed to update Redis processor status: {}", redis_err);
}
}
}
});
Ok(())
}
async fn run_processor(
db: &PostgresDb,
_redis: &RedisClient,
job: &MonitorJob,
processor_type: ProcessorType,
_cancel_rx: mpsc::Receiver<()>,
) -> Result<ProcessorOutput> {
let video_path = job.video_path.as_ref().context("No video path in job")?;
// Generate output path
let output_dir = PathBuf::from(OUTPUT_DIR.as_str());
let suffix = match processor_type {
ProcessorType::Story => format!("{}.story_story", job.uuid),
_ => format!("{}.{}", job.uuid, processor_type.as_str()),
};
let output_path = output_dir.join(format!("{}.json", suffix));
// Ensure output directory exists
if let Some(parent) = output_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let uuid = Some(job.uuid.as_str());
let video = db.get_video_by_uuid(&job.uuid).await?;
let total_frames = video.as_ref().map(|v| v.total_frames as i32).unwrap_or(0);
match processor_type {
ProcessorType::Asr => {
let result =
processor::process_asr(video_path, output_path.to_str().unwrap(), uuid).await?;
let chunks_produced = result.segments.len() as i32;
tracing::info!(
"ASR completed, storing {} segments for {}",
chunks_produced,
job.uuid
);
if let Err(e) = Self::store_asr_chunks(db, &job.uuid, &result).await {
tracing::error!("Failed to store ASR chunks for {}: {}", job.uuid, e);
}
Ok(ProcessorOutput {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
total_frames,
retry_count: 0,
pid: 0,
})
}
ProcessorType::Cut => {
let cut_path =
std::path::Path::new(&output_dir).join(format!("{}.cut.json", job.uuid));
let result = if cut_path.exists() {
// CUT 在 register 階段已完成,直接載入
let content =
std::fs::read_to_string(&cut_path).context("Failed to read cut.json")?;
serde_json::from_str(&content).context("Failed to parse cut.json")?
} else {
processor::process_cut(video_path, output_path.to_str().unwrap(), uuid).await?
};
let chunks_produced = result.scenes.len() as i32;
tracing::info!(
"CUT completed, storing {} scenes for {}",
chunks_produced,
job.uuid
);
if let Err(e) = Self::store_cut_chunks(db, &job.uuid, &result).await {
tracing::error!("Failed to store CUT chunks for {}: {}", job.uuid, e);
}
Ok(ProcessorOutput {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
total_frames,
retry_count: 0,
pid: 0,
})
}
ProcessorType::Yolo => {
let result =
processor::process_yolo(video_path, output_path.to_str().unwrap(), uuid)
.await?;
let chunks_produced = result.frames.len() as i32;
tracing::info!(
"YOLO completed, storing {} frames for {}",
chunks_produced,
job.uuid
);
if let Err(e) = Self::store_yolo_chunks(db, &job.uuid, &result).await {
tracing::error!("Failed to store YOLO chunks for {}: {}", job.uuid, e);
}
Ok(ProcessorOutput {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
total_frames,
retry_count: 0,
pid: 0,
})
}
ProcessorType::Ocr => {
let result =
processor::process_ocr(video_path, output_path.to_str().unwrap(), uuid).await?;
let chunks_produced = result.frames.len() as i32;
tracing::info!(
"OCR completed, storing {} frames for {}",
chunks_produced,
job.uuid
);
if let Err(e) = Self::store_ocr_chunks(db, &job.uuid, &result).await {
tracing::error!("Failed to store OCR chunks for {}: {}", job.uuid, e);
}
Ok(ProcessorOutput {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
total_frames,
retry_count: 0,
pid: 0,
})
}
ProcessorType::Face => {
let result =
processor::process_face(video_path, output_path.to_str().unwrap(), uuid)
.await?;
let chunks_produced = result.frames.len() as i32;
tracing::info!(
"FACE completed, storing {} frames for {}",
chunks_produced,
job.uuid
);
if let Err(e) = Self::store_face_chunks(db, &job.uuid, &result).await {
tracing::error!("Failed to store FACE chunks for {}: {}", job.uuid, e);
}
// 將 face embedding 寫入 Qdrant
if let Err(e) = Self::store_face_embeddings_to_qdrant(&job.uuid, &result).await {
tracing::error!("Failed to store face embeddings to Qdrant: {}", e);
}
Ok(ProcessorOutput {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
total_frames,
retry_count: 0,
pid: 0,
})
}
ProcessorType::Pose => {
let result =
processor::process_pose(video_path, output_path.to_str().unwrap(), uuid)
.await?;
let chunks_produced = result.frames.len() as i32;
tracing::info!(
"POSE completed, storing {} frames for {}",
chunks_produced,
job.uuid
);
if let Err(e) = Self::store_pose_chunks(db, &job.uuid, &result).await {
tracing::error!("Failed to store POSE chunks for {}: {}", job.uuid, e);
}
Ok(ProcessorOutput {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
total_frames,
retry_count: 0,
pid: 0,
})
}
ProcessorType::Asrx => {
let result =
processor::process_asrx(video_path, output_path.to_str().unwrap(), uuid)
.await?;
let chunks_produced = result.segments.len() as i32;
tracing::info!(
"ASRX completed, storing {} segments for {}",
chunks_produced,
job.uuid
);
if let Err(e) = Self::store_asrx_chunks(db, &job.uuid, &result).await {
tracing::error!("Failed to store ASRX chunks for {}: {}", job.uuid, e);
}
// 將 voice embeddings 寫入 Qdrant
if let Err(e) = Self::store_voice_embeddings_to_qdrant(&job.uuid, &result).await {
tracing::error!("Failed to store voice embeddings to Qdrant: {}", e);
}
Ok(ProcessorOutput {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
total_frames,
retry_count: 0,
pid: 0,
})
}
ProcessorType::VisualChunk => {
let result = processor::process_visual_chunk_advanced(
video_path,
output_path.to_str().unwrap(),
uuid,
)
.await?;
let chunks_produced = result.chunk_count as i32;
tracing::info!(
"VisualChunk completed, storing {} chunks for {}",
chunks_produced,
job.uuid
);
if let Err(e) = Self::store_visual_chunk_chunks(db, &job.uuid, &result).await {
tracing::error!("Failed to store VisualChunk chunks for {}: {}", job.uuid, e);
}
Ok(ProcessorOutput {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
total_frames,
retry_count: 0,
pid: 0,
})
}
ProcessorType::Scene => {
let scene_path =
std::path::Path::new(&output_dir).join(format!("{}.scene.json", job.uuid));
let scene_err =
std::path::Path::new(&output_dir).join(format!("{}.scene.err", job.uuid));
let scene_tmp =
std::path::Path::new(&output_dir).join(format!("{}.scene.tmp", job.uuid));
// 優先順序:.err跳過→ .json載入→ .tmp等待或重新執行
let result = if scene_err.exists() {
tracing::warn!("Scene previously failed for {}, skipping", job.uuid);
return Ok(ProcessorOutput {
data: serde_json::Value::Null,
chunks_produced: 0,
frames_processed: 0,
total_frames,
retry_count: 0,
pid: 0,
});
} else if scene_path.exists() {
tracing::info!("Scene JSON exists for {}, loading from file", job.uuid);
crate::core::processor::load_scene_from_file(scene_path.to_str().unwrap())?
} else if scene_tmp.exists() {
tracing::warn!("Scene tmp exists for {}, waiting for completion", job.uuid);
// 生產環境應等待,此處直接跳過避免卡住
crate::core::processor::SceneClassificationResult::default()
} else {
processor::process_scene_classification(
video_path,
output_path.to_str().unwrap(),
uuid,
)
.await?
};
let chunks_produced = result.scenes.len() as i32;
tracing::info!(
"Scene classification completed, storing {} scenes for {}",
chunks_produced,
job.uuid
);
if let Err(e) = Self::store_scene_chunks(db, &job.uuid, &result).await {
tracing::error!("Failed to store Scene chunks for {}: {}", job.uuid, e);
}
Ok(ProcessorOutput {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
total_frames,
retry_count: 0,
pid: 0,
})
}
ProcessorType::Story => {
let executor = crate::core::processor::PythonExecutor::new()?;
let _ = executor
.run(
"parent_chunk_5w1h.py",
&["--file-uuid", &job.uuid, "--embed"],
uuid,
"STORY",
Some(std::time::Duration::from_secs(300)),
)
.await;
let narratives_path = output_dir.join(format!("{}.narratives.json", job.uuid));
let chunks_produced = if narratives_path.exists() {
let content = std::fs::read_to_string(&narratives_path).unwrap_or_default();
let count: i32 = serde_json::from_str::<Vec<String>>(&content)
.map(|v| v.len() as i32)
.unwrap_or(0);
tracing::info!("Story generated {} narratives for {}", count, job.uuid);
count
} else {
0
};
Ok(ProcessorOutput {
data: serde_json::Value::Null,
chunks_produced,
frames_processed: total_frames,
total_frames,
retry_count: 0,
pid: 0,
})
}
ProcessorType::FiveW1H => {
let executor = crate::core::processor::PythonExecutor::new()?;
let _ = executor
.run(
"parent_chunk_5w1h.py",
&["--file-uuid", &job.uuid, "--embed", "--mode", "llm"],
uuid,
"5W1H",
Some(std::time::Duration::from_secs(300)),
)
.await;
Ok(ProcessorOutput {
data: serde_json::Value::Null,
chunks_produced: 0,
frames_processed: total_frames,
total_frames,
retry_count: 0,
pid: 0,
})
}
}
}
pub async fn store_asr_chunks(
db: &PostgresDb,
uuid: &str,
asr_result: &AsrResult,
) -> Result<()> {
let video = db
.get_video_by_uuid(uuid)
.await?
.ok_or_else(|| anyhow::anyhow!("Video not found for uuid: {}", uuid))?;
let fps = if video.fps > 0.0 { video.fps } else { 30.0 };
let segments: Vec<(i64, i64, i64, f64, f64, serde_json::Value)> = asr_result
.segments
.iter()
.enumerate()
.map(|(i, segment)| {
// Prefer ASR output frames, fallback to time-based conversion
let start_frame = segment
.start_frame
.unwrap_or_else(|| (segment.start_time * fps).round() as i64);
let end_frame = segment
.end_frame
.unwrap_or_else(|| (segment.end_time * fps).round() as i64);
let data = serde_json::json!({
"text": segment.text,
"text_normalized": segment.text.to_lowercase(),
"language": asr_result.language,
"language_probability": asr_result.language_probability,
});
(
i as i64,
start_frame,
end_frame,
segment.start_time,
segment.end_time,
data,
)
})
.collect();
db.store_asr_pre_chunks_batch(uuid, &segments).await?;
tracing::info!(
"Stored {} ASR pre-chunks for video {}",
segments.len(),
uuid
);
Ok(())
}
pub async fn store_cut_chunks(
db: &PostgresDb,
uuid: &str,
cut_result: &CutResult,
) -> Result<()> {
let video = db
.get_video_by_uuid(uuid)
.await?
.ok_or_else(|| anyhow::anyhow!("Video not found for uuid: {}", uuid))?;
let fps = if video.fps > 0.0 { video.fps } else { 30.0 };
let scenes: Vec<(i64, i64, i64, f64, f64, serde_json::Value)> = cut_result
.scenes
.iter()
.enumerate()
.map(|(i, scene)| {
let data = serde_json::json!({
"scene_number": scene.scene_number,
});
(
i as i64,
scene.start_frame as i64,
scene.end_frame as i64,
scene.start_time,
scene.end_time,
data,
)
})
.collect();
db.store_cut_pre_chunks_batch(uuid, &scenes).await?;
tracing::info!("Stored {} CUT pre-chunks for video {}", scenes.len(), uuid);
Ok(())
}
pub async fn store_yolo_chunks(
db: &PostgresDb,
uuid: &str,
yolo_result: &YoloResult,
) -> Result<()> {
let frames_count = yolo_result.frames.len();
tracing::info!(
"Storing {} YOLO pre-chunks for video {}",
frames_count,
uuid
);
let mut pre_chunks_to_store = Vec::new();
for frame in yolo_result.frames.iter() {
let data = serde_json::json!({
"objects": frame.objects,
"timestamp": frame.timestamp,
});
pre_chunks_to_store.push((
frame.frame as i64, // coordinate_index
Some(frame.timestamp), // timestamp
data,
None, // identity_id
None, // confidence
));
}
db.store_raw_pre_chunks_batch(uuid, "yolo", &pre_chunks_to_store)
.await?;
Ok(())
}
pub async fn store_ocr_chunks(
db: &PostgresDb,
uuid: &str,
ocr_result: &OcrResult,
) -> Result<()> {
let frames_count = ocr_result.frames.len();
tracing::info!("Storing {} OCR pre-chunks for video {}", frames_count, uuid);
let mut pre_chunks_to_store = Vec::new();
for frame in ocr_result.frames.iter() {
let data = serde_json::json!({
"texts": frame.texts,
"timestamp": frame.timestamp,
});
pre_chunks_to_store.push((frame.frame as i64, Some(frame.timestamp), data, None, None));
}
db.store_raw_pre_chunks_batch(uuid, "ocr", &pre_chunks_to_store)
.await?;
Ok(())
}
pub async fn store_face_chunks(
db: &PostgresDb,
uuid: &str,
face_result: &FaceResult,
) -> Result<()> {
let frames_count = face_result.frames.len();
tracing::info!(
"Storing {} Face pre-chunks + {} detections for video {}",
frames_count,
face_result
.frames
.iter()
.map(|f| f.faces.len())
.sum::<usize>(),
uuid
);
let mut pre_chunks_to_store = Vec::new();
let mut detections_to_store = Vec::new();
for frame in face_result.frames.iter() {
let data = serde_json::json!({
"faces": frame.faces,
"timestamp": frame.timestamp,
});
pre_chunks_to_store.push((frame.frame as i64, Some(frame.timestamp), data, None, None));
for face in frame.faces.iter() {
detections_to_store.push((
frame.frame as i64,
frame.timestamp,
face.x,
face.y,
face.width,
face.height,
face.confidence,
));
}
}
db.store_raw_pre_chunks_batch(uuid, "face", &pre_chunks_to_store)
.await?;
db.store_face_detections_batch(uuid, &detections_to_store)
.await?;
Ok(())
}
/// 將 face embeddings 寫入 Qdrant momentry_dev_face collection
pub async fn store_face_embeddings_to_qdrant(
uuid: &str,
face_result: &FaceResult,
) -> Result<()> {
let qdrant = QdrantDb::new();
let collection = format!(
"{}{}",
crate::core::config::REDIS_KEY_PREFIX
.as_str()
.trim_end_matches(':'),
"_face"
);
// 確保 collection 存在dim=512 for FaceNet
if let Err(e) = qdrant.ensure_collection(&collection, 512).await {
tracing::error!("Failed to ensure Qdrant face collection: {}", e);
return Ok(());
}
let mut count = 0;
for frame in &face_result.frames {
for face in &frame.faces {
if let Some(embedding) = &face.embedding {
if embedding.len() != 512 {
continue;
}
// 使用 hash 作為 Qdrant point ID需要 unsigned integer
// 使用 frame number 作為 Qdrant point IDu64
let point_id = frame.frame as u64;
let payload = serde_json::json!({
"file_uuid": uuid,
"face_id": face.face_id,
"frame": frame.frame,
"timestamp": frame.timestamp,
"x": face.x,
"y": face.y,
"width": face.width,
"height": face.height,
"confidence": face.confidence,
});
if let Err(e) = qdrant
.upsert_vector_to_collection(
&collection,
point_id,
embedding,
Some(payload),
)
.await
{
tracing::error!("Failed to upsert face vector {}: {}", point_id, e);
} else {
count += 1;
}
}
}
}
if count > 0 {
tracing::info!("Stored {} face embeddings to Qdrant for {}", count, uuid);
}
Ok(())
}
/// 將 voice embeddings 寫入 Qdrant momentry_dev_voice collection
pub async fn store_voice_embeddings_to_qdrant(
uuid: &str,
asrx_result: &AsrxResult,
) -> Result<()> {
let qdrant = QdrantDb::new();
let collection = format!(
"{}{}",
crate::core::config::REDIS_KEY_PREFIX
.as_str()
.trim_end_matches(':'),
"_voice"
);
// 確保 collection 存在dim=192 for ASRX voice
if let Err(e) = qdrant.ensure_collection(&collection, 192).await {
tracing::error!("Failed to ensure Qdrant voice collection: {}", e);
return Ok(());
}
let embeddings = match &asrx_result.embeddings {
Some(e) => e,
None => return Ok(()),
};
let mut count = 0;
for (i, segment) in asrx_result.segments.iter().enumerate() {
if let Some(emb) = embeddings.get(i) {
if emb.len() != 192 {
continue;
}
let payload = serde_json::json!({
"file_uuid": uuid,
"speaker_id": segment.speaker_id,
"segment_index": i,
"start_frame": segment.start_frame,
"end_frame": segment.end_frame,
"start_time": segment.start_time,
"end_time": segment.end_time,
});
if let Err(e) = qdrant
.upsert_vector_to_collection(&collection, i as u64, emb, Some(payload))
.await
{
tracing::error!("Failed to upsert voice vector {}: {}", i, e);
} else {
count += 1;
}
}
}
if count > 0 {
tracing::info!("Stored {} voice embeddings to Qdrant for {}", count, uuid);
}
Ok(())
}
pub async fn store_pose_chunks(
db: &PostgresDb,
uuid: &str,
pose_result: &PoseResult,
) -> Result<()> {
let frames_count = pose_result.frames.len();
tracing::info!(
"Storing {} Pose pre-chunks for video {}",
frames_count,
uuid
);
let mut pre_chunks_to_store = Vec::new();
for frame in pose_result.frames.iter() {
let data = serde_json::json!({
"persons": frame.persons,
"timestamp": frame.timestamp,
});
pre_chunks_to_store.push((frame.frame as i64, Some(frame.timestamp), data, None, None));
}
db.store_raw_pre_chunks_batch(uuid, "pose", &pre_chunks_to_store)
.await?;
Ok(())
}
pub async fn store_asrx_chunks(
db: &PostgresDb,
uuid: &str,
asrx_result: &AsrxResult,
) -> Result<()> {
let segments_count = asrx_result.segments.len();
tracing::info!(
"Storing {} ASRX pre-chunks for video {}",
segments_count,
uuid
);
let mut pre_chunks_to_store = Vec::new();
for (i, segment) in asrx_result.segments.iter().enumerate() {
let data = serde_json::json!({
"text": segment.text,
"speaker_id": segment.speaker_id,
"timestamp": segment.start_time,
});
// ASRX is time-based, so we use segment index or start time as coordinate.
pre_chunks_to_store.push((i as i64, Some(segment.start_time), data, None, None));
}
db.store_raw_pre_chunks_batch(uuid, "asrx", &pre_chunks_to_store)
.await?;
Ok(())
}
pub async fn store_visual_chunk_chunks(
db: &PostgresDb,
uuid: &str,
visual_chunk_result: &VisualChunkResult,
) -> Result<()> {
for (i, chunk) in visual_chunk_result.chunks.iter().enumerate() {
match db.store_chunk(chunk).await {
Ok(_) => {
tracing::info!("Stored VisualChunk chunk {} for video {}", i, uuid);
}
Err(e) => {
tracing::error!("Failed to store VisualChunk chunk {}: {}", i, e);
}
}
}
Ok(())
}
pub async fn store_scene_chunks(
db: &PostgresDb,
uuid: &str,
scene_result: &SceneClassificationResult,
) -> Result<()> {
let video = db
.get_video_by_uuid(uuid)
.await?
.ok_or_else(|| anyhow::anyhow!("Video not found for uuid: {}", uuid))?;
let fps = if video.fps > 0.0 { video.fps } else { 30.0 };
let scenes: Vec<(i64, i64, i64, f64, f64, serde_json::Value)> = scene_result
.scenes
.iter()
.enumerate()
.map(|(i, scene)| {
let start_frame = (scene.start_time * fps).round() as i64;
let end_frame = (scene.end_time * fps).round() as i64;
let data = serde_json::json!({
"scene_type": scene.scene_type,
"scene_type_zh": scene.scene_type_zh,
"confidence": scene.confidence,
"top_5": scene.top_5,
});
(
i as i64,
start_frame,
end_frame,
scene.start_time,
scene.end_time,
data,
)
})
.collect();
db.store_scene_pre_chunks_batch(uuid, &scenes).await?;
for (i, scene) in scene_result.scenes.iter().enumerate() {
let chk_id = format!("scene_{}", i + 1);
let meta = serde_json::json!({
"scene_type": scene.scene_type,
"scene_type_zh": scene.scene_type_zh,
"confidence": scene.confidence,
"top_5": scene.top_5,
});
let chunk_table = crate::core::db::schema::table_name("chunk");
let _ = sqlx::query(&format!(
"UPDATE {} SET metadata = metadata || $1::jsonb WHERE file_uuid=$2 AND chunk_id=$3",
chunk_table
))
.bind(&meta)
.bind(uuid)
.bind(&chk_id)
.execute(db.pool())
.await;
}
tracing::info!(
"Stored {} Scene pre-chunks for video {}",
scenes.len(),
uuid
);
Ok(())
}
pub async fn get_running_count(&self) -> usize {
*self.running_count.read().await
}
pub async fn cancel_all(&self) {
let mut running = self.running.write().await;
for (_, handle) in running.drain() {
let _ = handle.cancel_tx.send(()).await;
}
let mut count = self.running_count.write().await;
*count = 0;
}
}