feat: update core API, database layer, and worker modules

- Remove unused imports (n8n_search, universal_search, Client, Arc, etc.)
- Update API endpoints for identity, face recognition, search
- Fix postgres_db.rs search_videos parent_uuid column
- Add snapshot API and identity agent API
- Clean up backup files (.bak, .bak2)
This commit is contained in:
Warren
2026-04-30 15:07:02 +08:00
parent 8f2208dd63
commit 2b23d1cfbd
148 changed files with 8553 additions and 48637 deletions

View File

@@ -6,9 +6,7 @@ use tokio::time::sleep;
use tracing::{error, info, warn};
use crate::core::chunk::{rule1_ingest, rule3_ingest};
use crate::core::db::{
MonitorJobStatus, PostgresDb, ProcessorJobStatus, ProcessorType, RedisClient, VideoStatus,
};
use crate::core::db::{MonitorJobStatus, PostgresDb, ProcessorJobStatus, RedisClient, VideoStatus};
use crate::worker::config::WorkerConfig;
use crate::worker::processor::{ProcessorPool, ProcessorTask};
@@ -89,7 +87,29 @@ impl JobWorker {
async fn process_job(&self, job: crate::core::db::MonitorJob) -> Result<()> {
info!("Processing job: {} ({})", job.uuid, job.id);
let total_processors = ProcessorType::all().len() as i32;
// Determine which processors to run based on job.processors field
let processors_to_run: Vec<crate::core::db::ProcessorType> = if job.processors.is_empty() {
info!("No processors specified, running all processors");
crate::core::db::ProcessorType::all()
} else {
info!("Processors specified: {:?}", job.processors);
job.processors
.iter()
.filter_map(|p| crate::core::db::ProcessorType::from_db_str(p))
.collect()
};
let total_processors = processors_to_run.len() as i32;
// Get video total_frames for progress tracking
let video = self.db.get_video_by_uuid(&job.uuid).await?;
let total_frames = video.map(|v| v.total_frames).unwrap_or(0);
// Initialize processing_status with all processors
let processor_names: Vec<&str> = processors_to_run.iter().map(|p| p.as_str()).collect();
self.db
.init_processing_status(&job.uuid, processor_names.clone(), total_frames)
.await?;
self.db
.update_job_status(job.id, MonitorJobStatus::Running)
@@ -106,7 +126,18 @@ impl JobWorker {
result_map.insert(result.processor_type, result);
}
for processor_type in ProcessorType::all() {
for processor_type in processors_to_run {
// Update processor status to running
self.db
.update_processor_progress(
&job.uuid,
processor_type.as_str(),
0,
total_frames,
"running",
)
.await?;
// Check if processor already in terminal state
if let Some(result) = result_map.get(&processor_type) {
match result.status {
@@ -145,7 +176,7 @@ impl JobWorker {
let processor_result_id = self
.db
.create_processor_result(job.id, processor_type)
.create_processor_result(job.id, processor_type, &job.uuid)
.await?;
self.redis
@@ -215,6 +246,8 @@ impl JobWorker {
let has_asr = completed_processors.iter().any(|p| p == "asr");
let has_asrx = completed_processors.iter().any(|p| p == "asrx");
let has_cut = completed_processors.iter().any(|p| p == "cut");
let has_face = completed_processors.iter().any(|p| p == "face");
let has_yolo = completed_processors.iter().any(|p| p == "yolo");
// Update processor arrays in job record
self.db
@@ -231,9 +264,7 @@ impl JobWorker {
match db_clone.get_video_by_uuid(&uuid_clone).await {
Ok(Some(video)) => {
let fps = video.fps;
match rule1_ingest::ingest_rule1(db_clone.pool(), &uuid_clone, fps)
.await
{
match rule1_ingest::execute_rule1(&db_clone, &uuid_clone, fps).await {
Ok(count) => info!(
"✅ Rule 1 Ingestion completed: {} chunks inserted.",
count
@@ -263,6 +294,25 @@ impl JobWorker {
});
}
// 🚀 P3 Trigger: Identity Agent (Face + ASRX)
if has_face && has_asrx {
info!("📝 Prerequisites met for Identity Agent. Starting analysis...");
let uuid_clone = uuid.to_string();
tokio::spawn(async move {
tracing::info!("Identity Agent started for video {}", uuid_clone);
});
}
// 🚀 P4 Trigger: 5W1H Agent (after Rule 3 completion)
if has_cut && has_asr {
info!("📝 Prerequisites met for 5W1H Agent. Will trigger after Rule 3...");
let uuid_clone = uuid.to_string();
tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
tracing::info!("5W1H Agent started for video {}", uuid_clone);
});
}
self.db
.update_job_status(job_id, MonitorJobStatus::Completed)
.await?;
@@ -271,6 +321,14 @@ impl JobWorker {
.update_video_status(uuid, VideoStatus::Completed)
.await?;
// Get total_frames for completion status
let video = self.db.get_video_by_uuid(uuid).await?;
let total_frames = video.map(|v| v.total_frames).unwrap_or(0);
self.db
.update_processing_status_completed(uuid, total_frames)
.await?;
self.redis
.update_worker_job_status(uuid, job_id, "completed", None, completed_count, 7)
.await?;

View File

@@ -5,10 +5,8 @@ use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use tracing::{error, info};
use crate::core::chunk::types::{Chunk, ChunkRule, ChunkType};
use crate::core::config::{OUTPUT_DIR, PYTHON_PATH, SCRIPTS_DIR};
use crate::core::db::RedisClient;
use crate::core::db::{MonitorJob, PostgresDb, ProcessorJobStatus, ProcessorType};
use crate::core::db::{MonitorJob, PostgresDb, ProcessorJobStatus, ProcessorType, RedisClient};
use crate::core::processor;
use crate::core::processor::asr::AsrResult;
use crate::core::processor::asrx::AsrxResult;
@@ -16,9 +14,17 @@ use crate::core::processor::cut::CutResult;
use crate::core::processor::face::FaceResult;
use crate::core::processor::ocr::OcrResult;
use crate::core::processor::pose::PoseResult;
use crate::core::processor::scene_classification::SceneClassificationResult;
use crate::core::processor::visual_chunk::VisualChunkResult;
use crate::core::processor::yolo::YoloResult;
#[derive(Debug)]
struct ProcessorOutput {
data: serde_json::Value,
chunks_produced: i32,
frames_processed: i32,
}
#[derive(Debug, Clone)]
pub struct ProcessorTask {
pub job: MonitorJob,
@@ -113,15 +119,17 @@ impl ProcessorPool {
match result {
Ok(output) => {
info!(
"Processor {} completed for job {}",
processor_name, job.uuid
"Processor {} completed for job {} ({} chunks, {} frames)",
processor_name, job.uuid, output.chunks_produced, output.frames_processed
);
if let Err(e) = db
.update_processor_result(
.update_processor_result_with_stats(
processor_result_id,
ProcessorJobStatus::Completed,
None,
Some(&output),
Some(&output.data),
output.chunks_produced,
output.frames_processed,
)
.await
{
@@ -146,11 +154,13 @@ impl ProcessorPool {
processor_name, job.uuid, e
);
if let Err(db_err) = db
.update_processor_result(
.update_processor_result_with_stats(
processor_result_id,
ProcessorJobStatus::Failed,
Some(&e.to_string()),
None,
0,
0,
)
.await
{
@@ -181,7 +191,7 @@ impl ProcessorPool {
job: &MonitorJob,
processor_type: ProcessorType,
_cancel_rx: mpsc::Receiver<()>,
) -> Result<serde_json::Value> {
) -> Result<ProcessorOutput> {
let video_path = job.video_path.as_ref().context("No video path in job")?;
// Generate output path
@@ -199,109 +209,139 @@ impl ProcessorPool {
}
let uuid = Some(job.uuid.as_str());
let video = db.get_video_by_uuid(&job.uuid).await?;
let total_frames = video.as_ref().map(|v| v.total_frames as i32).unwrap_or(0);
match processor_type {
ProcessorType::Asr => {
let result =
processor::process_asr(video_path, output_path.to_str().unwrap(), uuid).await?;
// Store ASR chunks in database
let chunks_produced = result.segments.len() as i32;
tracing::info!(
"ASR completed, storing {} segments for {}",
result.segments.len(),
chunks_produced,
job.uuid
);
if let Err(e) = Self::store_asr_chunks(db, &job.uuid, &result).await {
tracing::error!("Failed to store ASR chunks for {}: {}", job.uuid, e);
}
Ok(serde_json::to_value(result)?)
Ok(ProcessorOutput {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
})
}
ProcessorType::Cut => {
let result =
processor::process_cut(video_path, output_path.to_str().unwrap(), uuid).await?;
// Store CUT chunks in database
let chunks_produced = result.scenes.len() as i32;
tracing::info!(
"CUT completed, storing {} scenes for {}",
result.scenes.len(),
chunks_produced,
job.uuid
);
if let Err(e) = Self::store_cut_chunks(db, &job.uuid, &result).await {
tracing::error!("Failed to store CUT chunks for {}: {}", job.uuid, e);
}
Ok(serde_json::to_value(result)?)
Ok(ProcessorOutput {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
})
}
ProcessorType::Yolo => {
let result =
processor::process_yolo(video_path, output_path.to_str().unwrap(), uuid)
.await?;
// Store YOLO chunks in database
let chunks_produced = result.frames.len() as i32;
tracing::info!(
"YOLO completed, storing {} frames for {}",
result.frames.len(),
chunks_produced,
job.uuid
);
if let Err(e) = Self::store_yolo_chunks(db, &job.uuid, &result).await {
tracing::error!("Failed to store YOLO chunks for {}: {}", job.uuid, e);
}
Ok(serde_json::to_value(result)?)
Ok(ProcessorOutput {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
})
}
ProcessorType::Ocr => {
let result =
processor::process_ocr(video_path, output_path.to_str().unwrap(), uuid).await?;
// Store OCR chunks in database
let chunks_produced = result.frames.len() as i32;
tracing::info!(
"OCR completed, storing {} frames for {}",
result.frames.len(),
chunks_produced,
job.uuid
);
if let Err(e) = Self::store_ocr_chunks(db, &job.uuid, &result).await {
tracing::error!("Failed to store OCR chunks for {}: {}", job.uuid, e);
}
Ok(serde_json::to_value(result)?)
Ok(ProcessorOutput {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
})
}
ProcessorType::Face => {
let result =
processor::process_face(video_path, output_path.to_str().unwrap(), uuid)
.await?;
// Store FACE chunks in database
let chunks_produced = result.frames.len() as i32;
tracing::info!(
"FACE completed, storing {} frames for {}",
result.frames.len(),
chunks_produced,
job.uuid
);
if let Err(e) = Self::store_face_chunks(db, &job.uuid, &result).await {
tracing::error!("Failed to store FACE chunks for {}: {}", job.uuid, e);
}
Ok(serde_json::to_value(result)?)
Ok(ProcessorOutput {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
})
}
ProcessorType::Pose => {
let result =
processor::process_pose(video_path, output_path.to_str().unwrap(), uuid)
.await?;
// Store POSE chunks in database
let chunks_produced = result.frames.len() as i32;
tracing::info!(
"POSE completed, storing {} frames for {}",
result.frames.len(),
chunks_produced,
job.uuid
);
if let Err(e) = Self::store_pose_chunks(db, &job.uuid, &result).await {
tracing::error!("Failed to store POSE chunks for {}: {}", job.uuid, e);
}
Ok(serde_json::to_value(result)?)
Ok(ProcessorOutput {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
})
}
ProcessorType::Asrx => {
let result =
processor::process_asrx(video_path, output_path.to_str().unwrap(), uuid)
.await?;
// Store ASRX chunks in database
let chunks_produced = result.segments.len() as i32;
tracing::info!(
"ASRX completed, storing {} segments for {}",
result.segments.len(),
chunks_produced,
job.uuid
);
if let Err(e) = Self::store_asrx_chunks(db, &job.uuid, &result).await {
tracing::error!("Failed to store ASRX chunks for {}: {}", job.uuid, e);
}
Ok(serde_json::to_value(result)?)
Ok(ProcessorOutput {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
})
}
ProcessorType::VisualChunk => {
let result = processor::process_visual_chunk_advanced(
@@ -310,16 +350,42 @@ impl ProcessorPool {
uuid,
)
.await?;
// Store VisualChunk chunks in database
let chunks_produced = result.chunk_count as i32;
tracing::info!(
"VisualChunk completed, storing {} chunks for {}",
result.chunk_count,
chunks_produced,
job.uuid
);
if let Err(e) = Self::store_visual_chunk_chunks(db, &job.uuid, &result).await {
tracing::error!("Failed to store VisualChunk chunks for {}: {}", job.uuid, e);
}
Ok(serde_json::to_value(result)?)
Ok(ProcessorOutput {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
})
}
ProcessorType::Scene => {
let result = processor::process_scene_classification(
video_path,
output_path.to_str().unwrap(),
uuid,
)
.await?;
let chunks_produced = result.scenes.len() as i32;
tracing::info!(
"Scene classification completed, storing {} scenes for {}",
chunks_produced,
job.uuid
);
if let Err(e) = Self::store_scene_chunks(db, &job.uuid, &result).await {
tracing::error!("Failed to store Scene chunks for {}: {}", job.uuid, e);
}
Ok(ProcessorOutput {
data: serde_json::to_value(result)?,
chunks_produced,
frames_processed: total_frames,
})
}
}
}
@@ -482,7 +548,7 @@ impl ProcessorPool {
_cancel_rx: &mut mpsc::Receiver<()>,
) -> Result<serde_json::Value> {
let script_path = std::env::var("MOMENTRY_ASRX_SCRIPT")
.unwrap_or_else(|_| format!("{}/asrx_processor.py", SCRIPTS_DIR.as_str()));
.unwrap_or_else(|_| format!("{}/asrx_processor_custom.py", SCRIPTS_DIR.as_str()));
let output = tokio::process::Command::new(PYTHON_PATH.as_str())
.arg(&script_path)
@@ -504,43 +570,44 @@ impl ProcessorPool {
uuid: &str,
asr_result: &AsrResult,
) -> Result<()> {
// Get video record to obtain file_id and fps
let video = db
.get_video_by_uuid(uuid)
.await?
.ok_or_else(|| anyhow::anyhow!("Video not found for uuid: {}", uuid))?;
let file_id = video.id;
let fps = if video.fps > 0.0 { video.fps } else { 30.0 };
for (i, segment) in asr_result.segments.iter().enumerate() {
let chunk = Chunk::from_seconds(
file_id as i32,
uuid.to_string(),
i as u32,
ChunkType::Sentence,
ChunkRule::Rule1,
segment.start,
segment.end,
fps,
serde_json::json!({
let segments: Vec<(i64, i64, i64, f64, f64, serde_json::Value)> = asr_result
.segments
.iter()
.enumerate()
.map(|(i, segment)| {
let start_frame = (segment.start * fps).round() as i64;
let end_frame = (segment.end * fps).round() as i64;
let data = serde_json::json!({
"text": segment.text,
"text_normalized": segment.text.to_lowercase(),
}),
)
.with_metadata(serde_json::json!({
"language": asr_result.language,
"language_probability": asr_result.language_probability,
}));
"language": asr_result.language,
"language_probability": asr_result.language_probability,
});
(
i as i64,
start_frame,
end_frame,
segment.start,
segment.end,
data,
)
})
.collect();
db.store_asr_pre_chunks_batch(uuid, &segments).await?;
tracing::info!(
"Stored {} ASR pre-chunks for video {}",
segments.len(),
uuid
);
match db.store_chunk(&chunk).await {
Ok(_) => {
tracing::info!("Stored ASR chunk {} for video {}", i, uuid);
}
Err(e) => {
tracing::error!("Failed to store ASR chunk {}: {}", i, e);
}
}
}
Ok(())
}
@@ -549,40 +616,35 @@ impl ProcessorPool {
uuid: &str,
cut_result: &CutResult,
) -> Result<()> {
// Get video record to obtain file_id and fps
let video = db
.get_video_by_uuid(uuid)
.await?
.ok_or_else(|| anyhow::anyhow!("Video not found for uuid: {}", uuid))?;
let file_id = video.id;
let fps = if video.fps > 0.0 { video.fps } else { 30.0 };
for (i, scene) in cut_result.scenes.iter().enumerate() {
let chunk = Chunk::from_seconds(
file_id as i32,
uuid.to_string(),
i as u32,
ChunkType::Cut,
ChunkRule::Rule1,
scene.start_time,
scene.end_time,
fps,
serde_json::json!({
let scenes: Vec<(i64, i64, i64, f64, f64, serde_json::Value)> = cut_result
.scenes
.iter()
.enumerate()
.map(|(i, scene)| {
let data = serde_json::json!({
"scene_number": scene.scene_number,
"start_frame": scene.start_frame,
"end_frame": scene.end_frame,
}),
);
});
(
i as i64,
scene.start_frame as i64,
scene.end_frame as i64,
scene.start_time,
scene.end_time,
data,
)
})
.collect();
db.store_cut_pre_chunks_batch(uuid, &scenes).await?;
tracing::info!("Stored {} CUT pre-chunks for video {}", scenes.len(), uuid);
match db.store_chunk(&chunk).await {
Ok(_) => {
tracing::info!("Stored CUT chunk {} for video {}", i, uuid);
}
Err(e) => {
tracing::error!("Failed to store CUT chunk {}: {}", i, e);
}
}
}
Ok(())
}
@@ -591,60 +653,32 @@ impl ProcessorPool {
uuid: &str,
yolo_result: &YoloResult,
) -> Result<()> {
// Get video record to obtain file_id and fps
let video = match db.get_video_by_uuid(uuid).await {
Ok(Some(video)) => video,
Ok(None) => {
tracing::error!("Video not found for uuid: {}", uuid);
return Ok(());
}
Err(e) => {
tracing::error!("Failed to get video for uuid {}: {}", uuid, e);
return Ok(());
}
};
let file_id = video.id;
let fps = if video.fps > 0.0 { video.fps } else { 30.0 };
let frames_count = yolo_result.frames.len();
tracing::info!(
"Storing {} YOLO pre-chunks for video {}",
frames_count,
uuid
);
for (i, frame) in yolo_result.frames.iter().enumerate() {
let mut chunk = Chunk::new(
file_id as i32,
uuid.to_string(),
i as u32,
ChunkType::Trace,
ChunkRule::Rule1,
frame.frame as i64,
frame.frame as i64 + 1,
fps,
serde_json::json!({
"objects": frame.objects,
"timestamp": frame.timestamp,
}),
);
// Override chunk_id to include processor prefix for uniqueness
chunk.chunk_id = format!("trace_yolo_{:04}", i);
let mut pre_chunks_to_store = Vec::new();
// Populate text_content for BM25 search
let object_names: Vec<String> =
frame.objects.iter().map(|o| o.class_name.clone()).collect();
if !object_names.is_empty() {
chunk = chunk.with_text_content(object_names.join(" "));
}
for frame in yolo_result.frames.iter() {
let data = serde_json::json!({
"objects": frame.objects,
"timestamp": frame.timestamp,
});
match db.store_chunk(&chunk).await {
Ok(_) => {
tracing::info!(
"Stored YOLO chunk {} (frame {}) for video {}",
i,
frame.frame,
uuid
);
}
Err(e) => {
tracing::error!("Failed to store YOLO chunk {}: {}", i, e);
}
}
pre_chunks_to_store.push((
frame.frame as i64, // coordinate_index
Some(frame.timestamp), // timestamp
data,
None, // identity_id
None, // confidence
));
}
db.store_raw_pre_chunks_batch(uuid, "yolo", &pre_chunks_to_store)
.await?;
Ok(())
}
@@ -653,59 +687,22 @@ impl ProcessorPool {
uuid: &str,
ocr_result: &OcrResult,
) -> Result<()> {
// Get video record to obtain file_id and fps
let video = match db.get_video_by_uuid(uuid).await {
Ok(Some(video)) => video,
Ok(None) => {
tracing::error!("Video not found for uuid: {}", uuid);
return Ok(());
}
Err(e) => {
tracing::error!("Failed to get video for uuid {}: {}", uuid, e);
return Ok(());
}
};
let file_id = video.id;
let fps = if video.fps > 0.0 { video.fps } else { 30.0 };
let frames_count = ocr_result.frames.len();
tracing::info!("Storing {} OCR pre-chunks for video {}", frames_count, uuid);
for (i, frame) in ocr_result.frames.iter().enumerate() {
let mut chunk = Chunk::new(
file_id as i32,
uuid.to_string(),
i as u32,
ChunkType::Trace,
ChunkRule::Rule1,
frame.frame as i64,
frame.frame as i64 + 1,
fps,
serde_json::json!({
"texts": frame.texts,
"timestamp": frame.timestamp,
}),
);
// Override chunk_id to include processor prefix for uniqueness
chunk.chunk_id = format!("trace_ocr_{:04}", i);
let mut pre_chunks_to_store = Vec::new();
// Populate text_content for BM25 search
let texts: Vec<String> = frame.texts.iter().map(|t| t.text.clone()).collect();
if !texts.is_empty() {
chunk = chunk.with_text_content(texts.join(" "));
}
for frame in ocr_result.frames.iter() {
let data = serde_json::json!({
"texts": frame.texts,
"timestamp": frame.timestamp,
});
match db.store_chunk(&chunk).await {
Ok(_) => {
tracing::info!(
"Stored OCR chunk {} (frame {}) for video {}",
i,
frame.frame,
uuid
);
}
Err(e) => {
tracing::error!("Failed to store OCR chunk {}: {}", i, e);
}
}
pre_chunks_to_store.push((frame.frame as i64, Some(frame.timestamp), data, None, None));
}
db.store_raw_pre_chunks_batch(uuid, "ocr", &pre_chunks_to_store)
.await?;
Ok(())
}
@@ -714,63 +711,33 @@ impl ProcessorPool {
uuid: &str,
face_result: &FaceResult,
) -> Result<()> {
// Get video record to obtain file_id and fps
let video = match db.get_video_by_uuid(uuid).await {
Ok(Some(video)) => video,
Ok(None) => {
tracing::error!("Video not found for uuid: {}", uuid);
return Ok(());
}
Err(e) => {
tracing::error!("Failed to get video for uuid {}: {}", uuid, e);
return Ok(());
}
};
let file_id = video.id;
let fps = if video.fps > 0.0 { video.fps } else { 30.0 };
let frames_count = face_result.frames.len();
tracing::info!(
"Storing {} Face pre-chunks for video {}",
frames_count,
uuid
);
for (i, frame) in face_result.frames.iter().enumerate() {
let mut chunk = Chunk::new(
file_id as i32,
uuid.to_string(),
i as u32,
ChunkType::Trace,
ChunkRule::Rule1,
let mut pre_chunks_to_store = Vec::new();
for frame in face_result.frames.iter() {
let data = serde_json::json!({
"faces": frame.faces,
"timestamp": frame.timestamp,
});
// We could potentially parse identity_id if it's already matched, but for raw ingestion it's None.
pre_chunks_to_store.push((
frame.frame as i64,
frame.frame as i64 + 1,
fps,
serde_json::json!({
"faces": frame.faces,
"timestamp": frame.timestamp,
}),
);
// Override chunk_id to include processor prefix for uniqueness
chunk.chunk_id = format!("trace_face_{:04}", i);
// Populate text_content for BM25 search (face IDs)
let face_ids: Vec<String> = frame
.faces
.iter()
.filter_map(|f| f.face_id.clone())
.collect();
if !face_ids.is_empty() {
chunk = chunk.with_text_content(face_ids.join(" "));
}
match db.store_chunk(&chunk).await {
Ok(_) => {
tracing::info!(
"Stored FACE chunk {} (frame {}) for video {}",
i,
frame.frame,
uuid
);
}
Err(e) => {
tracing::error!("Failed to store FACE chunk {}: {}", i, e);
}
}
Some(frame.timestamp),
data,
None, // identity_id
None, // confidence
));
}
db.store_raw_pre_chunks_batch(uuid, "face", &pre_chunks_to_store)
.await?;
Ok(())
}
@@ -779,63 +746,26 @@ impl ProcessorPool {
uuid: &str,
pose_result: &PoseResult,
) -> Result<()> {
// Get video record to obtain file_id and fps
let video = match db.get_video_by_uuid(uuid).await {
Ok(Some(video)) => video,
Ok(None) => {
tracing::error!("Video not found for uuid: {}", uuid);
return Ok(());
}
Err(e) => {
tracing::error!("Failed to get video for uuid {}: {}", uuid, e);
return Ok(());
}
};
let file_id = video.id;
let fps = if video.fps > 0.0 { video.fps } else { 30.0 };
let frames_count = pose_result.frames.len();
tracing::info!(
"Storing {} Pose pre-chunks for video {}",
frames_count,
uuid
);
for (i, frame) in pose_result.frames.iter().enumerate() {
let mut chunk = Chunk::new(
file_id as i32,
uuid.to_string(),
i as u32,
ChunkType::Trace,
ChunkRule::Rule1,
frame.frame as i64,
frame.frame as i64 + 1,
fps,
serde_json::json!({
"persons": frame.persons,
"timestamp": frame.timestamp,
}),
);
// Override chunk_id to include processor prefix for uniqueness
chunk.chunk_id = format!("trace_pose_{:04}", i);
let mut pre_chunks_to_store = Vec::new();
// Populate text_content for BM25 search (person count indicator)
let person_count = frame.persons.len();
if person_count > 0 {
let text = format!("person person person")
.repeat(person_count.min(10))
.trim()
.to_string();
chunk = chunk.with_text_content(text);
}
for frame in pose_result.frames.iter() {
let data = serde_json::json!({
"persons": frame.persons,
"timestamp": frame.timestamp,
});
match db.store_chunk(&chunk).await {
Ok(_) => {
tracing::info!(
"Stored POSE chunk {} (frame {}) for video {}",
i,
frame.frame,
uuid
);
}
Err(e) => {
tracing::error!("Failed to store POSE chunk {}: {}", i, e);
}
}
pre_chunks_to_store.push((frame.frame as i64, Some(frame.timestamp), data, None, None));
}
db.store_raw_pre_chunks_batch(uuid, "pose", &pre_chunks_to_store)
.await?;
Ok(())
}
@@ -844,58 +774,29 @@ impl ProcessorPool {
uuid: &str,
asrx_result: &AsrxResult,
) -> Result<()> {
// Get video record to obtain file_id and fps
let video = match db.get_video_by_uuid(uuid).await {
Ok(Some(video)) => video,
Ok(None) => {
tracing::error!("Video not found for uuid: {}", uuid);
return Ok(());
}
Err(e) => {
tracing::error!("Failed to get video for uuid {}: {}", uuid, e);
return Ok(());
}
};
let file_id = video.id;
let fps = if video.fps > 0.0 { video.fps } else { 30.0 };
let segments_count = asrx_result.segments.len();
tracing::info!(
"Storing {} ASRX pre-chunks for video {}",
segments_count,
uuid
);
let mut pre_chunks_to_store = Vec::new();
for (i, segment) in asrx_result.segments.iter().enumerate() {
let mut chunk = Chunk::from_seconds(
file_id as i32,
uuid.to_string(),
i as u32,
ChunkType::Trace,
ChunkRule::Rule1,
segment.start,
segment.end,
fps,
serde_json::json!({
"text": segment.text,
"timestamp": segment.start,
}),
);
// Override chunk_id to include processor prefix for uniqueness
chunk.chunk_id = format!("trace_asrx_{:04}", i);
// Populate text_content for BM25 search (already has text)
chunk = chunk.with_text_content(segment.text.clone());
// Also store speaker_id in content
chunk.content = serde_json::json!({
let data = serde_json::json!({
"text": segment.text,
"speaker_id": segment.speaker_id,
"timestamp": segment.start,
});
match db.store_chunk(&chunk).await {
Ok(_) => {
tracing::info!("Stored ASRX chunk {} for video {}", i, uuid);
}
Err(e) => {
tracing::error!("Failed to store ASRX chunk {}: {}", i, e);
}
}
// ASRX is time-based, so we use segment index or start time as coordinate.
// Let's use index for simplicity in pre_chunks, or start time.
pre_chunks_to_store.push((i as i64, Some(segment.start), data, None, None));
}
db.store_raw_pre_chunks_batch(uuid, "asrx", &pre_chunks_to_store)
.await?;
Ok(())
}
@@ -917,6 +818,52 @@ impl ProcessorPool {
Ok(())
}
pub async fn store_scene_chunks(
db: &PostgresDb,
uuid: &str,
scene_result: &SceneClassificationResult,
) -> Result<()> {
let video = db
.get_video_by_uuid(uuid)
.await?
.ok_or_else(|| anyhow::anyhow!("Video not found for uuid: {}", uuid))?;
let fps = if video.fps > 0.0 { video.fps } else { 30.0 };
let scenes: Vec<(i64, i64, i64, f64, f64, serde_json::Value)> = scene_result
.scenes
.iter()
.enumerate()
.map(|(i, scene)| {
let start_frame = (scene.start_time * fps).round() as i64;
let end_frame = (scene.end_time * fps).round() as i64;
let data = serde_json::json!({
"scene_type": scene.scene_type,
"scene_type_zh": scene.scene_type_zh,
"confidence": scene.confidence,
"top_5": scene.top_5,
});
(
i as i64,
start_frame,
end_frame,
scene.start_time,
scene.end_time,
data,
)
})
.collect();
db.store_scene_pre_chunks_batch(uuid, &scenes).await?;
tracing::info!(
"Stored {} Scene pre-chunks for video {}",
scenes.len(),
uuid
);
Ok(())
}
pub async fn get_running_count(&self) -> usize {
*self.running_count.read().await
}