refactor: remove Rule 3, Story, and Caption processors

- Remove Rule 3 (Scene Chunking) from worker auto-trigger
- Remove rule3_ingest.rs and related imports
- Remove Story/Caption from playground module parsing
- Clean up scan.rs Rule 3 display
- Fix ASRX field name conversion (start_time -> start)

Reason: Story/5W1H/Scene accuracy too poor - will redesign later
This commit is contained in:
Accusys
2026-06-22 15:34:02 +08:00
parent 22f13eca4b
commit 70e849d3ae
6 changed files with 46 additions and 372 deletions

View File

@@ -236,7 +236,16 @@ def process_asrx(video_path: str, output_path: str, uuid: str = "",
try:
with open(asr_path) as f:
asr_data = json.load(f)
asr_segments = asr_data.get("segments", [])
raw_segments = asr_data.get("segments", [])
# Convert field names: start_time/end_time -> start/end
asr_segments = []
for seg in raw_segments:
converted = {
"start": seg.get("start_time", seg.get("start", 0)),
"end": seg.get("end_time", seg.get("end", 0)),
"text": seg.get("text", ""),
}
asr_segments.append(converted)
if asr_segments:
print(f"[ASRX] Loaded {len(asr_segments)} ASR segments from {asr_path}")
except Exception as e:

View File

@@ -462,14 +462,9 @@ async fn get_ingestion_status(
"auto_vectorize",
sentence_embedded > 0,
Some(format!("{sentence_embedded} embedded"))
),
step!(
"rule3_scene",
scene_count > 0,
Some(format!("{scene_count} scene chunks"))
),
step!(
"face_track",
),
step!(
"face_track",
trace_count > 0,
Some(format!("{trace_count} traces / {face_total} detections"))
),

View File

@@ -1,13 +1,11 @@
pub mod rule1_ingest;
pub mod rule2_ingest;
pub mod rule3_ingest;
pub mod splitter;
pub mod trace_ingest;
pub mod types;
pub use rule1_ingest::execute_rule1;
pub use rule2_ingest::ingest_rule2;
pub use rule3_ingest::ingest_rule3;
pub use splitter::{AsrSegment, ChunkSplitter};
pub use trace_ingest::ingest_traces;
pub use types::{Chunk, ChunkType};

View File

@@ -1,171 +0,0 @@
use crate::core::config::OUTPUT_DIR;
use crate::core::db::schema;
use anyhow::{Context, Result};
use serde::Deserialize;
use sqlx::PgPool;
use std::fs;
use tracing::{info, warn};
#[derive(Debug, Deserialize)]
struct CutScene {
scene_number: u32,
start_frame: u64,
end_frame: u64,
start_time: f64,
end_time: f64,
}
#[derive(Debug, Deserialize)]
struct CutResult {
scenes: Vec<CutScene>,
}
#[derive(Debug, Deserialize)]
struct AsrSegment {
#[serde(alias = "start")]
start_time: f64,
#[serde(alias = "end")]
end_time: f64,
text: String,
}
/// Executes Rule 3 Ingestion: Scene-based Chunking with LLM 5W1H+ Summary.
/// 1. Reads CUT data to identify scenes.
/// 2. Aggregates Rule 1 (Sentence) chunks falling within each scene.
/// 3. Calls LLM to generate 5W1H+ summary.
/// 4. Inserts parent chunks into `dev.chunks`.
pub async fn ingest_rule3(pool: &PgPool, file_uuid: &str) -> Result<usize> {
let cut_path = format!("{}/{}.cut.json", *OUTPUT_DIR, file_uuid);
let asr_path = format!("{}/{}.asr.json", *OUTPUT_DIR, file_uuid);
// 1. Load CUT and ASR data
let cut_content = fs::read_to_string(&cut_path)
.with_context(|| format!("Failed to read CUT file: {}", cut_path))?;
let cut_result: CutResult = serde_json::from_str(&cut_content).context("Invalid CUT JSON")?;
let asr_segments: Vec<AsrSegment> = match fs::read_to_string(&asr_path) {
Ok(content) => serde_json::from_str(&content).unwrap_or_default(),
Err(_) => {
warn!("ASR file not found, proceeding with empty transcript for scenes");
vec![]
}
};
let mut count = 0;
let mut tx = pool.begin().await?;
// 2. Process each scene
for scene in &cut_result.scenes {
let chunk_id = format!("scene_{}", scene.scene_number);
// Aggregate text from Rule 1 chunks
let mut scene_text = String::new();
let mut child_ids: Vec<String> = Vec::new();
for seg in &asr_segments {
if seg.start_time >= scene.start_time && seg.end_time <= scene.end_time {
scene_text.push_str(&seg.text);
scene_text.push(' ');
// We'll look up the chunk_id from Rule 1 later if needed,
// but for now we just group by text overlap.
// A better approach is to query Rule 1 table for this range.
}
}
// Query chunks table for Rule 1 sentence chunks
let chunk_table = schema::table_name("chunk");
let rule1_rows: Vec<(String,)> = sqlx::query_as(&format!(
"SELECT chunk_id FROM {} \
WHERE file_uuid = $1 AND chunk_type = 'sentence' \
AND start_frame >= $2 \
AND end_frame <= $3",
chunk_table
))
.bind(file_uuid)
.bind(scene.start_frame as i64)
.bind(scene.end_frame as i64)
.fetch_all(&mut *tx)
.await?;
for row in &rule1_rows {
child_ids.push(row.0.clone());
}
// Fallback to simple aggregation if query didn't get text (due to frame boundaries)
if scene_text.is_empty() {
// Try to grab text directly if rule1 table doesn't have it or boundaries differ
// But rule1 table has start_frame/end_frame which should match.
// Let's re-query text directly.
}
let texts: Vec<String> = sqlx::query_scalar(&format!(
"SELECT text_content FROM {} \
WHERE file_uuid = $1 AND chunk_type = 'sentence' \
AND start_frame >= $2 \
AND end_frame <= $3 \
ORDER BY start_frame ASC",
chunk_table
))
.bind(file_uuid)
.bind(scene.start_frame as i64)
.bind(scene.end_frame as i64)
.fetch_all(&mut *tx)
.await?;
let aggregated_text = texts.join(" ");
info!(
"Scene {}: {} -> {} ({} sentences)",
scene.scene_number,
scene.start_time,
scene.end_time,
texts.len()
);
// 4. Insert into dev.chunks
let video_table = schema::table_name("videos");
let fps_query: Option<f64> = sqlx::query_scalar(&format!(
"SELECT fps FROM {} WHERE file_uuid = $1",
video_table
))
.bind(file_uuid)
.fetch_optional(&mut *tx)
.await?;
let fps = fps_query.unwrap_or(29.97);
// Prepare metadata JSON
let metadata = serde_json::json!({
"type": "scene",
"scene_number": scene.scene_number
});
sqlx::query(&format!(
"INSERT INTO {} (file_uuid, chunk_id, chunk_type, \
start_time, end_time, fps, start_frame, end_frame, \
content, text_content, summary_text, metadata, child_chunk_ids) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) \
ON CONFLICT (file_uuid, chunk_id) DO NOTHING",
chunk_table
))
.bind(file_uuid)
.bind(&chunk_id)
.bind("cut")
.bind(scene.start_time)
.bind(scene.end_time)
.bind(fps)
.bind(scene.start_frame as i64)
.bind(scene.end_frame as i64)
.bind(&metadata)
.bind(&aggregated_text)
.bind(&String::new())
.bind(&metadata)
.bind(&child_ids)
.execute(&mut *tx)
.await?;
count += 1;
}
tx.commit().await?;
Ok(count)
}

View File

@@ -1067,15 +1067,13 @@ async fn main() -> Result<()> {
"cut" => Some(ProcessorType::Cut),
"asrx" => Some(ProcessorType::Asrx),
"yolo" => Some(ProcessorType::Yolo),
"ocr" => Some(ProcessorType::Ocr),
"face" => Some(ProcessorType::Face),
"pose" => Some(ProcessorType::Pose),
"story" => Some(ProcessorType::Story),
"caption" => Some(ProcessorType::Caption),
_ => {
eprintln!("Unknown module: {}", name);
None
}
"ocr" => Some(ProcessorType::Ocr),
"face" => Some(ProcessorType::Face),
"pose" => Some(ProcessorType::Pose),
_ => {
eprintln!("Unknown module: {}", name);
None
}
}
})
.collect()
@@ -1095,14 +1093,12 @@ async fn main() -> Result<()> {
"asrx" => Some(ProcessorType::Asrx),
"yolo" => Some(ProcessorType::Yolo),
"ocr" => Some(ProcessorType::Ocr),
"face" => Some(ProcessorType::Face),
"pose" => Some(ProcessorType::Pose),
"story" => Some(ProcessorType::Story),
"caption" => Some(ProcessorType::Caption),
_ => {
eprintln!("Unknown cloud module: {}", name);
None
}
"face" => Some(ProcessorType::Face),
"pose" => Some(ProcessorType::Pose),
_ => {
eprintln!("Unknown cloud module: {}", name);
None
}
}
})
.collect()
@@ -1732,127 +1728,9 @@ async fn main() -> Result<()> {
}
}
}
}
}
// Process Story (video narrative)
if should_process(ProcessorType::Story) {
let story_path = output_dir.get_output_path(&uuid, "story.json");
let decision = decide_processing(&story_path, force, resume);
match decision {
ProcessingDecision::SkipComplete => {
println!("\nStory: ✓ Already complete, skipping");
}
ProcessingDecision::ForceReprocess => {
println!("\nStory: ⟳ Force reprocessing from scratch...");
std::fs::remove_file(&story_path).ok();
if is_cloud(ProcessorType::Story) {
println!(" [Cloud processing not implemented yet - run locally]");
} else {
process_story_module(
&story_path,
video_path,
&uuid,
&progress_state,
&ui,
)
.await?;
}
}
ProcessingDecision::ResumePartial => {
println!("\nStory: ↻ Resuming from checkpoint...");
if is_cloud(ProcessorType::Story) {
println!(" [Cloud processing not implemented yet - run locally]");
} else {
process_story_module(
&story_path,
video_path,
&uuid,
&progress_state,
&ui,
)
.await?;
}
}
ProcessingDecision::Process => {
if is_cloud(ProcessorType::Story) {
println!("\nStory: ☁️ Running via cloud...");
println!(" [Cloud processing not implemented yet - run locally]");
} else {
println!("\nStory: ⚙️ Processing...");
process_story_module(
&story_path,
video_path,
&uuid,
&progress_state,
&ui,
)
.await?;
}
}
}
}
// Process Caption (image captions)
if should_process(ProcessorType::Caption) {
let caption_path = output_dir.get_output_path(&uuid, "caption.json");
let decision = decide_processing(&caption_path, force, resume);
match decision {
ProcessingDecision::SkipComplete => {
println!("\nCaption: ✓ Already complete, skipping");
}
ProcessingDecision::ForceReprocess => {
println!("\nCaption: ⟳ Force reprocessing from scratch...");
std::fs::remove_file(&caption_path).ok();
if is_cloud(ProcessorType::Caption) {
println!(" [Cloud processing not implemented yet - run locally]");
} else {
process_caption_module(
&caption_path,
video_path,
&uuid,
&progress_state,
&ui,
)
.await?;
}
}
ProcessingDecision::ResumePartial => {
println!("\nCaption: ↻ Resuming from checkpoint...");
if is_cloud(ProcessorType::Caption) {
println!(" [Cloud processing not implemented yet - run locally]");
} else {
process_caption_module(
&caption_path,
video_path,
&uuid,
&progress_state,
&ui,
)
.await?;
}
}
ProcessingDecision::Process => {
if is_cloud(ProcessorType::Caption) {
println!("\nCaption: ☁️ Running via cloud...");
println!(" [Cloud processing not implemented yet - run locally]");
} else {
println!("\nCaption: ⚙️ Processing...");
process_caption_module(
&caption_path,
video_path,
&uuid,
&progress_state,
&ui,
)
.await?;
}
}
}
}
// TODO: Store pre_chunks and frames to database
// TODO: Store pre_chunks and frames to database
// Stop Redis subscriber
redis_handle.abort();
@@ -1889,18 +1767,10 @@ async fn main() -> Result<()> {
if should_process(ProcessorType::Appearance) {
let path = output_dir.get_output_path(&uuid, "appearance.json");
println!(" - Appearance JSON: {}", path.display());
}
if should_process(ProcessorType::Story) {
let path = output_dir.get_output_path(&uuid, "story.json");
println!(" - Story JSON: {}", path.display());
}
if should_process(ProcessorType::Caption) {
let path = output_dir.get_output_path(&uuid, "caption.json");
println!(" - Caption JSON: {}", path.display());
}
}
Ok(())
}
Ok(())
}
Commands::Chunk { uuid } => {
println!("Chunking: {}", uuid);

View File

@@ -7,7 +7,7 @@ use tokio::time::sleep;
use tracing::{debug, error, info, warn};
use crate::api::identity_agent_api::run_identity_agent;
use crate::core::chunk::{rule1_ingest, rule3_ingest};
use crate::core::chunk::rule1_ingest;
use crate::core::config::OUTPUT_DIR;
use crate::core::db::qdrant_db::QdrantDb;
use crate::core::db::{
@@ -1023,21 +1023,10 @@ impl JobWorker {
.fetch_optional(pool)
.await
.unwrap_or(None)
.unwrap_or(0)
.unwrap_or(0)
> 0;
let rule3 = !has_cut
|| sqlx::query_scalar::<_, i32>(&format!(
"SELECT 1 FROM {chunk_t} WHERE file_uuid = $1 AND chunk_type = 'cut' LIMIT 1"
))
.bind(uuid)
.fetch_optional(pool)
.await
.unwrap_or(None)
.unwrap_or(0)
> 0;
let trace = !has_face
let trace = !has_face
|| sqlx::query_scalar::<_, i64>(&format!(
"SELECT COUNT(DISTINCT trace_id) FROM {fd_t} WHERE file_uuid = $1 AND trace_id IS NOT NULL"
))
@@ -1047,14 +1036,13 @@ impl JobWorker {
.unwrap_or(0)
> 0;
let all_ok = rule1 && vector && rule3 && trace;
if !all_ok {
tracing::info!(
"[Ingestion] waiting (uuid={}): rule1={} vector={} rule3={} trace={}",
uuid,
rule1,
vector,
rule3,
let all_ok = rule1 && vector && trace;
if !all_ok {
tracing::info!(
"[Ingestion] waiting (uuid={}): rule1={} vector={} trace={}",
uuid,
rule1,
vector,
trace
);
}
@@ -1227,27 +1215,12 @@ impl JobWorker {
Ok(None) => error!("Video not found for chunking: {}", uuid_clone),
Err(e) => error!("Failed to get video info for chunking: {}", e),
}
});
}
}
});
}
}
if all_completed {
if has_cut {
info!("📝 Prerequisites met for Rule 3 Scene Chunking. Starting ingestion...");
let db_clone = self.db.clone();
let uuid_clone = uuid.to_string();
tokio::spawn(async move {
match rule3_ingest::ingest_rule3(db_clone.pool(), &uuid_clone).await {
Ok(count) => info!(
"✅ Rule 3 Scene Ingestion completed: {} scenes processed.",
count
),
Err(e) => error!("❌ Rule 3 Scene Ingestion failed: {}", e),
}
});
}
// 🚀 P2 Trigger: Face Trace + DB Store (after Face)
if all_completed {
// 🚀 P2 Trigger: Face Trace + DB Store (after Face)
// Runs face_tracker.py (IoU+embedding tracking), stores trace_id + position in DB
if has_face {
info!("📝 Face completed, triggering face trace + DB store...");