diff --git a/src/core/chunk/rule1_ingest.rs b/src/core/chunk/rule1_ingest.rs index fabbaff..30644a9 100644 --- a/src/core/chunk/rule1_ingest.rs +++ b/src/core/chunk/rule1_ingest.rs @@ -38,6 +38,11 @@ pub async fn execute_rule1(db: &PostgresDb, file_uuid: &str, fps: f64) -> Result format!("{} {}", seg.text, ocr_text) }; + // Skip chunks with no text (empty ASRX and no OCR) + if combined_text.trim().is_empty() { + continue; + } + let metadata = serde_json::json!({ "language": seg.language, }); @@ -50,7 +55,7 @@ pub async fn execute_rule1(db: &PostgresDb, file_uuid: &str, fps: f64) -> Result let chunk = Chunk::from_seconds( file_id as i32, file_uuid.to_string(), - format!("{}", idx), + format!("{}", count), ChunkType::Sentence, ChunkRule::Rule1, seg.start_time, diff --git a/src/worker/job_worker.rs b/src/worker/job_worker.rs index 2556d3b..4b5cb34 100644 --- a/src/worker/job_worker.rs +++ b/src/worker/job_worker.rs @@ -361,6 +361,58 @@ impl JobWorker { )); debug!("Checking output file: {:?}", output_path); + // Check for stale .tmp file (process crashed before renaming) + let tmp_path = PathBuf::from(OUTPUT_DIR.as_str()).join(format!( + "{}.{}.json.tmp", + job.uuid, + processor_type.as_str() + )); + if tmp_path.exists() && !output_path.exists() { + if let Ok(meta) = std::fs::metadata(&tmp_path) { + // 條件 1: 檔案 > 1KB + let has_content = meta.len() > 1024; + + // 條件 2: 檔案超過 120 秒未修改(確定沒人還在寫) + let is_stale = if let Ok(modified) = meta.modified() { + if let Ok(elapsed) = modified.elapsed() { + elapsed.as_secs() > 120 + } else { false } + } else { false }; + + // 條件 3: 檢查程序是否還在跑 + let proc_name = processor_type.as_str(); + let process_running = std::process::Command::new("ps") + .args(["aux"]) + .output() + .ok() + .and_then(|out| String::from_utf8(out.stdout).ok()) + .map(|out| { + out.contains(&format!("{}_processor", proc_name)) || + out.contains(&format!("{}_processor", proc_name)) + }) + .unwrap_or(false); + + if has_content && is_stale && !process_running { + info!( + "Found stale .tmp file ({} bytes, {}s old, process={}), renaming to .json for {}", + meta.len(), + meta.modified().ok().and_then(|m| m.elapsed().ok()).map(|e| e.as_secs()).unwrap_or(0), + if process_running { "running" } else { "dead" }, + processor_type.as_str() + ); + if std::fs::rename(&tmp_path, &output_path).is_ok() { + info!("Recovered {} from .tmp file", processor_type.as_str()); + } + } else if !has_content { + debug!("Skipping .tmp file (too small): {} bytes", meta.len()); + } else if !is_stale { + debug!("Skipping .tmp file (recently modified)"); + } else if process_running { + debug!("Skipping .tmp file (process still running)"); + } + } + } + // Special case: Pose processor should NOT be skipped even if pose.json exists // because swift_face_pose creates it and pose.rs needs to interpolate let skip_check = if *processor_type == crate::core::db::ProcessorType::Pose {