## v0.9.20260325_144654 ### Features - API Key Authentication System - Job Worker System - V2 Backup Versioning ### Bug Fixes - get_processor_results_by_job column mapping Co-authored-by: OpenCode
32 KiB
32 KiB
Momentry Core 分層緩存架構開發計劃
版本: V1.0
日期: 2026-03-24
目標: 實現 Redis + MongoDB 分層緩存架構
1. 概述
1.1 目標
在 Momentry Core 中實現分層緩存架構:
- 小型、高頻存取 → Redis
- 中型、查詢導向 → MongoDB
1.2 現有架構
| 組件 | 現況 | 用途 |
|---|---|---|
| Redis | ✅ 已實現 | Job 進度、Pub/Sub、健康檢查、API Key(Moka) |
| MongoDB | ⚠️ HTTP 驅動 | 僅用於存儲 chunks |
| 內存緩存 | Moka + RwLock | API Key、視頻記錄 |
1.3 目標架構
┌─────────────────────────────────────────────────────────────┐
│ Layer 1: Redis Cache │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Job Progress │ │ Health Status │ │
│ │ (已有) │ │ (新增) │ │
│ └─────────────────┘ └─────────────────┘ │
│ ┌─────────────────┐ │
│ │ Video Meta 熱讀 │ │
│ │ (新增) │ │
│ └─────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│ Cache Miss
▼
┌─────────────────────────────────────────────────────────────┐
│ Layer 2: MongoDB Cache │
│ Collection: momento.cache │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Videos List │ │ Search Results │ │
│ │ (新增) │ │ (新增) │ │
│ └─────────────────┘ └─────────────────┘ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Hybrid Search │ │ N8n Search │ │
│ │ (新增) │ │ (新增) │ │
│ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│ Cache Miss
▼
┌─────────────────────────────────────────────────────────────┐
│ PostgreSQL / Qdrant │
└─────────────────────────────────────────────────────────────┘
2. 技術棧變更
2.1 Cargo.toml 變更
# 現有
mongodb = { version = "2", features = ["tokio-sync"] }
# 變更為
mongodb = { version = "2", features = ["tokio-comp", "bson"] }
說明:
tokio-comp: 啟用 async tokio runtime 支持bson: BSON 序列化/反序列化支持
3. 新增模組結構
3.1 目錄結構
src/core/
├── cache/ # 新增目錄
│ ├── mod.rs # 模組入口
│ ├── mongo_cache.rs # MongoDB 緩存實現
│ ├── redis_cache.rs # Redis 緩存封裝
│ ├── keys.rs # Cache Key 工具函數
│ └── config.rs # 緩存配置
├── db/
│ ├── mod.rs # 新增 cache 導出
│ ├── mongodb_db.rs # 重構為原生驅動
│ └── ...
└── ...
3.2 文件清單
| 操作 | 文件路徑 | 說明 |
|---|---|---|
| 新增 | src/core/cache/mod.rs |
Cache 模組入口 |
| 新增 | src/core/cache/mongo_cache.rs |
MongoDB 緩存實現 |
| 新增 | src/core/cache/redis_cache.rs |
Redis 緩存封裝 |
| 新增 | src/core/cache/keys.rs |
Cache Key 工具函數 |
| 新增 | src/core/cache/config.rs |
緩存配置 |
| 修改 | src/core/db/mongodb_db.rs |
改用原生 mongodb crate |
| 修改 | src/core/db/mod.rs |
導出新增模組 |
| 修改 | src/api/server.rs |
整合緩存到 API handlers |
| 修改 | src/core/config.rs |
添加 MongoDB 緩存配置 |
| 修改 | Cargo.toml |
更新 mongodb feature |
4. 配置設計
4.1 環境變數
# MongoDB Cache 配置 (新增)
MONGODB_URL=mongodb://localhost:27017
MONGODB_CACHE_ENABLED=true
MONGODB_CACHE_TTL_VIDEOS=300 # 5 分鐘
MONGODB_CACHE_TTL_SEARCH=300 # 5 分鐘
MONGODB_CACHE_TTL_HYBRID_SEARCH=600 # 10 分鐘
MONGODB_CACHE_TTL_VIDEO_META=3600 # 60 分鐘
# Redis Cache 配置 (新增)
REDIS_CACHE_TTL_HEALTH=30 # 30 秒
REDIS_CACHE_TTL_VIDEO_META=3600 # 60 分鐘
4.2 config.rs 結構
// src/core/config.rs
pub mod cache {
use super::*;
pub static MONGODB_URL: Lazy<String> = Lazy::new(|| {
env::var("MONGODB_URL")
.unwrap_or_else(|_| "mongodb://localhost:27017".to_string())
});
pub static MONGODB_CACHE_ENABLED: Lazy<bool> = Lazy::new(|| {
env::var("MONGODB_CACHE_ENABLED")
.unwrap_or_else(|_| "true".to_string())
.parse()
.unwrap_or(true)
});
pub static MONGODB_CACHE_TTL_VIDEOS: Lazy<u64> = Lazy::new(|| {
env::var("MONGODB_CACHE_TTL_VIDEOS")
.unwrap_or_else(|_| "300".to_string())
.parse()
.unwrap_or(300)
});
pub static MONGODB_CACHE_TTL_SEARCH: Lazy<u64> = Lazy::new(|| {
env::var("MONGODB_CACHE_TTL_SEARCH")
.unwrap_or_else(|_| "300".to_string())
.parse()
.unwrap_or(300)
});
pub static MONGODB_CACHE_TTL_HYBRID_SEARCH: Lazy<u64> = Lazy::new(|| {
env::var("MONGODB_CACHE_TTL_HYBRID_SEARCH")
.unwrap_or_else(|_| "600".to_string())
.parse()
.unwrap_or(600)
});
pub static MONGODB_CACHE_TTL_VIDEO_META: Lazy<u64> = Lazy::new(|| {
env::var("MONGODB_CACHE_TTL_VIDEO_META")
.unwrap_or_else(|_| "3600".to_string())
.parse()
.unwrap_or(3600)
});
pub static REDIS_CACHE_TTL_HEALTH: Lazy<u64> = Lazy::new(|| {
env::var("REDIS_CACHE_TTL_HEALTH")
.unwrap_or_else(|_| "30".to_string())
.parse()
.unwrap_or(30)
});
}
5. MongoDB Cache 設計
5.1 Collection 結構
// Collection: momento.cache
// Database: momento
{
"_id": ObjectId("..."),
"key": "videos:list:page=1:limit=20",
"value": {
"videos": [
{
"uuid": "xxx",
"file_path": "/path/to/video.mp4",
"file_name": "video.mp4",
"duration": 120.5,
"width": 1920,
"height": 1080
}
]
},
"category": "videos",
"created_at": ISODate("2026-03-24T08:00:00Z"),
"expires_at": ISODate("2026-03-24T08:05:00Z"),
"hit_count": 0,
"last_access": ISODate("2026-03-24T08:00:00Z")
}
5.2 索引設計
// TTL Index - 自動刪除過期文檔
db.momento.cache.createIndex(
{ "expires_at": 1 },
{ expireAfterSeconds: 0 }
)
// 唯一索引 - 防止重複 key
db.momento.cache.createIndex(
{ "key": 1 },
{ unique: true }
)
// 分類索引 - 批量失效用
db.momento.cache.createIndex({ "category": 1 })
5.3 CacheEntry 結構
// src/core/cache/mongo_cache.rs
use serde::{Deserialize, Serialize};
use bson::oid::ObjectId;
use chrono::{DateTime, Utc};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CacheEntry {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub id: Option<ObjectId>,
pub key: String,
pub value: serde_json::Value,
pub category: String,
pub created_at: DateTime<Utc>,
pub expires_at: DateTime<Utc>,
#[serde(default)]
pub hit_count: i64,
#[serde(default)]
pub last_access: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CacheConfig {
pub enabled: bool,
pub ttl_videos: u64,
pub ttl_search: u64,
pub ttl_hybrid_search: u64,
pub ttl_video_meta: u64,
}
6. API 緩存策略
6.1 緩存矩陣
| API | Cache Layer | Key Pattern | TTL | 失效時機 |
|---|---|---|---|---|
GET /api/v1/videos |
MongoDB | videos:list:page={p}:limit={l} |
5min | register/delete |
GET /api/v1/lookup |
Redis | momentry:cache:video:{uuid} |
60min | update/delete |
POST /api/v1/search |
MongoDB | search:{hash} |
5min | vectorize |
POST /api/v1/search/hybrid |
MongoDB | search:hybrid:{hash} |
10min | vectorize |
POST /api/v1/n8n/search |
MongoDB | search:n8n:{hash} |
5min | vectorize |
GET /health |
Redis | momentry:cache:health |
30s | - |
6.2 Cache Key 命名規範
// src/core/cache/keys.rs
pub mod keys {
pub const CATEGORY_VIDEOS: &str = "videos";
pub const CATEGORY_SEARCH: &str = "search";
pub const CATEGORY_HYBRID_SEARCH: &str = "hybrid_search";
pub const CATEGORY_N8N_SEARCH: &str = "n8n_search";
pub const CATEGORY_VIDEO_META: &str = "video_meta";
pub const CATEGORY_HEALTH: &str = "health";
pub fn videos_list(page: usize, limit: usize) -> String {
format!("videos:list:page={}:limit={}", page, limit)
}
pub fn video_meta(uuid: &str) -> String {
format!("video:{}", uuid)
}
pub fn search(query_hash: &str) -> String {
format!("search:{}", query_hash)
}
pub fn hybrid_search(query_hash: &str) -> String {
format!("search:hybrid:{}", query_hash)
}
pub fn n8n_search(query_hash: &str) -> String {
format!("search:n8n:{}", query_hash)
}
pub fn health() -> String {
"health:basic".to_string()
}
}
7. 實現細節
7.1 MongoCache 實現
// src/core/cache/mongo_cache.rs
use anyhow::Result;
use bson::{doc, oid::ObjectId};
use chrono::{Duration, Utc};
use mongodb::{Client, Collection, Database};
use serde::{de::DeserializeOwned, Serialize};
use std::sync::Arc;
use super::keys;
use super::config::CacheConfig;
use crate::core::config::cache as cache_config;
#[derive(Clone)]
pub struct MongoCache {
client: Client,
db: Database,
collection: Collection<CacheEntry>,
config: CacheConfig,
}
impl MongoCache {
pub async fn init() -> Result<Self> {
let uri = cache_config::MONGODB_URL.as_str();
let client = Client::uri(uri).await?;
let db = client.database("momento");
let collection = db.collection::<CacheEntry>("cache");
let config = CacheConfig {
enabled: *cache_config::MONGODB_CACHE_ENABLED,
ttl_videos: *cache_config::MONGODB_CACHE_TTL_VIDEOS,
ttl_search: *cache_config::MONGODB_CACHE_TTL_SEARCH,
ttl_hybrid_search: *cache_config::MONGODB_CACHE_TTL_HYBRID_SEARCH,
ttl_video_meta: *cache_config::MONGODB_CACHE_TTL_VIDEO_META,
};
// Ensure indexes exist
Self::ensure_indexes(&collection).await?;
Ok(Self {
client,
db,
collection,
config,
})
}
async fn ensure_indexes(collection: &Collection<CacheEntry>) -> Result<()> {
use mongodb::IndexModel;
// TTL Index
let ttl_index = IndexModel::builder()
.keys(doc! { "expires_at": 1 })
.options(
mongodb::options::IndexOptions::builder()
.expire_after(std::time::Duration::from_secs(0))
.build()
)
.build();
// Unique key index
let key_index = IndexModel::builder()
.keys(doc! { "key": 1 })
.options(
mongodb::options::IndexOptions::builder()
.unique(true)
.build()
)
.build();
collection.create_indexes([ttl_index, key_index]).await?;
Ok(())
}
pub async fn get<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>> {
if !self.config.enabled {
return Ok(None);
}
let filter = doc! { "key": key };
let result = self.collection.find_one(filter).await?;
if let Some(entry) = result {
// Update hit count and last_access
let update = doc! {
"$inc": { "hit_count": 1 },
"$set": { "last_access": Utc::now() }
};
self.collection.update_one(doc! { "_id": entry.id }, update).await?;
// Deserialize value
let value = serde_json::from_value(entry.value)?;
Ok(Some(value))
} else {
Ok(None)
}
}
pub async fn set<T: Serialize>(&self, key: &str, value: &T, ttl_secs: u64, category: &str) -> Result<()> {
if !self.config.enabled {
return Ok(());
}
let now = Utc::now();
let expires_at = now + Duration::seconds(ttl_secs as i64);
let json_value = serde_json::to_value(value)?;
let entry = CacheEntry {
id: None,
key: key.to_string(),
value: json_value,
category: category.to_string(),
created_at: now,
expires_at,
hit_count: 0,
last_access: now,
};
let filter = doc! { "key": key };
let update = doc! {
"$set": {
"value": &entry.value,
"category": &entry.category,
"expires_at": entry.expires_at,
"last_access": entry.last_access,
},
"$setOnInsert": {
"key": &entry.key,
"created_at": entry.created_at,
"hit_count": 0i64,
}
};
self.collection.update_one(filter, update).await?;
Ok(())
}
pub async fn invalidate_category(&self, category: &str) -> Result<u64> {
if !self.config.enabled {
return Ok(0);
}
let result = self.collection.delete_many(doc! { "category": category }).await?;
Ok(result.deleted_count)
}
pub async fn invalidate_prefix(&self, prefix: &str) -> Result<u64> {
if !self.config.enabled {
return Ok(0);
}
let filter = doc! { "key": { "$regex": &format!("^{}", prefix) } };
let result = self.collection.delete_many(filter).await?;
Ok(result.deleted_count)
}
pub async fn get_or_fetch<F, Fut, T>(&self, key: &str, ttl_secs: u64, category: &str, fetcher: F) -> Result<T>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T>>,
T: DeserializeOwned + Serialize,
{
// Try cache first
if let Some(cached) = self.get::<T>(key).await? {
tracing::debug!("Cache hit for key: {}", key);
return Ok(cached);
}
// Cache miss - fetch from source
tracing::debug!("Cache miss for key: {}", key);
let value = fetcher().await?;
// Store in cache
self.set(key, &value, ttl_secs, category).await?;
Ok(value)
}
}
7.2 RedisCache 實現
// src/core/cache/redis_cache.rs
use anyhow::Result;
use redis::AsyncCommands;
use serde::{de::DeserializeOwned, Serialize};
use std::time::Duration;
use crate::core::config::cache as cache_config;
#[derive(Clone)]
pub struct RedisCache {
client: crate::core::db::RedisClient,
}
impl RedisCache {
pub fn new() -> Result<Self> {
let client = crate::core::db::RedisClient::new()?;
Ok(Self { client })
}
pub async fn get<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>> {
let mut conn = self.client.get_conn_internal().await?;
let value: Option<String> = conn.get(key).await?;
match value {
Some(json) => {
let result = serde_json::from_str(&json)?;
Ok(Some(result))
}
None => Ok(None),
}
}
pub async fn set<T: Serialize>(&self, key: &str, value: &T, ttl_secs: u64) -> Result<()> {
let mut conn = self.client.get_conn_internal().await?;
let json = serde_json::to_string(value)?;
let _: String = conn.set_ex(key, json, ttl_secs).await?;
Ok(())
}
pub async fn delete(&self, key: &str) -> Result<()> {
let mut conn = self.client.get_conn_internal().await?;
let _: () = conn.del(key).await?;
Ok(())
}
pub async fn invalidate_pattern(&self, pattern: &str) -> Result<u64> {
let mut conn = self.client.get_conn_internal().await?;
let keys: Vec<String> = conn.keys(pattern).await?;
let count = keys.len() as u64;
if !keys.is_empty() {
let _: () = conn.del(keys).await?;
}
Ok(count)
}
pub async fn get_or_fetch<F, Fut, T>(&self, key: &str, ttl_secs: u64, fetcher: F) -> Result<T>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T>>,
T: DeserializeOwned + Serialize,
{
// Try cache first
if let Some(cached) = self.get::<T>(key).await? {
return Ok(cached);
}
// Cache miss
let value = fetcher().await?;
self.set(key, &value, ttl_secs).await?;
Ok(value)
}
pub async fn get_health(&self) -> Result<Option<String>> {
let mut conn = self.client.get_conn_internal().await?;
let key = "momentry:cache:health";
let value: Option<String> = conn.get(key).await?;
Ok(value)
}
pub async fn set_health(&self, status: &str) -> Result<()> {
let ttl = *cache_config::REDIS_CACHE_TTL_HEALTH;
let mut conn = self.client.get_conn_internal().await?;
let key = "momentry:cache:health";
let _: String = conn.set_ex(key, status, ttl).await?;
Ok(())
}
}
8. API Handler 整合
8.1 AppState 擴展
// src/api/server.rs
#[derive(Clone)]
struct AppState {
embedder: Arc<Embedder>,
embedder_model: String,
mongo_cache: Arc<MongoCache>, // 新增
redis_cache: Arc<RedisCache>, // 新增
}
8.2 Videos List Handler
// src/api/server.rs
use crate::core::cache::{MongoCache, RedisCache, keys};
async fn list_videos(
State(state): State<AppState>,
Query(params): Query<VideosQuery>,
) -> Result<Json<VideosResponse>, StatusCode> {
let page = params.page.unwrap_or(1);
let limit = params.limit.unwrap_or(20);
let cache_key = keys::videos_list(page, limit);
// Try cache first
let video_infos = state.mongo_cache
.get_or_fetch::<_, _, VideosResponse>(
&cache_key,
300, // 5 min TTL
keys::CATEGORY_VIDEOS,
|| async {
let db = PostgresDb::init().await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let videos = db.list_videos().await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let video_infos: Vec<VideoInfoResponse> = videos
.into_iter()
.map(|v| VideoInfoResponse {
uuid: v.uuid,
file_path: v.file_path,
file_name: v.file_name,
duration: v.duration,
width: v.width,
height: v.height,
})
.collect();
Ok(VideosResponse { videos: video_infos })
},
)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(video_infos))
}
8.3 Lookup Handler
// src/api/server.rs
async fn lookup(
State(state): State<AppState>,
Query(query): Query<LookupQuery>,
) -> Result<Json<LookupResponse>, StatusCode> {
if let Some(path) = query.path {
let uuid = crate::uuid::compute_uuid_from_path(&path);
return Ok(Json(LookupResponse {
uuid,
file_path: None,
file_name: None,
duration: None,
}));
}
if let Some(uuid) = query.uuid {
let cache_key = keys::video_meta(&uuid);
// Try Redis cache first, fallback to DB
let video = state.redis_cache
.get_or_fetch::<_, _, Option<VideoRecord>>(
&cache_key,
3600, // 60 min TTL
|| async {
let db = PostgresDb::init().await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
db.get_video_by_uuid(&uuid).await
.map_err(|e| anyhow::anyhow!(e))
},
)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
if let Some(v) = video {
return Ok(Json(LookupResponse {
uuid: v.uuid,
file_path: Some(v.file_path),
file_name: Some(v.file_name),
duration: Some(v.duration),
}));
}
}
Err(StatusCode::NOT_FOUND)
}
8.4 Search Handler
// src/api/server.rs
use sha2::{Sha256, Digest};
async fn search(
State(state): State<AppState>,
Json(req): Json<SearchRequest>,
) -> Result<Json<SearchResponse>, StatusCode> {
let limit = req.limit.unwrap_or(10);
// Generate cache key from query hash
let query_for_hash = serde_json::json!({
"query": req.query,
"limit": limit,
"uuid": req.uuid,
});
let query_hash = format!("{:x}", Sha256::digest(&serde_json::to_string(&query_for_hash).unwrap()));
let cache_key = keys::search(&query_hash);
let response = state.mongo_cache
.get_or_fetch::<_, _, SearchResponse>(
&cache_key,
300, // 5 min TTL
keys::CATEGORY_SEARCH,
|| async {
// Original search logic here
let query_vector = state.embedder.embed_query(&req.query).await
.map_err(|e| anyhow::anyhow!("Embedding failed: {}", e))?;
let qdrant = QdrantDb::init().await
.map_err(|e| anyhow::anyhow!("Qdrant init failed: {}", e))?;
let pg = PostgresDb::init().await
.map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?;
let search_results = if let Some(ref uuid) = req.uuid {
let query_f64: Vec<f64> = query_vector.iter().map(|&x| x as f64).collect();
qdrant.search_in_uuid(&query_f64, uuid, limit).await?
} else {
let query_f64: Vec<f64> = query_vector.iter().map(|&x| x as f64).collect();
qdrant.search(&query_f64, limit).await?
};
let mut results = Vec::new();
for r in search_results {
if let Some(chunk) = pg.get_chunk_by_chunk_id(&r.chunk_id).await.ok().flatten() {
let text = chunk.content.get("text")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
results.push(SearchResult {
uuid: chunk.uuid,
chunk_id: chunk.chunk_id,
chunk_type: chunk.chunk_type.as_str().to_string(),
start_time: chunk.start_time,
end_time: chunk.end_time,
text,
score: r.score,
});
}
}
Ok(SearchResponse { results, query: req.query })
},
)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(response))
}
8.5 Health Handler
// src/api/server.rs
async fn health(State(state): State<AppState>) -> Json<HealthResponse> {
// Try Redis cache first
if let Some(status) = state.redis_cache.get_health().await.ok().flatten() {
return Json(HealthResponse {
status,
version: env!("CARGO_PKG_VERSION").to_string(),
uptime_ms: get_uptime_ms(),
});
}
// Cache miss - compute and cache
let status = "ok".to_string();
state.redis_cache.set_health(&status).await.ok();
Json(HealthResponse {
status,
version: env!("CARGO_PKG_VERSION").to_string(),
uptime_ms: get_uptime_ms(),
})
}
8.6 Register Handler (緩存失效)
// src/api/server.rs
async fn register(
State(state): State<AppState>,
Json(req): Json<RegisterRequest>,
) -> Result<Json<RegisterResponse>, StatusCode> {
// ... existing registration logic ...
let video_id = db.register_video(&record).await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
// Invalidate videos list cache
state.mongo_cache.invalidate_prefix("videos:list:").await.ok();
Ok(Json(RegisterResponse {
uuid,
video_id,
file_name,
duration,
width,
height,
}))
}
9. 失效策略
9.1 寫操作觸發失效
| 操作 | 失效範圍 |
|---|---|
POST /api/v1/register |
videos:* |
| 刪除視頻 | video:{uuid}, videos:* |
| 更新視頻 | video:{uuid} |
| 向量更新 | search:*, search:hybrid:*, search:n8n:* |
9.2 失效實現
// Invalidation helper methods
impl MongoCache {
pub async fn invalidate_videos_list(&self) -> Result<u64> {
self.invalidate_category(keys::CATEGORY_VIDEOS).await
}
pub async fn invalidate_video(&self, uuid: &str) -> Result<u64> {
let key = keys::video_meta(uuid);
let count = self.invalidate_prefix(&key).await?;
Ok(count + self.invalidate_videos_list().await?)
}
pub async fn invalidate_all_search(&self) -> Result<u64> {
let count = self.invalidate_category(keys::CATEGORY_SEARCH).await?;
let count2 = self.invalidate_category(keys::CATEGORY_HYBRID_SEARCH).await?;
let count3 = self.invalidate_category(keys::CATEGORY_N8N_SEARCH).await?;
Ok(count + count2 + count3)
}
}
10. 實現步驟
Phase 1: 基礎設施
| 步驟 | 任務 | 檔案 |
|---|---|---|
| 1.1 | 更新 Cargo.toml mongodb feature | Cargo.toml |
| 1.2 | 添加 MongoDB 配置到 config.rs | src/core/config.rs |
| 1.3 | 創建 cache 模組目錄 | src/core/cache/ |
| 1.4 | 實現 CacheEntry 和 keys 工具 | src/core/cache/keys.rs |
| 1.5 | 實現 CacheConfig | src/core/cache/config.rs |
| 1.6 | 重構 MongoDb 使用原生驅動 | src/core/db/mongodb_db.rs |
| 1.7 | 實現 MongoCache | src/core/cache/mongo_cache.rs |
| 1.8 | 實現 RedisCache | src/core/cache/redis_cache.rs |
| 1.9 | 更新 db/mod.rs 導出 | src/core/db/mod.rs |
Phase 2: API 整合
| 步驟 | 任務 | 檔案 |
|---|---|---|
| 2.1 | 擴展 AppState | src/api/server.rs |
| 2.2 | 整合 list_videos 緩存 | src/api/server.rs |
| 2.3 | 整合 lookup 緩存 | src/api/server.rs |
| 2.4 | 整合 search 緩存 | src/api/server.rs |
| 2.5 | 整合 hybrid_search 緩存 | src/api/server.rs |
| 2.6 | 整合 n8n_search 緩存 | src/api/server.rs |
| 2.7 | 整合 health 緩存 | src/api/server.rs |
| 2.8 | 添加 register 緩存失效 | src/api/server.rs |
Phase 3: 測試驗證
| 步驟 | 任務 |
|---|---|
| 3.1 | cargo check |
| 3.2 | cargo build |
| 3.3 | cargo clippy |
| 3.4 | cargo fmt |
| 3.5 | cargo test |
| 3.6 | 手動 API 測試 |
11. 測試策略
11.1 單元測試
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_cache_key_generation() {
assert_eq!(
keys::videos_list(1, 20),
"videos:list:page=1:limit=20"
);
assert_eq!(
keys::video_meta("abc123"),
"video:abc123"
);
}
#[tokio::test]
async fn test_cache_hit_miss() {
let cache = MongoCache::init().await.unwrap();
// Set value
cache.set("test_key", &"test_value".to_string(), 60, "test").await.unwrap();
// Get value
let value: Option<String> = cache.get("test_key").await.unwrap();
assert_eq!(value, Some("test_value".to_string()));
// Invalidate
cache.invalidate_category("test").await.unwrap();
// Get again
let value: Option<String> = cache.get("test_key").await.unwrap();
assert_eq!(value, None);
}
}
11.2 API 測試腳本
# Test cache hit
curl -s http://localhost:8080/api/v1/videos | jq .videos | wc -l
# Should return cached count
# Force cache miss (wait for TTL or invalidate)
curl -s -X POST http://localhost:8080/api/v1/register \
-H "Content-Type: application/json" \
-d '{"path": "/path/to/new/video.mp4"}'
# Verify cache was invalidated
curl -s http://localhost:8080/api/v1/videos | jq .videos | wc -l
# Should trigger fresh query
12. 監控指標
12.1 日誌
// 在 cache 命中/未命中時記錄
tracing::debug!("Cache hit for key: {}", key);
tracing::debug!("Cache miss for key: {}", key);
// 在失效時記錄
tracing::info!("Invalidated {} entries in category: {}", count, category);
12.2 可選指標
| 指標 | 描述 |
|---|---|
cache_hit_total |
Cache 命中總數 |
cache_miss_total |
Cache 未命中總數 |
cache_invalidations_total |
緩存失效總數 |
cache_operation_duration_seconds |
緩存操作延遲 |
13. 風險與緩解
| 風險 | 影響 | 緩解措施 |
|---|---|---|
| MongoDB 連接失敗 | 降級到無緩存 | 緩存操作添加 .ok() 錯誤處理 |
| 緩存數據過期不一致 | 用戶看到舊數據 | 合理的 TTL 值 + 寫時失效 |
| 緩存 key 衝突 | 返回錯誤數據 | 使用 SHA256 hash 確保唯一性 |
| 緩存空間膨脹 | 記憶體/磁碟佔用過大 | TTL 自動過期 + 最大條目限制 |
14. 預期效益
| 指標 | 改善前 | 預期改善後 |
|---|---|---|
GET /api/v1/videos 延遲 |
~200ms | ~20ms (Cache Hit) |
GET /api/v1/lookup 延遲 |
~50ms | ~5ms (Cache Hit) |
POST /api/v1/search 延遲 |
~500ms | ~50ms (Cache Hit) |
| 資料庫負載 | 100% | ~30% |
| API 吞吐量 | 100 RPS | ~300 RPS |
15. 附錄
A. MongoDB 初始化腳本
// 初始化 momento.cache collection 和索引
use momento;
db.cache.drop();
db.cache.insertOne({
key: "init",
value: { initialized: true },
category: "system",
created_at: new Date(),
expires_at: new Date(Date.now() + 86400000),
hit_count: 0,
last_access: new Date()
});
db.cache.createIndex(
{ "expires_at": 1 },
{ expireAfterSeconds: 0 }
);
db.cache.createIndex(
{ "key": 1 },
{ unique: true }
);
db.cache.createIndex({ "category": 1 });
db.cache.deleteOne({ key: "init" });
print("Cache collection initialized successfully");
B. 環境變數參考
# .env 或 shell 環境
MONGODB_URL=mongodb://localhost:27017
MONGODB_CACHE_ENABLED=true
MONGODB_CACHE_TTL_VIDEOS=300
MONGODB_CACHE_TTL_SEARCH=300
MONGODB_CACHE_TTL_HYBRID_SEARCH=600
MONGODB_CACHE_TTL_VIDEO_META=3600
REDIS_CACHE_TTL_HEALTH=30
REDIS_CACHE_TTL_VIDEO_META=3600