Initial commit: Momentry Core v0.1
- Rust-based digital asset management system - Video analysis: ASR, OCR, YOLO, Face, Pose - RAG capabilities with Qdrant vector database - Multi-database support: PostgreSQL, Redis, MongoDB - Monitoring system with launchd plists - n8n workflow automation integration
This commit is contained in:
40
src/core/db/mod.rs
Normal file
40
src/core/db/mod.rs
Normal file
@@ -0,0 +1,40 @@
|
||||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::core::chunk::Chunk;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SearchResult {
|
||||
pub chunk_id: String,
|
||||
pub score: f32,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait Database: Send + Sync {
|
||||
async fn init() -> Result<Self>
|
||||
where
|
||||
Self: Sized;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait ChunkStore: Send + Sync {
|
||||
async fn store_chunk(&self, chunk: &Chunk) -> Result<()>;
|
||||
async fn get_chunks_by_uuid(&self, uuid: &str) -> Result<Vec<Chunk>>;
|
||||
async fn get_all_chunks(&self) -> Result<Vec<Chunk>>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait VectorStore: Send + Sync {
|
||||
async fn store_vector(&self, chunk_id: &str, vector: &[f32]) -> Result<()>;
|
||||
async fn search(&self, query_vector: &[f32], limit: usize) -> Result<Vec<SearchResult>>;
|
||||
}
|
||||
|
||||
pub mod mongodb_db;
|
||||
pub mod postgres_db;
|
||||
pub mod qdrant_db;
|
||||
pub mod redis_db;
|
||||
|
||||
pub use mongodb_db::MongoDb;
|
||||
pub use postgres_db::{PostgresDb, VideoRecord};
|
||||
pub use qdrant_db::QdrantDb;
|
||||
pub use redis_db::RedisDb;
|
||||
53
src/core/db/mongodb_db.rs
Normal file
53
src/core/db/mongodb_db.rs
Normal file
@@ -0,0 +1,53 @@
|
||||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use super::Database;
|
||||
|
||||
pub struct MongoDb {
|
||||
cache: Arc<RwLock<MongoCache>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct MongoCache {
|
||||
documents: std::collections::HashMap<String, serde_json::Value>,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Serialize, serde::Deserialize)]
|
||||
pub struct VideoDocument {
|
||||
pub uuid: String,
|
||||
pub file_path: String,
|
||||
pub file_name: String,
|
||||
pub probe: serde_json::Value,
|
||||
pub asr: Option<serde_json::Value>,
|
||||
pub asrx: Option<serde_json::Value>,
|
||||
pub ocr: Option<serde_json::Value>,
|
||||
pub yolo: Option<serde_json::Value>,
|
||||
pub face: Option<serde_json::Value>,
|
||||
pub pose: Option<serde_json::Value>,
|
||||
pub created_at: String,
|
||||
pub updated_at: String,
|
||||
}
|
||||
|
||||
impl MongoDb {
|
||||
pub async fn store_video(&self, _doc: &VideoDocument) -> Result<()> {
|
||||
// TODO: Implement MongoDB client
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_video(&self, _uuid: &str) -> Result<Option<VideoDocument>> {
|
||||
// TODO: Implement MongoDB client
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Database for MongoDb {
|
||||
async fn init() -> Result<Self> {
|
||||
// TODO: Initialize MongoDB client
|
||||
Ok(Self {
|
||||
cache: Arc::new(RwLock::new(MongoCache::default())),
|
||||
})
|
||||
}
|
||||
}
|
||||
286
src/core/db/postgres_db.rs
Normal file
286
src/core/db/postgres_db.rs
Normal file
@@ -0,0 +1,286 @@
|
||||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::{PgPool, Row};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use super::Database;
|
||||
use crate::core::chunk::{Chunk, ChunkType};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct VideoRecord {
|
||||
pub id: i64,
|
||||
pub uuid: String,
|
||||
pub file_path: String,
|
||||
pub file_name: String,
|
||||
pub duration: f64,
|
||||
pub width: u32,
|
||||
pub height: u32,
|
||||
pub fps: f64,
|
||||
pub probe_json: Option<String>,
|
||||
pub created_at: String,
|
||||
}
|
||||
|
||||
pub struct PostgresDb {
|
||||
pool: PgPool,
|
||||
cache: Arc<RwLock<PostgresCache>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct PostgresCache {
|
||||
videos: std::collections::HashMap<String, VideoRecord>,
|
||||
chunks: std::collections::HashMap<String, Vec<Chunk>>,
|
||||
}
|
||||
|
||||
impl PostgresDb {
|
||||
pub async fn new(database_url: &str) -> Result<Self> {
|
||||
let pool = PgPool::connect(database_url).await?;
|
||||
|
||||
let db = Self {
|
||||
pool,
|
||||
cache: Arc::new(RwLock::new(PostgresCache::default())),
|
||||
};
|
||||
|
||||
db.init_schema().await?;
|
||||
Ok(db)
|
||||
}
|
||||
|
||||
pub async fn register_video(&self, record: &VideoRecord) -> Result<i64> {
|
||||
let result = sqlx::query(
|
||||
r#"
|
||||
INSERT INTO videos (uuid, file_path, file_name, duration, width, height, fps, probe_json)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
ON CONFLICT (uuid) DO UPDATE SET
|
||||
file_path = EXCLUDED.file_path,
|
||||
file_name = EXCLUDED.file_name,
|
||||
duration = EXCLUDED.duration,
|
||||
width = EXCLUDED.width,
|
||||
height = EXCLUDED.height,
|
||||
fps = EXCLUDED.fps,
|
||||
probe_json = EXCLUDED.probe_json,
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
RETURNING id::bigint
|
||||
"#
|
||||
)
|
||||
.bind(&record.uuid)
|
||||
.bind(&record.file_path)
|
||||
.bind(&record.file_name)
|
||||
.bind(record.duration)
|
||||
.bind(record.width as i32)
|
||||
.bind(record.height as i32)
|
||||
.bind(record.fps)
|
||||
.bind(&record.probe_json)
|
||||
.fetch_one(&self.pool)
|
||||
.await?;
|
||||
|
||||
let id: i64 = result.get(0);
|
||||
|
||||
// Update cache
|
||||
let mut cache = self.cache.write().await;
|
||||
let mut record = record.clone();
|
||||
record.id = id as i64;
|
||||
cache.videos.insert(record.uuid.clone(), record);
|
||||
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
pub async fn get_video_by_uuid(&self, uuid: &str) -> Result<Option<VideoRecord>> {
|
||||
// Check cache first
|
||||
{
|
||||
let cache = self.cache.read().await;
|
||||
if let Some(video) = cache.videos.get(uuid) {
|
||||
return Ok(Some(video.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
let result = sqlx::query_as::<_, (i32, String, String, String, f64, i32, i32, f64, Option<String>)>(
|
||||
"SELECT id, uuid, file_path, file_name, duration, width, height, fps, probe_json FROM videos WHERE uuid = $1"
|
||||
)
|
||||
.bind(uuid)
|
||||
.fetch_optional(&self.pool)
|
||||
.await?;
|
||||
|
||||
if let Some(r) = result {
|
||||
let video = VideoRecord {
|
||||
id: r.0 as i64,
|
||||
uuid: r.1,
|
||||
file_path: r.2,
|
||||
file_name: r.3,
|
||||
duration: r.4,
|
||||
width: r.5 as u32,
|
||||
height: r.6 as u32,
|
||||
fps: r.7,
|
||||
probe_json: r.8,
|
||||
created_at: String::new(),
|
||||
};
|
||||
|
||||
// Update cache
|
||||
let mut cache = self.cache.write().await;
|
||||
cache.videos.insert(uuid.to_string(), video.clone());
|
||||
|
||||
Ok(Some(video))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn list_videos(&self) -> Result<Vec<VideoRecord>> {
|
||||
let rows = sqlx::query_as::<_, (i32, String, String, String, f64, i32, i32, f64, Option<String>)>(
|
||||
"SELECT id, uuid, file_path, file_name, duration, width, height, fps, probe_json FROM videos ORDER BY id DESC"
|
||||
)
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
|
||||
let videos: Vec<VideoRecord> = rows
|
||||
.into_iter()
|
||||
.map(|r| VideoRecord {
|
||||
id: r.0 as i64,
|
||||
uuid: r.1,
|
||||
file_path: r.2,
|
||||
file_name: r.3,
|
||||
duration: r.4,
|
||||
width: r.5 as u32,
|
||||
height: r.6 as u32,
|
||||
fps: r.7,
|
||||
probe_json: r.8,
|
||||
created_at: String::new(),
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(videos)
|
||||
}
|
||||
|
||||
async fn init_schema(&self) -> Result<()> {
|
||||
sqlx::query(
|
||||
r#"
|
||||
CREATE TABLE IF NOT EXISTS videos (
|
||||
id SERIAL PRIMARY KEY,
|
||||
uuid VARCHAR(32) UNIQUE NOT NULL,
|
||||
file_path TEXT NOT NULL,
|
||||
file_name TEXT NOT NULL,
|
||||
duration DOUBLE PRECISION,
|
||||
width INTEGER,
|
||||
height INTEGER,
|
||||
fps DOUBLE PRECISION,
|
||||
probe_json TEXT,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
"#,
|
||||
)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
|
||||
sqlx::query("CREATE INDEX IF NOT EXISTS idx_videos_uuid ON videos(uuid)")
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
CREATE TABLE IF NOT EXISTS chunks (
|
||||
id SERIAL PRIMARY KEY,
|
||||
uuid VARCHAR(32) NOT NULL,
|
||||
chunk_id VARCHAR(64) NOT NULL,
|
||||
chunk_index INTEGER NOT NULL,
|
||||
chunk_type VARCHAR(32) NOT NULL,
|
||||
start_time DOUBLE PRECISION NOT NULL,
|
||||
end_time DOUBLE PRECISION NOT NULL,
|
||||
content JSONB NOT NULL,
|
||||
vector_id VARCHAR(64),
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
UNIQUE(uuid, chunk_id)
|
||||
)
|
||||
"#,
|
||||
)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
|
||||
sqlx::query("CREATE INDEX IF NOT EXISTS idx_chunks_uuid ON chunks(uuid)")
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
|
||||
sqlx::query("CREATE INDEX IF NOT EXISTS idx_chunks_type ON chunks(chunk_type)")
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
|
||||
sqlx::query("CREATE INDEX IF NOT EXISTS idx_chunks_time ON chunks(start_time, end_time)")
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn store_chunk(&self, chunk: &Chunk) -> Result<()> {
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO chunks (uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, content)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb)
|
||||
ON CONFLICT (uuid, chunk_id) DO UPDATE SET
|
||||
start_time = EXCLUDED.start_time,
|
||||
end_time = EXCLUDED.end_time,
|
||||
content = EXCLUDED.content,
|
||||
vector_id = EXCLUDED.vector_id
|
||||
"#
|
||||
)
|
||||
.bind(&chunk.uuid)
|
||||
.bind(&chunk.chunk_id)
|
||||
.bind(chunk.chunk_index as i32)
|
||||
.bind(chunk.chunk_type.as_str())
|
||||
.bind(chunk.start_time)
|
||||
.bind(chunk.end_time)
|
||||
.bind(&chunk.content)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_chunks_by_uuid(&self, uuid: &str) -> Result<Vec<Chunk>> {
|
||||
let rows = sqlx::query(
|
||||
"SELECT uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, content FROM chunks WHERE uuid = $1 ORDER BY chunk_index"
|
||||
)
|
||||
.bind(uuid)
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
|
||||
let chunks: Vec<Chunk> = rows
|
||||
.into_iter()
|
||||
.map(|r| {
|
||||
let chunk_type_str: String = r.get(3);
|
||||
let chunk_index: i32 = r.get(2);
|
||||
let chunk_type = match chunk_type_str.as_str() {
|
||||
"time_based" => ChunkType::TimeBased,
|
||||
"sentence" => ChunkType::Sentence,
|
||||
"cut" => ChunkType::Cut,
|
||||
_ => ChunkType::TimeBased,
|
||||
};
|
||||
|
||||
let content_json: String = r.get(6);
|
||||
let content: serde_json::Value =
|
||||
serde_json::from_str(&content_json).unwrap_or(serde_json::json!({}));
|
||||
|
||||
Chunk {
|
||||
uuid: r.get(0),
|
||||
chunk_id: r.get(1),
|
||||
chunk_index: chunk_index as u32,
|
||||
chunk_type,
|
||||
start_time: r.get(4),
|
||||
end_time: r.get(5),
|
||||
content,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(chunks)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Database for PostgresDb {
|
||||
async fn init() -> Result<Self> {
|
||||
let database_url = std::env::var("DATABASE_URL")
|
||||
.unwrap_or_else(|_| "postgres://accusys@localhost:5432/momentry".to_string());
|
||||
Self::new(&database_url).await
|
||||
}
|
||||
}
|
||||
88
src/core/db/qdrant_db.rs
Normal file
88
src/core/db/qdrant_db.rs
Normal file
@@ -0,0 +1,88 @@
|
||||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use super::{Database, SearchResult, VectorStore};
|
||||
|
||||
pub struct QdrantDb {
|
||||
collection_name: String,
|
||||
cache: Arc<RwLock<QdrantCache>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct QdrantCache {
|
||||
vectors: std::collections::HashMap<String, Vec<f32>>,
|
||||
}
|
||||
|
||||
impl QdrantDb {
|
||||
pub async fn init_collection(&self) -> Result<()> {
|
||||
// TODO: Implement actual Qdrant client
|
||||
// This is a placeholder
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn upsert_vector(&self, chunk_id: &str, vector: &[f32]) -> Result<()> {
|
||||
let mut cache = self.cache.write().await;
|
||||
cache.vectors.insert(chunk_id.to_string(), vector.to_vec());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Database for QdrantDb {
|
||||
async fn init() -> Result<Self> {
|
||||
let collection_name =
|
||||
std::env::var("QDRANT_COLLECTION").unwrap_or_else(|_| "momentry_chunks".to_string());
|
||||
|
||||
let db = Self {
|
||||
collection_name,
|
||||
cache: Arc::new(RwLock::new(QdrantCache::default())),
|
||||
};
|
||||
|
||||
db.init_collection().await?;
|
||||
Ok(db)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl VectorStore for QdrantDb {
|
||||
async fn store_vector(&self, chunk_id: &str, vector: &[f32]) -> Result<()> {
|
||||
self.upsert_vector(chunk_id, vector).await
|
||||
}
|
||||
|
||||
async fn search(&self, query_vector: &[f32], limit: usize) -> Result<Vec<SearchResult>> {
|
||||
// Simple cosine similarity search (placeholder)
|
||||
let cache = self.cache.read().await;
|
||||
let mut results: Vec<SearchResult> = Vec::new();
|
||||
|
||||
for (chunk_id, vector) in &cache.vectors {
|
||||
let similarity = cosine_similarity(query_vector, vector);
|
||||
results.push(SearchResult {
|
||||
chunk_id: chunk_id.clone(),
|
||||
score: similarity,
|
||||
});
|
||||
}
|
||||
|
||||
results.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap());
|
||||
results.truncate(limit);
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
}
|
||||
|
||||
fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
|
||||
if a.len() != b.len() || a.is_empty() {
|
||||
return 0.0;
|
||||
}
|
||||
|
||||
let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
|
||||
let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
|
||||
let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
|
||||
|
||||
if norm_a == 0.0 || norm_b == 0.0 {
|
||||
return 0.0;
|
||||
}
|
||||
|
||||
dot_product / (norm_a * norm_b)
|
||||
}
|
||||
65
src/core/db/redis_db.rs
Normal file
65
src/core/db/redis_db.rs
Normal file
@@ -0,0 +1,65 @@
|
||||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use super::Database;
|
||||
|
||||
pub struct RedisDb {
|
||||
state: Arc<RwLock<RedisState>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct RedisState {
|
||||
pub processing: Vec<String>,
|
||||
pub completed: Vec<String>,
|
||||
pub failed: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Serialize, serde::Deserialize)]
|
||||
pub struct Job {
|
||||
pub id: String,
|
||||
pub uuid: String,
|
||||
pub job_type: String,
|
||||
pub status: String,
|
||||
pub progress: f32,
|
||||
pub created_at: String,
|
||||
pub updated_at: String,
|
||||
}
|
||||
|
||||
impl RedisDb {
|
||||
pub async fn push_job(&self, _job: &Job) -> Result<()> {
|
||||
// TODO: Implement Redis client
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_pending_jobs(&self) -> Result<Vec<Job>> {
|
||||
// TODO: Implement Redis client
|
||||
Ok(vec![])
|
||||
}
|
||||
|
||||
pub async fn update_job_status(
|
||||
&self,
|
||||
_job_id: &str,
|
||||
_status: &str,
|
||||
_progress: f32,
|
||||
) -> Result<()> {
|
||||
// TODO: Implement Redis client
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn publish_event(&self, _channel: &str, _message: &str) -> Result<()> {
|
||||
// TODO: Implement Redis Pub/Sub
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Database for RedisDb {
|
||||
async fn init() -> Result<Self> {
|
||||
// TODO: Initialize Redis client
|
||||
Ok(Self {
|
||||
state: Arc::new(RwLock::new(RedisState::default())),
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user