From 127d646ef1d98050c3fefa6d25e546a576d8001b Mon Sep 17 00:00:00 2001 From: Accusys Date: Tue, 26 May 2026 04:35:51 +0800 Subject: [PATCH] fix: worker processor_results + rule3 SQL + unregister cleanup bugs - job_worker.rs: add upsert_processor_result when output file exists - job_worker.rs: add load JSON and store to pre_chunks when output exists - rule3_ingest.rs: fix SQL bind order (scene_number was occupying chunk_type slot) - files.rs: fix unregister WHERE clause (uuid -> file_uuid) + add pre_chunks delete - asrx_self/main_fixed.py: fix KeyError (s['start'] -> s['start_time']) - wrapper_worker_playground.sh: add Worker launchd script - com.momentry.playground.plist: add Playground launchd config --- .../plist/com.momentry.playground.plist | 34 ++++++++++++ scripts/asrx_self/main_fixed.py | 4 +- scripts/wrapper_worker_playground.sh | 14 +++++ src/api/files.rs | 17 +++++- src/core/chunk/rule3_ingest.rs | 13 +++-- src/worker/job_worker.rs | 53 ++++++++++++++++++- 6 files changed, 124 insertions(+), 11 deletions(-) create mode 100644 momentry_runtime/plist/com.momentry.playground.plist create mode 100755 scripts/wrapper_worker_playground.sh diff --git a/momentry_runtime/plist/com.momentry.playground.plist b/momentry_runtime/plist/com.momentry.playground.plist new file mode 100644 index 0000000..53c596b --- /dev/null +++ b/momentry_runtime/plist/com.momentry.playground.plist @@ -0,0 +1,34 @@ + + + + + Label + com.momentry.playground + + UserName + accusys + + GroupName + staff + + WorkingDirectory + /Users/accusys/momentry_core + + ProgramArguments + + /Users/accusys/momentry_core/scripts/wrapper_playground.sh + + + RunAtLoad + + + KeepAlive + + + StandardOutPath + /Users/accusys/momentry/log/playground.log + + StandardErrorPath + /Users/accusys/momentry/log/playground.error.log + + diff --git a/scripts/asrx_self/main_fixed.py b/scripts/asrx_self/main_fixed.py index f84ac21..34b498f 100755 --- a/scripts/asrx_self/main_fixed.py +++ b/scripts/asrx_self/main_fixed.py @@ -174,8 +174,8 @@ class SelfASRXFixed: wav = np.mean(wav, axis=1) # 轉 mono print(f" Audio loaded: {len(wav)/sample_rate:.2f}s, {sample_rate}Hz") - # 使用 ASR segments 取代 VAD - speech_segments = [(s["start"], s["end"]) for s in asr_segments] + # 使用 ASR segments 取代 VAD (audio处理用time) + speech_segments = [(s["start_time"], s["end_time"]) for s in asr_segments] print(f" Speech segments from ASR: {len(speech_segments)}") if len(speech_segments) == 0: diff --git a/scripts/wrapper_worker_playground.sh b/scripts/wrapper_worker_playground.sh new file mode 100755 index 0000000..7d2b54d --- /dev/null +++ b/scripts/wrapper_worker_playground.sh @@ -0,0 +1,14 @@ +#!/bin/bash +set -e + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +PROJECT_DIR="$(cd "$SCRIPT_DIR/.." && pwd)" + +# Source environment (silently) +source "$PROJECT_DIR/.env" 2>/dev/null || true +source "$PROJECT_DIR/.env.development" 2>/dev/null || true + +# Ensure PATH is set +export PATH="/opt/homebrew/bin:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:$PATH" + +exec "$PROJECT_DIR/target/debug/momentry_playground" worker \ No newline at end of file diff --git a/src/api/files.rs b/src/api/files.rs index cd272d2..5793f52 100644 --- a/src/api/files.rs +++ b/src/api/files.rs @@ -1062,7 +1062,7 @@ async fn unregister( })? .rows_affected() as i64; - let deleted_chunks: i64 = sqlx::query(&format!("DELETE FROM {} WHERE uuid = $1", chunks_table)) + let deleted_chunks: i64 = sqlx::query(&format!("DELETE FROM {} WHERE file_uuid = $1", chunks_table)) .bind(&uuid) .execute(state.db.pool()) .await @@ -1072,6 +1072,21 @@ async fn unregister( })? .rows_affected() as i64; + // Delete pre_chunks + let pre_chunks_table = schema::table_name("pre_chunks"); + let deleted_pre_chunks: i64 = sqlx::query(&format!( + "DELETE FROM {} WHERE file_uuid = $1", + pre_chunks_table + )) + .bind(&uuid) + .execute(state.db.pool()) + .await + .map_err(|e| { + tracing::error!("[unregister] Failed to delete pre_chunks: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })? + .rows_affected() as i64; + sqlx::query(&format!( "DELETE FROM {} WHERE file_uuid = $1", videos_table diff --git a/src/core/chunk/rule3_ingest.rs b/src/core/chunk/rule3_ingest.rs index bb73ff0..58d5726 100644 --- a/src/core/chunk/rule3_ingest.rs +++ b/src/core/chunk/rule3_ingest.rs @@ -160,18 +160,17 @@ pub async fn ingest_rule3(pool: &PgPool, file_uuid: &str) -> Result { )) .bind(file_uuid) .bind(&chunk_id) - .bind(scene.scene_number as i32) - .bind("cut") // Chunk type + .bind("cut") .bind(scene.start_time) .bind(scene.end_time) .bind(fps) .bind(scene.start_frame as i64) .bind(scene.end_frame as i64) - .bind(&metadata) // Content JSON - .bind(&aggregated_text) // Text content - .bind(&summary) // Summary - .bind(&metadata) // Metadata - .bind(&child_ids) // Child IDs + .bind(&metadata) + .bind(&aggregated_text) + .bind(&summary) + .bind(&metadata) + .bind(&child_ids) .execute(&mut *tx) .await?; diff --git a/src/worker/job_worker.rs b/src/worker/job_worker.rs index 1985c6b..88ee80c 100644 --- a/src/worker/job_worker.rs +++ b/src/worker/job_worker.rs @@ -371,6 +371,57 @@ impl JobWorker { 0, ) .await?; + if let Err(e) = self + .db + .upsert_processor_result(job.id, *processor_type, &job.uuid, "completed") + .await + { + error!("Failed to create completed processor result: {}", e); + } + // Load output file and store to pre_chunks + if let Ok(json_str) = std::fs::read_to_string(&output_path) { + let store_result = match processor_type { + crate::core::db::ProcessorType::Asr => { + if let Ok(result) = serde_json::from_str::(&json_str) { + ProcessorPool::store_asr_chunks(&self.db, &job.uuid, &result).await + } else { Ok(()) } + } + crate::core::db::ProcessorType::Asrx => { + if let Ok(result) = serde_json::from_str::(&json_str) { + ProcessorPool::store_asrx_chunks(&self.db, &job.uuid, &result).await + } else { Ok(()) } + } + crate::core::db::ProcessorType::Cut => { + if let Ok(result) = serde_json::from_str::(&json_str) { + ProcessorPool::store_cut_chunks(&self.db, &job.uuid, &result).await + } else { Ok(()) } + } + crate::core::db::ProcessorType::Yolo => { + if let Ok(result) = serde_json::from_str::(&json_str) { + ProcessorPool::store_yolo_chunks(&self.db, &job.uuid, &result).await + } else { Ok(()) } + } + crate::core::db::ProcessorType::Ocr => { + if let Ok(result) = serde_json::from_str::(&json_str) { + ProcessorPool::store_ocr_chunks(&self.db, &job.uuid, &result).await + } else { Ok(()) } + } + crate::core::db::ProcessorType::Face => { + if let Ok(result) = serde_json::from_str::(&json_str) { + ProcessorPool::store_face_chunks(&self.db, &job.uuid, &result).await + } else { Ok(()) } + } + crate::core::db::ProcessorType::Pose => { + if let Ok(result) = serde_json::from_str::(&json_str) { + ProcessorPool::store_pose_chunks(&self.db, &job.uuid, &result).await + } else { Ok(()) } + } + _ => Ok(()), + }; + if let Err(e) = store_result { + error!("Failed to store {} chunks for {}: {}", processor_type.as_str(), job.uuid, e); + } + } started_count += 1; // 覆寫 result_map 讓相依性檢查能正確判斷 result_map.insert( @@ -1091,7 +1142,7 @@ impl JobWorker { /// 自動對 sentence chunks 產生 vector embedding 並存入 PG + Qdrant async fn vectorize_chunks(db: &PostgresDb, uuid: &str) -> anyhow::Result<()> { - let embedder = Embedder::new("mxbai-embed-large:latest".to_string()); + let embedder = Embedder::new("embeddinggemma-300m".to_string()); let qdrant = QdrantDb::new(); let pool = db.pool();