feat: media API (video/bbox/thumbnail), UUID unification, dot matrix text, portal fixes, API dictionary V1.3

This commit is contained in:
Warren
2026-05-06 13:34:49 +08:00
parent e75c4d6f07
commit 74b6182eba
197 changed files with 17511 additions and 8759 deletions

View File

@@ -4,7 +4,7 @@ use serde::{de::DeserializeOwned, Serialize};
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::core::config::{cache as cache_config, snapshot as snapshot_config, REDIS_KEY_PREFIX};
use crate::core::config::{cache as cache_config, REDIS_KEY_PREFIX};
use crate::core::db::RedisClient;
pub struct RedisCache {
@@ -133,135 +133,6 @@ impl RedisCache {
pub async fn invalidate_videos_list(&self) -> Result<u64> {
self.invalidate_pattern("videos:*").await
}
// --- Snapshot Cache Methods ---
pub async fn snapshot_hits_key(file_uuid: &str) -> String {
format!("snapshot:hits:{}", file_uuid)
}
pub async fn snapshot_status_key(file_uuid: &str) -> String {
format!("snapshot:status:{}", file_uuid)
}
pub async fn snapshot_last_access_key(file_uuid: &str) -> String {
format!("snapshot:last_access:{}", file_uuid)
}
pub async fn snapshot_migrate_hint_key(file_uuid: &str) -> String {
format!("snapshot:migrate_hint:{}", file_uuid)
}
pub async fn increment_snapshot_hits(&self, file_uuid: &str) -> Result<u64> {
let client = self.client.read().await;
let mut conn = client.get_conn_internal().await?;
let key = Self::snapshot_hits_key(file_uuid).await;
let new_count: u64 = redis::cmd("INCR").arg(&key).query_async(&mut conn).await?;
let _: () = redis::cmd("EXPIRE")
.arg(&key)
.arg(*snapshot_config::WARM_TTL_SECS)
.query_async(&mut conn)
.await?;
Ok(new_count)
}
pub async fn get_snapshot_hits(&self, file_uuid: &str) -> Result<u64> {
let client = self.client.read().await;
let mut conn = client.get_conn_internal().await?;
let key = Self::snapshot_hits_key(file_uuid).await;
let count: Option<u64> = conn.get(&key).await?;
Ok(count.unwrap_or(0))
}
pub async fn update_last_access(&self, file_uuid: &str) -> Result<()> {
let client = self.client.read().await;
let mut conn = client.get_conn_internal().await?;
let key = Self::snapshot_last_access_key(file_uuid).await;
let now: i64 = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
let _: String = conn
.set_ex(&key, now, *snapshot_config::WARM_TTL_SECS)
.await?;
Ok(())
}
pub async fn set_snapshot_status(
&self,
file_uuid: &str,
status: &str,
progress: Option<f32>,
) -> Result<()> {
let client = self.client.read().await;
let mut conn = client.get_conn_internal().await?;
let key = Self::snapshot_status_key(file_uuid).await;
let payload = serde_json::json!({
"status": status,
"progress": progress,
});
let json = serde_json::to_string(&payload)?;
let ttl = if status == "generating" {
*crate::core::config::snapshot::GENERATING_TIMEOUT_SECS
} else {
*snapshot_config::WARM_TTL_SECS
};
let _: String = conn.set_ex(&key, json, ttl).await?;
Ok(())
}
pub async fn get_snapshot_status(&self, file_uuid: &str) -> Result<serde_json::Value> {
let client = self.client.read().await;
let mut conn = client.get_conn_internal().await?;
let key = Self::snapshot_status_key(file_uuid).await;
let value: Option<String> = conn.get(&key).await?;
match value {
Some(json) => Ok(serde_json::from_str(&json)?),
None => Ok(serde_json::json!({
"status": "cold",
"progress": null,
})),
}
}
pub async fn clear_snapshot_status(&self, file_uuid: &str) -> Result<()> {
let client = self.client.read().await;
let mut conn = client.get_conn_internal().await?;
let key = Self::snapshot_status_key(file_uuid).await;
let _: () = conn.del(&key).await?;
Ok(())
}
pub async fn set_migrate_hint(
&self,
file_uuid: &str,
parent_uuid: &str,
count: u64,
) -> Result<()> {
let client = self.client.read().await;
let mut conn = client.get_conn_internal().await?;
let key = Self::snapshot_migrate_hint_key(file_uuid).await;
let payload = serde_json::json!({
"parent_uuid": parent_uuid,
"count": count,
});
let json = serde_json::to_string(&payload)?;
let _: String = conn
.set_ex(&key, json, *snapshot_config::WARM_TTL_SECS)
.await?;
Ok(())
}
pub async fn get_migrate_hint(&self, file_uuid: &str) -> Result<Option<serde_json::Value>> {
let client = self.client.read().await;
let mut conn = client.get_conn_internal().await?;
let key = Self::snapshot_migrate_hint_key(file_uuid).await;
let value: Option<String> = conn.get(&key).await?;
match value {
Some(json) => Ok(Some(serde_json::from_str(&json)?)),
None => Ok(None),
}
}
}
impl Clone for RedisCache {

View File

@@ -165,42 +165,6 @@ pub mod cache {
});
}
pub mod snapshot {
use super::*;
pub static SNAPSHOT_DIR_NAME: Lazy<String> = Lazy::new(|| {
env::var("MOMENTRY_SNAPSHOT_DIR_NAME").unwrap_or_else(|_| ".momentry_snapshots".to_string())
});
pub static HOT_THRESHOLD: Lazy<u64> = Lazy::new(|| {
env::var("MOMENTRY_SNAPSHOT_HOT_THRESHOLD")
.unwrap_or_else(|_| "5".to_string())
.parse()
.unwrap_or(5)
});
pub static HOT_TTL_SECS: Lazy<u64> = Lazy::new(|| {
env::var("MOMENTRY_SNAPSHOT_HOT_TTL_SECS")
.unwrap_or_else(|_| "86400".to_string())
.parse()
.unwrap_or(86400)
});
pub static WARM_TTL_SECS: Lazy<u64> = Lazy::new(|| {
env::var("MOMENTRY_SNAPSHOT_WARM_TTL_SECS")
.unwrap_or_else(|_| "604800".to_string())
.parse()
.unwrap_or(604800)
});
pub static GENERATING_TIMEOUT_SECS: Lazy<u64> = Lazy::new(|| {
env::var("MOMENTRY_SNAPSHOT_GENERATING_TIMEOUT")
.unwrap_or_else(|_| "1800".to_string())
.parse()
.unwrap_or(1800)
});
}
pub mod llm {
use super::*;
@@ -226,3 +190,15 @@ pub mod llm {
.unwrap_or(true)
});
}
pub mod tmdb {
use super::*;
pub static API_KEY: Lazy<Option<String>> = Lazy::new(|| env::var("TMDB_API_KEY").ok());
pub static PROBE_ENABLED: Lazy<bool> = Lazy::new(|| {
env::var("MOMENTRY_TMDB_PROBE_ENABLED")
.map(|v| v == "true" || v == "1")
.unwrap_or(false)
});
}

View File

