Implement VFS Deduplication (block-level)
- Add DedupStore with content-addressable storage: - SHA-256 hash-based block storage - Reference counting for block lifecycle - dedup_file() and restore_file() operations - DedupManifest for file reconstruction - DedupStats for storage statistics - Add VfsDedupConfig: - block_size (default 4KB) - min_file_size threshold - store_path for dedup directory - Add hex crate for hash encoding - Block-level dedup foundation for SMB/ZFS All 229 tests pass.
This commit is contained in:
199
markbase-core/src/vfs/dedup.rs
Normal file
199
markbase-core/src/vfs/dedup.rs
Normal file
@@ -0,0 +1,199 @@
|
||||
use super::{VfsError, VfsDedupConfig};
|
||||
use sha2::{Sha256, Digest};
|
||||
use std::io::{Read, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
pub struct DedupStore {
|
||||
store_path: PathBuf,
|
||||
config: VfsDedupConfig,
|
||||
}
|
||||
|
||||
impl DedupStore {
|
||||
pub fn new(store_path: PathBuf, config: VfsDedupConfig) -> Self {
|
||||
Self { store_path, config }
|
||||
}
|
||||
|
||||
pub fn block_size(&self) -> usize {
|
||||
self.config.block_size
|
||||
}
|
||||
|
||||
pub fn hash_block(data: &[u8]) -> String {
|
||||
let mut hasher = Sha256::new();
|
||||
hasher.update(data);
|
||||
let hash = hasher.finalize();
|
||||
hex::encode(hash)
|
||||
}
|
||||
|
||||
pub fn store_block(&self, data: &[u8]) -> Result<String, VfsError> {
|
||||
if data.len() > self.config.block_size {
|
||||
return Err(VfsError::Io(format!("Block size {} exceeds limit {}", data.len(), self.config.block_size)));
|
||||
}
|
||||
|
||||
let hash = Self::hash_block(data);
|
||||
let block_path = self.store_path.join(&hash);
|
||||
|
||||
if !block_path.exists() {
|
||||
std::fs::write(&block_path, data)
|
||||
.map_err(|e| VfsError::Io(format!("Failed to write block {}: {}", hash, e)))?;
|
||||
}
|
||||
|
||||
Ok(hash)
|
||||
}
|
||||
|
||||
pub fn get_block(&self, hash: &str) -> Result<Vec<u8>, VfsError> {
|
||||
let block_path = self.store_path.join(hash);
|
||||
if !block_path.exists() {
|
||||
return Err(VfsError::NotFound(format!("Block {} not found", hash)));
|
||||
}
|
||||
|
||||
std::fs::read(&block_path)
|
||||
.map_err(|e| VfsError::Io(format!("Failed to read block {}: {}", hash, e)))
|
||||
}
|
||||
|
||||
pub fn increment_ref(&self, hash: &str) -> Result<(), VfsError> {
|
||||
let ref_path = self.store_path.join(format!("{}.ref", hash));
|
||||
let current = if ref_path.exists() {
|
||||
let content = std::fs::read_to_string(&ref_path)
|
||||
.map_err(|e| VfsError::Io(format!("Failed to read ref count: {}", e)))?;
|
||||
content.parse::<u64>().unwrap_or(0)
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
std::fs::write(&ref_path, (current + 1).to_string())
|
||||
.map_err(|e| VfsError::Io(format!("Failed to write ref count: {}", e)))
|
||||
}
|
||||
|
||||
pub fn decrement_ref(&self, hash: &str) -> Result<bool, VfsError> {
|
||||
let ref_path = self.store_path.join(format!("{}.ref", hash));
|
||||
if !ref_path.exists() {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let content = std::fs::read_to_string(&ref_path)
|
||||
.map_err(|e| VfsError::Io(format!("Failed to read ref count: {}", e)))?;
|
||||
let current = content.parse::<u64>().unwrap_or(1);
|
||||
|
||||
if current <= 1 {
|
||||
std::fs::remove_file(&ref_path)
|
||||
.map_err(|e| VfsError::Io(format!("Failed to remove ref count: {}", e)))?;
|
||||
let block_path = self.store_path.join(hash);
|
||||
if block_path.exists() {
|
||||
std::fs::remove_file(&block_path)
|
||||
.map_err(|e| VfsError::Io(format!("Failed to remove block: {}", e)))?;
|
||||
}
|
||||
Ok(true)
|
||||
} else {
|
||||
std::fs::write(&ref_path, (current - 1).to_string())
|
||||
.map_err(|e| VfsError::Io(format!("Failed to write ref count: {}", e)))?;
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_ref_count(&self, hash: &str) -> Result<u64, VfsError> {
|
||||
let ref_path = self.store_path.join(format!("{}.ref", hash));
|
||||
if !ref_path.exists() {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
let content = std::fs::read_to_string(&ref_path)
|
||||
.map_err(|e| VfsError::Io(format!("Failed to read ref count: {}", e)))?;
|
||||
Ok(content.parse::<u64>().unwrap_or(0))
|
||||
}
|
||||
|
||||
pub fn dedup_file(&self, source: &Path) -> Result<DedupManifest, VfsError> {
|
||||
let mut file = std::fs::File::open(source)
|
||||
.map_err(|e| super::util::map_io_error(source, e))?;
|
||||
|
||||
let mut manifest = DedupManifest {
|
||||
original_size: 0,
|
||||
block_hashes: Vec::new(),
|
||||
dedup_ratio: 0.0,
|
||||
};
|
||||
|
||||
let mut buffer = vec![0u8; self.config.block_size];
|
||||
loop {
|
||||
let n = file.read(&mut buffer)
|
||||
.map_err(|e| VfsError::Io(format!("Failed to read file: {}", e)))?;
|
||||
if n == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
manifest.original_size += n;
|
||||
let hash = self.store_block(&buffer[..n])?;
|
||||
self.increment_ref(&hash)?;
|
||||
manifest.block_hashes.push(hash);
|
||||
}
|
||||
|
||||
let stored_size = manifest.block_hashes.len() * self.config.block_size;
|
||||
manifest.dedup_ratio = if manifest.original_size > 0 {
|
||||
(stored_size as f64) / (manifest.original_size as f64)
|
||||
} else {
|
||||
0.0
|
||||
};
|
||||
|
||||
Ok(manifest)
|
||||
}
|
||||
|
||||
pub fn restore_file(&self, manifest: &DedupManifest, target: &Path) -> Result<(), VfsError> {
|
||||
let mut file = std::fs::File::create(target)
|
||||
.map_err(|e| super::util::map_io_error(target, e))?;
|
||||
|
||||
for hash in &manifest.block_hashes {
|
||||
let block = self.get_block(hash)?;
|
||||
file.write_all(&block)
|
||||
.map_err(|e| VfsError::Io(format!("Failed to write file: {}", e)))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn stats(&self) -> Result<DedupStats, VfsError> {
|
||||
let mut stats = DedupStats {
|
||||
total_blocks: 0,
|
||||
total_refs: 0,
|
||||
unique_blocks: 0,
|
||||
stored_bytes: 0,
|
||||
};
|
||||
|
||||
if !self.store_path.exists() {
|
||||
return Ok(stats);
|
||||
}
|
||||
|
||||
for entry in std::fs::read_dir(&self.store_path)
|
||||
.map_err(|e| VfsError::Io(format!("Failed to read store: {}", e)))? {
|
||||
let entry = entry.map_err(|e| VfsError::Io(e.to_string()))?;
|
||||
let path = entry.path();
|
||||
|
||||
let name = path.file_name().unwrap_or_default().to_string_lossy();
|
||||
if name.ends_with(".ref") {
|
||||
let content = std::fs::read_to_string(&path)
|
||||
.map_err(|e| VfsError::Io(format!("Failed to read ref count: {}", e)))?;
|
||||
stats.total_refs += content.parse::<u64>().unwrap_or(0);
|
||||
} else if !name.starts_with('.') {
|
||||
stats.unique_blocks += 1;
|
||||
let meta = entry.metadata()
|
||||
.map_err(|e| VfsError::Io(e.to_string()))?;
|
||||
stats.stored_bytes += meta.len();
|
||||
}
|
||||
}
|
||||
|
||||
stats.total_blocks = stats.total_refs;
|
||||
Ok(stats)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
pub struct DedupManifest {
|
||||
pub original_size: usize,
|
||||
pub block_hashes: Vec<String>,
|
||||
pub dedup_ratio: f64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DedupStats {
|
||||
pub total_blocks: u64,
|
||||
pub total_refs: u64,
|
||||
pub unique_blocks: u64,
|
||||
pub stored_bytes: u64,
|
||||
}
|
||||
@@ -1,4 +1,5 @@
|
||||
pub mod compression;
|
||||
pub mod dedup;
|
||||
pub mod local_fs;
|
||||
pub mod open_flags;
|
||||
pub mod s3_fs;
|
||||
@@ -430,3 +431,24 @@ pub struct VfsCompressionConfig {
|
||||
/// 最小压缩大小(字节),小于此大小不压缩
|
||||
pub min_size: u64,
|
||||
}
|
||||
|
||||
/// 去重配置
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct VfsDedupConfig {
|
||||
/// 块大小(字节),默认4KB
|
||||
pub block_size: usize,
|
||||
/// 最小文件大小(字节),小于此大小不去重
|
||||
pub min_file_size: u64,
|
||||
/// 去重存储路径
|
||||
pub store_path: PathBuf,
|
||||
}
|
||||
|
||||
impl Default for VfsDedupConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
block_size: 4096,
|
||||
min_file_size: 1024,
|
||||
store_path: PathBuf::from(".dedup"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user