diff --git a/docs/M4_M5_COLLABORATION_PROTOCOL.md b/docs/M4_M5_COLLABORATION_PROTOCOL.md new file mode 100644 index 0000000..8be51c7 --- /dev/null +++ b/docs/M4_M5_COLLABORATION_PROTOCOL.md @@ -0,0 +1,50 @@ +# M4 / M5 協作協議 + +## 核心原則:檔案是 source of truth + +所有 processor 的產出是 `{uuid}.{processor}.json` 檔案。 +**檔案存在 = 處理完成**,優先於 DB 或 Redis 的任何狀態記錄。 + +## 絕對禁止 + +### 1. 不可刪除已存在的輸出檔 +- 任何 `{uuid}.{processor}.*` 檔案,無論是 `.json`、`.json.tmp`、`.json.partial`、`.json.err` +- 一律不允許 `rm`、`unlink`、`delete` +- 唯一例外:明確的人工指令 `rm` / `Delete this file` + +### 2. 不可覆蓋已存在的輸出檔 +- 重新執行 processor 前,必須先 **copy(非 rename)** 加上時間戳備份 +- 備份命名:`{uuid}.{processor}.{timestamp}.{original_extension}` +- 若備份名已存在,跳過(不覆蓋不 counter) +- 原檔保留不動 + +### 3. 不可跨域操作 +- M4 只能在 M4 機器(Mac Mini)上操作 +- M5 只能在 M5 機器(MacBook Pro)上操作 +- 禁止任何跨機器的檔案操作或 cleanup + +## 重跑 processor 的正確流程 + +1. Worker 檢查 `{uuid}.{processor}.json` 是否存在 +2. **存在 → 跳過**(無論 DB/Redis 狀態) +3. 不存在 → copy 備份既有 `{uuid}.{processor}.*` → 執行 processor +4. Processor 輸出寫入 `.tmp` → 完成後 rename 為 `.json` + +## 例外處理 + +| 狀態 | 行為 | +|------|------| +| `.json` 存在 | 跳過,視為完成 | +| `.json.tmp` 存在(無 `.json`) | 視為未完成,備份後重跑 | +| `.json.partial` 存在(無 `.json`) | 視為未完成,備份後重跑 | +| `.json.err` 存在(無 `.json`) | 視為未完成,備份後重跑 | +| Process 被 kill(SIGKILL) | partial 存為 `.json.partial`(非 `.json`) | + +## 違規後果 + +2026-05-09 事故:M4 release 打包未含 .json → 跨域操作 → M5 cleanup 誤刪 asr.json +→ 導致 ASR 需重跑(完整電影約 1.5hr) +→ YOLO 需重跑 +→ 損失已完成的 pipeline 進度 + +此類違規不可再發生。 diff --git a/docs/M4_RELEASE_INCIDENT_2026-05-09.md b/docs/M4_RELEASE_INCIDENT_2026-05-09.md new file mode 100644 index 0000000..c4443b2 --- /dev/null +++ b/docs/M4_RELEASE_INCIDENT_2026-05-09.md @@ -0,0 +1,31 @@ +# M4 Release Incident — 2026-05-09 + +## Summary + +M4 在進行 release 打包作業時,未依照計畫包含 output `.json` 檔案,僅在 database 中保留 records。此外 M4 違反操作邊界進入 M5 管轄範圍,M5 執行 cleanup 時將已完成的 `asr.json` 一併刪除。 + +## Impact + +| 檔案 | 狀態 | 說明 | +|------|------|------| +| `{uuid}.asr.json` | ❌ 遺失 | 已完成的 ASR 輸出被 M5 cleanup 誤刪 | +| `{uuid}.yolo.json` | ❌ 損毀 | JSON parse error,需重跑 | +| DB records | ⚠️ 不一致 | processor_results 狀態與實際檔案不符 | + +## Root Cause + +1. **M4 release 打包遺漏**: Release 流程未將 `.json` 輸出檔納入打包範圍,只保留了 DB。 +2. **M4 越界操作**: M4 在 M5 的目錄/範圍內執行操作,違反開發隔離原則。 +3. **M5 cleanup 誤刪**: M5 的 cleanup 機制未預期 M4 的產出,將 `asr.json` 視為無用檔案清除。 + +## 處理 + +- ASR: 重跑中(asr_processor.py,完整電影約 6780s) +- YOLO: 重跑中(yolo_processor.py) +- 已修改 worker 邏輯:開機後以 `.json` 檔案存在為 source of truth,不再僅依賴 DB/Redis 狀態 + +## 預防措施 + +- Release 流程需明確定義 deliverables 包含 `.json` 檔案 +- M4/M5 操作邊界需嚴格遵守,禁止跨域操作 +- Cleanup 機制應先確認檔案是否為有效 processor output diff --git a/scripts/start_momentry.sh b/scripts/start_momentry.sh new file mode 100755 index 0000000..7d4587f --- /dev/null +++ b/scripts/start_momentry.sh @@ -0,0 +1,144 @@ +#!/bin/bash + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +PROJECT_DIR="$(dirname "$SCRIPT_DIR")" +ENV_FILE="${PROJECT_DIR}/.env.development" + +# Colors +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +RED='\033[0;31m' +CYAN='\033[0;36m' +NC='\033[0m' + +FAILURES=() + +check() { + if [ $? -eq 0 ]; then echo -e " ${GREEN}✅${NC} $1"; else echo -e " ${RED}❌${NC} $1"; FAILURES+=("$1"); fi +} + +echo -e "${CYAN}====================================${NC}" +echo -e "${CYAN} Momentry Core - Startup Sequence${NC}" +echo -e "${CYAN}====================================${NC}" +echo "" + +LOG_DIR="/Users/accusys/momentry/logs" + +# ── 1. PostgreSQL ── +echo -e "${YELLOW}[1/7] PostgreSQL${NC}" +PG_DATA="/Users/accusys/pgsql/data" +PG_BIN="/Users/accusys/pgsql/18.3/bin" +if $PG_BIN/pg_isready -q 2>/dev/null; then + echo -e " ${GREEN}✅${NC} already running" +else + $PG_BIN/pg_ctl -D "$PG_DATA" -l "$LOG_DIR/pg.log" start 2>/dev/null + sleep 2 + $PG_BIN/pg_isready -q 2>/dev/null; check "started" +fi + +# ── 2. Redis ── +echo -e "${YELLOW}[2/7] Redis${NC}" +if redis-cli ping 2>/dev/null | grep -q PONG; then + echo -e " ${GREEN}✅${NC} already running" +else + brew services start redis 2>/dev/null || redis-server --daemonize yes 2>/dev/null + sleep 2 + redis-cli ping 2>/dev/null | grep -q PONG; check "started" +fi + +# ── 3. Qdrant ── +echo -e "${YELLOW}[3/7] Qdrant${NC}" +QDRANT_BIN="${PROJECT_DIR}/services/qdrant/target/release/qdrant" +QDRANT_STORAGE="/Users/accusys/momentry/qdrant_storage" +if curl -s -o /dev/null -w "%{http_code}" --connect-timeout 3 http://localhost:6333/healthz 2>/dev/null | grep -q 200; then + echo -e " ${GREEN}✅${NC} already running" +else + mkdir -p "$QDRANT_STORAGE" + nohup "$QDRANT_BIN" > "$LOG_DIR/qdrant.log" 2>&1 & + for i in $(seq 1 15); do + sleep 2 + if curl -s -o /dev/null -w "%{http_code}" --connect-timeout 2 http://localhost:6333/healthz 2>/dev/null | grep -q 200; then + break + fi + done + curl -s -o /dev/null -w "%{http_code}" --connect-timeout 3 http://localhost:6333/healthz 2>/dev/null | grep -q 200; check "started" +fi + +# ── 4. Qdrant Collection ── +echo -e "${YELLOW}[4/7] Qdrant Collection${NC}" +source "$ENV_FILE" 2>/dev/null || true +COLLECTION="${QDRANT_COLLECTION:-momentry_dev_rule1_v2}" +EXISTS=$(curl -s "http://localhost:6333/collections/$COLLECTION" 2>/dev/null | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('result',{}).get('status','not_found'))" 2>/dev/null) +if [ "$EXISTS" = "not_found" ]; then + curl -s -X PUT "http://localhost:6333/collections/$COLLECTION" \ + -H "Content-Type: application/json" \ + -d '{"vectors":{"size":768,"distance":"Cosine"}}' > /dev/null 2>&1 + sleep 1 +fi +curl -s "http://localhost:6333/collections/$COLLECTION" 2>/dev/null | python3 -c "import sys,json; d=json.load(sys.stdin); s=d.get('result',{}).get('status','not_found'); assert s in ('green','ok'), f'unexpected status: {s}'" 2>/dev/null +check "collection '$COLLECTION' ready" + +# ── 5. LLM (Gemma4 / llama.cpp) ── +echo -e "${YELLOW}[5/7] LLM Server (Gemma4)${NC}" +if curl -s -o /dev/null -w "%{http_code}" --connect-timeout 5 http://localhost:8082/health 2>/dev/null | grep -q 200; then + echo -e " ${GREEN}✅${NC} already running" +else + LLM_BIN="/Users/accusys/llama/bin/llama-server" + LLM_MODEL="/Users/accusys/models/google_gemma-4-26B-A4B-it-Q5_K_M.gguf" + nohup "$LLM_BIN" -m "$LLM_MODEL" --host 0.0.0.0 --port 8082 -ngl 99 -c 16384 --temp 0.1 --mlock > "$LOG_DIR/llama_server.log" 2>&1 & + echo -e " ${YELLOW}⏳ loading model (~30s)...${NC}" + for i in $(seq 1 30); do + sleep 2 + if curl -s -o /dev/null -w "%{http_code}" --connect-timeout 2 http://localhost:8082/health 2>/dev/null | grep -q 200; then + break + fi + done + curl -s -o /dev/null -w "%{http_code}" --connect-timeout 3 http://localhost:8082/health 2>/dev/null | grep -q 200; check "started" +fi + +# ── 6. Embedding Server ── +echo -e "${YELLOW}[6/7] EmbeddingGemma${NC}" +if curl -s -o /dev/null -w "%{http_code}" --connect-timeout 5 http://localhost:11436/health 2>/dev/null | grep -q 200; then + echo -e " ${GREEN}✅${NC} already running" +else + VENV_PYTHON="${PROJECT_DIR}/venv/bin/python" + EMBED_SCRIPT="${PROJECT_DIR}/scripts/embeddinggemma_server.py" + if [ ! -f "$VENV_PYTHON" ]; then + VENV_PYTHON="/opt/homebrew/bin/python3.11" + pip install flask 2>/dev/null || true + fi + nohup "$VENV_PYTHON" "$EMBED_SCRIPT" --port 11436 > "$LOG_DIR/embed.log" 2>&1 & + sleep 5 + curl -s -o /dev/null -w "%{http_code}" --connect-timeout 5 http://localhost:11436/health 2>/dev/null | grep -q 200; check "started" +fi + +# ── 7. Playground Server ── +echo -e "${YELLOW}[7/7] Playground API Server${NC}" +if curl -s -o /dev/null -w "%{http_code}" -H "X-API-Key: muser_68600856036340bcafc01930eb4bd839_1774418104_97221b69" --connect-timeout 5 http://127.0.0.1:3003/api/v1/agents/5w1h/status 2>/dev/null | grep -q 200; then + echo -e " ${GREEN}✅${NC} already running" +else + cd "$PROJECT_DIR" + nohup target/debug/momentry_playground server > "$LOG_DIR/playground.log" 2>&1 & + sleep 4 + curl -s -o /dev/null -w "%{http_code}" -H "X-API-Key: muser_68600856036340bcafc01930eb4bd839_1774418104_97221b69" --connect-timeout 5 http://127.0.0.1:3003/api/v1/agents/5w1h/status 2>/dev/null | grep -q 200; check "started" +fi + +echo "" +if [ ${#FAILURES[@]} -eq 0 ]; then + echo -e "${GREEN}====================================${NC}" + echo -e "${GREEN} All services running${NC}" + echo -e "${GREEN}====================================${NC}" +else + echo -e "${RED}====================================${NC}" + echo -e "${RED} ${#FAILURES[@]} service(s) failed to start${NC}" + echo -e "${RED}====================================${NC}" + for f in "${FAILURES[@]}"; do echo -e " ${RED}❌${NC} $f"; done +fi +echo "" +echo " Playground: http://127.0.0.1:3003" +echo " LLM: http://127.0.0.1:8082" +echo " Embedding: http://127.0.0.1:11436" +echo " Qdrant: http://localhost:6333" +echo " PostgreSQL: localhost:5432" +echo " Redis: localhost:6379" +echo "" diff --git a/src/core/processor/executor.rs b/src/core/processor/executor.rs index 494ee2b..fc604bc 100644 --- a/src/core/processor/executor.rs +++ b/src/core/processor/executor.rs @@ -244,8 +244,10 @@ impl PythonExecutor { .and_then(|c| serde_json::from_str::(&c).ok()) .is_some(); if is_valid { - let _ = std::fs::rename(tmp, out); - tracing::warn!("[Executor] Partial output preserved: {:?}", out); + let mut partial_path = out.to_path_buf(); + partial_path.set_extension("json.partial"); + let _ = std::fs::rename(tmp, &partial_path); + tracing::warn!("[Executor] Partial output preserved: {:?}", partial_path); } else { let mut err_path = out.to_path_buf(); err_path.set_extension("json.err"); diff --git a/src/worker/job_worker.rs b/src/worker/job_worker.rs index 998502e..0f0ea1e 100644 --- a/src/worker/job_worker.rs +++ b/src/worker/job_worker.rs @@ -1,11 +1,13 @@ use anyhow::Result; use std::collections::HashMap; +use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use tokio::time::sleep; use tracing::{error, info, warn}; use crate::core::chunk::{rule1_ingest, rule3_ingest}; +use crate::core::config::OUTPUT_DIR; use crate::core::db::qdrant_db::QdrantDb; use crate::api::five_w1h_agent_api::run_5w1h_agent; use crate::api::identity_agent_api::run_identity_agent; @@ -63,6 +65,21 @@ impl JobWorker { let dynamic_max = resources.safe_max_concurrent(self.config.max_concurrent); self.processor_pool.sweep_stale().await; + // Reset stale running jobs: jobs stuck in 'running' with no active processor results + if let Err(e) = sqlx::query( + "UPDATE dev.monitor_jobs SET status = 'pending', updated_at = NOW() + WHERE status = 'running' + AND id NOT IN ( + SELECT DISTINCT job_id FROM dev.processor_results + WHERE status IN ('pending', 'running') + )" + ) + .execute(self.db.pool()) + .await + { + error!("Failed to reset stale running jobs: {}", e); + } + let health_key = format!("{}health", crate::core::config::REDIS_KEY_PREFIX.as_str()); let now = chrono::Utc::now().to_rfc3339(); let running_count = self.processor_pool.get_running_count().await; @@ -311,6 +328,59 @@ impl JobWorker { ) .await?; + // Check if output file already exists on disk (source of truth) + let output_path = + PathBuf::from(OUTPUT_DIR.as_str()).join(format!("{}.{}.json", job.uuid, processor_type.as_str())); + if output_path.exists() { + info!( + "Processor {} output file exists, marking completed and skipping", + processor_type.as_str() + ); + self.db + .update_processor_progress( + &job.uuid, + processor_type.as_str(), + total_frames, + total_frames, + "completed", + ) + .await?; + let total = total_frames as i32; + self.redis + .update_worker_processor_status( + &job.uuid, + processor_type.as_str(), + "completed", + None, + total, + total, + 0, + 0, + 0, + ) + .await?; + started_count += 1; + // 覆寫 result_map 讓相依性檢查能正確判斷 + result_map.insert(*processor_type, crate::core::db::ProcessorResult { + id: 0, + job_id: job.id, + processor_type: *processor_type, + status: ProcessorJobStatus::Completed, + started_at: None, + completed_at: None, + duration_secs: None, + chunks_produced: 0, + frames_processed: total_frames as i32, + output_size_bytes: 0, + error_message: None, + output_data: None, + retry_count: 0, + created_at: String::new(), + updated_at: String::new(), + }); + continue; + } + // Check if processor already in terminal state if let Some(result) = result_map.get(processor_type) { match result.status { @@ -432,6 +502,35 @@ impl JobWorker { break; } + // 備份既有輸出檔(copy + timestamp suffix,不刪原檔) + let uuid = &job.uuid; + let proc = processor_type.as_str(); + let prefix = format!("{}.{}.", uuid, proc); + if let Ok(dir) = std::fs::read_dir(OUTPUT_DIR.as_str()) { + let ts = chrono::Utc::now().format("%Y%m%d_%H%M%S"); + for entry in dir.flatten() { + let name = entry.file_name(); + let name = name.to_string_lossy(); + if !name.starts_with(&prefix) { + continue; + } + let suffix = &name[prefix.len()..]; + if suffix.starts_with(|c: char| c.is_ascii_digit()) { + continue; + } + let bak_name = format!("{}{}.{}", prefix, ts, &name[prefix.len()..]); + let bak_path = PathBuf::from(OUTPUT_DIR.as_str()).join(&bak_name); + if bak_path.exists() { + info!("Backup already exists: {}, skipping", bak_path.display()); + } else { + match std::fs::copy(entry.path(), &bak_path) { + Ok(bytes) => info!("Backed up {} -> {} ({} bytes)", name, bak_path.display(), bytes), + Err(e) => warn!("Failed to backup {}: {}", name, e), + } + } + } + } + let processor_result_id = self .db .create_processor_result(job.id, *processor_type, &job.uuid)