From 37f5da7d6ce8432cd8df4c2a7bdac5ded05735db Mon Sep 17 00:00:00 2001 From: Warren Date: Sat, 20 Jun 2026 22:39:25 +0800 Subject: [PATCH] Implement VFS Deduplication (block-level) - Add DedupStore with content-addressable storage: - SHA-256 hash-based block storage - Reference counting for block lifecycle - dedup_file() and restore_file() operations - DedupManifest for file reconstruction - DedupStats for storage statistics - Add VfsDedupConfig: - block_size (default 4KB) - min_file_size threshold - store_path for dedup directory - Add hex crate for hash encoding - Block-level dedup foundation for SMB/ZFS All 229 tests pass. --- markbase-core/Cargo.toml | 1 + markbase-core/src/vfs/dedup.rs | 199 +++++++++++++++++++++++++++++++++ markbase-core/src/vfs/mod.rs | 22 ++++ 3 files changed, 222 insertions(+) create mode 100644 markbase-core/src/vfs/dedup.rs diff --git a/markbase-core/Cargo.toml b/markbase-core/Cargo.toml index 22f7613..f93f477 100644 --- a/markbase-core/Cargo.toml +++ b/markbase-core/Cargo.toml @@ -48,6 +48,7 @@ rand = "0.8" axum-extra = { version = "0.9", features = ["multipart"] } tokio-util = { version = "0.7", features = ["io"] } zstd = "0.13" +hex = "0.4" toml = "0.8" uuid = { version = "1", features = ["v4"] } dashmap = "6.1" diff --git a/markbase-core/src/vfs/dedup.rs b/markbase-core/src/vfs/dedup.rs new file mode 100644 index 0000000..d55e384 --- /dev/null +++ b/markbase-core/src/vfs/dedup.rs @@ -0,0 +1,199 @@ +use super::{VfsError, VfsDedupConfig}; +use sha2::{Sha256, Digest}; +use std::io::{Read, Write}; +use std::path::{Path, PathBuf}; + +pub struct DedupStore { + store_path: PathBuf, + config: VfsDedupConfig, +} + +impl DedupStore { + pub fn new(store_path: PathBuf, config: VfsDedupConfig) -> Self { + Self { store_path, config } + } + + pub fn block_size(&self) -> usize { + self.config.block_size + } + + pub fn hash_block(data: &[u8]) -> String { + let mut hasher = Sha256::new(); + hasher.update(data); + let hash = hasher.finalize(); + hex::encode(hash) + } + + pub fn store_block(&self, data: &[u8]) -> Result { + if data.len() > self.config.block_size { + return Err(VfsError::Io(format!("Block size {} exceeds limit {}", data.len(), self.config.block_size))); + } + + let hash = Self::hash_block(data); + let block_path = self.store_path.join(&hash); + + if !block_path.exists() { + std::fs::write(&block_path, data) + .map_err(|e| VfsError::Io(format!("Failed to write block {}: {}", hash, e)))?; + } + + Ok(hash) + } + + pub fn get_block(&self, hash: &str) -> Result, VfsError> { + let block_path = self.store_path.join(hash); + if !block_path.exists() { + return Err(VfsError::NotFound(format!("Block {} not found", hash))); + } + + std::fs::read(&block_path) + .map_err(|e| VfsError::Io(format!("Failed to read block {}: {}", hash, e))) + } + + pub fn increment_ref(&self, hash: &str) -> Result<(), VfsError> { + let ref_path = self.store_path.join(format!("{}.ref", hash)); + let current = if ref_path.exists() { + let content = std::fs::read_to_string(&ref_path) + .map_err(|e| VfsError::Io(format!("Failed to read ref count: {}", e)))?; + content.parse::().unwrap_or(0) + } else { + 0 + }; + + std::fs::write(&ref_path, (current + 1).to_string()) + .map_err(|e| VfsError::Io(format!("Failed to write ref count: {}", e))) + } + + pub fn decrement_ref(&self, hash: &str) -> Result { + let ref_path = self.store_path.join(format!("{}.ref", hash)); + if !ref_path.exists() { + return Ok(false); + } + + let content = std::fs::read_to_string(&ref_path) + .map_err(|e| VfsError::Io(format!("Failed to read ref count: {}", e)))?; + let current = content.parse::().unwrap_or(1); + + if current <= 1 { + std::fs::remove_file(&ref_path) + .map_err(|e| VfsError::Io(format!("Failed to remove ref count: {}", e)))?; + let block_path = self.store_path.join(hash); + if block_path.exists() { + std::fs::remove_file(&block_path) + .map_err(|e| VfsError::Io(format!("Failed to remove block: {}", e)))?; + } + Ok(true) + } else { + std::fs::write(&ref_path, (current - 1).to_string()) + .map_err(|e| VfsError::Io(format!("Failed to write ref count: {}", e)))?; + Ok(false) + } + } + + pub fn get_ref_count(&self, hash: &str) -> Result { + let ref_path = self.store_path.join(format!("{}.ref", hash)); + if !ref_path.exists() { + return Ok(0); + } + + let content = std::fs::read_to_string(&ref_path) + .map_err(|e| VfsError::Io(format!("Failed to read ref count: {}", e)))?; + Ok(content.parse::().unwrap_or(0)) + } + + pub fn dedup_file(&self, source: &Path) -> Result { + let mut file = std::fs::File::open(source) + .map_err(|e| super::util::map_io_error(source, e))?; + + let mut manifest = DedupManifest { + original_size: 0, + block_hashes: Vec::new(), + dedup_ratio: 0.0, + }; + + let mut buffer = vec![0u8; self.config.block_size]; + loop { + let n = file.read(&mut buffer) + .map_err(|e| VfsError::Io(format!("Failed to read file: {}", e)))?; + if n == 0 { + break; + } + + manifest.original_size += n; + let hash = self.store_block(&buffer[..n])?; + self.increment_ref(&hash)?; + manifest.block_hashes.push(hash); + } + + let stored_size = manifest.block_hashes.len() * self.config.block_size; + manifest.dedup_ratio = if manifest.original_size > 0 { + (stored_size as f64) / (manifest.original_size as f64) + } else { + 0.0 + }; + + Ok(manifest) + } + + pub fn restore_file(&self, manifest: &DedupManifest, target: &Path) -> Result<(), VfsError> { + let mut file = std::fs::File::create(target) + .map_err(|e| super::util::map_io_error(target, e))?; + + for hash in &manifest.block_hashes { + let block = self.get_block(hash)?; + file.write_all(&block) + .map_err(|e| VfsError::Io(format!("Failed to write file: {}", e)))?; + } + + Ok(()) + } + + pub fn stats(&self) -> Result { + let mut stats = DedupStats { + total_blocks: 0, + total_refs: 0, + unique_blocks: 0, + stored_bytes: 0, + }; + + if !self.store_path.exists() { + return Ok(stats); + } + + for entry in std::fs::read_dir(&self.store_path) + .map_err(|e| VfsError::Io(format!("Failed to read store: {}", e)))? { + let entry = entry.map_err(|e| VfsError::Io(e.to_string()))?; + let path = entry.path(); + + let name = path.file_name().unwrap_or_default().to_string_lossy(); + if name.ends_with(".ref") { + let content = std::fs::read_to_string(&path) + .map_err(|e| VfsError::Io(format!("Failed to read ref count: {}", e)))?; + stats.total_refs += content.parse::().unwrap_or(0); + } else if !name.starts_with('.') { + stats.unique_blocks += 1; + let meta = entry.metadata() + .map_err(|e| VfsError::Io(e.to_string()))?; + stats.stored_bytes += meta.len(); + } + } + + stats.total_blocks = stats.total_refs; + Ok(stats) + } +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DedupManifest { + pub original_size: usize, + pub block_hashes: Vec, + pub dedup_ratio: f64, +} + +#[derive(Debug, Clone)] +pub struct DedupStats { + pub total_blocks: u64, + pub total_refs: u64, + pub unique_blocks: u64, + pub stored_bytes: u64, +} \ No newline at end of file diff --git a/markbase-core/src/vfs/mod.rs b/markbase-core/src/vfs/mod.rs index 41374de..5d71fe0 100644 --- a/markbase-core/src/vfs/mod.rs +++ b/markbase-core/src/vfs/mod.rs @@ -1,4 +1,5 @@ pub mod compression; +pub mod dedup; pub mod local_fs; pub mod open_flags; pub mod s3_fs; @@ -430,3 +431,24 @@ pub struct VfsCompressionConfig { /// 最小压缩大小(字节),小于此大小不压缩 pub min_size: u64, } + +/// 去重配置 +#[derive(Debug, Clone)] +pub struct VfsDedupConfig { + /// 块大小(字节),默认4KB + pub block_size: usize, + /// 最小文件大小(字节),小于此大小不去重 + pub min_file_size: u64, + /// 去重存储路径 + pub store_path: PathBuf, +} + +impl Default for VfsDedupConfig { + fn default() -> Self { + Self { + block_size: 4096, + min_file_size: 1024, + store_path: PathBuf::from(".dedup"), + } + } +}