diff --git a/scripts/start_momentry.sh b/scripts/start_momentry.sh index 1b5ade6..9f19fd8 100755 --- a/scripts/start_momentry.sh +++ b/scripts/start_momentry.sh @@ -25,8 +25,8 @@ echo "" LOG_DIR="/Users/accusys/momentry/logs" # ── 1. PostgreSQL ── -echo -e "${YELLOW}[1/10] PostgreSQL${NC}" -PG_DATA="/Users/accusys/momentry/var/postgresql" +echo -e "${YELLOW}[1/8] PostgreSQL${NC}" +PG_DATA="/Users/accusys/pgsql/data" PG_BIN="/Users/accusys/pgsql/18.3/bin" if $PG_BIN/pg_isready -q 2>/dev/null; then echo -e " ${GREEN}✅${NC} already running" @@ -37,7 +37,7 @@ else fi # ── 2. Redis ── -echo -e "${YELLOW}[2/10] Redis${NC}" +echo -e "${YELLOW}[2/8] Redis${NC}" if redis-cli ping 2>/dev/null | grep -q PONG; then echo -e " ${GREEN}✅${NC} already running" else @@ -47,15 +47,14 @@ else fi # ── 3. Qdrant ── -echo -e "${YELLOW}[3/10] Qdrant${NC}" +echo -e "${YELLOW}[3/8] Qdrant${NC}" QDRANT_BIN="/Users/accusys/momentry_resources/bin/qdrant" QDRANT_STORAGE="/Users/accusys/momentry/qdrant_storage" if curl -s -o /dev/null -w "%{http_code}" --connect-timeout 3 http://localhost:6333/healthz 2>/dev/null | grep -q 200; then echo -e " ${GREEN}✅${NC} already running" else mkdir -p "$QDRANT_STORAGE" - cd "$QDRANT_STORAGE" && "$QDRANT_BIN" > "$LOG_DIR/qdrant.log" 2>&1 & - cd "$PROJECT_DIR" + "$QDRANT_BIN" > "$LOG_DIR/qdrant.log" 2>&1 & for i in $(seq 1 15); do sleep 2 if curl -s -o /dev/null -w "%{http_code}" --connect-timeout 2 http://localhost:6333/healthz 2>/dev/null | grep -q 200; then @@ -66,7 +65,7 @@ else fi # ── 4. Qdrant Collection ── -echo -e "${YELLOW}[4/10] Qdrant Collection${NC}" +echo -e "${YELLOW}[4/8] Qdrant Collection${NC}" source "$ENV_FILE" 2>/dev/null || true COLLECTION="${QDRANT_COLLECTION:-momentry_dev_rule1_v2}" EXISTS=$(curl -s "http://localhost:6333/collections/$COLLECTION" 2>/dev/null | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('result',{}).get('status','not_found'))" 2>/dev/null) @@ -80,7 +79,7 @@ curl -s "http://localhost:6333/collections/$COLLECTION" 2>/dev/null | python3 -c check "collection '$COLLECTION' ready" # ── 5. LLM (Gemma4 / llama.cpp) ── -echo -e "${YELLOW}[5/10] LLM Server (Gemma4)${NC}" +echo -e "${YELLOW}[5/8] LLM Server (Gemma4)${NC}" if curl -s -o /dev/null -w "%{http_code}" --connect-timeout 5 http://localhost:8082/health 2>/dev/null | grep -q 200; then echo -e " ${GREEN}✅${NC} already running" else @@ -97,27 +96,8 @@ else curl -s -o /dev/null -w "%{http_code}" --connect-timeout 3 http://localhost:8082/health 2>/dev/null | grep -q 200; check "started" fi -# ── 6. MariaDB ── -echo -e "${YELLOW}[6/10] MariaDB${NC}" -MARIADB_BIN="/Users/accusys/momentry_resources/mariadb/bin/mariadbd" -MARIADB_DATA="/Users/accusys/momentry/var/mysql" -if [ -S /tmp/mysql.sock ] || /Users/accusys/momentry_resources/mariadb/bin/mariadb-admin ping --silent 2>/dev/null; then - echo -e " ${GREEN}✅${NC} already running" -else - mkdir -p "$MARIADB_DATA" - "$MARIADB_BIN" --datadir="$MARIADB_DATA" --socket=/tmp/mysql.sock --port=3306 \ - > "$LOG_DIR/mariadb.log" 2>&1 & - for i in $(seq 1 10); do - sleep 2 - if /Users/accusys/momentry_resources/mariadb/bin/mariadb-admin ping --silent 2>/dev/null; then - break - fi - done - /Users/accusys/momentry_resources/mariadb/bin/mariadb-admin ping --silent 2>/dev/null; check "started" -fi - -# ── 7. Embedding Server ── -echo -e "${YELLOW}[7/10] EmbeddingGemma${NC}" +# ── 6. Embedding Server ── +echo -e "${YELLOW}[6/8] EmbeddingGemma${NC}" if curl -s -o /dev/null -w "%{http_code}" --connect-timeout 5 http://localhost:11436/health 2>/dev/null | grep -q 200; then echo -e " ${GREEN}✅${NC} already running" else @@ -132,8 +112,8 @@ else curl -s -o /dev/null -w "%{http_code}" --connect-timeout 5 http://localhost:11436/health 2>/dev/null | grep -q 200; check "started" fi -# ── 8. Playground Server ── -echo -e "${YELLOW}[8/10] Playground API Server${NC}" +# ── 7. Playground Server ── +echo -e "${YELLOW}[7/8] Playground API Server${NC}" if curl -s -o /dev/null -w "%{http_code}" -H "X-API-Key: muser_68600856036340bcafc01930eb4bd839_1774418104_97221b69" --connect-timeout 5 http://127.0.0.1:3003/api/v1/agents/5w1h/status 2>/dev/null | grep -q 200; then echo -e " ${GREEN}✅${NC} already running" else @@ -143,27 +123,19 @@ else curl -s -o /dev/null -w "%{http_code}" -H "X-API-Key: muser_68600856036340bcafc01930eb4bd839_1774418104_97221b69" --connect-timeout 5 http://127.0.0.1:3003/api/v1/agents/5w1h/status 2>/dev/null | grep -q 200; check "started" fi -# ── 9. Caddy ── -echo -e "${YELLOW}[9/10] Caddy${NC}" -CADDY_BIN="/Users/accusys/momentry_resources/bin/caddy" -CADDY_CONFIG="/Users/accusys/momentry/etc/Caddyfile" -if curl -s -o /dev/null -w "%{http_code}" --connect-timeout 3 http://127.0.0.1:2019/config/ 2>/dev/null | grep -q 200; then +# ── 8. Ollama (Gemma4 E4B) ── +echo -e "${YELLOW}[8/8] Ollama (Gemma4 E4B)${NC}" +if curl -s -o /dev/null -w "%{http_code}" --connect-timeout 5 http://localhost:11434/api/tags 2>/dev/null | grep -q 200; then echo -e " ${GREEN}✅${NC} already running" else - "$CADDY_BIN" run --config "$CADDY_CONFIG" > "$LOG_DIR/caddy.log" 2>&1 & - sleep 3 - curl -s -o /dev/null -w "%{http_code}" --connect-timeout 3 http://127.0.0.1:2019/config/ 2>/dev/null | grep -q 200; check "started" -fi - -# ── 10. Gitea ── -echo -e "${YELLOW}[10/10] Gitea${NC}" -GITEA_BIN="/Users/accusys/momentry_resources/bin/gitea" -if curl -s -o /dev/null -w "%{http_code}" --connect-timeout 3 http://127.0.0.1:3000/api/v1/version 2>/dev/null | grep -q 200; then - echo -e " ${GREEN}✅${NC} already running" -else - "$GITEA_BIN" web --port 3000 > "$LOG_DIR/gitea.log" 2>&1 & - sleep 4 - curl -s -o /dev/null -w "%{http_code}" --connect-timeout 3 http://127.0.0.1:3000/api/v1/version 2>/dev/null | grep -q 200; check "started" + OLLAMA_BIN="/Users/accusys/momentry_resources/bin/ollama" + if [ ! -f "$OLLAMA_BIN" ]; then + echo -e " ${YELLOW}⚠ ollama binary not found, skipping${NC}" + else + "$OLLAMA_BIN" serve > "$LOG_DIR/ollama.log" 2>&1 & + sleep 3 + curl -s -o /dev/null -w "%{http_code}" --connect-timeout 5 http://localhost:11434/api/tags 2>/dev/null | grep -q 200; check "started" + fi fi echo "" @@ -179,13 +151,10 @@ else fi echo "" echo " Playground: http://127.0.0.1:3003" -echo " Caddy: http://127.0.0.1:2019" -echo " WordPress: http://m5max128wp.momentry.ddns.net:8081" echo " LLM: http://127.0.0.1:8082" echo " Embedding: http://127.0.0.1:11436" +echo " Ollama: http://localhost:11434" echo " Qdrant: http://localhost:6333" echo " PostgreSQL: localhost:5432" echo " Redis: localhost:6379" -echo " MariaDB: localhost:3306" -echo " Gitea: http://127.0.0.1:3000" echo "" diff --git a/src/core/db/postgres_db.rs b/src/core/db/postgres_db.rs index 8de4f36..cc33da5 100644 --- a/src/core/db/postgres_db.rs +++ b/src/core/db/postgres_db.rs @@ -559,31 +559,6 @@ impl ProcessorType { ProcessorType::FiveW1H, ] } - - /// Pipeline type for scheduling: Frame-based, Time-based, or Cross (needs both). - pub fn pipeline(&self) -> PipelineType { - match self { - Self::Cut - | Self::Yolo - | Self::Face - | Self::Ocr - | Self::Pose - | Self::VisualChunk - | Self::Scene => PipelineType::Frame, - - Self::Asr | Self::Asrx => PipelineType::Time, - - Self::Story | Self::FiveW1H => PipelineType::Cross, - } - } -} - -/// Pipeline classification for worker scheduling. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum PipelineType { - Frame, - Time, - Cross, } #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)] @@ -2069,7 +2044,7 @@ impl PostgresDb { metadata, \ (1 - (embedding <=> $1::vector)) as similarity \ FROM {} \ - WHERE file_uuid = $2 AND chunk_type IN ('story_parent', 'llm_parent') AND embedding IS NOT NULL \ + WHERE file_uuid = $2 AND chunk_type IN ('sentence', 'story_parent', 'llm_parent') AND embedding IS NOT NULL \ ORDER BY embedding <=> $1::vector \ LIMIT $3", chunk_table @@ -3079,8 +3054,31 @@ impl PostgresDb { Ok(()) } - pub async fn store_vector(&self, _chunk_id: &str, _vector: &[f32], _uuid: &str) -> Result<()> { - tracing::warn!("[PostgresDb] store_vector called; Qdrant handles vectors"); + pub async fn store_vector(&self, chunk_id: &str, vector: &[f32], uuid: &str) -> Result<()> { + let chunk_table = schema::table_name("chunk"); + let vector_json = serde_json::to_string(vector)?; + sqlx::query(&format!( + "UPDATE {} SET embedding = $1::vector WHERE chunk_id = $2 AND file_uuid = $3", + chunk_table + )) + .bind(&vector_json) + .bind(chunk_id) + .bind(uuid) + .execute(&self.pool) + .await?; + Ok(()) + } + + pub async fn update_vector_id(&self, chunk_id: &str, vector_id: &str) -> Result<()> { + let chunk_table = schema::table_name("chunk"); + sqlx::query(&format!( + "UPDATE {} SET vector_id = $1 WHERE chunk_id = $2", + chunk_table + )) + .bind(vector_id) + .bind(chunk_id) + .execute(&self.pool) + .await?; Ok(()) } @@ -3176,11 +3174,6 @@ impl PostgresDb { ChunkStore::get_chunks_by_uuid(self, uuid).await } - pub async fn update_vector_id(&self, _chunk_id: &str, _vector_id: &str) -> Result<()> { - tracing::warn!("[PostgresDb] update_vector_id stub"); - Ok(()) - } - pub async fn create_gitea_token( &self, _id: i64, diff --git a/src/worker/job_worker.rs b/src/worker/job_worker.rs index 4dcb8cb..1985c6b 100644 --- a/src/worker/job_worker.rs +++ b/src/worker/job_worker.rs @@ -12,8 +12,8 @@ use crate::core::chunk::{rule1_ingest, rule3_ingest}; use crate::core::config::OUTPUT_DIR; use crate::core::db::qdrant_db::QdrantDb; use crate::core::db::{ - schema, MonitorJobStatus, PipelineType, PostgresDb, ProcessorJobStatus, ProcessorType, - RedisClient, VectorPayload, VideoStatus, + schema, MonitorJobStatus, PostgresDb, ProcessorJobStatus, RedisClient, VectorPayload, + VideoStatus, }; use crate::core::embedding::Embedder; use crate::core::processor::heuristic_scene::generate_scene_meta; @@ -338,81 +338,62 @@ impl JobWorker { .await?; // Check if output file already exists on disk (source of truth) - // and validate that it's a parseable JSON with expected structure. let output_path = PathBuf::from(OUTPUT_DIR.as_str()).join(format!( "{}.{}.json", job.uuid, processor_type.as_str() )); if output_path.exists() { - match validate_output_file(&output_path, *processor_type) { - Ok(true) => { - info!( - "Processor {} output file exists and valid, marking completed", - processor_type.as_str() - ); - self.db - .update_processor_progress( - &job.uuid, - processor_type.as_str(), - total_frames, - total_frames, - "completed", - ) - .await?; - let total = total_frames as i32; - self.redis - .update_worker_processor_status( - &job.uuid, - processor_type.as_str(), - "completed", - None, - total, - total, - 0, - 0, - 0, - ) - .await?; - started_count += 1; - result_map.insert( - *processor_type, - crate::core::db::ProcessorResult { - id: 0, - job_id: job.id, - processor_type: *processor_type, - status: ProcessorJobStatus::Completed, - started_at: None, - completed_at: None, - duration_secs: None, - chunks_produced: 0, - frames_processed: total_frames as i32, - output_size_bytes: 0, - error_message: None, - output_data: None, - retry_count: 0, - created_at: String::new(), - updated_at: String::new(), - }, - ); - continue; - } - Ok(false) => { - warn!( - "Processor {} output file exists but content invalid, will reprocess", - processor_type.as_str() - ); - // fall through → reprocess - } - Err(e) => { - warn!( - "Processor {} output validation error: {}, will reprocess", - processor_type.as_str(), - e - ); - // fall through → reprocess - } - } + info!( + "Processor {} output file exists, marking completed and skipping", + processor_type.as_str() + ); + self.db + .update_processor_progress( + &job.uuid, + processor_type.as_str(), + total_frames, + total_frames, + "completed", + ) + .await?; + let total = total_frames as i32; + self.redis + .update_worker_processor_status( + &job.uuid, + processor_type.as_str(), + "completed", + None, + total, + total, + 0, + 0, + 0, + ) + .await?; + started_count += 1; + // 覆寫 result_map 讓相依性檢查能正確判斷 + result_map.insert( + *processor_type, + crate::core::db::ProcessorResult { + id: 0, + job_id: job.id, + processor_type: *processor_type, + status: ProcessorJobStatus::Completed, + started_at: None, + completed_at: None, + duration_secs: None, + chunks_produced: 0, + frames_processed: total_frames as i32, + output_size_bytes: 0, + error_message: None, + output_data: None, + retry_count: 0, + created_at: String::new(), + updated_at: String::new(), + }, + ); + continue; } // Check if processor already in terminal state @@ -503,15 +484,10 @@ impl JobWorker { } } - // Check pipeline capacity before starting processor - if !self.processor_pool.can_start_for(processor_type.pipeline()).await { + // Check capacity before starting processor + if !self.processor_pool.can_start().await { info!( - "Max {} processors reached, skipping remaining processors for job {}", - match processor_type.pipeline() { - PipelineType::Frame => "frame", - PipelineType::Time => "time", - PipelineType::Cross => "cross", - }, + "Max concurrent processors reached, skipping remaining processors for job {}", job.uuid ); // 為所有未啟動的 processors 創建 Skipped 記錄 @@ -677,16 +653,8 @@ impl JobWorker { expected_count ); - // 如果 processor_results 筆數少於期望的 processor 數,代表有 processor 尚未啟動(如依賴未滿足) - if results.len() < expected_count { - info!( - "Job {} has {}/{} processor results, not all processors created yet. Skipping completion check.", - uuid, - results.len(), - expected_count - ); - return Ok(()); - } + // 依實際依賴狀態觸發後處理,不需等待所有 processor 結果 + // 例如:Rule 1 只需 ASR+ASRX 完成即可觸發,不須等 face/pose/story 完成 // 定義必要 processor(必須完成的才算 job 成功) let essential_processors = ["cut", "asr", "yolo"]; @@ -1204,35 +1172,6 @@ impl JobWorker { } } -/// 驗證 processor 輸出檔案的完整性。 -/// 回傳 Ok(true) 表示有效,Ok(false) 表示檔案存在但內容異常需重跑,Err 表示檢查失敗。 -fn validate_output_file(path: &std::path::Path, processor_type: ProcessorType) -> Result { - let content = match std::fs::read_to_string(path) { - Ok(c) => c, - Err(_) => return Ok(false), - }; - if content.trim().is_empty() { - return Ok(false); - } - let json: serde_json::Value = match serde_json::from_str(&content) { - Ok(v) => v, - Err(_) => return Ok(false), - }; - // 依 processor type 檢查必要欄位 - let valid = match processor_type { - ProcessorType::Asr => json.get("segments").and_then(|s| s.as_array()).map_or(false, |a| !a.is_empty()), - ProcessorType::Asrx => json.get("segments").and_then(|s| s.as_array()).map_or(false, |a| !a.is_empty()), - ProcessorType::Yolo => json.get("frames").and_then(|f| f.as_object()).is_some(), - ProcessorType::Face => json.get("frames").and_then(|f| f.as_object()).is_some(), - ProcessorType::Ocr => json.get("frames").and_then(|f| f.as_object()).is_some(), - ProcessorType::Pose => json.get("frames").and_then(|f| f.as_object()).is_some(), - ProcessorType::Cut => json.get("segments").or_else(|| json.get("scenes")).and_then(|s| s.as_array()).map_or(false, |a| !a.is_empty()), - // VisualChunk / Scene / Story / FiveW1H: 只檢查是 valid JSON 即可 - _ => true, - }; - Ok(valid) -} - #[cfg(test)] mod tests { use super::*; @@ -1243,34 +1182,4 @@ mod tests { assert!(config.enabled); assert!(config.max_concurrent >= 1); } - - fn test_validate_path(name: &str) -> std::path::PathBuf { - let dir = std::env::temp_dir().join(format!("test_validate_{}", name)); - let _ = std::fs::create_dir_all(&dir); - dir.join("output.json") - } - - #[test] - fn test_validate_output_empty() { - let path = test_validate_path("empty"); - std::fs::write(&path, "").unwrap(); - assert!(!validate_output_file(&path, ProcessorType::Yolo).unwrap_or(false)); - let _ = std::fs::remove_file(&path); - } - - #[test] - fn test_validate_output_invalid_json() { - let path = test_validate_path("invalid"); - std::fs::write(&path, "not json").unwrap(); - assert!(!validate_output_file(&path, ProcessorType::Yolo).unwrap_or(false)); - let _ = std::fs::remove_file(&path); - } - - #[test] - fn test_validate_output_yolo_ok() { - let path = test_validate_path("yolo_ok"); - std::fs::write(&path, r#"{"frames":{"1":{"detections":[]}}}"#).unwrap(); - assert!(validate_output_file(&path, ProcessorType::Yolo).unwrap_or(false)); - let _ = std::fs::remove_file(&path); - } }