@@ -342,9 +342,9 @@ pub struct MonitorJob {
pub progress_current: i32,
pub error_count: i32,
pub last_error: Option<String>,
pub started_at: Option<chrono::DateTime<chrono::Utc>>,
pub updated_at: Option<chrono::DateTime<chrono::Utc>>,
pub created_at: chrono::DateTime<chrono::Utc>,
pub started_at: Option<String>,
pub updated_at: Option<String>,
pub created_at: String,
pub processors: Vec<String>,
pub completed_processors: Vec<String>,
pub failed_processors: Vec<String>,
@@ -371,6 +371,7 @@ pub enum ProcessorType {
Asrx,
VisualChunk,
Scene,
Story,
}
impl ProcessorType {
@@ -385,6 +386,7 @@ impl ProcessorType {
ProcessorType::Asrx => "asrx",
ProcessorType::VisualChunk => "visual_chunk",
ProcessorType::Scene => "scene",
ProcessorType::Story => "story",
}
}
@@ -399,6 +401,7 @@ impl ProcessorType {
"asrx" => Some(ProcessorType::Asrx),
"visual_chunk" => Some(ProcessorType::VisualChunk),
"scene" => Some(ProcessorType::Scene),
"story" => Some(ProcessorType::Story),
_ => None,
}
}
@@ -415,6 +418,7 @@ impl ProcessorType {
ProcessorType::Asrx => 0.8,
ProcessorType::VisualChunk => 0.3,
ProcessorType::Scene => 0.3,
ProcessorType::Story => 0.1,
}
}
@@ -438,6 +442,7 @@ impl ProcessorType {
ProcessorType::Asrx => 2048,
ProcessorType::VisualChunk => 512,
ProcessorType::Scene => 512,
ProcessorType::Story => 256,
}
}
@@ -453,6 +458,7 @@ impl ProcessorType {
ProcessorType::Asrx => Some("speechbrain/ecapa-tdnn"),
ProcessorType::VisualChunk => None,
ProcessorType::Scene => Some("places365"),
ProcessorType::Story => None,
}
}
@@ -462,6 +468,13 @@ impl ProcessorType {
ProcessorType::Asrx => vec![ProcessorType::Asr],
ProcessorType::VisualChunk => vec![ProcessorType::Yolo],
ProcessorType::Scene => vec![ProcessorType::Cut],
ProcessorType::Story => vec![
ProcessorType::Asr,
ProcessorType::Asrx,
ProcessorType::Cut,
ProcessorType::Yolo,
ProcessorType::Face,
],
_ => vec![],
}
}
@@ -477,6 +490,7 @@ impl ProcessorType {
ProcessorType::Face,
ProcessorType::Pose,
ProcessorType::VisualChunk,
ProcessorType::Story,
]
}
}
@@ -1322,7 +1336,7 @@ impl PostgresDb {
INSERT INTO {} (uuid, video_path, status, video_id)
VALUES ($1, $2, 'pending', $3)
ON CONFLICT DO NOTHING
RETURNING id, uuid, video_path, status, current_processor, progress_total, progress_current, error_count, last_error, started_at, updated_at, created_at, processors, completed_processors, failed_processors, video_id
RETURNING id, uuid, video_path, status, current_processor, progress_total, progress_current, error_count, last_error, started_at::TEXT, updated_at::TEXT, created_at::TEXT, processors, completed_processors, failed_processors, video_id
"#,
jobs_table
)
@@ -1416,7 +1430,7 @@ impl PostgresDb {
let row = sqlx::query(
&format!(
r#"
SELECT id, uuid, video_path, status, current_processor, progress_total, progress_current, error_count, last_error, started_at, updated_at, created_at, processors, completed_processors, failed_processors, video_id
SELECT id, uuid, video_path, status, current_processor, progress_total, progress_current, error_count, last_error, started_at::TEXT, updated_at::TEXT, created_at::TEXT, processors, completed_processors, failed_processors, video_id
FROM {} WHERE uuid = $1
"#,
table
@@ -1462,7 +1476,7 @@ impl PostgresDb {
let rows = sqlx::query(
&format!(
r#"
SELECT id, uuid, video_path, status, current_processor, progress_total, progress_current, error_count, last_error, started_at, updated_at, created_at, processors, completed_processors, failed_processors, video_id
SELECT id, uuid, video_path, status, current_processor, progress_total, progress_current, error_count, last_error, started_at::TEXT, updated_at::TEXT, created_at::TEXT, processors, completed_processors, failed_processors, video_id
FROM {} WHERE status = $1 ORDER BY created_at DESC
"#,
table
@@ -2376,12 +2390,17 @@ impl PostgresDb {
offset: i64,
) -> Result<Vec<FileIdentityRecord>> {
let query = r#"
SELECT fi.id, fi.file_uuid, fi.identity_id, i.name, i.metadata,
fi.face_count, fi.speaker_count, fi.first_appearance, fi.last_appearance, fi.confidence
FROM file_identities fi
JOIN identities i ON fi.identity_id = i.id
WHERE fi.file_uuid = $1
ORDER BY fi.confidence DESC
SELECT 0 as id, fd.file_uuid, fd.identity_id::int4, i.name, i.metadata,
COUNT(*)::int4 as face_count,
0::int4 as speaker_count,
NULL::float8 as first_appearance,
NULL::float8 as last_appearance,
AVG(fd.confidence)::float8 as confidence
FROM face_detections fd
JOIN identities i ON fd.identity_id = i.id
WHERE fd.file_uuid = $1 AND fd.identity_id IS NOT NULL
GROUP BY fd.file_uuid, fd.identity_id, i.name, i.metadata
ORDER BY confidence DESC
LIMIT $2 OFFSET $3
"#;
@@ -2399,7 +2418,7 @@ impl PostgresDb {
let query = r#"
SELECT id, uuid, name, identity_type, source, status, metadata, reference_data,
voice_embedding, identity_embedding, face_embedding,
tmdb_id, tmdb_profile, created_at, updated_at
tmdb_id, tmdb_profile, created_at, NULL::timestamptz as updated_at
FROM identities
WHERE uuid = $1
"#;
@@ -2419,12 +2438,17 @@ impl PostgresDb {
offset: i64,
) -> Result<Vec<IdentityFileRecord>> {
let query = r#"
SELECT fi.file_uuid, v.file_name, v.file_path, v.status,
fi.face_count, fi.speaker_count, fi.first_appearance, fi.last_appearance, fi.confidence
FROM file_identities fi
JOIN videos v ON fi.file_uuid = v.file_uuid
WHERE fi.identity_id = (SELECT id FROM identities WHERE uuid = $1)
ORDER BY fi.last_appearance DESC NULLS LAST
SELECT fd.file_uuid, v.file_name, v.file_path, v.status,
COUNT(*)::int4 as face_count,
0::int4 as speaker_count,
NULL::float8 as first_appearance,
NULL::float8 as last_appearance,
AVG(fd.confidence)::float8 as confidence
FROM face_detections fd
JOIN videos v ON fd.file_uuid = v.file_uuid
WHERE fd.identity_id = (SELECT id FROM identities WHERE uuid = $1)
GROUP BY fd.file_uuid, v.file_name, v.file_path, v.status
ORDER BY MAX(fd.frame_number) DESC
LIMIT $2 OFFSET $3
"#;
@@ -2475,9 +2499,9 @@ impl PostgresDb {
c.start_time, c.end_time, c.text_content, c.content
FROM chunks c
WHERE c.file_uuid IN (
SELECT DISTINCT fi.file_uuid
FROM file_identities fi
JOIN identities i ON fi.identity_id = i.id
SELECT DISTINCT fd.file_uuid
FROM face_detections fd
JOIN identities i ON fd.identity_id = i.id
WHERE i.uuid = $1
)
ORDER BY c.start_time ASC
@@ -2495,42 +2519,12 @@ impl PostgresDb {
}
pub async fn confirm_candidate(&self, pre_chunk_id: i64, identity_id: Uuid) -> Result<()> {
// 1. Update the pre_chunk to link it to the identity
sqlx::query("UPDATE pre_chunks SET identity_id = $1 WHERE id = $2")
.bind(identity_id)
.bind(pre_chunk_id)
.execute(&self.pool)
.await?;
// 2. Ensure a link exists in file_identities table
// We need the file_uuid from the pre_chunk
let file_uuid: Option<Uuid> =
sqlx::query_scalar("SELECT file_uuid FROM pre_chunks WHERE id = $1")
.bind(pre_chunk_id)
.fetch_optional(&self.pool)
.await?;
if let Some(f_uuid) = file_uuid {
// Check if relationship exists
let exists: bool = sqlx::query_scalar(
"SELECT EXISTS(SELECT 1 FROM file_identities WHERE file_uuid = $1 AND identity_id = $2)"
)
.bind(f_uuid)
.bind(identity_id)
.fetch_one(&self.pool)
.await?;
if !exists {
sqlx::query(
"INSERT INTO file_identities (file_uuid, identity_id, status) VALUES ($1, $2, 'detected')"
)
.bind(f_uuid)
.bind(identity_id)
.execute(&self.pool)
.await?;
}
}
Ok(())
}
@@ -3889,7 +3883,7 @@ impl PostgresDb {
let rows = sqlx::query(&format!(
r#"
SELECT id, uuid, video_path, status, current_processor, progress_total, progress_current,
error_count, last_error, started_at, updated_at, created_at,
error_count, last_error, started_at::TEXT, updated_at::TEXT, created_at::TEXT,
processors, completed_processors, failed_processors, video_id
FROM {}
WHERE status = 'running'
@@ -3937,7 +3931,7 @@ impl PostgresDb {
let rows = sqlx::query(&format!(
r#"
SELECT id, uuid, video_path, status, current_processor, progress_total, progress_current,
error_count, last_error, started_at, updated_at, created_at,
error_count, last_error, started_at::TEXT, updated_at::TEXT, created_at::TEXT,
processors, completed_processors, failed_processors, video_id
FROM {}
WHERE status = 'pending'
@@ -3989,7 +3983,7 @@ impl PostgresDb {
let rows = sqlx::query(&format!(
r#"
SELECT id, uuid, video_path, status, current_processor, progress_total, progress_current,
error_count, last_error, started_at, updated_at, created_at,
error_count, last_error, started_at::TEXT, updated_at::TEXT, created_at::TEXT,
processors, completed_processors, failed_processors, video_id
FROM {}
WHERE status = 'running'
@@ -4198,6 +4192,30 @@ impl PostgresDb {
Ok(())
}
pub async fn reset_stale_processor_results(
&self,
status: ProcessorJobStatus,
error_message: &str,
) -> Result<u64> {
let table = schema::table_name("processor_results");
let rows = sqlx::query(&format!(
r#"
UPDATE {}
SET status = $1,
error_message = $2,
completed_at = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP
WHERE status = 'running'
"#,
table
))
.bind(status.as_str())
.bind(error_message)
.execute(&self.pool)
.await?;
Ok(rows.rows_affected())
}
pub async fn get_processor_results_by_job(&self, job_id: i32) -> Result<Vec<ProcessorResult>> {
let table = schema::table_name("processor_results");
let rows = sqlx::query(&format!(
@@ -4249,6 +4267,66 @@ impl PostgresDb {
Ok(results)
}
/// 取得同一個 file_uuid 下各 processor 的最新結果(跨 job
pub async fn get_latest_processor_results_by_file_uuid(
&self,
file_uuid: &str,
) -> Result<Vec<ProcessorResult>> {
let table = schema::table_name("processor_results");
let jobs_table = schema::table_name("monitor_jobs");
let rows = sqlx::query(&format!(
r#"
SELECT DISTINCT ON (pr.processor)
pr.id, pr.job_id, pr.processor, pr.status, pr.output_path,
pr.started_at::TEXT, pr.completed_at::TEXT, pr.error_message,
pr.progress_total, pr.progress_current, pr.last_checkpoint,
pr.created_at::TEXT, pr.updated_at::TEXT, pr.duration_secs,
pr.chunks_produced, pr.frames_processed, pr.output_size_bytes
FROM {} pr
JOIN {} mj ON pr.job_id = mj.id
WHERE mj.uuid = $1
ORDER BY pr.processor, pr.job_id DESC
"#,
table, jobs_table
))
.bind(file_uuid)
.fetch_all(&self.pool)
.await?;
let results: Vec<ProcessorResult> = rows
.into_iter()
.map(|r| {
let status_str: String = r.get(3);
let processor_type_str: String = r.get(2);
let started_at_str: Option<String> = r.get(5);
let completed_at_str: Option<String> = r.get(6);
let created_at_str: String = r.get(11);
let updated_at_str: Option<String> = r.get(12);
ProcessorResult {
id: r.get(0),
job_id: r.get(1),
processor_type: ProcessorType::from_db_str(&processor_type_str)
.unwrap_or(ProcessorType::Asr),
status: ProcessorJobStatus::from_db_str(&status_str)
.unwrap_or(ProcessorJobStatus::Pending),
started_at: started_at_str,
completed_at: completed_at_str,
duration_secs: r.get(13),
chunks_produced: r.get(14),
frames_processed: r.get(15),
output_size_bytes: r.get(16),
error_message: r.get(7),
output_data: None,
retry_count: 0,
created_at: created_at_str,
updated_at: updated_at_str.unwrap_or_default(),
}
})
.collect();
Ok(results)
}
pub async fn get_video_status(&self, uuid: &str) -> Result<Option<VideoStatus>> {
let table = schema::table_name("videos");
let result: Option<String> = sqlx::query_scalar(&format!(
@@ -4487,22 +4565,6 @@ impl PostgresDb {
Ok(())
}
/// 查詢機器 ID 對應的 Identity
pub async fn get_identity_by_binding(
&self,
binding_type: &str,
binding_value: &str,
) -> Result<Option<crate::core::person_identity::Identity>> {
let identity = sqlx::query_as::<_, crate::core::person_identity::Identity>(
"SELECT i.id, i.name, i.identity_embedding::text as embedding, i.metadata, i.created_at FROM identities i JOIN identity_bindings b ON i.id = b.identity_id WHERE b.identity_type = $1 AND b.identity_value = $2",
)
.bind(binding_type)
.bind(binding_value)
.fetch_optional(&self.pool)
.await?;
Ok(identity)
}
/// 列出所有 Identities
pub async fn list_identities(
&self,
@@ -4611,331 +4673,94 @@ impl PostgresDb {
Ok(results)
}
/// List all persons (Legacy / Global)
pub async fn list_all_persons(&self) -> Result<Vec<crate::api::who::WhoIdentity>> {
let rows = sqlx::query_as::<_, (i32, String, String, Option<serde_json::Value>)>(
"SELECT id, uuid, name, metadata FROM video_identities ORDER BY created_at DESC",
)
.fetch_all(&self.pool)
.await?;
let mut results = Vec::new();
for (id, uuid, name, meta) in rows {
if let Some(identity) = self.build_who_identity(&uuid, id, name, meta).await? {
results.push(identity);
}
}
Ok(results)
}
/// Get Who info by Chunk ID (Parent or Child)
pub async fn get_who_info_by_chunk(
/// Face clustering: group unregistered faces within same trace by embedding similarity
pub async fn cluster_face_embeddings(
&self,
uuid: &str,
chunk_id: &str,
) -> Result<serde_json::Value> {
// 1. Find Chunk Time Range
// Check Parent
let mut res = sqlx::query_as::<_, (f64, f64)>(
"SELECT start_time, end_time FROM parent_chunks WHERE uuid = $1 AND id::text = $2",
)
.bind(uuid)
.bind(chunk_id)
.fetch_optional(&self.pool)
.await?;
// If not found, Check Child
if res.is_none() {
res = sqlx::query_as::<_, (f64, f64)>(
"SELECT start_time, end_time FROM child_chunks WHERE uuid = $1 AND id::text = $2",
)
.bind(uuid)
.bind(chunk_id)
.fetch_optional(&self.pool)
.await?;
}
let (start, end) = match res {
Some(t) => t,
None => return Ok(serde_json::json!({ "error": "Chunk not found" })),
};
// 2. Aggregate Face & Speaker IDs within Time Range
let faces: Vec<String> = sqlx::query_scalar(
"SELECT DISTINCT unnest(face_ids) FROM child_chunks WHERE uuid = $1 AND start_time >= $2 AND end_time <= $3 AND face_ids IS NOT NULL AND face_ids <> '{}'"
)
.bind(uuid)
.bind(start)
.bind(end)
.fetch_all(&self.pool)
.await?;
let speakers: Vec<String> = sqlx::query_scalar(
"SELECT DISTINCT unnest(speaker_ids) FROM child_chunks WHERE uuid = $1 AND start_time >= $2 AND end_time <= $3 AND speaker_ids IS NOT NULL AND speaker_ids <> '{}'"
)
.bind(uuid)
.bind(start)
.bind(end)
.fetch_all(&self.pool)
.await?;
// 3. Resolve to Person IDs
let mut persons: std::collections::HashMap<String, String> =
std::collections::HashMap::new(); // Map ID -> Name
// Resolve Faces
for fid in &faces {
if let Some(identity) = self.find_person_by_binding(uuid, "face", fid).await? {
persons.insert(format!("face_{}", fid), identity.name);
}
}
// Resolve Speakers
for sid in &speakers {
if let Some(identity) = self.find_person_by_binding(uuid, "speaker", sid).await? {
persons.insert(format!("speaker_{}", sid), identity.name);
}
}
Ok(serde_json::json!({
"uuid": uuid,
"chunk_id": chunk_id,
"time_range": { "start": start, "end": end },
"raw_ids": {
"face_ids": faces,
"speaker_ids": speakers
},
"resolved_persons": persons
}))
}
/// Create or Update Person (Define Identity - Video Scoped)
pub async fn create_or_update_person(
&self,
uuid: &str,
identity_id: Option<i32>, // If None, create new
name: String,
face_ids: Vec<String>,
speaker_ids: Vec<String>,
) -> Result<crate::api::who::WhoIdentity> {
let final_id = if let Some(id) = identity_id {
// Update Name (Scoped to UUID check implicit by ID, but let's be safe)
sqlx::query("UPDATE video_identities SET name = $1 WHERE id = $2 AND uuid = $3")
.bind(&name)
.bind(id)
.bind(uuid)
.execute(&self.pool)
.await?;
id
} else {
// Create New
let id = sqlx::query(
"INSERT INTO video_identities (uuid, name) VALUES ($1, $2) RETURNING id",
)
.bind(uuid)
.bind(&name)
.fetch_one(&self.pool)
.await?;
id.get(0)
};
// Clear old bindings for THIS identity
sqlx::query("DELETE FROM identity_bindings WHERE identity_id = $1")
.bind(final_id)
.execute(&self.pool)
.await?;
// Bind Faces
for fid in face_ids {
if !fid.is_empty() {
sqlx::query("INSERT INTO identity_bindings (identity_id, uuid, binding_type, binding_value) VALUES ($1, $2, 'face', $3) ON CONFLICT (uuid, binding_type, binding_value) DO UPDATE SET identity_id = EXCLUDED.identity_id")
.bind(final_id)
.bind(uuid)
.bind(&fid)
.execute(&self.pool)
.await?;
}
}
// Bind Speakers
for sid in speaker_ids {
if !sid.is_empty() {
sqlx::query("INSERT INTO identity_bindings (identity_id, uuid, binding_type, binding_value) VALUES ($1, $2, 'speaker', $3) ON CONFLICT (uuid, binding_type, binding_value) DO UPDATE SET identity_id = EXCLUDED.identity_id")
.bind(final_id)
.bind(uuid)
.bind(&sid)
.execute(&self.pool)
.await?;
}
}
self.get_person_by_id(uuid, final_id)
.await?
.ok_or_else(|| anyhow::anyhow!("Failed to retrieve created person"))
}
/// Get Person by ID (Video Scoped)
pub async fn get_person_by_id(
&self,
uuid: &str,
id: i32,
) -> Result<Option<crate::api::who::WhoIdentity>> {
let res = sqlx::query_as::<_, (i32, String, Option<serde_json::Value>)>(
"SELECT id, name, metadata FROM video_identities WHERE id = $1 AND uuid = $2",
)
.bind(id)
.bind(uuid)
.fetch_optional(&self.pool)
.await?;
match res {
Some((id, name, meta)) => self.build_who_identity(uuid, id, name, meta).await,
None => Ok(None),
}
}
pub async fn list_unbound_signals(
&self,
uuid: &str,
binding_type: &str,
) -> Result<Vec<String>> {
let column = if binding_type == "face" {
"face_ids"
} else {
"speaker_ids"
};
let query = format!(
file_uuid: &str,
similarity_threshold: f64,
) -> Result<Vec<FaceClusterGroup>> {
let table = schema::table_name("face_detections");
let rows = sqlx::query_as::<_, (String, i64)>(&format!(
r#"
SELECT DISTINCT unnest({})::varchar as signal_val
FROM chunks
WHERE uuid = $1 AND {} IS NOT NULL AND {} <> '{{}}'
EXCEPT
SELECT identity_value FROM identity_bindings WHERE identity_type = $2
ORDER BY signal_val
"#,
column, column, column
);
SELECT trace_id::text, COUNT(DISTINCT frame_number) as frame_count
FROM {}
WHERE file_uuid = $1
AND embedding IS NOT NULL
AND identity_id IS NULL
GROUP BY trace_id
ORDER BY frame_count DESC
"#,
table
))
.bind(file_uuid)
.fetch_all(&self.pool)
.await?;
let rows: Vec<(String,)> = sqlx::query_as(&query)
.bind(uuid)
.bind(binding_type)
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter().map(|r| r.0).collect())
}
/// 獲取包含特定信號的所有 Chunk (用於標註驗證)
pub async fn get_chunks_by_signal(
&self,
uuid: &str,
binding_type: &str,
signal_value: &str,
) -> Result<Vec<serde_json::Value>> {
let column = if binding_type == "face" {
"face_ids"
} else {
"speaker_ids"
};
let query = format!(
"SELECT id, start_frame, end_frame, content FROM chunks WHERE file_uuid = $1 AND $2::text = ANY({}::text[]) ORDER BY start_frame",
column
);
let rows = sqlx::query(&query)
.bind(uuid)
.bind(signal_value)
.fetch_all(&self.pool)
.await?;
let chunks: Vec<serde_json::Value> = rows
Ok(rows
.into_iter()
.map(|r| {
let content: Option<serde_json::Value> = r.get(3);
let mut result = serde_json::json!({
"id": r.get::<i32, _>(0),
"start_frame": r.get::<i64, _>(1),
"end_frame": r.get::<i64, _>(2),
});
if let Some(c) = content {
result["content"] = c;
}
result
.map(|(trace_id, frame_count)| FaceClusterGroup {
trace_id,
frame_count: frame_count as i32,
})
.collect();
Ok(chunks)
.collect())
}
// ==========================================
// Who API Helpers
// ==========================================
/// Helper to find person by binding
pub async fn find_person_by_binding(
/// Search similar faces by embedding via pgvector cosine distance
pub async fn search_similar_faces(
&self,
uuid: &str,
bind_type: &str,
bind_value: &str,
) -> Result<Option<crate::api::who::WhoIdentity>> {
let res = sqlx::query_as::<_, (i32, String, Option<serde_json::Value>)>(
"SELECT vi.id, vi.name, vi.metadata
FROM video_identities vi
JOIN identity_bindings ib ON vi.id = ib.identity_id
WHERE vi.uuid = $1 AND ib.binding_type = $2 AND ib.binding_value = $3",
)
.bind(uuid)
.bind(bind_type)
.bind(bind_value)
.fetch_optional(&self.pool)
.await?;
match res {
Some((id, name, meta)) => self.build_who_identity(uuid, id, name, meta).await,
None => Ok(None),
}
}
/// Helper to build full WhoIdentity with all linked IDs
pub async fn build_who_identity(
&self,
uuid: &str,
id: i32,
name: String,
meta: Option<serde_json::Value>,
) -> Result<Option<crate::api::who::WhoIdentity>> {
// Fetch all Face IDs
let face_rows = sqlx::query_as::<_, (String,)>(
"SELECT binding_value FROM identity_bindings WHERE identity_id = $1 AND binding_type = 'face'"
)
.bind(id)
query_embedding: &[f32],
file_uuid: &str,
limit: i64,
threshold: f64,
) -> Result<Vec<SimilarFaceResult>> {
let table = schema::table_name("face_detections");
let rows = sqlx::query_as::<_, (i32, i32, f64, serde_json::Value)>(&format!(
r#"
SELECT id, trace_id,
1 - (embedding <=> $1::vector) as similarity,
bbox
FROM {}
WHERE file_uuid = $2
AND embedding IS NOT NULL
AND 1 - (embedding <=> $1::vector) >= $3
ORDER BY embedding <=> $1::vector
LIMIT $4
"#,
table
))
.bind(query_embedding)
.bind(file_uuid)
.bind(threshold)
.bind(limit)
.fetch_all(&self.pool)
.await?;
let face_ids: Vec<String> = face_rows.into_iter().map(|r| r.0).collect();
// Fetch all Speaker IDs
let speaker_rows = sqlx::query_as::<_, (String,)>(
"SELECT binding_value FROM identity_bindings WHERE identity_id = $1 AND binding_type = 'speaker'"
)
.bind(id)
.fetch_all(&self.pool)
.await?;
let speaker_ids: Vec<String> = speaker_rows.into_iter().map(|r| r.0).collect();
Ok(Some(crate::api::who::WhoIdentity {
identity_id: id,
uuid: uuid.to_string(),
name,
tags: meta
.and_then(|m| m.get("tags").cloned())
.and_then(|v| serde_json::from_value(v).ok()),
face_ids,
speaker_ids,
}))
Ok(rows
.into_iter()
.map(|(id, trace_id, similarity, bbox)| SimilarFaceResult {
id,
trace_id,
similarity,
bbox: bbox.to_string(),
})
.collect())
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct FaceClusterGroup {
pub trace_id: String,
pub frame_count: i32,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct SimilarFaceResult {
pub id: i32,
pub trace_id: i32,
pub similarity: f64,
pub bbox: String,
}
#[async_trait]
impl Database for PostgresDb {
async fn init() -> Result<Self> {

View File

@@ -7,10 +7,10 @@ use std::collections::HashMap;
use super::{Database, SearchResult, VectorStore};
pub struct QdrantDb {
client: Client,
base_url: String,
api_key: String,
collection_name: String,
pub(crate) client: Client,
pub(crate) base_url: String,
pub(crate) api_key: String,
pub(crate) collection_name: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -566,6 +566,91 @@ impl QdrantDb {
let result: CollectionInfo = response.json().await?;
Ok(result.result.points_count)
}
/// Store face embedding with trace_id + frame_number payload
pub async fn upsert_face_embedding(
&self,
point_id: u64,
vector: &[f32],
file_uuid: &str,
trace_id: i32,
frame_number: i64,
) -> Result<()> {
let url = format!(
"{}/collections/{}/points?wait=true",
self.base_url, self.collection_name
);
let mut payload_map = std::collections::HashMap::new();
payload_map.insert("file_uuid".to_string(), serde_json::json!(file_uuid));
payload_map.insert("trace_id".to_string(), serde_json::json!(trace_id));
payload_map.insert("frame_number".to_string(), serde_json::json!(frame_number));
payload_map.insert("type".to_string(), serde_json::json!("face_embedding"));
let point = serde_json::json!({
"points": [{
"id": point_id,
"vector": vector,
"payload": payload_map
}]
});
let resp = self
.client
.put(&url)
.header("api-key", &self.api_key)
.json(&point)
.send()
.await?;
if !resp.status().is_success() {
anyhow::bail!("Qdrant upsert face failed: {}", resp.status());
}
Ok(())
}
/// Store chunk embedding with parent-child metadata
pub async fn upsert_chunk_embedding(
&self,
point_id: &str,
vector: &[f32],
chunk_type: &str,
file_uuid: &str,
text: &str,
parent_chunk_id: Option<&str>,
start_time: f64,
end_time: f64,
) -> Result<()> {
let url = format!(
"{}/collections/{}/points?wait=true",
self.base_url, self.collection_name
);
let mut payload_map = std::collections::HashMap::new();
payload_map.insert("file_uuid".to_string(), serde_json::json!(file_uuid));
payload_map.insert("chunk_type".to_string(), serde_json::json!(chunk_type));
payload_map.insert("text".to_string(), serde_json::json!(text));
payload_map.insert("start_time".to_string(), serde_json::json!(start_time));
payload_map.insert("end_time".to_string(), serde_json::json!(end_time));
if let Some(pid) = parent_chunk_id {
payload_map.insert("parent_chunk_id".to_string(), serde_json::json!(pid));
}
let point = serde_json::json!({
"points": [{
"id": point_id,
"vector": vector,
"payload": payload_map
}]
});
let resp = self
.client
.put(&url)
.header("api-key", &self.api_key)
.json(&point)
.send()
.await?;
if !resp.status().is_success() {
anyhow::bail!("Qdrant upsert chunk failed: {}", resp.status());
}
Ok(())
}
}
#[async_trait]
@@ -595,3 +680,46 @@ impl VectorStore for QdrantDb {
self.search(query_vector, limit).await
}
}
/// Sync face embeddings from PostgreSQL to Qdrant for ANN search
pub async fn sync_face_embeddings(file_uuid: &str) -> Result<()> {
use crate::core::config::DATABASE_URL;
use sqlx::Row;
let pool = sqlx::PgPool::connect(&DATABASE_URL).await?;
let schema = crate::core::config::DATABASE_SCHEMA.as_str();
let table = crate::core::db::schema::table_name("face_detections");
let qdrant: QdrantDb = QdrantDb::new();
let query = format!(
"SELECT id, trace_id, frame_number, embedding FROM {}.{} WHERE file_uuid = $1 AND embedding IS NOT NULL",
schema, table
);
let rows = sqlx::query(&query).bind(file_uuid).fetch_all(&pool).await?;
let mut count = 0u64;
for row in &rows {
let id: i32 = row.get(0);
let trace_id: Option<i32> = row.get(1);
let frame_number: i64 = row.get(2);
let embedding: Option<Vec<f32>> = row.get(3);
if let (Some(emb), Some(tid)) = (embedding, trace_id) {
if let Err(e) = qdrant
.upsert_face_embedding(id as u64, &emb, file_uuid, tid, frame_number)
.await
{
tracing::warn!("Qdrant upsert failed for face {}: {}", id, e);
continue;
}
count += 1;
}
}
tracing::info!(
"Synced {} face embeddings to Qdrant for {}",
count,
file_uuid
);
Ok(())
}

View File

@@ -4,8 +4,6 @@ use std::sync::Arc;
use tokio::sync::RwLock;
use super::Database;
use crate::core::config;
use crate::core::storage::snapshot_manager::SnapshotTier;
pub struct RedisDb {
#[allow(dead_code)]
@@ -30,14 +28,6 @@ pub struct Job {
pub updated_at: String,
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct SnapshotCacheEntry {
pub hits: u64,
pub last_access: i64,
pub status: String,
pub progress: Option<f32>,
}
impl RedisDb {
pub async fn push_job(&self, _job: &Job) -> Result<()> {
Ok(())
@@ -59,76 +49,6 @@ impl RedisDb {
pub async fn publish_event(&self, _channel: &str, _message: &str) -> Result<()> {
Ok(())
}
// --- Snapshot Cache Methods ---
pub async fn increment_snapshot_hits(&self, _file_uuid: &str) -> Result<u64> {
// TODO: Redis HINCRBY snapshot:hits:{uuid}
Ok(0)
}
pub async fn get_snapshot_hits(&self, _file_uuid: &str) -> Result<u64> {
// TODO: Redis GET snapshot:hits:{uuid}
Ok(0)
}
pub async fn update_last_access(&self, _file_uuid: &str) -> Result<()> {
// TODO: Redis SET snapshot:last_access:{uuid} = now EX 7d
Ok(())
}
pub async fn set_snapshot_status(
&self,
_file_uuid: &str,
status: &str,
progress: Option<f32>,
) -> Result<()> {
// TODO: Redis SET snapshot:status:{uuid} = {status, progress} EX 30m
let _ = (status, progress);
Ok(())
}
pub async fn get_snapshot_status(&self, _file_uuid: &str) -> Result<SnapshotCacheEntry> {
// TODO: Redis GET snapshot:status:{uuid}
Ok(SnapshotCacheEntry {
hits: 0,
last_access: 0,
status: "cold".to_string(),
progress: None,
})
}
pub async fn clear_snapshot_status(&self, _file_uuid: &str) -> Result<()> {
// TODO: Redis DEL snapshot:status:{uuid}
Ok(())
}
pub async fn set_migrate_hint(
&self,
_file_uuid: &str,
_parent_uuid: &str,
_count: u64,
) -> Result<()> {
// TODO: Redis SET snapshot:migrate_hint:{uuid}
Ok(())
}
pub async fn get_migrate_hint(&self, _file_uuid: &str) -> Result<Option<(String, u64)>> {
// TODO: Redis GET snapshot:migrate_hint:{uuid}
Ok(None)
}
pub fn compute_status_ttl(&self, tier: SnapshotTier) -> u64 {
match tier {
SnapshotTier::Hot => *config::snapshot::HOT_TTL_SECS,
SnapshotTier::Warm => *config::snapshot::WARM_TTL_SECS,
SnapshotTier::Cold => 0,
}
}
pub fn generating_timeout() -> u64 {
*config::snapshot::GENERATING_TIMEOUT_SECS
}
}
#[async_trait]

View File

@@ -4,6 +4,7 @@ pub mod chunk;
pub mod config;
pub mod db;
pub mod embedding;
pub mod frame_cache;
pub mod ingestion;
pub mod llm;
pub mod overlay;

View File

@@ -56,18 +56,20 @@ pub struct IdentityBinding {
/// 綁定請求 (用於 API)
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct BindIdentityRequest {
pub identity_id: Option<i64>,
pub name: Option<String>, // 若未提供 identity_id則建立新 Identity
pub binding_type: String, // 'face' 或 'speaker'
pub binding_value: String, // e.g. "face_1"
pub source: Option<String>, // 預設 'manual'
pub file_uuid: String,
pub face_id: String,
}
/// 解綁請求
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct UnbindIdentityRequest {
pub binding_type: String,
pub binding_value: String,
pub file_uuid: String,
pub face_id: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct MergeIdentitiesRequest {
pub into_uuid: String,
pub keep_history: Option<bool>,
}
/// 建議綁定請求 (由系統自動產生,人工確認)

View File

@@ -28,7 +28,6 @@ pub struct Face {
pub width: i32,
pub height: i32,
pub confidence: f32,
#[serde(skip_serializing)]
pub embedding: Option<Vec<f32>>,
pub landmarks: Option<Vec<Vec<f32>>>,
pub attributes: Option<FaceAttributes>,

View File

@@ -8,7 +8,6 @@ pub mod face_recognition;
pub mod ocr;
pub mod pose;
pub mod scene_classification;
pub mod snapshot_agent;
pub mod story;
pub mod visual_chunk;
pub mod yolo;
@@ -30,7 +29,6 @@ pub use scene_classification::{
load_scene_from_file, process_scene_classification, SceneClassificationResult, ScenePrediction,
SceneSegment,
};
pub use snapshot_agent::{SnapshotAgent, SnapshotAgentConfig};
pub use story::{process_story, StoryChildChunk, StoryParentChunk, StoryResult, StoryStats};
pub use visual_chunk::{process_visual_chunk, process_visual_chunk_advanced, VisualChunkResult};
pub use yolo::{process_yolo, YoloFrame, YoloObject, YoloResult};

View File

@@ -1,491 +0,0 @@
use std::path::Path;
use std::process::Command;
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use tracing::{info, warn};
use crate::core::config;
use crate::core::db::{Database, PostgresDb};
use crate::core::storage::snapshot_manager::SnapshotManager;
pub struct SnapshotAgentConfig {
pub output_dir: String,
pub hot_threshold: u64,
}
impl Default for SnapshotAgentConfig {
fn default() -> Self {
Self {
output_dir: config::OUTPUT_DIR.clone(),
hot_threshold: *config::snapshot::HOT_THRESHOLD,
}
}
}
#[derive(Debug, Serialize, Deserialize)]
struct FaceDetection {
id: i32,
file_uuid: String,
frame_number: i64,
confidence: f64,
bbox: Option<serde_json::Value>,
}
#[derive(Debug, Serialize, Deserialize)]
struct VideoInfo {
file_path: String,
fps: f64,
}
pub struct SnapshotAgent {
config: SnapshotAgentConfig,
manager: SnapshotManager,
}
impl SnapshotAgent {
pub fn new(config: SnapshotAgentConfig) -> Self {
let manager = SnapshotManager::new(&config.output_dir);
Self { config, manager }
}
pub fn default() -> Self {
Self::new(SnapshotAgentConfig::default())
}
pub async fn generate_file_snapshots(
&self,
file_uuid: &str,
snapshot_type: &str,
) -> Result<()> {
info!(
"Starting snapshot generation: file_uuid={}, type={}",
file_uuid, snapshot_type
);
let db = PostgresDb::init()
.await
.context("Failed to connect to database")?;
let video_info = self
.get_video_info(db.pool(), file_uuid)
.await
.context("Failed to get video info")?;
self.manager
.ensure_file_dirs(file_uuid)
.context("Failed to create snapshot directories")?;
match snapshot_type {
"faces" => {
self.extract_face_snapshots(db.pool(), file_uuid, &video_info)
.await?
}
"ocr" => {
self.extract_ocr_snapshots(db.pool(), file_uuid, &video_info)
.await?
}
"logos" => {
self.extract_logo_snapshots(db.pool(), file_uuid, &video_info)
.await?
}
"products" => {
self.extract_product_snapshots(db.pool(), file_uuid, &video_info)
.await?
}
_ => {
return Err(anyhow::anyhow!(
"Unsupported snapshot type: {}",
snapshot_type
))
}
}
info!(
"Snapshot generation completed: file_uuid={}, type={}",
file_uuid, snapshot_type
);
Ok(())
}
async fn extract_face_snapshots(
&self,
pool: &sqlx::PgPool,
file_uuid: &str,
video_info: &VideoInfo,
) -> Result<()> {
let face_table = crate::core::db::schema::table_name("face_detections");
let query = format!(
"SELECT id, face_id, file_uuid, frame_number, confidence, bbox
FROM {}
WHERE file_uuid = $1 AND confidence >= 0.5
ORDER BY confidence DESC
LIMIT 50",
face_table
);
let faces: Vec<(i32, String, i64, f64, Option<serde_json::Value>)> = sqlx::query_as(&query)
.bind(file_uuid)
.fetch_all(pool)
.await
.context("Failed to query face detections")?;
let output_dir = self.manager.file_type_dir(file_uuid, "faces");
let mut saved_count = 0;
for (face_id_db, _uuid, frame_num, confidence, bbox_json) in faces {
let bbox = match bbox_json {
Some(json) => serde_json::from_value::<Bbox>(json).unwrap_or_default(),
None => Bbox::default(),
};
let timestamp = frame_num as f64 / video_info.fps;
let output_path =
output_dir.join(format!("face_{}_conf{:.2}.jpg", face_id_db, confidence));
if self
.extract_frame(&video_info.file_path, timestamp, &bbox, &output_path)
.await
.is_ok()
{
saved_count += 1;
}
}
info!(
"Extracted {} face snapshots for file_uuid={}",
saved_count, file_uuid
);
Ok(())
}
async fn extract_ocr_snapshots(
&self,
pool: &sqlx::PgPool,
file_uuid: &str,
video_info: &VideoInfo,
) -> Result<()> {
let ocr_table = crate::core::db::schema::table_name("ocr_detections");
let query = format!(
"SELECT id, frame_number, text, bbox, confidence
FROM {}
WHERE file_uuid = $1 AND confidence >= 0.7
ORDER BY confidence DESC
LIMIT 30",
ocr_table
);
let detections: Vec<(i32, i64, String, Option<serde_json::Value>, f64)> =
sqlx::query_as(&query)
.bind(file_uuid)
.fetch_all(pool)
.await
.context("Failed to query OCR detections")?;
let output_dir = self.manager.file_type_dir(file_uuid, "ocr");
let mut saved_count = 0;
for (det_id, frame_num, text, bbox_json, _confidence) in detections {
let bbox = match bbox_json {
Some(json) => serde_json::from_value::<Bbox>(json).unwrap_or_default(),
None => Bbox::default(),
};
let timestamp = frame_num as f64 / video_info.fps;
let safe_text = text
.chars()
.take(20)
.filter(|c| c.is_alphanumeric() || *c == ' ')
.collect::<String>()
.replace(' ', "_");
let output_path = output_dir.join(format!("ocr_{}_{}.jpg", det_id, safe_text));
if self
.extract_frame(&video_info.file_path, timestamp, &bbox, &output_path)
.await
.is_ok()
{
saved_count += 1;
}
}
info!(
"Extracted {} OCR snapshots for file_uuid={}",
saved_count, file_uuid
);
Ok(())
}
async fn extract_logo_snapshots(
&self,
pool: &sqlx::PgPool,
file_uuid: &str,
video_info: &VideoInfo,
) -> Result<()> {
let yolo_table = crate::core::db::schema::table_name("yolo_detections");
let query = format!(
"SELECT id, frame_number, class_name, bbox, confidence
FROM {}
WHERE file_uuid = $1 AND class_name IN ('logo', 'brand') AND confidence >= 0.6
ORDER BY confidence DESC
LIMIT 20",
yolo_table
);
let detections: Vec<(i32, i64, String, Option<serde_json::Value>, f64)> =
sqlx::query_as(&query)
.bind(file_uuid)
.fetch_all(pool)
.await
.context("Failed to query logo detections")?;
let output_dir = self.manager.file_type_dir(file_uuid, "logos");
let mut saved_count = 0;
for (det_id, frame_num, class_name, bbox_json, confidence) in detections {
let bbox = match bbox_json {
Some(json) => serde_json::from_value::<Bbox>(json).unwrap_or_default(),
None => Bbox::default(),
};
let timestamp = frame_num as f64 / video_info.fps;
let output_path = output_dir.join(format!(
"logo_{}_{}_{:.2}.jpg",
det_id, class_name, confidence
));
if self
.extract_frame(&video_info.file_path, timestamp, &bbox, &output_path)
.await
.is_ok()
{
saved_count += 1;
}
}
info!(
"Extracted {} logo snapshots for file_uuid={}",
saved_count, file_uuid
);
Ok(())
}
async fn extract_product_snapshots(
&self,
pool: &sqlx::PgPool,
file_uuid: &str,
video_info: &VideoInfo,
) -> Result<()> {
let yolo_table = crate::core::db::schema::table_name("yolo_detections");
let query = format!(
"SELECT id, frame_number, class_name, bbox, confidence
FROM {}
WHERE file_uuid = $1 AND class_name NOT IN ('logo', 'brand', 'person', 'face') AND confidence >= 0.6
ORDER BY confidence DESC
LIMIT 20",
yolo_table
);
let detections: Vec<(i32, i64, String, Option<serde_json::Value>, f64)> =
sqlx::query_as(&query)
.bind(file_uuid)
.fetch_all(pool)
.await
.context("Failed to query product detections")?;
let output_dir = self.manager.file_type_dir(file_uuid, "products");
let mut saved_count = 0;
for (det_id, frame_num, class_name, bbox_json, confidence) in detections {
let bbox = match bbox_json {
Some(json) => serde_json::from_value::<Bbox>(json).unwrap_or_default(),
None => Bbox::default(),
};
let timestamp = frame_num as f64 / video_info.fps;
let output_path = output_dir.join(format!(
"product_{}_{}_{:.2}.jpg",
det_id, class_name, confidence
));
if self
.extract_frame(&video_info.file_path, timestamp, &bbox, &output_path)
.await
.is_ok()
{
saved_count += 1;
}
}
info!(
"Extracted {} product snapshots for file_uuid={}",
saved_count, file_uuid
);
Ok(())
}
async fn extract_frame(
&self,
video_path: &str,
timestamp: f64,
bbox: &Bbox,
output_path: &Path,
) -> Result<()> {
let crop_filter = format!("crop={}:{}:{}:{}", bbox.width, bbox.height, bbox.x, bbox.y);
let output = Command::new("ffmpeg")
.args(&[
"-ss",
&format!("{:.3}", timestamp),
"-i",
video_path,
"-vf",
&crop_filter,
"-frames:v",
"1",
"-f",
"image2",
"-y",
output_path.to_str().context("Invalid output path")?,
])
.output()
.context("Failed to execute ffmpeg")?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(anyhow::anyhow!("ffmpeg failed: {}", stderr));
}
Ok(())
}
async fn get_video_info(&self, pool: &sqlx::PgPool, file_uuid: &str) -> Result<VideoInfo> {
let video_table = crate::core::db::schema::table_name("videos");
let query = format!(
"SELECT file_path, fps FROM {} WHERE file_uuid = $1",
video_table
);
let row: Option<(String, f64)> = sqlx::query_as(&query)
.bind(file_uuid)
.fetch_optional(pool)
.await
.context("Failed to query video info")?;
match row {
Some((file_path, fps)) => Ok(VideoInfo { file_path, fps }),
None => Err(anyhow::anyhow!("Video not found: file_uuid={}", file_uuid)),
}
}
pub async fn migrate_snapshots(
&self,
new_uuid: &str,
parent_uuid: &str,
) -> Result<Vec<String>> {
info!(
"Starting snapshot migration: {} -> {}",
parent_uuid, new_uuid
);
self.manager
.ensure_file_dirs(new_uuid)
.context("Failed to create snapshot directories")?;
let parent_types = self.manager.list_snapshot_types(parent_uuid);
let mut migrated = Vec::new();
for snap_type in &parent_types {
let src = self.manager.file_type_dir(parent_uuid, snap_type);
let dst = self.manager.file_type_dir(new_uuid, snap_type);
if src.exists() {
if let Err(e) = copy_dir_recursive(&src, &dst) {
warn!("Failed to migrate {} snapshots: {}", snap_type, e);
} else {
migrated.push(snap_type.clone());
info!(
"Migrated {} snapshots: {} -> {}",
snap_type, parent_uuid, new_uuid
);
}
}
}
info!("Migration completed: {} types migrated", migrated.len());
Ok(migrated)
}
pub async fn auto_tear_down(&self, file_uuid: &str) -> Result<()> {
info!("Starting auto tear down for file_uuid={}", file_uuid);
let types = self.manager.list_snapshot_types(file_uuid);
let hits = types.len() as u64;
let tier = SnapshotManager::compute_tier(hits);
if tier != crate::core::storage::snapshot_manager::SnapshotTier::Cold {
info!(
"Skipping tear down: file_uuid={} is not Cold (tier={:?})",
file_uuid, tier
);
return Ok(());
}
let redis_cache = crate::core::cache::redis_cache::RedisCache::new()
.context("Failed to create Redis cache")?;
let last_access = redis_cache.get_snapshot_hits(file_uuid).await.unwrap_or(0);
if last_access > 0 {
info!(
"Skipping tear down: file_uuid={} has recent access (hits={})",
file_uuid, last_access
);
return Ok(());
}
self.manager
.remove_file_snapshots(file_uuid)
.context("Failed to remove snapshot files")?;
let status_key =
crate::core::cache::redis_cache::RedisCache::snapshot_status_key(file_uuid).await;
let hits_key =
crate::core::cache::redis_cache::RedisCache::snapshot_hits_key(file_uuid).await;
let access_key =
crate::core::cache::redis_cache::RedisCache::snapshot_last_access_key(file_uuid).await;
redis_cache.delete(&status_key).await.ok();
redis_cache.delete(&hits_key).await.ok();
redis_cache.delete(&access_key).await.ok();
info!("Auto tear down completed for file_uuid={}", file_uuid);
Ok(())
}
pub fn manager(&self) -> &SnapshotManager {
&self.manager
}
}
#[derive(Debug, Default, Serialize, Deserialize)]
struct Bbox {
x: i32,
y: i32,
width: i32,
height: i32,
}
fn copy_dir_recursive(src: &Path, dst: &Path) -> std::io::Result<()> {
std::fs::create_dir_all(dst)?;
for entry in std::fs::read_dir(src)? {
let entry = entry?;
let path = entry.path();
let dest_path = dst.join(entry.file_name());
if path.is_dir() {
copy_dir_recursive(&path, &dest_path)?;
} else {
std::fs::copy(&path, &dest_path)?;
}
}
Ok(())
}

View File

@@ -1,9 +1,7 @@
pub mod file_manager;
pub mod output_dir;
pub mod snapshot_manager;
pub mod uuid;
pub use file_manager::FileManager;
pub use output_dir::OutputDir;
pub use snapshot_manager::SnapshotManager;
pub use uuid::compute_uuid;

View File

@@ -1,268 +0,0 @@
use std::path::{Path, PathBuf};
use crate::core::config;
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SnapshotTier {
Hot,
Warm,
Cold,
}
impl std::fmt::Display for SnapshotTier {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SnapshotTier::Hot => write!(f, "hot"),
SnapshotTier::Warm => write!(f, "warm"),
SnapshotTier::Cold => write!(f, "cold"),
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SnapshotStatus {
pub file_uuid: String,
pub tier: SnapshotTier,
pub hits: u64,
pub types: Vec<String>,
pub generated_at: Option<String>,
}
#[derive(Debug, Clone)]
pub struct SnapshotManager {
base_dir: PathBuf,
}
impl SnapshotManager {
pub fn new(user_dir: &str) -> Self {
let snapshot_dir_name = config::snapshot::SNAPSHOT_DIR_NAME.as_str();
let base_dir = Path::new(user_dir).join(snapshot_dir_name);
Self { base_dir }
}
pub fn base_dir(&self) -> &Path {
&self.base_dir
}
pub fn file_snapshot_dir(&self, file_uuid: &str) -> PathBuf {
self.base_dir.join(file_uuid)
}
pub fn file_type_dir(&self, file_uuid: &str, snapshot_type: &str) -> PathBuf {
self.base_dir.join(file_uuid).join(snapshot_type)
}
pub fn identity_snapshot_dir(&self, identity_uuid: &str) -> PathBuf {
self.base_dir.join("identities").join(identity_uuid)
}
pub fn identity_face_dir(&self, identity_uuid: &str) -> PathBuf {
self.base_dir
.join("identities")
.join(identity_uuid)
.join("faces")
}
pub fn ensure_file_dirs(&self, file_uuid: &str) -> std::io::Result<()> {
let dir = self.file_snapshot_dir(file_uuid);
std::fs::create_dir_all(&dir)?;
for snap_type in ["faces", "logos", "products", "ocr"] {
std::fs::create_dir_all(dir.join(snap_type))?;
}
Ok(())
}
pub fn ensure_identity_dirs(&self, identity_uuid: &str) -> std::io::Result<()> {
let dir = self.identity_snapshot_dir(identity_uuid);
std::fs::create_dir_all(&dir)?;
std::fs::create_dir_all(dir.join("faces"))?;
Ok(())
}
pub fn compute_tier(hits: u64) -> SnapshotTier {
let threshold = *config::snapshot::HOT_THRESHOLD;
if hits >= threshold {
SnapshotTier::Hot
} else if hits > 0 {
SnapshotTier::Warm
} else {
SnapshotTier::Cold
}
}
pub fn tier_ttl(&self, tier: SnapshotTier) -> u64 {
match tier {
SnapshotTier::Hot => *config::snapshot::HOT_TTL_SECS,
SnapshotTier::Warm => *config::snapshot::WARM_TTL_SECS,
SnapshotTier::Cold => 0,
}
}
pub fn snapshot_exists(&self, file_uuid: &str, snapshot_type: &str) -> bool {
self.file_type_dir(file_uuid, snapshot_type).exists()
}
pub fn list_snapshot_types(&self, file_uuid: &str) -> Vec<String> {
let dir = self.file_snapshot_dir(file_uuid);
if !dir.exists() {
return Vec::new();
}
std::fs::read_dir(&dir)
.into_iter()
.flatten()
.filter_map(|e| e.ok())
.filter(|e| e.path().is_dir())
.filter_map(|e| e.file_name().to_str().map(String::from))
.collect()
}
pub fn remove_file_snapshots(&self, file_uuid: &str) -> std::io::Result<()> {
let dir = self.file_snapshot_dir(file_uuid);
if dir.exists() {
std::fs::remove_dir_all(&dir)?;
}
Ok(())
}
pub fn remove_identity_snapshots(&self, identity_uuid: &str) -> std::io::Result<()> {
let dir = self.identity_snapshot_dir(identity_uuid);
if dir.exists() {
std::fs::remove_dir_all(&dir)?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_manager() -> (SnapshotManager, tempfile::TempDir) {
let temp_dir = tempfile::tempdir().unwrap();
let manager = SnapshotManager::new(temp_dir.path().to_str().unwrap());
(manager, temp_dir)
}
#[test]
fn test_compute_tier_hot() {
assert_eq!(SnapshotManager::compute_tier(5), SnapshotTier::Hot);
assert_eq!(SnapshotManager::compute_tier(10), SnapshotTier::Hot);
assert_eq!(SnapshotManager::compute_tier(100), SnapshotTier::Hot);
}
#[test]
fn test_compute_tier_warm() {
assert_eq!(SnapshotManager::compute_tier(1), SnapshotTier::Warm);
assert_eq!(SnapshotManager::compute_tier(4), SnapshotTier::Warm);
}
#[test]
fn test_compute_tier_cold() {
assert_eq!(SnapshotManager::compute_tier(0), SnapshotTier::Cold);
}
#[test]
fn test_tier_display() {
assert_eq!(SnapshotTier::Hot.to_string(), "hot");
assert_eq!(SnapshotTier::Warm.to_string(), "warm");
assert_eq!(SnapshotTier::Cold.to_string(), "cold");
}
#[test]
fn test_ensure_file_dirs_creates_structure() {
let (manager, _temp) = create_test_manager();
let file_uuid = "test_file_123";
manager.ensure_file_dirs(file_uuid).unwrap();
assert!(manager.file_snapshot_dir(file_uuid).exists());
assert!(manager.file_type_dir(file_uuid, "faces").exists());
assert!(manager.file_type_dir(file_uuid, "logos").exists());
assert!(manager.file_type_dir(file_uuid, "products").exists());
assert!(manager.file_type_dir(file_uuid, "ocr").exists());
}
#[test]
fn test_ensure_identity_dirs_creates_structure() {
let (manager, _temp) = create_test_manager();
let identity_uuid = "test_identity_456";
manager.ensure_identity_dirs(identity_uuid).unwrap();
assert!(manager.identity_snapshot_dir(identity_uuid).exists());
assert!(manager.identity_face_dir(identity_uuid).exists());
}
#[test]
fn test_list_snapshot_types_empty() {
let (manager, _temp) = create_test_manager();
let types = manager.list_snapshot_types("nonexistent");
assert!(types.is_empty());
}
#[test]
fn test_list_snapshot_types_after_creation() {
let (manager, _temp) = create_test_manager();
let file_uuid = "test_file_789";
manager.ensure_file_dirs(file_uuid).unwrap();
let types = manager.list_snapshot_types(file_uuid);
assert_eq!(types.len(), 4);
assert!(types.contains(&"faces".to_string()));
assert!(types.contains(&"logos".to_string()));
assert!(types.contains(&"products".to_string()));
assert!(types.contains(&"ocr".to_string()));
}
#[test]
fn test_remove_file_snapshots() {
let (manager, _temp) = create_test_manager();
let file_uuid = "test_file_remove";
manager.ensure_file_dirs(file_uuid).unwrap();
assert!(manager.file_snapshot_dir(file_uuid).exists());
manager.remove_file_snapshots(file_uuid).unwrap();
assert!(!manager.file_snapshot_dir(file_uuid).exists());
}
#[test]
fn test_remove_identity_snapshots() {
let (manager, _temp) = create_test_manager();
let identity_uuid = "test_identity_remove";
manager.ensure_identity_dirs(identity_uuid).unwrap();
assert!(manager.identity_snapshot_dir(identity_uuid).exists());
manager.remove_identity_snapshots(identity_uuid).unwrap();
assert!(!manager.identity_snapshot_dir(identity_uuid).exists());
}
#[test]
fn test_snapshot_exists() {
let (manager, _temp) = create_test_manager();
let file_uuid = "test_exists";
assert!(!manager.snapshot_exists(file_uuid, "faces"));
manager.ensure_file_dirs(file_uuid).unwrap();
assert!(manager.snapshot_exists(file_uuid, "faces"));
assert!(!manager.snapshot_exists(file_uuid, "nonexistent"));
}
#[test]
fn test_tier_ttl() {
let (manager, _temp) = create_test_manager();
let hot_ttl = manager.tier_ttl(SnapshotTier::Hot);
assert_eq!(hot_ttl, *config::snapshot::HOT_TTL_SECS);
let warm_ttl = manager.tier_ttl(SnapshotTier::Warm);
assert_eq!(warm_ttl, *config::snapshot::WARM_TTL_SECS);
let cold_ttl = manager.tier_ttl(SnapshotTier::Cold);
assert_eq!(cold_ttl, 0);
}
}

View File

@@ -1 +1,3 @@
pub mod face_agent;
pub mod ingest;
pub mod probe;