diff --git a/migrations/034_processor_state_machine.sql b/migrations/034_processor_state_machine.sql new file mode 100644 index 0000000..6dedcc9 --- /dev/null +++ b/migrations/034_processor_state_machine.sql @@ -0,0 +1,23 @@ +-- Migration: 034_processor_state_machine +-- Purpose: Add processor_alerts table for State Machine alert mechanism +-- Date: 2026-05-30 + +-- Create processor_alerts table +CREATE TABLE IF NOT EXISTS processor_alerts ( + id SERIAL PRIMARY KEY, + file_uuid VARCHAR(32), + processor_type VARCHAR(32) NOT NULL, + alert_type VARCHAR(32) NOT NULL, + message TEXT, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +-- Create indexes for efficient querying +CREATE INDEX IF NOT EXISTS idx_alerts_file_uuid ON processor_alerts(file_uuid); +CREATE INDEX IF NOT EXISTS idx_alerts_processor_type ON processor_alerts(processor_type); +CREATE INDEX IF NOT EXISTS idx_alerts_alert_type ON processor_alerts(alert_type); +CREATE INDEX IF NOT EXISTS idx_alerts_created_at ON processor_alerts(created_at); + +-- Add comments +COMMENT ON TABLE processor_alerts IS 'Processor state machine alerts for dependency/resource/output issues'; +COMMENT ON COLUMN processor_alerts.alert_type IS 'Alert types: dependency_not_met, resource_exhausted, output_invalid, timeout'; \ No newline at end of file diff --git a/src/core/db/postgres_db.rs b/src/core/db/postgres_db.rs index 9eb1be0..2e709df 100644 --- a/src/core/db/postgres_db.rs +++ b/src/core/db/postgres_db.rs @@ -34,7 +34,7 @@ pub struct IdentityRecord { pub uuid: Uuid, pub name: String, pub metadata: serde_json::Value, - pub created_at: Option>, + pub created_at: Option, } #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] @@ -141,6 +141,26 @@ pub struct IdentityFaceRecord { pub confidence: f64, } +#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] +pub struct FileFaceRecord { + pub id: i64, + pub file_uuid: String, + pub frame_number: i64, + pub timestamp_secs: Option, + pub face_id: Option, + pub trace_id: Option, + pub x: f64, + pub y: f64, + pub width: f64, + pub height: f64, + pub confidence: f64, + pub identity_id: Option, + pub stranger_id: Option, + pub identity_uuid: Option, + pub identity_name: Option, + pub stranger_metadata: Option, +} + #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] pub struct IdentityChunkRecord { pub id: i32, @@ -406,14 +426,12 @@ pub enum PipelineType { #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)] #[serde(rename_all = "snake_case")] pub enum ProcessorType { - Asr, Cut, Yolo, Ocr, Face, Pose, Asrx, - VisualChunk, Scene, Story, FiveW1H, @@ -443,14 +461,12 @@ impl<'r> sqlx::Decode<'r, sqlx::Postgres> for ProcessorType { impl ProcessorType { pub fn as_str(&self) -> &'static str { match self { - ProcessorType::Asr => "asr", ProcessorType::Cut => "cut", ProcessorType::Yolo => "yolo", ProcessorType::Ocr => "ocr", ProcessorType::Face => "face", ProcessorType::Pose => "pose", ProcessorType::Asrx => "asrx", - ProcessorType::VisualChunk => "visual_chunk", ProcessorType::Scene => "scene", ProcessorType::Story => "story", ProcessorType::FiveW1H => "5w1h", @@ -459,14 +475,12 @@ impl ProcessorType { pub fn from_db_str(s: &str) -> Option { match s { - "asr" => Some(ProcessorType::Asr), "cut" => Some(ProcessorType::Cut), "yolo" => Some(ProcessorType::Yolo), "ocr" => Some(ProcessorType::Ocr), "face" => Some(ProcessorType::Face), "pose" => Some(ProcessorType::Pose), "asrx" => Some(ProcessorType::Asrx), - "visual_chunk" => Some(ProcessorType::VisualChunk), "scene" => Some(ProcessorType::Scene), "story" => Some(ProcessorType::Story), "5w1h" => Some(ProcessorType::FiveW1H), @@ -474,24 +488,20 @@ impl ProcessorType { } } - /// 預估 CPU 使用率(0.0 ~ 1.0, 1.0 = 一個完整核心) pub fn estimated_cpu(&self) -> f64 { match self { - ProcessorType::Asr => 1.0, ProcessorType::Cut => 0.5, ProcessorType::Yolo => 0.3, ProcessorType::Ocr => 0.8, ProcessorType::Face => 0.6, ProcessorType::Pose => 0.4, ProcessorType::Asrx => 0.8, - ProcessorType::VisualChunk => 0.3, ProcessorType::Scene => 0.3, ProcessorType::Story => 0.1, ProcessorType::FiveW1H => 0.1, } } - /// 是否使用 GPU pub fn uses_gpu(&self) -> bool { match self { ProcessorType::Yolo | ProcessorType::Face | ProcessorType::Pose => true, @@ -499,49 +509,39 @@ impl ProcessorType { } } - /// 預估記憶體使用量 (MB) pub fn estimated_memory_mb(&self) -> u64 { match self { - ProcessorType::Asr => 2048, ProcessorType::Cut => 512, ProcessorType::Yolo => 1024, ProcessorType::Ocr => 1024, ProcessorType::Face => 1536, ProcessorType::Pose => 1024, ProcessorType::Asrx => 2048, - ProcessorType::VisualChunk => 512, ProcessorType::Scene => 512, ProcessorType::Story => 256, ProcessorType::FiveW1H => 256, } } - /// 使用的模型名稱(如有) pub fn model_name(&self) -> Option<&'static str> { match self { - ProcessorType::Asr => Some("faster-whisper"), ProcessorType::Cut => None, ProcessorType::Yolo => Some("yolov8n"), ProcessorType::Ocr => Some("paddleocr"), ProcessorType::Face => Some("insightface/buffalo_l"), ProcessorType::Pose => Some("mediapipe/pose"), ProcessorType::Asrx => Some("speechbrain/ecapa-tdnn"), - ProcessorType::VisualChunk => None, ProcessorType::Scene => Some("places365"), ProcessorType::Story => None, ProcessorType::FiveW1H => Some("gemma4"), } } - /// 依賴的其他 Processor(需先完成才能執行) pub fn dependencies(&self) -> Vec { match self { - ProcessorType::Asr => vec![ProcessorType::Cut], - ProcessorType::Asrx => vec![ProcessorType::Asr], - ProcessorType::VisualChunk => vec![ProcessorType::Yolo], + ProcessorType::Asrx => vec![ProcessorType::Cut], ProcessorType::Scene => vec![ProcessorType::Cut], ProcessorType::Story => vec![ - ProcessorType::Asr, ProcessorType::Asrx, ProcessorType::Cut, ProcessorType::Yolo, @@ -555,16 +555,12 @@ impl ProcessorType { pub fn all() -> Vec { vec![ ProcessorType::Cut, - // Scene (Places365) removed — replaced by heuristic_scene_metadata post-processor - ProcessorType::Asr, ProcessorType::Asrx, ProcessorType::Yolo, ProcessorType::Ocr, ProcessorType::Face, ProcessorType::Pose, - ProcessorType::VisualChunk, ProcessorType::Story, - ProcessorType::FiveW1H, ] } @@ -573,11 +569,9 @@ impl ProcessorType { ProcessorType::Yolo | ProcessorType::Ocr | ProcessorType::Face - | ProcessorType::Pose - | ProcessorType::VisualChunk => PipelineType::Frame, + | ProcessorType::Pose => PipelineType::Frame, - ProcessorType::Asr - | ProcessorType::Cut + ProcessorType::Cut | ProcessorType::Asrx | ProcessorType::Scene | ProcessorType::Story @@ -589,6 +583,9 @@ impl ProcessorType { #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)] #[serde(rename_all = "snake_case")] pub enum ProcessorJobStatus { + Idle, + Waiting, + Ready, Pending, Running, Completed, @@ -621,6 +618,9 @@ impl<'r> sqlx::Decode<'r, sqlx::Postgres> for ProcessorJobStatus { impl ProcessorJobStatus { pub fn as_str(&self) -> &'static str { match self { + ProcessorJobStatus::Idle => "idle", + ProcessorJobStatus::Waiting => "waiting", + ProcessorJobStatus::Ready => "ready", ProcessorJobStatus::Pending => "pending", ProcessorJobStatus::Running => "running", ProcessorJobStatus::Completed => "completed", @@ -631,6 +631,9 @@ impl ProcessorJobStatus { pub fn from_db_str(s: &str) -> Option { match s { + "idle" => Some(ProcessorJobStatus::Idle), + "waiting" => Some(ProcessorJobStatus::Waiting), + "ready" => Some(ProcessorJobStatus::Ready), "pending" => Some(ProcessorJobStatus::Pending), "running" => Some(ProcessorJobStatus::Running), "completed" => Some(ProcessorJobStatus::Completed), @@ -793,6 +796,7 @@ pub struct PostgresCache { #[derive(Debug, serde::Serialize, sqlx::FromRow)] pub struct SemanticSearchResult { pub id: i32, + pub file_uuid: Option, // Added for global search pub scene_order: i32, pub start_frame: i64, pub end_frame: i64, @@ -832,9 +836,9 @@ impl PostgresDb { pub async fn new(database_url: &str) -> Result { let max_connections = std::env::var("DB_MAX_CONNECTIONS") - .unwrap_or_else(|_| "10".to_string()) + .unwrap_or_else(|_| "30".to_string()) .parse::() - .unwrap_or(10); + .unwrap_or(30); let acquire_timeout_secs = std::env::var("DB_ACQUIRE_TIMEOUT") .unwrap_or_else(|_| "60".to_string()) @@ -908,7 +912,7 @@ impl PostgresDb { sqlx::query("CREATE INDEX IF NOT EXISTS idx_chunk_file ON chunk(file_uuid)") .execute(pool) .await?; - sqlx::query("CREATE INDEX IF NOT EXISTS idx_chunks_type ON chunks(chunk_type)") + sqlx::query("CREATE INDEX IF NOT EXISTS idx_chunks_type ON chunk(chunk_type)") .execute(pool) .await?; @@ -921,8 +925,17 @@ impl PostgresDb { // Processor Results sqlx::query("CREATE TABLE IF NOT EXISTS processor_results (id SERIAL PRIMARY KEY, job_id INTEGER, file_uuid VARCHAR(255) NOT NULL, processor VARCHAR(64), processor_type VARCHAR(64) NOT NULL, status VARCHAR(32) DEFAULT 'pending', result JSONB, error_message TEXT, started_at TIMESTAMP WITH TIME ZONE, completed_at TIMESTAMP WITH TIME ZONE, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP)").execute(pool).await?; - // Talents & Identity Bindings + // Talents sqlx::query("CREATE TABLE IF NOT EXISTS talents (id BIGSERIAL PRIMARY KEY, real_name VARCHAR(255) NOT NULL UNIQUE, actor_name VARCHAR(255), voice_embedding TEXT, face_embedding TEXT, metadata JSONB DEFAULT '{}', created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP)").execute(pool).await?; + + // Identity History (Undo/Redo Support - for PATCH operations only) + // Create before identity_bindings to avoid dependency issues + let history_table = schema::table_name("identity_history"); + sqlx::query(&format!("CREATE TABLE IF NOT EXISTS {} (id BIGSERIAL PRIMARY KEY, identity_id INTEGER NOT NULL, operation VARCHAR(20) NOT NULL, before_snapshot JSONB, after_snapshot JSONB, is_undone BOOLEAN DEFAULT FALSE, undone_at TIMESTAMPTZ, user_id VARCHAR(100), user_source VARCHAR(50), created_at TIMESTAMPTZ DEFAULT NOW())", history_table)).execute(pool).await?; + sqlx::query(&format!("CREATE INDEX IF NOT EXISTS idx_identity_history_identity_time ON {}(identity_id, created_at DESC)", history_table)).execute(pool).await?; + sqlx::query(&format!("CREATE INDEX IF NOT EXISTS idx_identity_history_not_undone ON {}(identity_id, created_at DESC) WHERE NOT is_undone", history_table)).execute(pool).await?; + + // Identity Bindings (depends on identities table) sqlx::query("CREATE TABLE IF NOT EXISTS identity_bindings (id BIGSERIAL PRIMARY KEY, identity_id BIGINT REFERENCES identities(id) ON DELETE CASCADE, identity_type VARCHAR(20) NOT NULL, identity_value VARCHAR(100) NOT NULL, metadata JSONB DEFAULT '{}', confidence DOUBLE PRECISION DEFAULT 1.0, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, UNIQUE(identity_id, identity_type, identity_value))").execute(pool).await?; // API Keys @@ -955,7 +968,30 @@ impl PostgresDb { .execute(pool) .await?; - sqlx::query("DROP TRIGGER IF EXISTS chunks_search_vector_trigger ON chunks") + sqlx::query( + r#"CREATE OR REPLACE FUNCTION jsonb_deep_merge(base jsonb, override jsonb) RETURNS jsonb + IMMUTABLE LANGUAGE plpgsql AS $$ + DECLARE + result jsonb := COALESCE(base, '{}'::jsonb); + key text; + val jsonb; + BEGIN + FOR key, val IN SELECT * FROM jsonb_each(override) + LOOP + IF jsonb_typeof(val) = 'object' AND jsonb_typeof(result -> key) = 'object' THEN + result := jsonb_set(result, ARRAY[key], jsonb_deep_merge(result -> key, val)); + ELSE + result := jsonb_set(result, ARRAY[key], val); + END IF; + END LOOP; + RETURN result; + END; + $$;"#, + ) + .execute(pool) + .await?; + + sqlx::query("DROP TRIGGER IF EXISTS chunks_search_vector_trigger ON chunk") .execute(pool) .await?; @@ -968,13 +1004,13 @@ impl PostgresDb { .await?; // Chunks Rule 1 - sqlx::query("CREATE TABLE IF NOT EXISTS chunks_rule1 (id UUID PRIMARY KEY DEFAULT gen_random_uuid(), file_uuid VARCHAR(32) NOT NULL REFERENCES videos(uuid) ON DELETE CASCADE, start_frame BIGINT NOT NULL, end_frame BIGINT NOT NULL, content TEXT NOT NULL, speaker_id VARCHAR(50), created_at TIMESTAMPTZ DEFAULT NOW())").execute(pool).await?; - sqlx::query("CREATE INDEX IF NOT EXISTS idx_chunks_rule1_asset ON chunks_rule1(file_uuid)") - .execute(pool) - .await?; + // Temporarily disabled for debugging + // sqlx::query("CREATE TABLE IF NOT EXISTS chunks_rule1 ...").execute(pool).await?; + // sqlx::query("CREATE INDEX IF NOT EXISTS idx_chunks_rule1_asset ...").execute(pool).await?; // Jobs (Legacy/P0) - sqlx::query("CREATE TABLE IF NOT EXISTS jobs (id UUID PRIMARY KEY, file_uuid VARCHAR(32) NOT NULL REFERENCES videos(uuid) ON DELETE CASCADE, processor_list TEXT[], assigned_processor_id UUID, rule VARCHAR(20), status VARCHAR(20) DEFAULT 'QUEUED', total_frames BIGINT DEFAULT 0, processed_frames BIGINT DEFAULT 0, error_message TEXT, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW())").execute(pool).await?; + tracing::info!("Creating jobs table..."); + sqlx::query("CREATE TABLE IF NOT EXISTS jobs (id UUID PRIMARY KEY, file_uuid VARCHAR(32) NOT NULL REFERENCES videos(file_uuid) ON DELETE CASCADE, processor_list TEXT[], assigned_processor_id UUID, rule VARCHAR(20), status VARCHAR(20) DEFAULT 'QUEUED', total_frames BIGINT DEFAULT 0, processed_frames BIGINT DEFAULT 0, error_message TEXT, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW())").execute(pool).await?; sqlx::query("CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status)") .execute(pool) .await?; @@ -1372,10 +1408,10 @@ impl PostgresDb { let pre_chunks = schema::table_name("pre_chunks"); sqlx::query(&format!( - "DELETE FROM {} WHERE file_uuid = $1::uuid", + "DELETE FROM {} WHERE REPLACE(file_uuid, '-', '') = $1", pre_chunks )) - .bind(uuid) + .bind(&uuid.replace('-', "")) .execute(&self.pool) .await?; @@ -1455,22 +1491,48 @@ impl PostgresDb { // Convert to i64 for monitor_jobs.video_id (BIGINT) let video_id_i64 = video_id.map(|v| v as i64); - let row = sqlx::query( - &format!( - r#" - INSERT INTO {} (uuid, video_path, status, video_id) - VALUES ($1, $2, 'pending', $3) - ON CONFLICT (uuid) DO UPDATE SET status = 'pending', updated_at = NOW() - RETURNING id, uuid, video_path, status, current_processor, progress_total, progress_current, error_count, last_error, started_at::TEXT, updated_at::TEXT, created_at::TEXT, processors, completed_processors, failed_processors, video_id - "#, - jobs_table + // Check if job already exists + let existing_id: Option = + sqlx::query_scalar(&format!("SELECT id FROM {} WHERE uuid = $1", jobs_table)) + .bind(uuid) + .fetch_optional(&self.pool) + .await?; + + let row = if let Some(job_id) = existing_id { + // Update existing job + sqlx::query( + &format!( + r#" + UPDATE {} SET status = 'pending', video_path = $1, video_id = $2, updated_at = NOW() + WHERE id = $3 + RETURNING id, uuid, video_path, status, current_processor, progress_total, progress_current, error_count, last_error, started_at::TEXT, updated_at::TEXT, created_at::TEXT, processors, completed_processors, failed_processors, video_id + "#, + jobs_table + ) ) - ) - .bind(uuid) - .bind(video_path) - .bind(video_id_i64) - .fetch_one(&self.pool) - .await?; + .bind(video_path) + .bind(video_id_i64) + .bind(job_id) + .fetch_one(&self.pool) + .await? + } else { + // Insert new job + sqlx::query( + &format!( + r#" + INSERT INTO {} (uuid, video_path, status, video_id) + VALUES ($1, $2, 'pending', $3) + RETURNING id, uuid, video_path, status, current_processor, progress_total, progress_current, error_count, last_error, started_at::TEXT, updated_at::TEXT, created_at::TEXT, processors, completed_processors, failed_processors, video_id + "#, + jobs_table + ) + ) + .bind(uuid) + .bind(video_path) + .bind(video_id_i64) + .fetch_one(&self.pool) + .await? + }; let status_str: String = row.get(3); let status = @@ -2062,7 +2124,7 @@ impl PostgresDb { let results = sqlx::query_as::<_, SemanticSearchResult>( &format!( "SELECT \ - id, id as scene_order, \ + id, file_uuid, id as scene_order, \ (start_time * fps)::bigint as start_frame, (end_time * fps)::bigint as end_frame, \ fps, start_time, end_time, \ COALESCE(summary_text, text_content, '') as summary, \ @@ -2084,6 +2146,41 @@ impl PostgresDb { Ok(results) } + /// Global semantic search across all files + pub async fn search_parent_chunks_semantic_global( + &self, + query_vector: &[f32], + limit: usize, + ) -> Result> { + // Convert Vec to JSON string for vector cast + let vector_json = serde_json::to_string(query_vector) + .map_err(|e| anyhow::anyhow!("Vector serialize error: {}", e))?; + + let chunk_table = schema::table_name("chunk"); + let results = sqlx::query_as::<_, SemanticSearchResult>( + &format!( + "SELECT \ + id, file_uuid, id as scene_order, \ + (start_time * fps)::bigint as start_frame, (end_time * fps)::bigint as end_frame, \ + fps, start_time, end_time, \ + COALESCE(summary_text, text_content, '') as summary, \ + metadata, \ + (1 - (embedding <=> $1::vector)) as similarity \ + FROM {} \ + WHERE chunk_type IN ('sentence', 'story_parent', 'llm_parent') AND embedding IS NOT NULL \ + ORDER BY embedding <=> $1::vector \ + LIMIT $2", + chunk_table + ), + ) + .bind(&vector_json) + .bind(limit as i64) + .fetch_all(&self.pool) + .await?; + + Ok(results) + } + /// Get children for a list of parent IDs pub async fn get_children_for_parents( &self, @@ -2368,7 +2465,6 @@ impl PostgresDb { "story" | "story_parent" | "story_child" => { crate::core::chunk::types::ChunkType::Story } - "visual" => crate::core::chunk::types::ChunkType::Visual, _ => crate::core::chunk::types::ChunkType::Story, }; let start_frame = (st * fps).round() as i64; @@ -2503,7 +2599,7 @@ impl PostgresDb { id: r.get("id"), job_id: r.get("job_id"), processor_type: crate::core::db::ProcessorType::from_db_str(ptype) - .unwrap_or(crate::core::db::ProcessorType::Asr), + .unwrap_or(crate::core::db::ProcessorType::Cut), status: crate::core::db::ProcessorJobStatus::from_db_str(st) .unwrap_or(crate::core::db::ProcessorJobStatus::Pending), started_at: r @@ -2549,7 +2645,7 @@ impl PostgresDb { id: r.get("id"), job_id: r.get("job_id"), processor_type: crate::core::db::ProcessorType::from_db_str(ptype) - .unwrap_or(crate::core::db::ProcessorType::Asr), + .unwrap_or(crate::core::db::ProcessorType::Cut), status: crate::core::db::ProcessorJobStatus::from_db_str(st) .unwrap_or(crate::core::db::ProcessorJobStatus::Pending), started_at: r @@ -2954,6 +3050,65 @@ impl PostgresDb { .collect()) } + pub async fn get_file_faces( + &self, + file_uuid: &str, + limit: i32, + offset: i64, + ) -> Result> { + let fd_table = schema::table_name("face_detections"); + let video_table = schema::table_name("videos"); + let id_table = schema::table_name("identities"); + let st_table = schema::table_name("strangers"); + use sqlx::Row; + let rows = sqlx::query(&format!( + "SELECT fd.id::bigint as id, fd.file_uuid, \ + fd.frame_number::bigint as frame_number, \ + (fd.frame_number::float8 / NULLIF(v.fps, 0)) as timestamp_secs, \ + fd.face_id, fd.trace_id, \ + fd.x::float8 as x, fd.y::float8 as y, \ + fd.width::float8 as width, fd.height::float8 as height, \ + fd.confidence::float8 as confidence, \ + fd.identity_id, fd.stranger_id, \ + i.uuid::text as identity_uuid, i.name as identity_name, \ + s.metadata as stranger_metadata \ + FROM {} fd \ + JOIN {} v ON v.file_uuid = fd.file_uuid \ + LEFT JOIN {} i ON i.id = fd.identity_id \ + LEFT JOIN {} s ON s.id = fd.stranger_id \ + WHERE fd.file_uuid = $1 \ + ORDER BY fd.frame_number, fd.trace_id \ + LIMIT $2 OFFSET $3", + fd_table, video_table, id_table, st_table + )) + .bind(file_uuid) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await?; + Ok(rows + .into_iter() + .map(|r| super::FileFaceRecord { + id: r.get("id"), + file_uuid: r.get("file_uuid"), + frame_number: r.get("frame_number"), + timestamp_secs: r.get("timestamp_secs"), + face_id: r.get("face_id"), + trace_id: r.get("trace_id"), + x: r.get("x"), + y: r.get("y"), + width: r.get("width"), + height: r.get("height"), + confidence: r.get("confidence"), + identity_id: r.get("identity_id"), + stranger_id: r.get("stranger_id"), + identity_uuid: r.get("identity_uuid"), + identity_name: r.get("identity_name"), + stranger_metadata: r.get("stranger_metadata"), + }) + .collect()) + } + pub async fn get_identity_chunks( &self, uuid_str: &str, @@ -2963,16 +3118,48 @@ impl PostgresDb { let id_table = schema::table_name("identities"); let fd_table = schema::table_name("face_detections"); let chunk_table = schema::table_name("chunk"); + let ib_table = schema::table_name("identity_bindings"); + let pc_table = schema::table_name("pre_chunks"); use sqlx::Row; - let rows = sqlx::query( - &format!("SELECT c.file_uuid, c.chunk_id, (c.start_time * c.fps)::bigint as start_frame, (c.end_time * c.fps)::bigint as end_frame, c.fps, c.start_time, c.end_time, c.text_content, 'sentence' as chunk_type \ - FROM {} c JOIN {} fd ON fd.file_uuid = c.file_uuid \ - AND fd.frame_number BETWEEN c.start_frame AND c.end_frame \ - WHERE fd.identity_id = (SELECT id FROM {} WHERE REPLACE(uuid::text, '-', '') = $1) \ - GROUP BY c.file_uuid, c.chunk_id, c.start_frame, c.end_frame, c.fps, c.start_time, c.end_time, c.text_content LIMIT $2 OFFSET $3", chunk_table, fd_table, id_table) - ) - .bind(uuid_str).bind(limit).bind(offset) - .fetch_all(&self.pool).await?; + let subq = format!( + "SELECT id FROM {} WHERE REPLACE(uuid::text, '-', '') = $1", + id_table + ); + let rows = sqlx::query(&format!( + "SELECT c.file_uuid, c.chunk_id, \ + (c.start_time * c.fps)::bigint as start_frame, \ + (c.end_time * c.fps)::bigint as end_frame, \ + c.fps, c.start_time, c.end_time, c.text_content, \ + 'sentence' as chunk_type \ + FROM {} c \ + JOIN {} fd ON fd.file_uuid = c.file_uuid \ + AND fd.frame_number BETWEEN c.start_frame AND c.end_frame \ + WHERE fd.identity_id = ({}) \ + GROUP BY c.file_uuid, c.chunk_id, c.start_frame, c.end_frame, \ + c.fps, c.start_time, c.end_time, c.text_content \ + UNION ALL \ + SELECT c.file_uuid, c.chunk_id, \ + c.start_frame::bigint, c.end_frame::bigint, \ + c.fps, c.start_time, c.end_time, c.text_content, \ + 'sentence' as chunk_type \ + FROM {} c \ + JOIN {} pc ON pc.file_uuid = c.file_uuid \ + AND pc.processor_type = 'asrx' \ + AND c.start_time <= (pc.data->>'timestamp')::double precision \ + AND c.end_time >= (pc.data->>'timestamp')::double precision \ + JOIN {} ib ON ib.identity_value = pc.data->>'speaker_id' \ + AND ib.identity_type = 'speaker' \ + AND ib.file_uuid = pc.file_uuid \ + WHERE ib.identity_id = ({}) \ + ORDER BY start_time \ + LIMIT $2 OFFSET $3", + chunk_table, fd_table, subq, chunk_table, pc_table, ib_table, subq + )) + .bind(uuid_str) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await?; Ok(rows .into_iter() .map(|r| super::IdentityChunkRecord { @@ -2999,7 +3186,7 @@ impl PostgresDb { let clean = uuid_str.replace('-', ""); use sqlx::Row; let row = sqlx::query( - &format!("SELECT id::bigint as id, uuid::text, name, identity_type, source, status, metadata, reference_data, \ + &format!("SELECT id::bigint as id, uuid::text, name, identity_type, source, status, metadata, COALESCE(reference_data, '{{}}'::jsonb) as reference_data, \ NULL::real[] as voice_embedding, NULL::real[] as identity_embedding, \ face_embedding::real[] as face_embedding, \ tmdb_id, tmdb_profile, created_at::timestamptz as created_at, NULL::timestamptz as updated_at \ @@ -3343,7 +3530,6 @@ impl crate::core::db::ChunkStore for PostgresDb { "story" | "story_parent" | "story_child" => { crate::core::chunk::types::ChunkType::Story } - "visual" => crate::core::chunk::types::ChunkType::Visual, _ => crate::core::chunk::types::ChunkType::Story, }; let start_frame = (st * fps).round() as i64; @@ -3680,4 +3866,30 @@ mod tests { assert!(result.is_ok()); assert_eq!(result.unwrap(), "__no_match__:*"); } + + #[tokio::test] + async fn test_check_videos_file_uuid_column() { + let url = crate::core::config::DATABASE_URL.as_str(); + let pool = match PgPoolOptions::new().max_connections(1).connect(url).await { + Ok(p) => p, + Err(_) => { + eprintln!("SKIP: DB unavailable"); + return; + } + }; + let schema = crate::core::config::DATABASE_SCHEMA.as_str(); + let cols: Vec = sqlx::query_scalar( + "SELECT column_name FROM information_schema.columns WHERE table_schema=$1 AND table_name='videos' ORDER BY ordinal_position" + ) + .bind(schema) + .fetch_all(&pool) + .await + .unwrap_or_default(); + eprintln!("videos columns in schema '{}': {:?}", schema, cols); + assert!( + cols.contains(&"file_uuid".to_string()), + "videos must have file_uuid column, got: {:?}", + cols + ); + } } diff --git a/src/core/db/redis_client.rs b/src/core/db/redis_client.rs index 3dcb502..10d3887 100644 --- a/src/core/db/redis_client.rs +++ b/src/core/db/redis_client.rs @@ -300,6 +300,38 @@ impl RedisClient { Ok(()) } + pub async fn emit_processor_alert( + &self, + file_uuid: &str, + processor_type: &str, + alert_type: &str, + message: &str, + ) -> Result<()> { + let mut conn = self.get_conn_internal().await?; + let prefix = REDIS_KEY_PREFIX.as_str(); + let channel = format!("{}processor:alerts", prefix); + + let alert_json = serde_json::json!({ + "file_uuid": file_uuid, + "processor_type": processor_type, + "alert_type": alert_type, + "message": message, + "timestamp": chrono::Utc::now().to_rfc3339(), + }); + + let _: usize = conn.publish(&channel, serde_json::to_string(&alert_json)?).await?; + + tracing::warn!( + "Processor alert: {} | {} | {} | {}", + file_uuid, + processor_type, + alert_type, + message + ); + + Ok(()) + } + pub async fn subscribe_anomaly_alerts(&self) -> Result { let mut pubsub = self.client.get_async_pubsub().await?; let prefix = REDIS_KEY_PREFIX.as_str(); @@ -441,12 +473,27 @@ impl RedisClient { let key = format!("{}job:{}", prefix, uuid); let _: i32 = conn.del(&key).await?; - let processor_types = ["asr", "cut", "yolo", "ocr", "face", "pose", "asrx"]; + let processor_types = [ + "asr", + "cut", + "yolo", + "ocr", + "face", + "pose", + "asrx", + "visual_chunk", + "story", + "tmdb_probe", + "embedding", + ]; for ptype in processor_types { let proc_key = format!("{}job:{}:processor:{}", prefix, uuid, ptype); let _: i32 = conn.del(&proc_key).await?; } + let progress_key = format!("{}progress:{}", prefix, uuid); + let _: i32 = conn.del(&progress_key).await?; + Ok(()) } diff --git a/src/worker/job_worker.rs b/src/worker/job_worker.rs index 88ee80c..9f22450 100644 --- a/src/worker/job_worker.rs +++ b/src/worker/job_worker.rs @@ -6,7 +6,6 @@ use std::time::Duration; use tokio::time::sleep; use tracing::{error, info, warn}; -use crate::api::five_w1h_agent_api::run_5w1h_agent; use crate::api::identity_agent_api::run_identity_agent; use crate::core::chunk::{rule1_ingest, rule3_ingest}; use crate::core::config::OUTPUT_DIR; @@ -22,6 +21,29 @@ use crate::worker::processor::{ProcessorPool, ProcessorTask}; use crate::worker::resources::SystemResources; use sqlx::PgPool; +enum ConditionResult { + Ready, + Waiting(Vec), +} + +fn check_dependencies( + processor: crate::core::db::ProcessorType, + completed: &[crate::core::db::ProcessorType], +) -> ConditionResult { + let deps = processor.dependencies(); + let missing: Vec<_> = deps + .iter() + .filter(|d| !completed.contains(d)) + .cloned() + .collect(); + + if missing.is_empty() { + ConditionResult::Ready + } else { + ConditionResult::Waiting(missing) + } +} + pub struct JobWorker { db: Arc, redis: Arc, @@ -250,38 +272,6 @@ impl JobWorker { .collect() }; - // 長影片動態調整:若 CUT 場景過長,Face 需在 ASR 之前執行 - if let Ok(Some(video)) = self.db.get_video_by_uuid(&job.uuid).await { - // 條件:cut_done 且場景數 <= 3 且最長場景 > 600s(10分鐘) - if video.cut_done && video.cut_count <= 3 && video.cut_max_duration > 600.0 { - info!( - "[DYNAMIC] Long cut detected: {} scenes, max_dur={:.0}s for {}. Moving Face before ASR.", - video.cut_count, video.cut_max_duration, job.uuid - ); - // 確保 Face 在 ASR 之前 - if let Some(asr_pos) = processors_to_run - .iter() - .position(|p| matches!(p, crate::core::db::ProcessorType::Asr)) - { - if let Some(face_pos) = processors_to_run - .iter() - .position(|p| matches!(p, crate::core::db::ProcessorType::Face)) - { - if face_pos > asr_pos { - // 將 Face 移到 ASR 前面 - let face = processors_to_run.remove(face_pos); - let insert_pos = processors_to_run - .iter() - .position(|p| matches!(p, crate::core::db::ProcessorType::Asr)) - .unwrap(); - processors_to_run.insert(insert_pos, face); - info!("[DYNAMIC] Reordered processors: Face now ahead of ASR"); - } - } - } - } - } - let total_processor_types = processors_to_run.len() as i32; // Get video total_frames for progress tracking @@ -381,45 +371,73 @@ impl JobWorker { // Load output file and store to pre_chunks if let Ok(json_str) = std::fs::read_to_string(&output_path) { let store_result = match processor_type { - crate::core::db::ProcessorType::Asr => { - if let Ok(result) = serde_json::from_str::(&json_str) { - ProcessorPool::store_asr_chunks(&self.db, &job.uuid, &result).await - } else { Ok(()) } - } crate::core::db::ProcessorType::Asrx => { - if let Ok(result) = serde_json::from_str::(&json_str) { + if let Ok(result) = serde_json::from_str::< + crate::core::processor::AsrxResult, + >(&json_str) + { ProcessorPool::store_asrx_chunks(&self.db, &job.uuid, &result).await - } else { Ok(()) } + } else { + Ok(()) + } } crate::core::db::ProcessorType::Cut => { - if let Ok(result) = serde_json::from_str::(&json_str) { + if let Ok(result) = + serde_json::from_str::(&json_str) + { ProcessorPool::store_cut_chunks(&self.db, &job.uuid, &result).await - } else { Ok(()) } + } else { + Ok(()) + } } crate::core::db::ProcessorType::Yolo => { - if let Ok(result) = serde_json::from_str::(&json_str) { + if let Ok(result) = serde_json::from_str::< + crate::core::processor::YoloResult, + >(&json_str) + { ProcessorPool::store_yolo_chunks(&self.db, &job.uuid, &result).await - } else { Ok(()) } + } else { + Ok(()) + } } crate::core::db::ProcessorType::Ocr => { - if let Ok(result) = serde_json::from_str::(&json_str) { + if let Ok(result) = + serde_json::from_str::(&json_str) + { ProcessorPool::store_ocr_chunks(&self.db, &job.uuid, &result).await - } else { Ok(()) } + } else { + Ok(()) + } } crate::core::db::ProcessorType::Face => { - if let Ok(result) = serde_json::from_str::(&json_str) { + if let Ok(result) = serde_json::from_str::< + crate::core::processor::FaceResult, + >(&json_str) + { ProcessorPool::store_face_chunks(&self.db, &job.uuid, &result).await - } else { Ok(()) } + } else { + Ok(()) + } } crate::core::db::ProcessorType::Pose => { - if let Ok(result) = serde_json::from_str::(&json_str) { + if let Ok(result) = serde_json::from_str::< + crate::core::processor::PoseResult, + >(&json_str) + { ProcessorPool::store_pose_chunks(&self.db, &job.uuid, &result).await - } else { Ok(()) } + } else { + Ok(()) + } } _ => Ok(()), }; if let Err(e) = store_result { - error!("Failed to store {} chunks for {}: {}", processor_type.as_str(), job.uuid, e); + error!( + "Failed to store {} chunks for {}: {}", + processor_type.as_str(), + job.uuid, + e + ); } } started_count += 1; @@ -459,10 +477,28 @@ impl JobWorker { continue; } ProcessorJobStatus::Failed => { + if result.retry_count >= 3 { + info!( + "Processor {} failed {} times, max retries reached (3), skipping", + processor_type.as_str(), + result.retry_count + ); + started_count += 1; + continue; + } info!( - "Processor {} previously failed, retrying", - processor_type.as_str() + "Processor {} previously failed (retry {}/3), retrying", + processor_type.as_str(), + result.retry_count + 1 ); + let _ = sqlx::query(&format!( + "UPDATE {} SET retry_count = retry_count + 1 WHERE job_id = $1 AND processor = $2", + schema::table_name("processor_results") + )) + .bind(job.id) + .bind(processor_type.as_str()) + .execute(self.db.pool()) + .await; } ProcessorJobStatus::Running => { info!( @@ -472,8 +508,11 @@ impl JobWorker { started_count += 1; continue; } - // Skipped 不視為 terminal — 允許重新啟動 - ProcessorJobStatus::Skipped | ProcessorJobStatus::Pending => { + ProcessorJobStatus::Idle + | ProcessorJobStatus::Waiting + | ProcessorJobStatus::Ready + | ProcessorJobStatus::Skipped + | ProcessorJobStatus::Pending => { // Continue to start processor } } @@ -531,6 +570,26 @@ impl JobWorker { processor_type.as_str(), deps.iter().map(|d| d.as_str()).collect::>(), ); + let missing_deps: Vec = deps + .iter() + .filter(|d| !matches!( + result_map.get(d).map(|r| &r.status), + Some(ProcessorJobStatus::Completed) + )) + .map(|d| d.as_str().to_string()) + .collect(); + if let Err(e) = self + .redis + .emit_processor_alert( + &job.uuid, + processor_type.as_str(), + "dependency_not_met", + &format!("Waiting for: {}", missing_deps.join(", ")), + ) + .await + { + error!("Failed to emit processor alert: {}", e); + } continue; } } @@ -669,21 +728,10 @@ impl JobWorker { "SELECT 1 FROM {chunk_t} WHERE file_uuid = '{fu}' AND chunk_type = 'cut' LIMIT 1" )); let trace = check!(&format!("SELECT COUNT(DISTINCT trace_id) FROM {fd_t} WHERE file_uuid = '{fu}' AND trace_id IS NOT NULL")); - let tkg = check!(&format!( - "SELECT 1 FROM {} WHERE file_uuid = '{fu}' LIMIT 1", - schema::table_name("tkg_nodes") - )); - let scene_meta = std::path::Path::new(&format!( - "{}/{fu}.scene_meta.json", - crate::core::config::OUTPUT_DIR.as_str() - )) - .exists(); - let five_w1h = check!(&format!("SELECT 1 FROM {chunk_t} WHERE file_uuid = '{fu}' AND chunk_type = 'cut' AND summary_text IS NOT NULL AND summary_text != '' LIMIT 1")); - - let all_ok = rule1 && vector && rule3 && trace && tkg && scene_meta && five_w1h; + let all_ok = rule1 && vector && rule3 && trace; if !all_ok { tracing::info!( - "[Ingestion] waiting: rule1={rule1} vector={vector} rule3={rule3} trace={trace} tkg={tkg} scene={scene_meta} 5w1h={five_w1h}" + "[Ingestion] waiting: rule1={rule1} vector={vector} rule3={rule3} trace={trace}" ); } all_ok @@ -708,7 +756,7 @@ impl JobWorker { // 例如:Rule 1 只需 ASR+ASRX 完成即可觸發,不須等 face/pose/story 完成 // 定義必要 processor(必須完成的才算 job 成功) - let essential_processors = ["cut", "asr", "yolo"]; + let essential_processors = ["cut", "asrx", "yolo"]; let essential_completed = essential_processors.iter().all(|ep| { results.iter().any(|r| { @@ -763,20 +811,16 @@ impl JobWorker { .map(|r| r.processor_type.as_str().to_string()) .collect(); - // Check prerequisites for post-processing triggers - let has_asr = completed_processors.iter().any(|p| p == "asr"); let has_asrx = completed_processors.iter().any(|p| p == "asrx"); let has_cut = completed_processors.iter().any(|p| p == "cut"); let has_face = completed_processors.iter().any(|p| p == "face"); let has_yolo = completed_processors.iter().any(|p| p == "yolo"); - // Update processor arrays in job record self.db .update_job_processors_arrays(job_id, completed_processors, failed_processors.clone()) .await?; - // 🚀 P1 Trigger: Rule 1 Chunking(僅需 ASR + ASRX) - if has_asr && has_asrx { + if has_asrx { info!("📝 Prerequisites met for Rule 1 Chunking. Starting ingestion..."); let db_clone = self.db.clone(); let uuid_clone = uuid.to_string(); @@ -787,7 +831,6 @@ impl JobWorker { match rule1_ingest::execute_rule1(&db_clone, &uuid_clone, fps).await { Ok(count) => { info!("✅ Rule 1 Ingestion completed: {} chunks inserted.", count); - // Automatically vectorize new sentence chunks if count > 0 { info!( "📝 Starting automatic vectorize for {} chunks...", @@ -802,7 +845,6 @@ impl JobWorker { ); } } - // Phase 1 release: sentence chunk embedding 交付 info!("📦 Phase 1 release packaging..."); let executor = match crate::core::processor::PythonExecutor::new() { Ok(ex) => ex, @@ -836,10 +878,8 @@ impl JobWorker { }); } - // Rule 3 / Trace / Identity 需要 all_completed(含非必要 processor 失敗也可) if all_completed { - // 🚀 P1 Trigger: Rule 3 Scene Chunking - if has_cut && has_asr { + if has_cut { info!("📝 Prerequisites met for Rule 3 Scene Chunking. Starting ingestion..."); let db_clone = self.db.clone(); let uuid_clone = uuid.to_string(); @@ -881,16 +921,6 @@ impl JobWorker { Ok(()) => { info!("✅ Face trace + DB store completed for {}", uuid_clone); - // Sync face embeddings to Qdrant for ANN search - info!("📝 Syncing face embeddings to Qdrant..."); - if let Err(e) = - crate::core::db::qdrant_db::sync_face_embeddings(&uuid_clone).await - { - error!("❌ Qdrant face sync failed for {}: {}", uuid_clone, e); - } else { - info!("✅ Qdrant face sync completed for {}", uuid_clone); - } - // Generate trace chunks from face_detections + ASR text info!("📝 Generating trace chunks..."); match crate::core::chunk::trace_ingest::ingest_traces( @@ -902,26 +932,6 @@ impl JobWorker { Ok(n) => info!("✅ {} trace chunks created for {}", n, uuid_clone), Err(e) => error!("❌ Trace chunk ingestion failed: {}", e), } - - // Build Temporal Knowledge Graph (TKG) — native Rust - info!("📝 Building TKG graph (Rust)..."); - let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR") - .unwrap_or_else(|_| ".".to_string()); - match crate::core::processor::tkg::build_tkg( - db_clone.as_ref(), - &uuid_clone, - &output_dir, - ) - .await - { - Ok(r) => info!( - "✅ TKG built for {}: {} face, {} obj, {} spk, {} co, {} sf, {} ff edges", - uuid_clone, - r.face_trace_nodes, r.object_nodes, r.speaker_nodes, - r.co_occurrence_edges, r.speaker_face_edges, r.face_face_edges, - ), - Err(e) => error!("❌ TKG build failed for {}: {}", uuid_clone, e), - } } Err(e) => { error!("❌ Face trace + DB store failed for {}: {}", uuid_clone, e) @@ -1011,47 +1021,9 @@ impl JobWorker { }); } - // 🚀 P4 Trigger: 5W1H Agent (after Rule 3 completion) - if has_cut && has_asr { - info!("📝 Prerequisites met for 5W1H Agent. Starting..."); - let db_clone = self.db.clone(); - let uuid_clone = uuid.to_string(); - tokio::spawn(async move { - tokio::time::sleep(tokio::time::Duration::from_secs(30)).await; - match run_5w1h_agent(&db_clone, &uuid_clone).await { - Ok(()) => { - info!("✅ 5W1H Agent completed for {}", uuid_clone); - // Phase 2 release: full pipeline 交付 - info!("📦 Phase 2 release packaging..."); - let executor = match crate::core::processor::PythonExecutor::new() { - Ok(ex) => ex, - Err(e) => { - error!("Failed PythonExecutor for release pack: {}", e); - return; - } - }; - match executor - .run( - "release_pack.py", - &["--phase", "2", "--file-uuid", &uuid_clone], - None, - "RELEASE_P2", - Some(std::time::Duration::from_secs(120)), - ) - .await - { - Ok(()) => info!("✅ Phase 2 release packaged for {}", uuid_clone), - Err(e) => error!("❌ Phase 2 release pack failed: {}", e), - } - } - Err(e) => error!("❌ 5W1H Agent failed for {}: {}", uuid_clone, e), - } - }); - } - if !Self::ingestion_complete(self.db.pool(), uuid).await { info!( - "Job {}: all 10 processors done, waiting for ingestion...", + "Job {}: all processors done, waiting for ingestion...", job_id ); return Ok(());