feat: add momentry_playground binary for development
- Add separate momentry_playground binary with distinct configuration - Production (momentry): Port 3002, Redis prefix 'momentry:' - Development (momentry_playground): Port 3003, Redis prefix 'momentry_dev:' - Add SERVER_PORT and REDIS_KEY_PREFIX config via environment variables - Replace all hardcoded Redis key prefixes with configurable values - Create .env.development for playground environment settings - Update .env with production defaults - Add dotenv dependency for environment file loading Configuration isolation allows running both binaries simultaneously without port conflicts or Redis key collisions.
This commit is contained in:
162
src/core/cache/redis_cache.rs
vendored
Normal file
162
src/core/cache/redis_cache.rs
vendored
Normal file
@@ -0,0 +1,162 @@
|
||||
use anyhow::Result;
|
||||
use redis::AsyncCommands;
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::core::config::{cache as cache_config, REDIS_KEY_PREFIX};
|
||||
use crate::core::db::RedisClient;
|
||||
|
||||
pub struct RedisCache {
|
||||
client: Arc<RwLock<RedisClient>>,
|
||||
}
|
||||
|
||||
impl RedisCache {
|
||||
pub fn new() -> Result<Self> {
|
||||
let client = RedisClient::new()?;
|
||||
Ok(Self {
|
||||
client: Arc::new(RwLock::new(client)),
|
||||
})
|
||||
}
|
||||
|
||||
fn prefixed_key(&self, key: &str) -> String {
|
||||
format!("{}cache:{}", REDIS_KEY_PREFIX.as_str(), key)
|
||||
}
|
||||
|
||||
pub async fn get<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>> {
|
||||
let client = self.client.read().await;
|
||||
let mut conn = client.get_conn_internal().await?;
|
||||
let prefixed = self.prefixed_key(key);
|
||||
let value: Option<String> = conn.get(&prefixed).await?;
|
||||
|
||||
match value {
|
||||
Some(json) => {
|
||||
let result = serde_json::from_str(&json)?;
|
||||
Ok(Some(result))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn set<T: Serialize>(&self, key: &str, value: &T, ttl_secs: u64) -> Result<()> {
|
||||
let client = self.client.read().await;
|
||||
let mut conn = client.get_conn_internal().await?;
|
||||
let prefixed = self.prefixed_key(key);
|
||||
let json = serde_json::to_string(value)?;
|
||||
let _: String = conn.set_ex(&prefixed, json, ttl_secs).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn delete(&self, key: &str) -> Result<bool> {
|
||||
let client = self.client.read().await;
|
||||
let mut conn = client.get_conn_internal().await?;
|
||||
let prefixed = self.prefixed_key(key);
|
||||
let _: () = conn.del(&prefixed).await?;
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
pub async fn exists(&self, key: &str) -> Result<bool> {
|
||||
let client = self.client.read().await;
|
||||
let mut conn = client.get_conn_internal().await?;
|
||||
let prefixed = self.prefixed_key(key);
|
||||
let exists: bool = conn.exists(&prefixed).await?;
|
||||
Ok(exists)
|
||||
}
|
||||
|
||||
pub async fn invalidate_pattern(&self, pattern: &str) -> Result<u64> {
|
||||
let client = self.client.read().await;
|
||||
let mut conn = client.get_conn_internal().await?;
|
||||
let prefixed_pattern = self.prefixed_key(pattern);
|
||||
let keys: Vec<String> = redis::cmd("KEYS")
|
||||
.arg(&prefixed_pattern)
|
||||
.query_async(&mut conn)
|
||||
.await?;
|
||||
let count = keys.len() as u64;
|
||||
|
||||
if !keys.is_empty() {
|
||||
let _: () = conn.del(&keys).await?;
|
||||
}
|
||||
|
||||
tracing::debug!("Invalidated {} keys matching pattern: {}", count, pattern);
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
pub async fn get_or_fetch<F, Fut, T>(&self, key: &str, ttl_secs: u64, fetcher: F) -> Result<T>
|
||||
where
|
||||
F: FnOnce() -> Fut,
|
||||
Fut: std::future::Future<Output = Result<T>>,
|
||||
T: DeserializeOwned + Serialize,
|
||||
{
|
||||
if let Some(cached) = self.get::<T>(key).await? {
|
||||
tracing::debug!("Redis cache hit for key: {}", key);
|
||||
return Ok(cached);
|
||||
}
|
||||
|
||||
tracing::debug!("Redis cache miss for key: {}", key);
|
||||
let value = fetcher().await?;
|
||||
if let Err(e) = self.set(key, &value, ttl_secs).await {
|
||||
tracing::warn!("Failed to cache value in Redis: {}", e);
|
||||
}
|
||||
Ok(value)
|
||||
}
|
||||
|
||||
pub async fn get_health(&self) -> Result<Option<String>> {
|
||||
let client = self.client.read().await;
|
||||
let mut conn = client.get_conn_internal().await?;
|
||||
let key = self.prefixed_key("health");
|
||||
let value: Option<String> = conn.get(&key).await?;
|
||||
Ok(value)
|
||||
}
|
||||
|
||||
pub async fn set_health(&self, status: &str) -> Result<()> {
|
||||
let ttl = *cache_config::REDIS_CACHE_TTL_HEALTH;
|
||||
let client = self.client.read().await;
|
||||
let mut conn = client.get_conn_internal().await?;
|
||||
let key = self.prefixed_key("health");
|
||||
let _: String = conn.set_ex(&key, status, ttl).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_video_meta(&self, uuid: &str) -> Result<Option<serde_json::Value>> {
|
||||
self.get(uuid).await
|
||||
}
|
||||
|
||||
pub async fn set_video_meta(&self, uuid: &str, value: &serde_json::Value) -> Result<()> {
|
||||
let ttl = *cache_config::REDIS_CACHE_TTL_VIDEO_META;
|
||||
self.set(uuid, value, ttl).await
|
||||
}
|
||||
|
||||
pub async fn invalidate_video_meta(&self, uuid: &str) -> Result<bool> {
|
||||
self.delete(uuid).await
|
||||
}
|
||||
|
||||
pub async fn invalidate_videos_list(&self) -> Result<u64> {
|
||||
self.invalidate_pattern("videos:*").await
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for RedisCache {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
client: Arc::clone(&self.client),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for RedisCache {
|
||||
fn default() -> Self {
|
||||
Self::new().expect("Failed to create Redis cache")
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_prefixed_key() {
|
||||
let cache = RedisCache::new().unwrap();
|
||||
assert_eq!(cache.prefixed_key("test"), "momentry:cache:test");
|
||||
assert_eq!(cache.prefixed_key("video:abc"), "momentry:cache:video:abc");
|
||||
}
|
||||
}
|
||||
139
src/core/config.rs
Normal file
139
src/core/config.rs
Normal file
@@ -0,0 +1,139 @@
|
||||
use once_cell::sync::Lazy;
|
||||
use std::env;
|
||||
|
||||
pub static DATABASE_URL: Lazy<String> = Lazy::new(|| {
|
||||
env::var("DATABASE_URL")
|
||||
.unwrap_or_else(|_| "postgres://accusys@localhost:5432/momentry".to_string())
|
||||
});
|
||||
|
||||
pub static MONGODB_URL: Lazy<String> = Lazy::new(|| {
|
||||
env::var("MONGODB_URL").unwrap_or_else(|_| "mongodb://localhost:27017".to_string())
|
||||
});
|
||||
|
||||
pub static REDIS_URL: Lazy<String> = Lazy::new(|| {
|
||||
env::var("REDIS_URL").unwrap_or_else(|_| {
|
||||
let password = env::var("REDIS_PASSWORD").unwrap_or_else(|_| "accusys".to_string());
|
||||
// Format: redis://[:password]@host:port (use default user)
|
||||
format!("redis://:{}@localhost:6379", password)
|
||||
})
|
||||
});
|
||||
|
||||
pub static REDIS_PASSWORD: Lazy<String> =
|
||||
Lazy::new(|| env::var("REDIS_PASSWORD").unwrap_or_else(|_| "accusys".to_string()));
|
||||
|
||||
pub static OUTPUT_DIR: Lazy<String> = Lazy::new(|| {
|
||||
env::var("MOMENTRY_OUTPUT_DIR").unwrap_or_else(|_| "/Users/accusys/momentry/output".to_string())
|
||||
});
|
||||
|
||||
pub static BACKUP_DIR: Lazy<String> = Lazy::new(|| {
|
||||
env::var("MOMENTRY_BACKUP_DIR")
|
||||
.unwrap_or_else(|_| "/Users/accusys/momentry/backup/momentry".to_string())
|
||||
});
|
||||
|
||||
pub static PYTHON_PATH: Lazy<String> = Lazy::new(|| {
|
||||
env::var("MOMENTRY_PYTHON_PATH").unwrap_or_else(|_| "/opt/homebrew/bin/python3.11".to_string())
|
||||
});
|
||||
|
||||
pub static SCRIPTS_DIR: Lazy<String> = Lazy::new(|| {
|
||||
env::var("MOMENTRY_SCRIPTS_DIR")
|
||||
.unwrap_or_else(|_| "/Users/accusys/momentry_core_0.1/scripts".to_string())
|
||||
});
|
||||
|
||||
pub static LOG_LEVEL: Lazy<String> = Lazy::new(|| {
|
||||
env::var("RUST_LOG")
|
||||
.or_else(|_| env::var("MOMENTRY_LOG_LEVEL"))
|
||||
.unwrap_or_else(|_| "info".to_string())
|
||||
});
|
||||
|
||||
pub static MEDIA_BASE_URL: Lazy<String> = Lazy::new(|| {
|
||||
env::var("MOMENTRY_MEDIA_BASE_URL")
|
||||
.unwrap_or_else(|_| "https://wp.momentry.ddns.net".to_string())
|
||||
});
|
||||
|
||||
pub static SERVER_PORT: Lazy<u16> = Lazy::new(|| {
|
||||
env::var("MOMENTRY_SERVER_PORT")
|
||||
.unwrap_or_else(|_| "3002".to_string())
|
||||
.parse()
|
||||
.unwrap_or(3002)
|
||||
});
|
||||
|
||||
pub static REDIS_KEY_PREFIX: Lazy<String> =
|
||||
Lazy::new(|| env::var("MOMENTRY_REDIS_PREFIX").unwrap_or_else(|_| "momentry:".to_string()));
|
||||
|
||||
pub mod processor {
|
||||
use super::*;
|
||||
|
||||
pub static ASR_TIMEOUT_SECS: Lazy<u64> = Lazy::new(|| {
|
||||
env::var("MOMENTRY_ASR_TIMEOUT")
|
||||
.unwrap_or_else(|_| "3600".to_string())
|
||||
.parse()
|
||||
.unwrap_or(3600)
|
||||
});
|
||||
|
||||
pub static CUT_TIMEOUT_SECS: Lazy<u64> = Lazy::new(|| {
|
||||
env::var("MOMENTRY_CUT_TIMEOUT")
|
||||
.unwrap_or_else(|_| "3600".to_string())
|
||||
.parse()
|
||||
.unwrap_or(3600)
|
||||
});
|
||||
|
||||
pub static DEFAULT_TIMEOUT_SECS: Lazy<u64> = Lazy::new(|| {
|
||||
env::var("MOMENTRY_DEFAULT_TIMEOUT")
|
||||
.unwrap_or_else(|_| "7200".to_string())
|
||||
.parse()
|
||||
.unwrap_or(7200)
|
||||
});
|
||||
}
|
||||
|
||||
pub mod cache {
|
||||
use super::*;
|
||||
|
||||
pub static MONGODB_CACHE_ENABLED: Lazy<bool> = Lazy::new(|| {
|
||||
env::var("MONGODB_CACHE_ENABLED")
|
||||
.unwrap_or_else(|_| "true".to_string())
|
||||
.parse()
|
||||
.unwrap_or(true)
|
||||
});
|
||||
|
||||
pub static MONGODB_CACHE_TTL_VIDEOS: Lazy<u64> = Lazy::new(|| {
|
||||
env::var("MONGODB_CACHE_TTL_VIDEOS")
|
||||
.unwrap_or_else(|_| "300".to_string())
|
||||
.parse()
|
||||
.unwrap_or(300)
|
||||
});
|
||||
|
||||
pub static MONGODB_CACHE_TTL_SEARCH: Lazy<u64> = Lazy::new(|| {
|
||||
env::var("MONGODB_CACHE_TTL_SEARCH")
|
||||
.unwrap_or_else(|_| "300".to_string())
|
||||
.parse()
|
||||
.unwrap_or(300)
|
||||
});
|
||||
|
||||
pub static MONGODB_CACHE_TTL_HYBRID_SEARCH: Lazy<u64> = Lazy::new(|| {
|
||||
env::var("MONGODB_CACHE_TTL_HYBRID_SEARCH")
|
||||
.unwrap_or_else(|_| "600".to_string())
|
||||
.parse()
|
||||
.unwrap_or(600)
|
||||
});
|
||||
|
||||
pub static MONGODB_CACHE_TTL_VIDEO_META: Lazy<u64> = Lazy::new(|| {
|
||||
env::var("MONGODB_CACHE_TTL_VIDEO_META")
|
||||
.unwrap_or_else(|_| "3600".to_string())
|
||||
.parse()
|
||||
.unwrap_or(3600)
|
||||
});
|
||||
|
||||
pub static REDIS_CACHE_TTL_HEALTH: Lazy<u64> = Lazy::new(|| {
|
||||
env::var("REDIS_CACHE_TTL_HEALTH")
|
||||
.unwrap_or_else(|_| "30".to_string())
|
||||
.parse()
|
||||
.unwrap_or(30)
|
||||
});
|
||||
|
||||
pub static REDIS_CACHE_TTL_VIDEO_META: Lazy<u64> = Lazy::new(|| {
|
||||
env::var("REDIS_CACHE_TTL_VIDEO_META")
|
||||
.unwrap_or_else(|_| "3600".to_string())
|
||||
.parse()
|
||||
.unwrap_or(3600)
|
||||
});
|
||||
}
|
||||
@@ -6,6 +6,8 @@ use serde::{Deserialize, Serialize};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::core::config::REDIS_KEY_PREFIX;
|
||||
|
||||
pub struct RedisClient {
|
||||
client: Client,
|
||||
state: Arc<RwLock<RedisState>>,
|
||||
@@ -18,13 +20,8 @@ pub struct RedisState {
|
||||
|
||||
impl RedisClient {
|
||||
pub fn new() -> Result<Self> {
|
||||
let redis_url = std::env::var("REDIS_URL").unwrap_or_else(|_| {
|
||||
let password =
|
||||
std::env::var("REDIS_PASSWORD").unwrap_or_else(|_| "accusys".to_string());
|
||||
format!("redis://:{}@localhost:6379", password)
|
||||
});
|
||||
|
||||
let client = Client::open(redis_url.as_str()).context("Failed to connect to Redis")?;
|
||||
let client = Client::open(crate::core::config::REDIS_URL.as_str())
|
||||
.context("Failed to connect to Redis")?;
|
||||
|
||||
Ok(Self {
|
||||
client,
|
||||
@@ -49,7 +46,8 @@ impl RedisClient {
|
||||
|
||||
pub async fn get_job_status(&self, uuid: &str) -> Result<Option<JobStatus>> {
|
||||
let mut conn = self.get_conn_internal().await?;
|
||||
let key = format!("momentry:job:{}", uuid);
|
||||
let prefix = REDIS_KEY_PREFIX.as_str();
|
||||
let key = format!("{}job:{}", prefix, uuid);
|
||||
|
||||
let status: Option<String> = conn.hget(&key, "status").await?;
|
||||
if status.is_none() {
|
||||
@@ -83,7 +81,8 @@ impl RedisClient {
|
||||
status: &ProcessorStatus,
|
||||
) -> Result<()> {
|
||||
let mut conn = self.get_conn_internal().await?;
|
||||
let key = format!("momentry:job:{}:processor:{}", uuid, processor);
|
||||
let prefix = REDIS_KEY_PREFIX.as_str();
|
||||
let key = format!("{}job:{}:processor:{}", prefix, uuid, processor);
|
||||
|
||||
let _: Option<String> = conn
|
||||
.hset_multiple(
|
||||
@@ -111,7 +110,8 @@ impl RedisClient {
|
||||
processor: &str,
|
||||
) -> Result<Option<ProcessorStatus>> {
|
||||
let mut conn = self.get_conn_internal().await?;
|
||||
let key = format!("momentry:job:{}:processor:{}", uuid, processor);
|
||||
let prefix = REDIS_KEY_PREFIX.as_str();
|
||||
let key = format!("{}job:{}:processor:{}", prefix, uuid, processor);
|
||||
|
||||
let status: Option<String> = conn.hget(&key, "status").await?;
|
||||
if status.is_none() {
|
||||
@@ -138,7 +138,8 @@ impl RedisClient {
|
||||
|
||||
pub async fn publish_progress(&self, uuid: &str, message: &ProgressMessage) -> Result<()> {
|
||||
let mut conn = self.get_conn_internal().await?;
|
||||
let channel = format!("momentry:progress:{}", uuid);
|
||||
let prefix = REDIS_KEY_PREFIX.as_str();
|
||||
let channel = format!("{}progress:{}", prefix, uuid);
|
||||
|
||||
let json = serde_json::to_string(message)?;
|
||||
let _: usize = conn.publish(&channel, json).await?;
|
||||
@@ -148,7 +149,8 @@ impl RedisClient {
|
||||
|
||||
pub async fn subscribe_progress(&self, uuid: &str) -> Result<redis::aio::PubSub> {
|
||||
let mut pubsub = self.client.get_async_pubsub().await?;
|
||||
let channel = format!("momentry:progress:{}", uuid);
|
||||
let prefix = REDIS_KEY_PREFIX.as_str();
|
||||
let channel = format!("{}progress:{}", prefix, uuid);
|
||||
|
||||
pubsub.subscribe(channel).await?;
|
||||
|
||||
@@ -174,45 +176,285 @@ impl RedisClient {
|
||||
|
||||
pub async fn add_to_active_jobs(&self, uuid: &str) -> Result<()> {
|
||||
let mut conn = self.get_conn_internal().await?;
|
||||
let _: usize = conn.sadd("momentry:jobs:active", uuid).await?;
|
||||
let prefix = REDIS_KEY_PREFIX.as_str();
|
||||
let _: usize = conn.sadd(format!("{}jobs:active", prefix), uuid).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn move_to_completed_jobs(&self, uuid: &str) -> Result<()> {
|
||||
let mut conn = self.get_conn_internal().await?;
|
||||
let prefix = REDIS_KEY_PREFIX.as_str();
|
||||
let _: bool = conn
|
||||
.smove("momentry:jobs:active", "momentry:jobs:completed", uuid)
|
||||
.smove(
|
||||
format!("{}jobs:active", prefix),
|
||||
format!("{}jobs:completed", prefix),
|
||||
uuid,
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn move_to_failed_jobs(&self, uuid: &str) -> Result<()> {
|
||||
let mut conn = self.get_conn_internal().await?;
|
||||
let prefix = REDIS_KEY_PREFIX.as_str();
|
||||
let _: bool = conn
|
||||
.smove("momentry:jobs:active", "momentry:jobs:failed", uuid)
|
||||
.smove(
|
||||
format!("{}jobs:active", prefix),
|
||||
format!("{}jobs:failed", prefix),
|
||||
uuid,
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_active_jobs(&self) -> Result<Vec<String>> {
|
||||
let mut conn = self.get_conn_internal().await?;
|
||||
let jobs: Vec<String> = conn.smembers("momentry:jobs:active").await?;
|
||||
let prefix = REDIS_KEY_PREFIX.as_str();
|
||||
let jobs: Vec<String> = conn.smembers(format!("{}jobs:active", prefix)).await?;
|
||||
Ok(jobs)
|
||||
}
|
||||
|
||||
pub async fn set_health(&self, status: &str) -> Result<()> {
|
||||
let mut conn = self.get_conn_internal().await?;
|
||||
let prefix = REDIS_KEY_PREFIX.as_str();
|
||||
let _: String = conn
|
||||
.set_ex("momentry:health:momentry_core", status, 60)
|
||||
.set_ex(format!("{}health:momentry_core", prefix), status, 60)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_health(&self) -> Result<Option<String>> {
|
||||
let mut conn = self.get_conn_internal().await?;
|
||||
let health: Option<String> = conn.get("momentry:health:momentry_core").await?;
|
||||
let prefix = REDIS_KEY_PREFIX.as_str();
|
||||
let health: Option<String> = conn.get(format!("{}health:momentry_core", prefix)).await?;
|
||||
Ok(health)
|
||||
}
|
||||
|
||||
pub async fn sync_monitor_job(&self, job: &MonitorJobRedis) -> Result<()> {
|
||||
let mut conn = self.get_conn_internal().await?;
|
||||
let prefix = REDIS_KEY_PREFIX.as_str();
|
||||
let key = format!("{}monitor:job:{}", prefix, job.uuid);
|
||||
|
||||
let _: Option<String> = conn
|
||||
.hset_multiple(
|
||||
&key,
|
||||
&[
|
||||
("uuid", job.uuid.as_str()),
|
||||
("status", job.status.as_str()),
|
||||
("current_processor", job.current_processor.as_str()),
|
||||
("progress_total", job.progress_total.to_string().as_str()),
|
||||
(
|
||||
"progress_current",
|
||||
job.progress_current.to_string().as_str(),
|
||||
),
|
||||
("error_count", job.error_count.to_string().as_str()),
|
||||
("started_at", job.started_at.as_str()),
|
||||
("updated_at", job.updated_at.as_str()),
|
||||
],
|
||||
)
|
||||
.await?;
|
||||
|
||||
let _: bool = conn.expire(&key, 86400).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn publish_job_error(&self, uuid: &str, error: &str) -> Result<()> {
|
||||
let mut conn = self.get_conn_internal().await?;
|
||||
let prefix = REDIS_KEY_PREFIX.as_str();
|
||||
let channel = format!("{}errors:{}", prefix, uuid);
|
||||
|
||||
let error_msg = JobErrorMessage {
|
||||
uuid: uuid.to_string(),
|
||||
error: error.to_string(),
|
||||
timestamp: chrono::Utc::now().timestamp(),
|
||||
};
|
||||
|
||||
let json = serde_json::to_string(&error_msg)?;
|
||||
let _: usize = conn.publish(&channel, json).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn publish_anomaly_alert(&self, alert: &AnomalyAlertMessage) -> Result<()> {
|
||||
let mut conn = self.get_conn_internal().await?;
|
||||
let prefix = REDIS_KEY_PREFIX.as_str();
|
||||
let channel = format!("{}anomaly:alerts", prefix);
|
||||
|
||||
let json = serde_json::to_string(alert)?;
|
||||
let _: usize = conn.publish(&channel, json).await?;
|
||||
|
||||
let key = format!("{}anomaly:key:{}", prefix, alert.key_id);
|
||||
let alert_data = serde_json::json!({
|
||||
"key_id": alert.key_id,
|
||||
"anomaly_type": alert.anomaly_type,
|
||||
"severity": alert.severity,
|
||||
"timestamp": alert.timestamp,
|
||||
"message": alert.message,
|
||||
});
|
||||
let _: Option<String> = conn
|
||||
.hset(&key, "latest", &serde_json::to_string(&alert_data)?)
|
||||
.await?;
|
||||
let _: bool = conn.expire(&key, 86400).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn subscribe_anomaly_alerts(&self) -> Result<redis::aio::PubSub> {
|
||||
let mut pubsub = self.client.get_async_pubsub().await?;
|
||||
let prefix = REDIS_KEY_PREFIX.as_str();
|
||||
pubsub
|
||||
.subscribe(format!("{}anomaly:alerts", prefix))
|
||||
.await?;
|
||||
Ok(pubsub)
|
||||
}
|
||||
|
||||
pub async fn get_latest_anomaly(&self, key_id: &str) -> Result<Option<AnomalyAlertMessage>> {
|
||||
let mut conn = self.get_conn_internal().await?;
|
||||
let prefix = REDIS_KEY_PREFIX.as_str();
|
||||
let key = format!("{}anomaly:key:{}", prefix, key_id);
|
||||
let latest: Option<String> = conn.hget(&key, "latest").await?;
|
||||
|
||||
if let Some(json) = latest {
|
||||
let alert: AnomalyAlertMessage = serde_json::from_str(&json)?;
|
||||
Ok(Some(alert))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn subscribe_job_errors(&self, uuid: &str) -> Result<redis::aio::PubSub> {
|
||||
let mut pubsub = self.client.get_async_pubsub().await?;
|
||||
let prefix = REDIS_KEY_PREFIX.as_str();
|
||||
let channel = format!("{}errors:{}", prefix, uuid);
|
||||
|
||||
pubsub.subscribe(channel).await?;
|
||||
|
||||
Ok(pubsub)
|
||||
}
|
||||
|
||||
pub async fn update_worker_job_status(
|
||||
&self,
|
||||
uuid: &str,
|
||||
job_id: i64,
|
||||
status: &str,
|
||||
current_processor: Option<&str>,
|
||||
progress: i32,
|
||||
total: i32,
|
||||
) -> Result<()> {
|
||||
let mut conn = self.get_conn_internal().await?;
|
||||
let prefix = REDIS_KEY_PREFIX.as_str();
|
||||
let key = format!("{}worker:job:{}", prefix, uuid);
|
||||
|
||||
let _: Option<String> = conn
|
||||
.hset_multiple(
|
||||
&key,
|
||||
&[
|
||||
("job_id", job_id.to_string().as_str()),
|
||||
("status", status),
|
||||
("current_processor", current_processor.unwrap_or("")),
|
||||
("progress_current", progress.to_string().as_str()),
|
||||
("progress_total", total.to_string().as_str()),
|
||||
("updated_at", &chrono::Utc::now().to_rfc3339()),
|
||||
],
|
||||
)
|
||||
.await?;
|
||||
|
||||
let _: bool = conn.expire(&key, 86400).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn update_worker_processor_status(
|
||||
&self,
|
||||
uuid: &str,
|
||||
processor: &str,
|
||||
status: &str,
|
||||
error: Option<&str>,
|
||||
) -> Result<()> {
|
||||
let mut conn = self.get_conn_internal().await?;
|
||||
let prefix = REDIS_KEY_PREFIX.as_str();
|
||||
let key = format!("{}worker:job:{}:processor:{}", prefix, uuid, processor);
|
||||
|
||||
let now = chrono::Utc::now().to_rfc3339();
|
||||
|
||||
let mut fields: Vec<(&str, &str)> = vec![("status", status), ("updated_at", &now)];
|
||||
|
||||
if let Some(err) = error {
|
||||
fields.push(("error", err));
|
||||
}
|
||||
|
||||
let _: Option<String> = conn.hset_multiple(&key, &fields).await?;
|
||||
let _: bool = conn.expire(&key, 86400).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_worker_job_status(&self, uuid: &str) -> Result<Option<WorkerJobStatus>> {
|
||||
let mut conn = self.get_conn_internal().await?;
|
||||
let prefix = REDIS_KEY_PREFIX.as_str();
|
||||
let key = format!("{}worker:job:{}", prefix, uuid);
|
||||
|
||||
let exists: bool = conn.exists(&key).await?;
|
||||
if !exists {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let status: String = conn.hget(&key, "status").await?;
|
||||
let job_id: i64 = conn.hget(&key, "job_id").await?;
|
||||
let current_processor: String = conn.hget(&key, "current_processor").await?;
|
||||
let progress_current: i32 = conn.hget(&key, "progress_current").await?;
|
||||
let progress_total: i32 = conn.hget(&key, "progress_total").await?;
|
||||
let updated_at: String = conn.hget(&key, "updated_at").await?;
|
||||
|
||||
Ok(Some(WorkerJobStatus {
|
||||
uuid: uuid.to_string(),
|
||||
job_id,
|
||||
status,
|
||||
current_processor,
|
||||
progress_current,
|
||||
progress_total,
|
||||
updated_at,
|
||||
}))
|
||||
}
|
||||
|
||||
pub async fn delete_worker_job(&self, uuid: &str) -> Result<()> {
|
||||
let mut conn = self.get_conn_internal().await?;
|
||||
let prefix = REDIS_KEY_PREFIX.as_str();
|
||||
|
||||
let key = format!("{}worker:job:{}", prefix, uuid);
|
||||
let _: i32 = conn.del(&key).await?;
|
||||
|
||||
let processor_types = ["asr", "cut", "yolo", "ocr", "face", "pose", "asrx"];
|
||||
for ptype in processor_types {
|
||||
let proc_key = format!("{}worker:job:{}:processor:{}", prefix, uuid, ptype);
|
||||
let _: i32 = conn.del(&proc_key).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_all_worker_jobs(&self) -> Result<Vec<WorkerJobInfo>> {
|
||||
let mut conn = self.get_conn_internal().await?;
|
||||
let prefix = REDIS_KEY_PREFIX.as_str();
|
||||
let keys: Vec<String> = conn.keys(format!("{}worker:job:*", prefix)).await?;
|
||||
|
||||
let mut jobs = Vec::new();
|
||||
for key in keys {
|
||||
let uuid = key.replace(&format!("{}worker:job:", prefix), "");
|
||||
if let Some(status) = self.get_worker_job_status(&uuid).await? {
|
||||
jobs.push(WorkerJobInfo {
|
||||
uuid,
|
||||
job_id: status.job_id,
|
||||
status: status.status,
|
||||
progress_current: status.progress_current,
|
||||
progress_total: status.progress_total,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Ok(jobs)
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for RedisClient {
|
||||
@@ -260,3 +502,55 @@ pub struct ProgressData {
|
||||
pub current: Option<i32>,
|
||||
pub total: Option<i32>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct MonitorJobRedis {
|
||||
pub uuid: String,
|
||||
pub status: String,
|
||||
pub current_processor: String,
|
||||
pub progress_total: i32,
|
||||
pub progress_current: i32,
|
||||
pub error_count: i32,
|
||||
pub started_at: String,
|
||||
pub updated_at: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct JobErrorMessage {
|
||||
pub uuid: String,
|
||||
pub error: String,
|
||||
pub timestamp: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct AnomalyAlertMessage {
|
||||
pub key_id: String,
|
||||
pub anomaly_type: String,
|
||||
pub severity: String,
|
||||
pub ip_address: Option<String>,
|
||||
pub request_count: Option<i32>,
|
||||
pub error_rate: Option<f64>,
|
||||
pub unique_ips: Option<i32>,
|
||||
pub timestamp: i64,
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct WorkerJobStatus {
|
||||
pub uuid: String,
|
||||
pub job_id: i64,
|
||||
pub status: String,
|
||||
pub current_processor: String,
|
||||
pub progress_current: i32,
|
||||
pub progress_total: i32,
|
||||
pub updated_at: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct WorkerJobInfo {
|
||||
pub uuid: String,
|
||||
pub job_id: i64,
|
||||
pub status: String,
|
||||
pub progress_current: i32,
|
||||
pub progress_total: i32,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user