feat: Add HTTP API for progress monitoring

- Add /api/v1/progress/:uuid endpoint for real-time progress查询
- Implement Redis Hash storage for progress persistence
- Increase DB connection pool (5->10)
- Add get_processor_status method to RedisClient
- Update DEVELOPMENT_LOG with HTTP API implementation

Test: curl http://127.0.0.1:3002/api/v1/progress/<uuid>
This commit is contained in:
accusys
2026-03-18 02:14:49 +08:00
parent cb1fcd4846
commit 13e208b569
5 changed files with 2837 additions and 67 deletions

View File

@@ -1,12 +1,23 @@
use anyhow::Result;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use sqlx::{PgPool, Row};
use sqlx::{postgres::PgPoolOptions, PgPool, Row};
use std::sync::Arc;
use tokio::sync::RwLock;
use super::Database;
use crate::core::chunk::{Chunk, ChunkType};
use crate::core::chunk::types::{Chunk, ChunkRule, ChunkType};
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct StorageStatus {
pub fs_video: bool,
pub fs_json: bool,
pub psql_chunk: bool,
pub pobject_chunk: bool,
pub mobject_chunk: bool,
pub pvector_chunk: bool,
pub qvector_chunk: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VideoRecord {
@@ -19,6 +30,40 @@ pub struct VideoRecord {
pub height: u32,
pub fps: f64,
pub probe_json: Option<String>,
pub storage: StorageStatus,
pub created_at: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PreChunk {
pub id: i64,
pub file_id: i64,
pub source_type: String,
pub source_file: Option<String>,
pub chunk_type: String,
pub start_time: f64,
pub end_time: f64,
pub start_frame: i64,
pub end_frame: i64,
pub fps: f64,
pub raw_json: serde_json::Value,
pub text_content: Option<String>,
pub processed: bool,
pub chunk_id: Option<String>,
pub created_at: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Frame {
pub id: i64,
pub file_id: i64,
pub frame_number: i64,
pub timestamp: f64,
pub fps: f64,
pub yolo_objects: Option<serde_json::Value>,
pub ocr_results: Option<serde_json::Value>,
pub face_results: Option<serde_json::Value>,
pub frame_path: Option<String>,
pub created_at: String,
}
@@ -30,12 +75,17 @@ pub struct PostgresDb {
#[derive(Debug, Default)]
pub struct PostgresCache {
videos: std::collections::HashMap<String, VideoRecord>,
#[allow(dead_code)]
chunks: std::collections::HashMap<String, Vec<Chunk>>,
}
impl PostgresDb {
pub async fn new(database_url: &str) -> Result<Self> {
let pool = PgPool::connect(database_url).await?;
let pool_options = PgPoolOptions::new()
.max_connections(10)
.acquire_timeout(std::time::Duration::from_secs(60));
let pool = pool_options.connect(database_url).await?;
let db = Self {
pool,
@@ -49,8 +99,8 @@ impl PostgresDb {
pub async fn register_video(&self, record: &VideoRecord) -> Result<i64> {
let result = sqlx::query(
r#"
INSERT INTO videos (uuid, file_path, file_name, duration, width, height, fps, probe_json)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
INSERT INTO videos (uuid, file_path, file_name, duration, width, height, fps, probe_json, fs_video)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, TRUE)
ON CONFLICT (uuid) DO UPDATE SET
file_path = EXCLUDED.file_path,
file_name = EXCLUDED.file_name,
@@ -59,6 +109,7 @@ impl PostgresDb {
height = EXCLUDED.height,
fps = EXCLUDED.fps,
probe_json = EXCLUDED.probe_json,
fs_video = TRUE,
updated_at = CURRENT_TIMESTAMP
RETURNING id::bigint
"#
@@ -94,8 +145,8 @@ impl PostgresDb {
}
}
let result = sqlx::query_as::<_, (i32, String, String, String, f64, i32, i32, f64, Option<String>)>(
"SELECT id, uuid, file_path, file_name, duration, width, height, fps, probe_json FROM videos WHERE uuid = $1"
let result = sqlx::query_as::<_, (i32, String, String, String, f64, i32, i32, f64, Option<String>, bool, bool, bool, bool, bool, bool, bool)>(
"SELECT id, uuid, file_path, file_name, duration, width, height, fps, probe_json, fs_video, fs_json, psql_chunk, pobject_chunk, mobject_chunk, pvector_chunk, qvector_chunk FROM videos WHERE uuid = $1"
)
.bind(uuid)
.fetch_optional(&self.pool)
@@ -104,7 +155,7 @@ impl PostgresDb {
if let Some(r) = result {
let video = VideoRecord {
id: r.0 as i64,
uuid: r.1,
uuid: r.1.clone(),
file_path: r.2,
file_name: r.3,
duration: r.4,
@@ -112,6 +163,15 @@ impl PostgresDb {
height: r.6 as u32,
fps: r.7,
probe_json: r.8,
storage: StorageStatus {
fs_video: r.9,
fs_json: r.10,
psql_chunk: r.11,
pobject_chunk: r.12,
mobject_chunk: r.13,
pvector_chunk: r.14,
qvector_chunk: r.15,
},
created_at: String::new(),
};
@@ -126,8 +186,8 @@ impl PostgresDb {
}
pub async fn list_videos(&self) -> Result<Vec<VideoRecord>> {
let rows = sqlx::query_as::<_, (i32, String, String, String, f64, i32, i32, f64, Option<String>)>(
"SELECT id, uuid, file_path, file_name, duration, width, height, fps, probe_json FROM videos ORDER BY id DESC"
let rows = sqlx::query_as::<_, (i32, String, String, String, f64, i32, i32, f64, Option<String>, bool, bool, bool, bool, bool, bool, bool)>(
"SELECT id, uuid, file_path, file_name, duration, width, height, fps, probe_json, fs_video, fs_json, psql_chunk, pobject_chunk, mobject_chunk, pvector_chunk, qvector_chunk FROM videos ORDER BY id DESC"
)
.fetch_all(&self.pool)
.await?;
@@ -144,6 +204,15 @@ impl PostgresDb {
height: r.6 as u32,
fps: r.7,
probe_json: r.8,
storage: StorageStatus {
fs_video: r.9,
fs_json: r.10,
psql_chunk: r.11,
pobject_chunk: r.12,
mobject_chunk: r.13,
pvector_chunk: r.14,
qvector_chunk: r.15,
},
created_at: String::new(),
})
.collect();
@@ -151,6 +220,69 @@ impl PostgresDb {
Ok(videos)
}
pub async fn update_storage_status(&self, uuid: &str, field: &str, value: bool) -> Result<()> {
let column = match field {
"fs_video" => "fs_video",
"fs_json" => "fs_json",
"psql_chunk" => "psql_chunk",
"pobject_chunk" => "pobject_chunk",
"mobject_chunk" => "mobject_chunk",
"pvector_chunk" => "pvector_chunk",
"qvector_chunk" => "qvector_chunk",
_ => return Err(anyhow::anyhow!("Invalid storage field: {}", field)),
};
sqlx::query(&format!(
"UPDATE videos SET {} = $1, updated_at = CURRENT_TIMESTAMP WHERE uuid = $2",
column
))
.bind(value)
.bind(uuid)
.execute(&self.pool)
.await?;
// Invalidate cache
let mut cache = self.cache.write().await;
cache.videos.remove(uuid);
Ok(())
}
pub async fn get_storage_status(&self, uuid: &str) -> Result<Option<StorageStatus>> {
if let Some(video) = self.get_video_by_uuid(uuid).await? {
Ok(Some(video.storage))
} else {
Ok(None)
}
}
pub async fn get_chunk_count(&self, uuid: &str) -> Result<(i64, i64)> {
let sentence_count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM chunks WHERE uuid = $1 AND chunk_type = 'sentence'",
)
.bind(uuid)
.fetch_one(&self.pool)
.await?;
let time_count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM chunks WHERE uuid = $1 AND chunk_type = 'time_based'",
)
.bind(uuid)
.fetch_one(&self.pool)
.await?;
Ok((sentence_count, time_count))
}
pub async fn get_vector_count(&self, uuid: &str) -> Result<i64> {
let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM chunk_vectors WHERE uuid = $1")
.bind(uuid)
.fetch_one(&self.pool)
.await?;
Ok(count)
}
async fn init_schema(&self) -> Result<()> {
sqlx::query(
r#"
@@ -164,6 +296,13 @@ impl PostgresDb {
height INTEGER,
fps DOUBLE PRECISION,
probe_json TEXT,
fs_video BOOLEAN DEFAULT FALSE,
fs_json BOOLEAN DEFAULT FALSE,
psql_chunk BOOLEAN DEFAULT FALSE,
pobject_chunk BOOLEAN DEFAULT FALSE,
mobject_chunk BOOLEAN DEFAULT FALSE,
pvector_chunk BOOLEAN DEFAULT FALSE,
qvector_chunk BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
@@ -176,6 +315,36 @@ impl PostgresDb {
.execute(&self.pool)
.await?;
sqlx::query("ALTER TABLE videos ADD COLUMN IF NOT EXISTS fs_video BOOLEAN DEFAULT FALSE")
.execute(&self.pool)
.await?;
sqlx::query("ALTER TABLE videos ADD COLUMN IF NOT EXISTS fs_json BOOLEAN DEFAULT FALSE")
.execute(&self.pool)
.await?;
sqlx::query("ALTER TABLE videos ADD COLUMN IF NOT EXISTS psql_chunk BOOLEAN DEFAULT FALSE")
.execute(&self.pool)
.await?;
sqlx::query(
"ALTER TABLE videos ADD COLUMN IF NOT EXISTS pobject_chunk BOOLEAN DEFAULT FALSE",
)
.execute(&self.pool)
.await?;
sqlx::query(
"ALTER TABLE videos ADD COLUMN IF NOT EXISTS mobject_chunk BOOLEAN DEFAULT FALSE",
)
.execute(&self.pool)
.await?;
sqlx::query(
"ALTER TABLE videos ADD COLUMN IF NOT EXISTS pvector_chunk BOOLEAN DEFAULT FALSE",
)
.execute(&self.pool)
.await?;
sqlx::query(
"ALTER TABLE videos ADD COLUMN IF NOT EXISTS qvector_chunk BOOLEAN DEFAULT FALSE",
)
.execute(&self.pool)
.await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS chunks (
@@ -186,9 +355,14 @@ impl PostgresDb {
chunk_type VARCHAR(32) NOT NULL,
start_time DOUBLE PRECISION NOT NULL,
end_time DOUBLE PRECISION NOT NULL,
fps DOUBLE PRECISION DEFAULT 24.0,
start_frame BIGINT DEFAULT 0,
end_frame BIGINT DEFAULT 0,
content JSONB NOT NULL,
metadata JSONB,
vector_id VARCHAR(64),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(uuid, chunk_id)
)
"#,
@@ -208,28 +382,165 @@ impl PostgresDb {
.execute(&self.pool)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_chunks_uuid_type ON chunks(uuid, chunk_type)")
.execute(&self.pool)
.await?;
sqlx::query(
"CREATE INDEX IF NOT EXISTS idx_chunks_content_gin ON chunks USING GIN(content)",
)
.execute(&self.pool)
.await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS chunk_vectors (
id SERIAL PRIMARY KEY,
chunk_id VARCHAR(64) NOT NULL UNIQUE,
uuid VARCHAR(32) NOT NULL,
chunk_type VARCHAR(32) NOT NULL,
start_time DOUBLE PRECISION,
end_time DOUBLE PRECISION,
embedding TEXT,
metadata JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"#,
)
.execute(&self.pool)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_chunk_vectors_uuid ON chunk_vectors(uuid)")
.execute(&self.pool)
.await?;
sqlx::query(
"CREATE INDEX IF NOT EXISTS idx_chunk_vectors_chunk_id ON chunk_vectors(chunk_id)",
)
.execute(&self.pool)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_chunk_vectors_uuid ON chunk_vectors(uuid)")
.execute(&self.pool)
.await?;
sqlx::query(
"CREATE INDEX IF NOT EXISTS idx_chunk_vectors_chunk_id ON chunk_vectors(chunk_id)",
)
.execute(&self.pool)
.await?;
// pre_chunks table
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS pre_chunks (
id SERIAL PRIMARY KEY,
file_id INTEGER NOT NULL REFERENCES videos(id),
source_type VARCHAR(32) NOT NULL,
source_file TEXT,
chunk_type VARCHAR(32) NOT NULL,
start_time DOUBLE PRECISION NOT NULL,
end_time DOUBLE PRECISION NOT NULL,
start_frame BIGINT DEFAULT 0,
end_frame BIGINT DEFAULT 0,
fps DOUBLE PRECISION DEFAULT 24.0,
raw_json JSONB NOT NULL,
text_content TEXT,
processed BOOLEAN DEFAULT FALSE,
chunk_id VARCHAR(64),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(file_id, source_type, start_frame, end_frame)
)
"#,
)
.execute(&self.pool)
.await?;
// frames table
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS frames (
id SERIAL PRIMARY KEY,
file_id INTEGER NOT NULL REFERENCES videos(id),
frame_number BIGINT NOT NULL,
timestamp DOUBLE PRECISION NOT NULL,
fps DOUBLE PRECISION DEFAULT 24.0,
yolo_objects JSONB,
ocr_results JSONB,
face_results JSONB,
frame_path TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(file_id, frame_number)
)
"#,
)
.execute(&self.pool)
.await?;
// Add file_id columns to existing tables if not exist
sqlx::query(
"ALTER TABLE chunks ADD COLUMN IF NOT EXISTS file_id INTEGER REFERENCES videos(id)",
)
.execute(&self.pool)
.await?;
sqlx::query("ALTER TABLE chunks ADD COLUMN IF NOT EXISTS text_content TEXT")
.execute(&self.pool)
.await?;
sqlx::query("ALTER TABLE chunks ADD COLUMN IF NOT EXISTS frame_count INTEGER DEFAULT 0")
.execute(&self.pool)
.await?;
sqlx::query("ALTER TABLE chunks ADD COLUMN IF NOT EXISTS pre_chunk_ids INTEGER[]")
.execute(&self.pool)
.await?;
sqlx::query("ALTER TABLE chunk_vectors ADD COLUMN IF NOT EXISTS file_id INTEGER REFERENCES videos(id)")
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn store_chunk(&self, chunk: &Chunk) -> Result<()> {
let content_with_rule = serde_json::json!({
"rule": chunk.rule.as_str(),
"data": chunk.content
});
sqlx::query(
r#"
INSERT INTO chunks (uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, content)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb)
INSERT INTO chunks (file_id, uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, pre_chunk_ids)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12::jsonb, $13::jsonb, $14, $15, $16)
ON CONFLICT (uuid, chunk_id) DO UPDATE SET
start_time = EXCLUDED.start_time,
end_time = EXCLUDED.end_time,
fps = EXCLUDED.fps,
start_frame = EXCLUDED.start_frame,
end_frame = EXCLUDED.end_frame,
text_content = EXCLUDED.text_content,
content = EXCLUDED.content,
vector_id = EXCLUDED.vector_id
metadata = EXCLUDED.metadata,
vector_id = EXCLUDED.vector_id,
frame_count = EXCLUDED.frame_count,
pre_chunk_ids = EXCLUDED.pre_chunk_ids,
updated_at = CURRENT_TIMESTAMP
"#
)
.bind(chunk.file_id)
.bind(&chunk.uuid)
.bind(&chunk.chunk_id)
.bind(chunk.chunk_index as i32)
.bind(chunk.chunk_type.as_str())
.bind(chunk.start_time)
.bind(chunk.end_time)
.bind(&chunk.content)
.bind(chunk.fps)
.bind(chunk.start_frame)
.bind(chunk.end_frame)
.bind(&chunk.text_content)
.bind(&content_with_rule)
.bind(&chunk.metadata)
.bind(&chunk.vector_id)
.bind(chunk.frame_count)
.bind(&chunk.pre_chunk_ids)
.execute(&self.pool)
.await?;
@@ -238,7 +549,7 @@ impl PostgresDb {
pub async fn get_chunks_by_uuid(&self, uuid: &str) -> Result<Vec<Chunk>> {
let rows = sqlx::query(
"SELECT uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, content FROM chunks WHERE uuid = $1 ORDER BY chunk_index"
"SELECT file_id, uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, pre_chunk_ids FROM chunks WHERE uuid = $1 ORDER BY chunk_index"
)
.bind(uuid)
.fetch_all(&self.pool)
@@ -247,32 +558,417 @@ impl PostgresDb {
let chunks: Vec<Chunk> = rows
.into_iter()
.map(|r| {
let chunk_type_str: String = r.get(3);
let chunk_index: i32 = r.get(2);
let chunk_type_str: String = r.get(4);
let chunk_index: i32 = r.get(3);
let chunk_type = match chunk_type_str.as_str() {
"time" => ChunkType::TimeBased,
"sentence" => ChunkType::Sentence,
"cut" => ChunkType::Cut,
"trace" => ChunkType::Trace,
_ => ChunkType::TimeBased,
};
let content: serde_json::Value = r.get(11);
let metadata: Option<serde_json::Value> = r.get(12);
// Get pre_chunk_ids - try direct Vec<i32> decode first
let pre_chunk_ids: Vec<i32> = r.try_get(15).unwrap_or_default();
// Extract rule from content
let (rule, content_data) = if content.get("rule").is_some() {
let rule_str = content
.get("rule")
.and_then(|v| v.as_str())
.unwrap_or("rule_1");
let rule = if rule_str == "rule_2" {
ChunkRule::Rule2
} else {
ChunkRule::Rule1
};
let data = content.get("data").cloned().unwrap_or(content);
(rule, data)
} else {
(ChunkRule::Rule1, content)
};
let file_id: i32 = sqlx::Row::get(&r, "file_id");
let frame_count: i32 = sqlx::Row::get(&r, "frame_count");
Chunk {
file_id,
uuid: r.get("uuid"),
chunk_id: r.get("chunk_id"),
chunk_index: chunk_index as u32,
chunk_type,
rule,
start_time: r.get("start_time"),
end_time: r.get("end_time"),
fps: r.get("fps"),
start_frame: r.get("start_frame"),
end_frame: r.get("end_frame"),
text_content: r.get("text_content"),
content: content_data,
metadata,
vector_id: r.get("vector_id"),
frame_count,
pre_chunk_ids,
}
})
.collect();
Ok(chunks)
}
pub async fn store_pre_chunk(&self, pre_chunk: &PreChunk) -> Result<i64> {
let row = sqlx::query(
r#"
INSERT INTO pre_chunks (file_id, source_type, source_file, chunk_type, start_time, end_time, start_frame, end_frame, fps, raw_json, text_content, processed, chunk_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
ON CONFLICT (file_id, source_type, start_frame, end_frame) DO UPDATE SET
raw_json = EXCLUDED.raw_json,
text_content = EXCLUDED.text_content,
processed = EXCLUDED.processed,
chunk_id = EXCLUDED.chunk_id
RETURNING id
"#
)
.bind(pre_chunk.file_id)
.bind(&pre_chunk.source_type)
.bind(&pre_chunk.source_file)
.bind(&pre_chunk.chunk_type)
.bind(pre_chunk.start_time)
.bind(pre_chunk.end_time)
.bind(pre_chunk.start_frame)
.bind(pre_chunk.end_frame)
.bind(pre_chunk.fps)
.bind(&pre_chunk.raw_json)
.bind(&pre_chunk.text_content)
.bind(pre_chunk.processed)
.bind(&pre_chunk.chunk_id)
.fetch_one(&self.pool)
.await?;
let id: i32 = row.get(0);
Ok(id as i64)
}
pub async fn store_frame(&self, frame: &Frame) -> Result<()> {
sqlx::query(
r#"
INSERT INTO frames (file_id, frame_number, timestamp, fps, yolo_objects, ocr_results, face_results, frame_path)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (file_id, frame_number) DO UPDATE SET
yolo_objects = EXCLUDED.yolo_objects,
ocr_results = EXCLUDED.ocr_results,
face_results = EXCLUDED.face_results,
frame_path = EXCLUDED.frame_path
"#
)
.bind(frame.file_id)
.bind(frame.frame_number)
.bind(frame.timestamp)
.bind(frame.fps)
.bind(&frame.yolo_objects)
.bind(&frame.ocr_results)
.bind(&frame.face_results)
.bind(&frame.frame_path)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn get_frames_by_time_range(
&self,
file_id: i64,
start_time: f64,
end_time: f64,
) -> Result<Vec<Frame>> {
let rows = sqlx::query_as::<_, (
i32,
i32,
i64,
f64,
f64,
Option<serde_json::Value>,
Option<serde_json::Value>,
Option<serde_json::Value>,
Option<String>,
String,
)>(
"SELECT id, file_id, frame_number, timestamp, fps, yolo_objects, ocr_results, face_results, frame_path, created_at
FROM frames
WHERE file_id = $1 AND timestamp >= $2 AND timestamp <= $3
ORDER BY frame_number"
)
.bind(file_id)
.bind(start_time)
.bind(end_time)
.fetch_all(&self.pool)
.await?;
let frames: Vec<Frame> = rows
.into_iter()
.map(|r| Frame {
id: r.0 as i64,
file_id: r.1 as i64,
frame_number: r.2,
timestamp: r.3,
fps: r.4,
yolo_objects: r.5,
ocr_results: r.6,
face_results: r.7,
frame_path: r.8,
created_at: r.9,
})
.collect();
Ok(frames)
}
pub async fn get_chunks_by_time_range(
&self,
file_id: i64,
start_time: f64,
end_time: f64,
) -> Result<Vec<Chunk>> {
let rows = sqlx::query(
"SELECT file_id, uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, pre_chunk_ids
FROM chunks
WHERE file_id = $1 AND start_time >= $2 AND end_time <= $3
ORDER BY start_time"
)
.bind(file_id)
.bind(start_time)
.bind(end_time)
.fetch_all(&self.pool)
.await?;
let chunks: Vec<Chunk> = rows
.into_iter()
.map(|r| {
let chunk_type_str: String = r.get(4);
let chunk_index: i32 = r.get(3);
let chunk_type = match chunk_type_str.as_str() {
"time" => ChunkType::TimeBased,
"sentence" => ChunkType::Sentence,
"cut" => ChunkType::Cut,
"trace" => ChunkType::Trace,
_ => ChunkType::TimeBased,
};
let content: serde_json::Value = r.get(11);
let metadata: Option<serde_json::Value> = r.get(12);
// Get pre_chunk_ids - try direct Vec<i32> decode
let pre_chunk_ids: Vec<i32> = r.try_get(15).unwrap_or_default();
let (rule, content_data) = if content.get("rule").is_some() {
let rule_str = content
.get("rule")
.and_then(|v| v.as_str())
.unwrap_or("rule_1");
let rule = if rule_str == "rule_2" {
ChunkRule::Rule2
} else {
ChunkRule::Rule1
};
let data = content.get("data").cloned().unwrap_or(content);
(rule, data)
} else {
(ChunkRule::Rule1, content)
};
let file_id: i32 = sqlx::Row::get(&r, "file_id");
let frame_count: i32 = sqlx::Row::get(&r, "frame_count");
Chunk {
file_id,
uuid: r.get("uuid"),
chunk_id: r.get("chunk_id"),
chunk_index: chunk_index as u32,
chunk_type,
rule,
start_time: r.get("start_time"),
end_time: r.get("end_time"),
fps: r.get("fps"),
start_frame: r.get("start_frame"),
end_frame: r.get("end_frame"),
text_content: r.get("text_content"),
content: content_data,
metadata,
vector_id: r.get("vector_id"),
frame_count,
pre_chunk_ids,
}
})
.collect();
Ok(chunks)
}
pub async fn get_file_id_by_uuid(&self, uuid: &str) -> Result<i64> {
let row = sqlx::query("SELECT id FROM videos WHERE uuid = $1")
.bind(uuid)
.fetch_one(&self.pool)
.await?;
Ok(row.get(0))
}
pub async fn store_vector(&self, chunk_id: &str, vector: &[f32], uuid: &str) -> Result<()> {
let vector_json = serde_json::json!(vector);
let embedding_str = vector_json.to_string();
// Clone for use in closure
let chunk_id = chunk_id.to_string();
let uuid = uuid.to_string();
// Use blocking task - this needs to wait for result
let join_result = tokio::task::spawn_blocking(move || {
let output = std::process::Command::new("psql")
.args([
"postgres://accusys@localhost:5432/momentry",
"-c",
&format!(
"INSERT INTO chunk_vectors (chunk_id, uuid, chunk_type, embedding) VALUES ('{}', '{}', 'sentence', '{}') ON CONFLICT (chunk_id) DO UPDATE SET embedding = EXCLUDED.embedding",
chunk_id, uuid, embedding_str.replace('\'', "''")
)
])
.output();
(chunk_id, output)
})
.await;
match join_result {
Ok((cid, Ok(output))) => {
if !output.status.success() {
let err = String::from_utf8_lossy(&output.stderr);
tracing::error!("psql error for {}: {}", cid, err);
}
}
Ok((cid, Err(e))) => {
tracing::error!("psql output error for {}: {}", cid, e);
}
Err(e) => {
tracing::error!("join error: {}", e);
}
}
Ok(())
}
pub async fn update_vector_id(&self, chunk_id: &str, vector_id: &str) -> Result<()> {
sqlx::query("UPDATE chunks SET vector_id = $1 WHERE chunk_id = $2")
.bind(vector_id)
.bind(chunk_id)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn search_vector(
&self,
_query_vector: &[f32],
_limit: usize,
) -> Result<Vec<super::SearchResult>> {
Ok(vec![])
}
pub async fn search_text(&self, query: &str, chunk_type: Option<&str>) -> Result<Vec<Chunk>> {
let query_pattern = format!("%{}%", query);
let sql = match chunk_type {
Some(_) => "SELECT uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, content, metadata, vector_id FROM chunks WHERE content->>'text' ILIKE $1 AND chunk_type = $2 ORDER BY chunk_index",
None => "SELECT uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, content, metadata, vector_id FROM chunks WHERE content->>'text' ILIKE $1 ORDER BY chunk_index",
};
let chunks = if let Some(ct) = chunk_type {
sqlx::query_as::<
_,
(
String,
String,
i32,
String,
f64,
f64,
f64,
i64,
i64,
String,
Option<String>,
Option<String>,
),
>(sql)
.bind(&query_pattern)
.bind(ct)
.fetch_all(&self.pool)
.await?
} else {
sqlx::query_as::<
_,
(
String,
String,
i32,
String,
f64,
f64,
f64,
i64,
i64,
String,
Option<String>,
Option<String>,
),
>(sql)
.bind(&query_pattern)
.fetch_all(&self.pool)
.await?
};
let results: Vec<Chunk> = chunks
.into_iter()
.map(|r| {
let chunk_type = match r.3.as_str() {
"time_based" => ChunkType::TimeBased,
"sentence" => ChunkType::Sentence,
"cut" => ChunkType::Cut,
_ => ChunkType::TimeBased,
};
let content_json: String = r.get(6);
let content: serde_json::Value =
serde_json::from_str(&content_json).unwrap_or(serde_json::json!({}));
serde_json::from_str(&r.9).unwrap_or(serde_json::json!({}));
let metadata: Option<serde_json::Value> =
r.10.and_then(|m| serde_json::from_str(&m).ok());
Chunk {
uuid: r.get(0),
chunk_id: r.get(1),
chunk_index: chunk_index as u32,
file_id: 0,
uuid: r.0,
chunk_id: r.1,
chunk_index: r.2 as u32,
chunk_type,
start_time: r.get(4),
end_time: r.get(5),
rule: ChunkRule::Rule1,
start_time: r.4,
end_time: r.5,
fps: r.6,
start_frame: r.7,
end_frame: r.8,
text_content: Some(r.9),
content,
metadata,
vector_id: r.11,
frame_count: 0,
pre_chunk_ids: vec![],
}
})
.collect();
Ok(chunks)
Ok(results)
}
}

262
src/core/db/redis_client.rs Normal file
View File

@@ -0,0 +1,262 @@
use anyhow::{Context, Result};
use futures_util::stream::StreamExt;
use redis::aio::MultiplexedConnection;
use redis::{AsyncCommands, Client};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::RwLock;
pub struct RedisClient {
client: Client,
state: Arc<RwLock<RedisState>>,
}
#[derive(Debug, Clone, Default)]
pub struct RedisState {
pub connected: bool,
}
impl RedisClient {
pub fn new() -> Result<Self> {
let redis_url = std::env::var("REDIS_URL").unwrap_or_else(|_| {
let password =
std::env::var("REDIS_PASSWORD").unwrap_or_else(|_| "accusys".to_string());
format!("redis://:{}@localhost:6379", password)
});
let client = Client::open(redis_url.as_str()).context("Failed to connect to Redis")?;
Ok(Self {
client,
state: Arc::new(RwLock::new(RedisState { connected: true })),
})
}
pub async fn is_connected(&self) -> bool {
self.state.read().await.connected
}
pub async fn get_conn(&self) -> Result<MultiplexedConnection> {
self.get_conn_internal().await
}
pub async fn get_conn_internal(&self) -> Result<MultiplexedConnection> {
self.client
.get_multiplexed_async_connection()
.await
.context("Failed to get Redis connection")
}
pub async fn get_job_status(&self, uuid: &str) -> Result<Option<JobStatus>> {
let mut conn = self.get_conn_internal().await?;
let key = format!("momentry:job:{}", uuid);
let status: Option<String> = conn.hget(&key, "status").await?;
if status.is_none() {
return Ok(None);
}
let current_processor: String = conn.hget(&key, "current_processor").await?;
let progress_total: i32 = conn.hget(&key, "progress_total").await?;
let progress_current: i32 = conn.hget(&key, "progress_current").await?;
let started_at: String = conn.hget(&key, "started_at").await?;
let updated_at: String = conn.hget(&key, "updated_at").await?;
let error_count: i32 = conn.hget(&key, "error_count").await?;
let last_error: String = conn.hget(&key, "last_error").await?;
Ok(Some(JobStatus {
status: status.unwrap_or_default(),
current_processor,
progress_total,
progress_current,
started_at,
updated_at,
error_count,
last_error,
}))
}
pub async fn set_processor_status(
&self,
uuid: &str,
processor: &str,
status: &ProcessorStatus,
) -> Result<()> {
let mut conn = self.get_conn_internal().await?;
let key = format!("momentry:job:{}:processor:{}", uuid, processor);
let _: Option<String> = conn
.hset_multiple(
&key,
&[
("status", status.status.as_str()),
("progress", status.progress.to_string().as_str()),
("current", status.current.to_string().as_str()),
("total", status.total.to_string().as_str()),
("started_at", status.started_at.as_str()),
("updated_at", status.updated_at.as_str()),
("message", status.message.as_str()),
],
)
.await?;
let _: bool = conn.expire(&key, 86400).await?;
Ok(())
}
pub async fn get_processor_status(
&self,
uuid: &str,
processor: &str,
) -> Result<Option<ProcessorStatus>> {
let mut conn = self.get_conn_internal().await?;
let key = format!("momentry:job:{}:processor:{}", uuid, processor);
let status: Option<String> = conn.hget(&key, "status").await?;
if status.is_none() {
return Ok(None);
}
let progress: i32 = conn.hget(&key, "progress").await?;
let current: i32 = conn.hget(&key, "current").await?;
let total: i32 = conn.hget(&key, "total").await?;
let started_at: String = conn.hget(&key, "started_at").await?;
let updated_at: String = conn.hget(&key, "updated_at").await?;
let message: String = conn.hget(&key, "message").await?;
Ok(Some(ProcessorStatus {
status: status.unwrap_or_default(),
progress,
current,
total,
started_at,
updated_at,
message,
}))
}
pub async fn publish_progress(&self, uuid: &str, message: &ProgressMessage) -> Result<()> {
let mut conn = self.get_conn_internal().await?;
let channel = format!("momentry:progress:{}", uuid);
let json = serde_json::to_string(message)?;
let _: usize = conn.publish(&channel, json).await?;
Ok(())
}
pub async fn subscribe_progress(&self, uuid: &str) -> Result<redis::aio::PubSub> {
let mut pubsub = self.client.get_async_pubsub().await?;
let channel = format!("momentry:progress:{}", uuid);
pubsub.subscribe(channel).await?;
Ok(pubsub)
}
pub async fn subscribe_and_callback<F>(&self, uuid: &str, mut callback: F) -> Result<()>
where
F: FnMut(ProgressMessage) + Send + 'static,
{
let mut pubsub = self.subscribe_progress(uuid).await?;
let mut stream = pubsub.on_message();
while let Some(msg) = stream.next().await {
let payload: String = msg.get_payload().map_err(|e| anyhow::anyhow!("{}", e))?;
if let Ok(progress_msg) = serde_json::from_str::<ProgressMessage>(&payload) {
callback(progress_msg);
}
}
Ok(())
}
pub async fn add_to_active_jobs(&self, uuid: &str) -> Result<()> {
let mut conn = self.get_conn_internal().await?;
let _: usize = conn.sadd("momentry:jobs:active", uuid).await?;
Ok(())
}
pub async fn move_to_completed_jobs(&self, uuid: &str) -> Result<()> {
let mut conn = self.get_conn_internal().await?;
let _: bool = conn
.smove("momentry:jobs:active", "momentry:jobs:completed", uuid)
.await?;
Ok(())
}
pub async fn move_to_failed_jobs(&self, uuid: &str) -> Result<()> {
let mut conn = self.get_conn_internal().await?;
let _: bool = conn
.smove("momentry:jobs:active", "momentry:jobs:failed", uuid)
.await?;
Ok(())
}
pub async fn get_active_jobs(&self) -> Result<Vec<String>> {
let mut conn = self.get_conn_internal().await?;
let jobs: Vec<String> = conn.smembers("momentry:jobs:active").await?;
Ok(jobs)
}
pub async fn set_health(&self, status: &str) -> Result<()> {
let mut conn = self.get_conn_internal().await?;
let _: String = conn
.set_ex("momentry:health:momentry_core", status, 60)
.await?;
Ok(())
}
pub async fn get_health(&self) -> Result<Option<String>> {
let mut conn = self.get_conn_internal().await?;
let health: Option<String> = conn.get("momentry:health:momentry_core").await?;
Ok(health)
}
}
impl Default for RedisClient {
fn default() -> Self {
Self::new().expect("Failed to create Redis client")
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobStatus {
pub status: String,
pub current_processor: String,
pub progress_total: i32,
pub progress_current: i32,
pub started_at: String,
pub updated_at: String,
pub error_count: i32,
pub last_error: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProcessorStatus {
pub status: String,
pub progress: i32,
pub current: i32,
pub total: i32,
pub started_at: String,
pub updated_at: String,
pub message: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProgressMessage {
#[serde(rename = "type")]
pub msg_type: String,
pub processor: String,
pub uuid: String,
pub timestamp: i64,
pub data: ProgressData,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProgressData {
pub message: Option<String>,
pub current: Option<i32>,
pub total: Option<i32>,
}