From 4d1fe2d26f97ada556b495debca566fe3b10b587 Mon Sep 17 00:00:00 2001 From: Accusys Date: Thu, 14 May 2026 20:24:21 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20file=20dedup=20=E2=80=94=20content=5Fha?= =?UTF-8?q?sh=20SHA256=20+=20/files/lookup=20API=20+=20auto-rename=20on=20?= =?UTF-8?q?name=20collision?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- release/migrate_add_content_hash.sql | 6 + src/api/server.rs | 199 +++++++++++++++++++++++---- src/core/ingestion.rs | 86 ++++++++---- src/core/storage/content_hash.rs | 18 +++ src/core/storage/mod.rs | 2 + 5 files changed, 260 insertions(+), 51 deletions(-) create mode 100644 release/migrate_add_content_hash.sql create mode 100644 src/core/storage/content_hash.rs diff --git a/release/migrate_add_content_hash.sql b/release/migrate_add_content_hash.sql new file mode 100644 index 0000000..85103d3 --- /dev/null +++ b/release/migrate_add_content_hash.sql @@ -0,0 +1,6 @@ +-- Migration: Add content_hash column for file deduplication +-- Date: 2026-05-14 +-- Usage: psql -U accusys -d momentry -f migrate_add_content_hash.sql + +ALTER TABLE videos ADD COLUMN IF NOT EXISTS content_hash TEXT; +CREATE INDEX IF NOT EXISTS idx_videos_content_hash ON videos(content_hash) WHERE content_hash IS NOT NULL; diff --git a/src/api/server.rs b/src/api/server.rs index f05ccf5..992ae5f 100644 --- a/src/api/server.rs +++ b/src/api/server.rs @@ -7,6 +7,7 @@ use axum::{ }; use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use sha2::{Digest, Sha256}; use sqlx::Row; use std::time::Instant; @@ -161,6 +162,93 @@ struct CacheToggleResponse { // Missing structs added +#[derive(Debug, Deserialize)] +#[derive(Serialize)] +struct FileLookupMatch { + file_uuid: String, + file_name: String, + file_type: Option, + status: String, + content_hash: Option, + file_size: Option, + duration: Option, + width: Option, + height: Option, +} + +#[derive(Serialize)] +struct FileLookupResponse { + file_name: String, + exists: bool, + matches: Vec, + next_name: String, +} + +async fn lookup_file_by_name( + State(state): State, + Query(params): Query>, +) -> Result, StatusCode> { + let base = params.get("file_name").map(|s| s.trim().to_string()).unwrap_or_default(); + if base.is_empty() { + return Ok(Json(FileLookupResponse { + file_name: String::new(), + exists: false, + matches: vec![], + next_name: String::new(), + })); + } + let table = schema::table_name("videos"); + let dot_pos = base.rfind('.'); + let (stem, ext) = match dot_pos { + Some(p) => (base[..p].to_string(), base[p..].to_string()), + None => (base.clone(), String::new()), + }; + let pattern = format!("{}%%", &stem); + + let query_sql = format!("SELECT file_uuid, file_name, file_type, status, content_hash, duration, width, height FROM {} WHERE file_name = $1 OR file_name LIKE $2 ORDER BY file_name", table); + let rows = sqlx::query(&query_sql) + .bind(&base) + .bind(&pattern) + .fetch_all(state.db.pool()) + .await + .map_err(|e| { tracing::error!("lookup query error: {}", e); StatusCode::INTERNAL_SERVER_ERROR })?; + + let exists = rows.iter().any(|r| r.get::("file_name") == base); + let matches: Vec = rows.iter().map(|r| { + FileLookupMatch { + file_uuid: r.get("file_uuid"), + file_name: r.get("file_name"), + file_type: r.get("file_type"), + status: r.get("status"), + content_hash: r.get("content_hash"), + file_size: None, + duration: r.get("duration"), + width: r.get("width"), + height: r.get("height"), + } + }).collect(); + + let max_n: usize = rows.iter().filter_map(|r| { + let n: String = r.get("file_name"); + if n == base { return Some(0usize); } + let rest = n.strip_prefix(&stem).and_then(|r| r.strip_suffix(&ext))?; + let inner = rest.trim().strip_prefix('(').and_then(|r| r.strip_suffix(')'))?; + inner.parse::().ok() + }).max().unwrap_or(0); + let next_name = if max_n == 0 && !exists { + base.clone() + } else { + format!("{} ({}){}", stem, max_n + 1, ext) + }; + + Ok(Json(FileLookupResponse { + file_name: base, + exists, + matches, + next_name, + })) +} + #[derive(Debug, Deserialize)] struct RegisterFileRequest { file_path: String, @@ -684,6 +772,45 @@ fn generate_visual_search_hash( format!("{:x}", hasher.finalize())[..16].to_string() } +/// Compute SHA256 for dedup. Returns hex string. +fn sha256_file(path: &std::path::Path) -> Option { + crate::core::storage::content_hash::compute_sha256(path).ok() +} + +/// Resolve name conflict: if file_name collides with existing but content differs, +/// append ` (N)` suffix. Returns the resolved file_name. +async fn resolve_filename( + db: &PostgresDb, + file_name: &str, + content_hash: &str, +) -> String { + let table = schema::table_name("videos"); + let base = file_name.to_string(); + let dot_pos = base.rfind('.'); + let (stem, ext) = match dot_pos { + Some(p) => (base[..p].to_string(), base[p..].to_string()), + None => (base.clone(), String::new()), + }; + let mut candidate = base.clone(); + let mut attempt = 0usize; + loop { + // Check if candidate name exists with a DIFFERENT hash (same content = OK) + let conflict: Option = sqlx::query_scalar( + &format!("SELECT file_uuid FROM {} WHERE file_name = $1 AND (content_hash IS DISTINCT FROM $2 OR content_hash IS NULL)", table) + ) + .bind(&candidate) + .bind(content_hash) + .fetch_optional(db.pool()) + .await + .unwrap_or(None); + if conflict.is_none() { + return candidate; + } + attempt += 1; + candidate = format!("{} ({}){}", stem, attempt, ext); + } +} + /// 註冊單一檔案(內部函數,不處理 pattern) async fn register_single_file( state: &AppState, @@ -743,11 +870,50 @@ async fn register_single_file( } }; + // Step 1: Compute SHA256 of full file + let content_hash = sha256_file(&path).unwrap_or_default(); + + // Step 2: Hash check — same content = already registered (regardless of name) + let videos_table = schema::table_name("videos"); + if !content_hash.is_empty() { + if let Ok(Some(existing_uuid)) = sqlx::query_scalar::<_, String>( + &format!("SELECT file_uuid FROM {} WHERE content_hash = $1 LIMIT 1", videos_table) + ) + .bind(&content_hash) + .fetch_optional(db.pool()) + .await + { + tracing::info!("[REGISTER] Content hash collision → already registered: {}", existing_uuid); + let existing_name: Option = sqlx::query_scalar( + &format!("SELECT file_name FROM {} WHERE file_uuid = $1", videos_table) + ).bind(&existing_uuid).fetch_optional(db.pool()).await.unwrap_or(None); + return RegisterFileResponse { + success: true, + file_uuid: existing_uuid, + file_name: existing_name.unwrap_or(file_name), + file_path: canonical_path, + file_type: None, + duration: 0.0, + width: 0, + height: 0, + fps: 0.0, + total_frames: 0, + registration_time: None, + already_exists: true, + message: "Content already registered (identical file)".to_string(), + }; + } + } + + // Step 3: Name check — same name but different content → auto-rename + let final_name = resolve_filename(&db, &file_name, &content_hash).await; + + // Step 4: Compute UUID (using final resolved name) let videos_table = schema::table_name("videos"); let birthday = sqlx::query_scalar::<_, chrono::DateTime>( &format!("SELECT registration_time FROM {} WHERE file_name = $1 AND registration_time IS NOT NULL LIMIT 1", videos_table) ) - .bind(&file_name) + .bind(&final_name) .fetch_optional(db.pool()) .await .unwrap_or(None) @@ -759,30 +925,10 @@ async fn register_single_file( &mac_address, &birthday, &canonical_path, - &file_name, + &final_name, ); - // Check if already exists - if let Ok(Some(_)) = db.get_video_by_uuid(&file_uuid).await { - tracing::info!("[REGISTER] File already registered: {}", file_uuid); - return RegisterFileResponse { - success: true, - file_uuid, - file_name, - file_path: canonical_path, - file_type: None, - duration: 0.0, - width: 0, - height: 0, - fps: 0.0, - total_frames: 0, - registration_time: None, - already_exists: true, - message: "File already registered".to_string(), - }; - } - - // Probe + // Step 5: Probe let probe_result = match crate::core::probe::probe_video(&canonical_path) { Ok(r) => r, Err(e) => { @@ -857,12 +1003,12 @@ async fn register_single_file( let probe_json = serde_json::to_value(&probe_result).ok(); let status = "pending"; let _ = sqlx::query(&format!( - "INSERT INTO {} (file_uuid, file_path, file_name, file_type, duration, width, height, fps, probe_json, status, registration_time) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW()) ON CONFLICT (file_uuid) DO UPDATE SET file_path = EXCLUDED.file_path, file_name = EXCLUDED.file_name, status = EXCLUDED.status", + "INSERT INTO {} (file_uuid, file_path, file_name, file_type, duration, width, height, fps, probe_json, status, content_hash, registration_time) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW()) ON CONFLICT (file_uuid) DO UPDATE SET file_path = EXCLUDED.file_path, file_name = EXCLUDED.file_name, status = EXCLUDED.status, content_hash = EXCLUDED.content_hash", videos_table )) - .bind(&file_uuid).bind(&canonical_path).bind(&file_name).bind(&final_file_type) + .bind(&file_uuid).bind(&canonical_path).bind(&final_name).bind(&final_file_type) .bind(duration).bind(width as i32).bind(height as i32).bind(fps) - .bind(&probe_json).bind(status) + .bind(&probe_json).bind(status).bind(&content_hash) .execute(db.pool()).await; // 若是 video 類型,同步執行 CUT + Scene 分類 @@ -2614,6 +2760,7 @@ pub async fn start_server(host: &str, port: u16) -> anyhow::Result<()> { let protected_routes = Router::new() .route("/api/v1/files/register", post(register_file)) + .route("/api/v1/files/lookup", get(lookup_file_by_name)) .route("/api/v1/unregister", post(unregister)) .route("/api/v1/files/scan", get(scan_files)) .route("/api/v1/file/:file_uuid/probe", get(probe_by_uuid)) diff --git a/src/core/ingestion.rs b/src/core/ingestion.rs index 18c6674..00c63ec 100644 --- a/src/core/ingestion.rs +++ b/src/core/ingestion.rs @@ -18,6 +18,34 @@ impl IngestionService { Self { db } } + /// Resolve name collision: if file_name exists with different content, append ` (N)` suffix. + async fn resolve_filename(&self, file_name: &str, content_hash: &str) -> String { + let table = schema::table_name("videos"); + let base = file_name.to_string(); + let dot_pos = base.rfind('.'); + let (stem, ext) = match dot_pos { + Some(p) => (base[..p].to_string(), base[p..].to_string()), + None => (base.clone(), String::new()), + }; + let mut candidate = base.clone(); + let mut attempt = 0usize; + loop { + let conflict: Option = sqlx::query_scalar( + &format!("SELECT file_uuid FROM {} WHERE file_name = $1 AND (content_hash IS DISTINCT FROM $2 OR content_hash IS NULL)", table) + ) + .bind(&candidate) + .bind(content_hash) + .fetch_optional(self.db.pool()) + .await + .unwrap_or(None); + if conflict.is_none() { + return candidate; + } + attempt += 1; + candidate = format!("{} ({}){}", stem, attempt, ext); + } + } + pub async fn ingest(&self, file_path: &str) -> Result> { let path = Path::new(file_path); @@ -32,16 +60,33 @@ impl IngestionService { .to_string_lossy() .to_string(); - // Stable UUID based on MAC + Birthday + Filename. - // Moving the file (path change) keeps the SAME identity. + // 1. Compute SHA256 for dedup + let content_hash = crate::core::storage::content_hash::compute_sha256(&canonical_path).ok().unwrap_or_default(); - // 1. Look for existing Birthday (Identity Anchor) - // If the file (by name) was registered before, use its original birth time. + // 2. Hash check — same content = already registered + let videos_table = schema::table_name("videos"); + if !content_hash.is_empty() { + if let Ok(Some(existing_uuid)) = sqlx::query_scalar::<_, String>( + &format!("SELECT file_uuid FROM {} WHERE content_hash = $1 LIMIT 1", videos_table) + ) + .bind(&content_hash) + .fetch_optional(self.db.pool()) + .await + { + info!("Content already registered: {} ({})", filename, existing_uuid); + return Ok(Some(existing_uuid)); + } + } + + // 3. Resolve name conflict (same name, different content → auto-rename) + let final_name = self.resolve_filename(&filename, &content_hash).await; + + // 4. Compute UUID with resolved name let videos_table = schema::table_name("videos"); let birthday = sqlx::query_scalar::<_, chrono::DateTime>( &format!("SELECT registration_time FROM {} WHERE file_name = $1 AND registration_time IS NOT NULL LIMIT 1", videos_table) ) - .bind(&filename) + .bind(&final_name) .fetch_optional(self.db.pool()) .await .ok() @@ -54,30 +99,15 @@ impl IngestionService { .map(|p| p.to_string_lossy().to_string()) .unwrap_or_default(); - // 2. Compute UUID let uuid = uuid_utils::compute_birth_uuid( &uuid_utils::get_mac_address(), &birthday, &canonical_path.to_string_lossy(), - &filename, + &final_name, ); - let parent = canonical_path - .parent() - .map(|p| p.to_string_lossy().to_string()) - .unwrap_or_default(); - let username = uuid_utils::extract_username_from_path(&parent); - if let Ok(Some(_)) = self.db.get_video_by_uuid(&uuid).await { - info!( - "Video already registered: {} ({})", - path.file_name().unwrap_or_default().to_string_lossy(), - uuid - ); - return Ok(None); - } - info!("Starting ingestion for: {} ({})", path.display(), uuid); let probe_result = probe::probe_video(file_path) @@ -195,6 +225,16 @@ impl IngestionService { .await .with_context(|| "Failed to register video in database")?; + // Store content_hash for dedup + if !content_hash.is_empty() { + let vt = schema::table_name("videos"); + let _ = sqlx::query(&format!("UPDATE {} SET content_hash = $1 WHERE file_uuid = $2", vt)) + .bind(&content_hash) + .bind(&uuid) + .execute(self.db.pool()) + .await; + } + self.db .set_registration_time(&uuid) .await @@ -205,10 +245,6 @@ impl IngestionService { .await .with_context(|| "Failed to set birth_registration")?; - info!( - "Successfully registered video: {} (UUID: {}, Birth UUID: {})", - record.file_name, uuid, uuid - ); Ok(Some(uuid)) } } diff --git a/src/core/storage/content_hash.rs b/src/core/storage/content_hash.rs new file mode 100644 index 0000000..72a11bc --- /dev/null +++ b/src/core/storage/content_hash.rs @@ -0,0 +1,18 @@ +use sha2::{Digest, Sha256}; +use std::io::Read; +use std::path::Path; +use anyhow::Result; + +/// Compute SHA256 of the entire file content +pub fn compute_sha256(path: &Path) -> Result { + let mut file = std::fs::File::open(path)?; + let mut hasher = Sha256::new(); + let mut buf = [0u8; 65536]; + loop { + let n = file.read(&mut buf)?; + if n == 0 { break; } + hasher.update(&buf[..n]); + } + let hash = format!("{:x}", hasher.finalize()); + Ok(hash) +} diff --git a/src/core/storage/mod.rs b/src/core/storage/mod.rs index 2626c2b..084cc04 100644 --- a/src/core/storage/mod.rs +++ b/src/core/storage/mod.rs @@ -1,7 +1,9 @@ +pub mod content_hash; pub mod file_manager; pub mod output_dir; pub mod uuid; +pub use content_hash::compute_sha256; pub use file_manager::FileManager; pub use output_dir::OutputDir; pub use uuid::compute_uuid;