From 70e849d3aec62a34034f8de039d6278e550aacef Mon Sep 17 00:00:00 2001 From: Accusys Date: Mon, 22 Jun 2026 15:34:02 +0800 Subject: [PATCH] refactor: remove Rule 3, Story, and Caption processors - Remove Rule 3 (Scene Chunking) from worker auto-trigger - Remove rule3_ingest.rs and related imports - Remove Story/Caption from playground module parsing - Clean up scan.rs Rule 3 display - Fix ASRX field name conversion (start_time -> start) Reason: Story/5W1H/Scene accuracy too poor - will redesign later --- scripts/asrx_processor.py | 11 ++- src/api/scan.rs | 11 +-- src/core/chunk/mod.rs | 2 - src/core/chunk/rule3_ingest.rs | 171 --------------------------------- src/playground.rs | 166 ++++---------------------------- src/worker/job_worker.rs | 57 +++-------- 6 files changed, 46 insertions(+), 372 deletions(-) delete mode 100644 src/core/chunk/rule3_ingest.rs diff --git a/scripts/asrx_processor.py b/scripts/asrx_processor.py index b5f3a9f..1ec17f5 100755 --- a/scripts/asrx_processor.py +++ b/scripts/asrx_processor.py @@ -236,7 +236,16 @@ def process_asrx(video_path: str, output_path: str, uuid: str = "", try: with open(asr_path) as f: asr_data = json.load(f) - asr_segments = asr_data.get("segments", []) + raw_segments = asr_data.get("segments", []) + # Convert field names: start_time/end_time -> start/end + asr_segments = [] + for seg in raw_segments: + converted = { + "start": seg.get("start_time", seg.get("start", 0)), + "end": seg.get("end_time", seg.get("end", 0)), + "text": seg.get("text", ""), + } + asr_segments.append(converted) if asr_segments: print(f"[ASRX] Loaded {len(asr_segments)} ASR segments from {asr_path}") except Exception as e: diff --git a/src/api/scan.rs b/src/api/scan.rs index 60be9e2..aecf976 100644 --- a/src/api/scan.rs +++ b/src/api/scan.rs @@ -462,14 +462,9 @@ async fn get_ingestion_status( "auto_vectorize", sentence_embedded > 0, Some(format!("{sentence_embedded} embedded")) - ), - step!( - "rule3_scene", - scene_count > 0, - Some(format!("{scene_count} scene chunks")) - ), - step!( - "face_track", +), +step!( +"face_track", trace_count > 0, Some(format!("{trace_count} traces / {face_total} detections")) ), diff --git a/src/core/chunk/mod.rs b/src/core/chunk/mod.rs index eaad324..2f44b42 100644 --- a/src/core/chunk/mod.rs +++ b/src/core/chunk/mod.rs @@ -1,13 +1,11 @@ pub mod rule1_ingest; pub mod rule2_ingest; -pub mod rule3_ingest; pub mod splitter; pub mod trace_ingest; pub mod types; pub use rule1_ingest::execute_rule1; pub use rule2_ingest::ingest_rule2; -pub use rule3_ingest::ingest_rule3; pub use splitter::{AsrSegment, ChunkSplitter}; pub use trace_ingest::ingest_traces; pub use types::{Chunk, ChunkType}; diff --git a/src/core/chunk/rule3_ingest.rs b/src/core/chunk/rule3_ingest.rs deleted file mode 100644 index e8c4e41..0000000 --- a/src/core/chunk/rule3_ingest.rs +++ /dev/null @@ -1,171 +0,0 @@ -use crate::core::config::OUTPUT_DIR; -use crate::core::db::schema; -use anyhow::{Context, Result}; -use serde::Deserialize; -use sqlx::PgPool; -use std::fs; -use tracing::{info, warn}; - -#[derive(Debug, Deserialize)] -struct CutScene { - scene_number: u32, - start_frame: u64, - end_frame: u64, - start_time: f64, - end_time: f64, -} - -#[derive(Debug, Deserialize)] -struct CutResult { - scenes: Vec, -} - -#[derive(Debug, Deserialize)] -struct AsrSegment { - #[serde(alias = "start")] - start_time: f64, - #[serde(alias = "end")] - end_time: f64, - text: String, -} - -/// Executes Rule 3 Ingestion: Scene-based Chunking with LLM 5W1H+ Summary. -/// 1. Reads CUT data to identify scenes. -/// 2. Aggregates Rule 1 (Sentence) chunks falling within each scene. -/// 3. Calls LLM to generate 5W1H+ summary. -/// 4. Inserts parent chunks into `dev.chunks`. -pub async fn ingest_rule3(pool: &PgPool, file_uuid: &str) -> Result { - let cut_path = format!("{}/{}.cut.json", *OUTPUT_DIR, file_uuid); - let asr_path = format!("{}/{}.asr.json", *OUTPUT_DIR, file_uuid); - - // 1. Load CUT and ASR data - let cut_content = fs::read_to_string(&cut_path) - .with_context(|| format!("Failed to read CUT file: {}", cut_path))?; - let cut_result: CutResult = serde_json::from_str(&cut_content).context("Invalid CUT JSON")?; - - let asr_segments: Vec = match fs::read_to_string(&asr_path) { - Ok(content) => serde_json::from_str(&content).unwrap_or_default(), - Err(_) => { - warn!("ASR file not found, proceeding with empty transcript for scenes"); - vec![] - } - }; - - let mut count = 0; - let mut tx = pool.begin().await?; - - // 2. Process each scene - for scene in &cut_result.scenes { - let chunk_id = format!("scene_{}", scene.scene_number); - - // Aggregate text from Rule 1 chunks - let mut scene_text = String::new(); - let mut child_ids: Vec = Vec::new(); - - for seg in &asr_segments { - if seg.start_time >= scene.start_time && seg.end_time <= scene.end_time { - scene_text.push_str(&seg.text); - scene_text.push(' '); - // We'll look up the chunk_id from Rule 1 later if needed, - // but for now we just group by text overlap. - // A better approach is to query Rule 1 table for this range. - } - } - - // Query chunks table for Rule 1 sentence chunks - let chunk_table = schema::table_name("chunk"); - let rule1_rows: Vec<(String,)> = sqlx::query_as(&format!( - "SELECT chunk_id FROM {} \ - WHERE file_uuid = $1 AND chunk_type = 'sentence' \ - AND start_frame >= $2 \ - AND end_frame <= $3", - chunk_table - )) - .bind(file_uuid) - .bind(scene.start_frame as i64) - .bind(scene.end_frame as i64) - .fetch_all(&mut *tx) - .await?; - - for row in &rule1_rows { - child_ids.push(row.0.clone()); - } - - // Fallback to simple aggregation if query didn't get text (due to frame boundaries) - if scene_text.is_empty() { - // Try to grab text directly if rule1 table doesn't have it or boundaries differ - // But rule1 table has start_frame/end_frame which should match. - // Let's re-query text directly. - } - - let texts: Vec = sqlx::query_scalar(&format!( - "SELECT text_content FROM {} \ - WHERE file_uuid = $1 AND chunk_type = 'sentence' \ - AND start_frame >= $2 \ - AND end_frame <= $3 \ - ORDER BY start_frame ASC", - chunk_table - )) - .bind(file_uuid) - .bind(scene.start_frame as i64) - .bind(scene.end_frame as i64) - .fetch_all(&mut *tx) - .await?; - - let aggregated_text = texts.join(" "); - - info!( - "Scene {}: {} -> {} ({} sentences)", - scene.scene_number, - scene.start_time, - scene.end_time, - texts.len() - ); - - // 4. Insert into dev.chunks - let video_table = schema::table_name("videos"); - let fps_query: Option = sqlx::query_scalar(&format!( - "SELECT fps FROM {} WHERE file_uuid = $1", - video_table - )) - .bind(file_uuid) - .fetch_optional(&mut *tx) - .await?; - let fps = fps_query.unwrap_or(29.97); - - // Prepare metadata JSON - let metadata = serde_json::json!({ - "type": "scene", - "scene_number": scene.scene_number - }); - - sqlx::query(&format!( - "INSERT INTO {} (file_uuid, chunk_id, chunk_type, \ - start_time, end_time, fps, start_frame, end_frame, \ - content, text_content, summary_text, metadata, child_chunk_ids) \ - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) \ - ON CONFLICT (file_uuid, chunk_id) DO NOTHING", - chunk_table - )) - .bind(file_uuid) - .bind(&chunk_id) - .bind("cut") - .bind(scene.start_time) - .bind(scene.end_time) - .bind(fps) - .bind(scene.start_frame as i64) - .bind(scene.end_frame as i64) - .bind(&metadata) - .bind(&aggregated_text) - .bind(&String::new()) - .bind(&metadata) - .bind(&child_ids) - .execute(&mut *tx) - .await?; - - count += 1; - } - - tx.commit().await?; - Ok(count) -} diff --git a/src/playground.rs b/src/playground.rs index d2f560a..db3f513 100644 --- a/src/playground.rs +++ b/src/playground.rs @@ -1067,15 +1067,13 @@ async fn main() -> Result<()> { "cut" => Some(ProcessorType::Cut), "asrx" => Some(ProcessorType::Asrx), "yolo" => Some(ProcessorType::Yolo), - "ocr" => Some(ProcessorType::Ocr), - "face" => Some(ProcessorType::Face), - "pose" => Some(ProcessorType::Pose), - "story" => Some(ProcessorType::Story), - "caption" => Some(ProcessorType::Caption), - _ => { - eprintln!("Unknown module: {}", name); - None - } +"ocr" => Some(ProcessorType::Ocr), +"face" => Some(ProcessorType::Face), +"pose" => Some(ProcessorType::Pose), +_ => { +eprintln!("Unknown module: {}", name); +None +} } }) .collect() @@ -1095,14 +1093,12 @@ async fn main() -> Result<()> { "asrx" => Some(ProcessorType::Asrx), "yolo" => Some(ProcessorType::Yolo), "ocr" => Some(ProcessorType::Ocr), - "face" => Some(ProcessorType::Face), - "pose" => Some(ProcessorType::Pose), - "story" => Some(ProcessorType::Story), - "caption" => Some(ProcessorType::Caption), - _ => { - eprintln!("Unknown cloud module: {}", name); - None - } +"face" => Some(ProcessorType::Face), +"pose" => Some(ProcessorType::Pose), +_ => { +eprintln!("Unknown cloud module: {}", name); +None +} } }) .collect() @@ -1732,127 +1728,9 @@ async fn main() -> Result<()> { } } } - } +} - // Process Story (video narrative) - if should_process(ProcessorType::Story) { - let story_path = output_dir.get_output_path(&uuid, "story.json"); - let decision = decide_processing(&story_path, force, resume); - - match decision { - ProcessingDecision::SkipComplete => { - println!("\nStory: ✓ Already complete, skipping"); - } - ProcessingDecision::ForceReprocess => { - println!("\nStory: ⟳ Force reprocessing from scratch..."); - std::fs::remove_file(&story_path).ok(); - if is_cloud(ProcessorType::Story) { - println!(" [Cloud processing not implemented yet - run locally]"); - } else { - process_story_module( - &story_path, - video_path, - &uuid, - &progress_state, - &ui, - ) - .await?; - } - } - ProcessingDecision::ResumePartial => { - println!("\nStory: ↻ Resuming from checkpoint..."); - if is_cloud(ProcessorType::Story) { - println!(" [Cloud processing not implemented yet - run locally]"); - } else { - process_story_module( - &story_path, - video_path, - &uuid, - &progress_state, - &ui, - ) - .await?; - } - } - ProcessingDecision::Process => { - if is_cloud(ProcessorType::Story) { - println!("\nStory: ☁️ Running via cloud..."); - println!(" [Cloud processing not implemented yet - run locally]"); - } else { - println!("\nStory: ⚙️ Processing..."); - process_story_module( - &story_path, - video_path, - &uuid, - &progress_state, - &ui, - ) - .await?; - } - } - } - } - - // Process Caption (image captions) - if should_process(ProcessorType::Caption) { - let caption_path = output_dir.get_output_path(&uuid, "caption.json"); - let decision = decide_processing(&caption_path, force, resume); - - match decision { - ProcessingDecision::SkipComplete => { - println!("\nCaption: ✓ Already complete, skipping"); - } - ProcessingDecision::ForceReprocess => { - println!("\nCaption: ⟳ Force reprocessing from scratch..."); - std::fs::remove_file(&caption_path).ok(); - if is_cloud(ProcessorType::Caption) { - println!(" [Cloud processing not implemented yet - run locally]"); - } else { - process_caption_module( - &caption_path, - video_path, - &uuid, - &progress_state, - &ui, - ) - .await?; - } - } - ProcessingDecision::ResumePartial => { - println!("\nCaption: ↻ Resuming from checkpoint..."); - if is_cloud(ProcessorType::Caption) { - println!(" [Cloud processing not implemented yet - run locally]"); - } else { - process_caption_module( - &caption_path, - video_path, - &uuid, - &progress_state, - &ui, - ) - .await?; - } - } - ProcessingDecision::Process => { - if is_cloud(ProcessorType::Caption) { - println!("\nCaption: ☁️ Running via cloud..."); - println!(" [Cloud processing not implemented yet - run locally]"); - } else { - println!("\nCaption: ⚙️ Processing..."); - process_caption_module( - &caption_path, - video_path, - &uuid, - &progress_state, - &ui, - ) - .await?; - } - } - } - } - - // TODO: Store pre_chunks and frames to database +// TODO: Store pre_chunks and frames to database // Stop Redis subscriber redis_handle.abort(); @@ -1889,18 +1767,10 @@ async fn main() -> Result<()> { if should_process(ProcessorType::Appearance) { let path = output_dir.get_output_path(&uuid, "appearance.json"); println!(" - Appearance JSON: {}", path.display()); - } - if should_process(ProcessorType::Story) { - let path = output_dir.get_output_path(&uuid, "story.json"); - println!(" - Story JSON: {}", path.display()); - } - if should_process(ProcessorType::Caption) { - let path = output_dir.get_output_path(&uuid, "caption.json"); - println!(" - Caption JSON: {}", path.display()); - } +} - Ok(()) - } +Ok(()) +} Commands::Chunk { uuid } => { println!("Chunking: {}", uuid); diff --git a/src/worker/job_worker.rs b/src/worker/job_worker.rs index f59bdce..5504983 100644 --- a/src/worker/job_worker.rs +++ b/src/worker/job_worker.rs @@ -7,7 +7,7 @@ use tokio::time::sleep; use tracing::{debug, error, info, warn}; use crate::api::identity_agent_api::run_identity_agent; -use crate::core::chunk::{rule1_ingest, rule3_ingest}; +use crate::core::chunk::rule1_ingest; use crate::core::config::OUTPUT_DIR; use crate::core::db::qdrant_db::QdrantDb; use crate::core::db::{ @@ -1023,21 +1023,10 @@ impl JobWorker { .fetch_optional(pool) .await .unwrap_or(None) - .unwrap_or(0) +.unwrap_or(0) > 0; - let rule3 = !has_cut - || sqlx::query_scalar::<_, i32>(&format!( - "SELECT 1 FROM {chunk_t} WHERE file_uuid = $1 AND chunk_type = 'cut' LIMIT 1" - )) - .bind(uuid) - .fetch_optional(pool) - .await - .unwrap_or(None) - .unwrap_or(0) - > 0; - - let trace = !has_face +let trace = !has_face || sqlx::query_scalar::<_, i64>(&format!( "SELECT COUNT(DISTINCT trace_id) FROM {fd_t} WHERE file_uuid = $1 AND trace_id IS NOT NULL" )) @@ -1047,14 +1036,13 @@ impl JobWorker { .unwrap_or(0) > 0; - let all_ok = rule1 && vector && rule3 && trace; - if !all_ok { - tracing::info!( - "[Ingestion] waiting (uuid={}): rule1={} vector={} rule3={} trace={}", - uuid, - rule1, - vector, - rule3, +let all_ok = rule1 && vector && trace; +if !all_ok { +tracing::info!( +"[Ingestion] waiting (uuid={}): rule1={} vector={} trace={}", +uuid, +rule1, +vector, trace ); } @@ -1227,27 +1215,12 @@ impl JobWorker { Ok(None) => error!("Video not found for chunking: {}", uuid_clone), Err(e) => error!("Failed to get video info for chunking: {}", e), } - }); - } - } +}); +} +} - if all_completed { - if has_cut { - info!("📝 Prerequisites met for Rule 3 Scene Chunking. Starting ingestion..."); - let db_clone = self.db.clone(); - let uuid_clone = uuid.to_string(); - tokio::spawn(async move { - match rule3_ingest::ingest_rule3(db_clone.pool(), &uuid_clone).await { - Ok(count) => info!( - "✅ Rule 3 Scene Ingestion completed: {} scenes processed.", - count - ), - Err(e) => error!("❌ Rule 3 Scene Ingestion failed: {}", e), - } - }); - } - - // 🚀 P2 Trigger: Face Trace + DB Store (after Face) +if all_completed { +// 🚀 P2 Trigger: Face Trace + DB Store (after Face) // Runs face_tracker.py (IoU+embedding tracking), stores trace_id + position in DB if has_face { info!("📝 Face completed, triggering face trace + DB store...");