18 KiB
18 KiB
DedupFs + S3Vfs Combination Design
Date: 2026-06-25 Status: Design proposal Goal: Distributed deduplication storage via MinIO/S3 backend
Executive Summary
Current State
DedupStore(dedup.rs, 224 行):
- 基于本地文件系统的 dedup 存储
- SHA-256 块哈希 + 引用计数
- 块存储到本地目录(
store_path/.dedup/)
问题:
- ❌ 无法跨节点共享 dedup 块
- ❌ 无分布式容错能力
- ❌ 单节点存储限制
Proposed Solution
DedupS3Store:
- 块存储到 MinIO/S3 对象(跨节点共享)
- 引用计数存储到 S3 object metadata
- Manifest 存储到 S3 对象(JSON 格式)
优势:
- ✅ 跨节点 dedup 共享(MinIO 分布式)
- ✅ 自动容错(MinIO erasure coding)
- ✅ 无单节点限制(MinIO 可扩展)
- ✅ 与现有 S3Vfs 集成(无需新 HTTP API)
Architecture
┌─────────────────────────────────────────────────────────────────────────┐
│ MarkBase Node A │
│ ├── DedupS3Store │
│ │ ├── store_block() → S3 PUT <hash> │
│ │ ├── get_block() → S3 GET <hash> │
│ │ └── dedup_file() → 分块 + S3 PUT + manifest │
│ └───────────────────────────────────────────────────────────────────────┘
│ ↓ │
┌─────────────────────────────────────────────────────────────────────────┐
│ MinIO Cluster (S3-compatible) │
│ ├── Bucket: markbase-dedup │
│ │ ├── Objects: <sha256-hash> (dedup 块) │
│ │ ├── Metadata: x-amz-meta-ref-count (引用计数) │
│ │ └── Manifests: manifests/<file-id>.json │
│ │ │
│ ├── Erasure Coding: EC:2 (自动容错) │
│ ├── Replication: Node A → Node B (DR) │
│ └─────────────────────────────────────────────────────────────────────┘
│ ↓ │
┌─────────────────────────────────────────────────────────────────────────┐
│ MarkBase Node B │
│ ├── DedupS3Store │
│ │ ├── get_block() → S3 GET <hash> (共享 Node A 的块) │
│ │ └── restore_file() → S3 GET manifest + S3 GET blocks │
│ └─────────────────────────────────────────────────────────────────────┘
Implementation Design
DedupS3Store Struct
pub struct DedupS3Store {
s3vfs: S3Vfs, // S3 backend
bucket: String, // Bucket name (markbase-dedup)
block_prefix: String, // Object key prefix (blocks/)
manifest_prefix: String, // Manifest prefix (manifests/)
config: VfsDedupConfig, // block_size, min_file_size
}
pub struct DedupManifest {
original_size: usize,
block_hashes: Vec<String>,
dedup_ratio: f64,
file_id: String, // UUID for manifest storage
}
Core Methods
| Method | Current (LocalFs) | Proposed (S3Vfs) |
|---|---|---|
store_block(data) |
std::fs::write(store_path/hash, data) |
S3Vfs.put_object(blocks/hash, data) |
get_block(hash) |
std::fs::read(store_path/hash) |
S3Vfs.get_object(blocks/hash) |
increment_ref(hash) |
std::fs::write(hash.ref, count) |
S3Vfs.put_object(blocks/hash, data) + metadata update |
decrement_ref(hash) |
std::fs::write/remove |
S3Vfs.delete_object + metadata check |
dedup_file(source) |
Local file read + block store | Local file read + S3 PUT blocks |
restore_file(manifest) |
Local file write + block read | Local file write + S3 GET blocks |
get_ref_count(hash) |
std::fs::read(hash.ref) |
S3Vfs.head_object(blocks/hash) → metadata |
S3 Object Layout
Bucket: markbase-dedup
├── blocks/
│ ├── <sha256-hash-1> # Dedup 块(4KB)
│ │ └── Metadata: x-amz-meta-ref-count: 5
│ ├── <sha256-hash-2>
│ │ └── Metadata: x-amz-meta-ref-count: 2
│ └── ...
│
├── manifests/
│ ├── <file-id-1>.json # Manifest JSON
│ │ └── Content: {"original_size": 1024, "block_hashes": [...], ...}
│ ├── <file-id-2>.json
│ └── ...
│
└── stats.json # DedupStats(可选)
Reference Count Management
Challenge
S3 对象不支持 atomic increment/decrement 操作。
Solution 1: Metadata Update (推荐 ⭐⭐⭐⭐⭐)
流程:
fn increment_ref(&self, hash: &str) -> Result<(), VfsError> {
// 1. GET current metadata
let head = self.s3vfs.head_object(&format!("blocks/{}", hash))?;
let current_ref = head.metadata.get("x-amz-meta-ref-count")
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(0);
// 2. PUT with updated metadata
let block_data = self.s3vfs.get_object(&format!("blocks/{}", hash))?;
self.s3vfs.put_object_with_metadata(
&format!("blocks/{}", hash),
&block_data,
[("x-amz-meta-ref-count", (current_ref + 1).to_string())]
)?;
Ok(())
}
优势:
- ✅ 简单实现
- ✅ 与 S3 标准兼容
- ⚠️ 需要两次请求(GET + PUT)
劣势:
- ⚠️ 非原子操作(并发问题)
- ⚠️ 需要读取块数据(PUT 需要 body)
Solution 2: Separate Ref Count Object
流程:
fn increment_ref(&self, hash: &str) -> Result<(), VfsError> {
// 1. GET ref count object
let ref_key = format!("refs/{}/count", hash);
let current = self.s3vfs.get_object(&ref_key)
.and_then(|data| data.parse::<u64>())
.unwrap_or(0);
// 2. PUT updated ref count
self.s3vfs.put_object(&ref_key, (current + 1).to_string())?;
Ok(())
}
优势:
- ✅ 无需读取块数据
- ✅ 更小的对象(仅数字)
劣势:
- ⚠️ 需要额外对象存储
- ⚠️ 非原子操作(并发问题)
Solution 3: MinIO Extended API (企业版)
MinIO 企业版提供 mc admin bucket policy 和 object locking API。
优势:
- ✅ 可能提供 atomic operation
劣势:
- ⚠️ 仅 MinIO 企业版
- ⚠️ 需要研究具体 API
Concurrency Problem
Scenario
Node A 和 Node B 同时 dedup 相同文件:
- Node A:
increment_ref(hash-abc)→ GET count=2 → PUT count=3 - Node B:
increment_ref(hash-abc)→ GET count=2 → PUT count=3 - 结果:count=3(错误,应为 count=4)
Solution 1: Optimistic Locking
使用 S3 versioning 检测冲突:
fn increment_ref(&self, hash: &str) -> Result<(), VfsError> {
loop {
// 1. GET current version + metadata
let (version_id, current_ref) = self.get_ref_with_version(hash)?;
// 2. PUT with version check
let result = self.s3vfs.put_object_if_version(
&format!("blocks/{}", hash),
block_data,
(current_ref + 1),
version_id // Only succeed if version unchanged
);
if result.is_ok() {
break;
}
// Retry if version mismatch
}
Ok(())
}
要求:MinIO versioning enabled。
Solution 2: Distributed Lock Service
使用外部分布式锁(如 Redis/Zookeeper):
fn increment_ref(&self, hash: &str) -> Result<(), VfsError> {
// 1. Acquire distributed lock
let lock = self.lock_service.acquire(&format!("lock:{}", hash))?;
// 2. Increment ref count
self.update_ref_count(hash)?;
// 3. Release lock
lock.release();
Ok(())
}
劣势:需要额外服务(Redis)。
Solution 3: Accept Non-Atomic (简化方案)
对于 MarkBase Lightweight 定位:
- ⚠️ 接受非原子操作风险
- ⚠️ 偶尔 ref count 不准确(不影响数据完整性)
- ⚠️ 定期修复(scrub job)
推荐:Phase 1 使用 Solution 1(Metadata Update),Phase 2 研究 MinIO versioning。
Implementation Phases
| Phase | Task | Code Lines | Priority | Risk |
|---|---|---|---|---|
| Phase 1 | DedupS3Store struct + basic I/O | ~300 | P0 | Medium |
| Phase 2 | Reference count metadata | ~100 | P0 | Medium |
| Phase 3 | Manifest storage to S3 | ~50 | P1 | Low |
| Phase 4 | CLI integration | ~100 | P1 | Low |
| Phase 5 | Async version (DedupAsyncS3Store) | ~200 | P2 | High |
| Phase 6 | Concurrency fix (versioning) | ~150 | P2 | High |
| Phase 7 | Performance benchmark | ~100 | P2 | Low |
| Total | ~1000 |
DedupS3Store Implementation (Phase 1 Draft)
use super::s3_fs::S3Vfs;
use super::{VfsDedupConfig, VfsError};
use sha2::{Sha256, Digest};
use std::path::Path;
pub struct DedupS3Store {
s3vfs: S3Vfs,
bucket: String,
block_prefix: String,
manifest_prefix: String,
config: VfsDedupConfig,
}
impl DedupS3Store {
pub fn new(
endpoint: &str,
region: &str,
bucket: &str,
access_key: &str,
secret_key: &str,
config: VfsDedupConfig,
) -> Result<Self, VfsError> {
let s3vfs = S3Vfs::new(endpoint, region, bucket, access_key, secret_key)?;
Ok(Self {
s3vfs,
bucket: bucket.to_string(),
block_prefix: "blocks/".to_string(),
manifest_prefix: "manifests/".to_string(),
config,
})
}
pub fn store_block(&self, data: &[u8]) -> Result<String, VfsError> {
if data.len() > self.config.block_size {
return Err(VfsError::Io(format!("Block size exceeds limit")));
}
let hash = Self::hash_block(data);
let key = format!("{}{}", self.block_prefix, hash);
// Check if block exists
if !self.s3vfs.object_exists(&key)? {
// PUT with initial ref count = 1
self.s3vfs.put_object_with_metadata(
&key,
data,
[("x-amz-meta-ref-count", "1")]
)?;
} else {
// Increment ref count
self.increment_ref(&hash)?;
}
Ok(hash)
}
pub fn get_block(&self, hash: &str) -> Result<Vec<u8>, VfsError> {
let key = format!("{}{}", self.block_prefix, hash);
self.s3vfs.get_object(&key)
}
pub fn increment_ref(&self, hash: &str) -> Result<(), VfsError> {
let key = format!("{}{}", self.block_prefix, hash);
let head = self.s3vfs.head_object(&key)?;
let current_ref = head.metadata
.get("x-amz-meta-ref-count")
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(1);
// Need to GET block data + PUT with new metadata
let block_data = self.get_block(hash)?;
self.s3vfs.put_object_with_metadata(
&key,
&block_data,
[("x-amz-meta-ref-count", (current_ref + 1).to_string())]
)?;
Ok(())
}
pub fn dedup_file(&self, source: &Path) -> Result<DedupManifest, VfsError> {
let mut file = std::fs::File::open(source)?;
let mut manifest = DedupManifest::new();
let mut buffer = vec![0u8; self.config.block_size];
loop {
let n = file.read(&mut buffer)?;
if n == 0 { break; }
manifest.original_size += n;
let hash = self.store_block(&buffer[..n])?;
manifest.block_hashes.push(hash);
}
// Store manifest to S3
let file_id = uuid::Uuid::new_v4().to_string();
manifest.file_id = file_id;
let manifest_key = format!("{}{}.json", self.manifest_prefix, file_id);
let manifest_json = serde_json::to_string(&manifest)?;
self.s3vfs.put_object(&manifest_key, manifest_json.as_bytes())?;
Ok(manifest)
}
pub fn restore_file(&self, manifest_id: &str, target: &Path) -> Result<(), VfsError> {
let manifest_key = format!("{}{}.json", self.manifest_prefix, manifest_id);
let manifest_json = self.s3vfs.get_object(&manifest_key)?;
let manifest: DedupManifest = serde_json::from_slice(&manifest_json)?;
let mut file = std::fs::File::create(target)?;
for hash in &manifest.block_hashes {
let block = self.get_block(hash)?;
file.write_all(&block)?;
}
Ok(())
}
fn hash_block(data: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(data);
hex::encode(hasher.finalize())
}
}
Integration with MarkBase VFS
Option 1: Standalone DedupS3Store
用户手动创建 DedupS3Store:
# CLI tool
markbase dedup-upload --s3 --s3-endpoint http://localhost:9000 --file /data/large.iso
markbase dedup-download --s3 --manifest-id <uuid> --output /data/restored.iso
Option 2: DedupVfsBackend (VfsBackend trait)
创建 VfsBackend wrapper,自动 dedup:
pub struct DedupS3Backend {
dedup_store: DedupS3Store,
manifest_dir: PathBuf, // Local cache for manifests
}
impl VfsBackend for DedupS3Backend {
fn open_file(&self, path: &Path, flags: &OpenFlags) -> Result<Box<dyn VfsFile>, VfsError> {
// 1. Read manifest from S3
let manifest = self.load_manifest(path)?;
// 2. DedupS3File (read blocks from S3)
Ok(Box::new(DedupS3File::new(self.dedup_store.clone(), manifest)))
}
fn stat(&self, path: &Path) -> Result<VfsStat, VfsError> {
// Read from manifest metadata
let manifest = self.load_manifest(path)?;
Ok(VfsStat {
size: manifest.original_size,
mtime: manifest.mtime,
...
})
}
fn read_dir(&self, path: &Path) -> Result<Vec<VfsDirEntry>, VfsError> {
// List manifests from S3
self.dedup_store.s3vfs.list_objects(&self.manifest_prefix)
}
}
优势:
- ✅ 透明 dedup(用户无需关心)
- ✅ 与 SMB/WebDAV/SFTP 无缝集成
Option 3: Hybrid (LocalFs + DedupS3Store)
pub struct HybridDedupBackend {
local: LocalFs, // Small files (<1MB) 存本地
dedup_s3: DedupS3Store, // Large files (>1MB) dedup to S3
}
impl VfsBackend for HybridDedupBackend {
fn open_file(&self, path: &Path, flags: &OpenFlags) -> Result<Box<dyn VfsFile>, VfsError> {
// Check file size
let stat = self.local.stat(path)?;
if stat.size < self.dedup_s3.config.min_file_size {
// Small file: direct LocalFs
self.local.open_file(path, flags)
} else {
// Large file: dedup to S3
self.dedup_s3.dedup_file(path)?;
self.dedup_s3.open_file_from_manifest(path)
}
}
}
推荐:Option 1(Phase 1),Option 3(Phase 2)。
Performance Considerations
Network Latency
| Operation | LocalFs | S3Vfs | Overhead |
|---|---|---|---|
| store_block (4KB) | ~0.1ms | ~5-10ms (HTTP) | ~50-100x |
| get_block (4KB) | ~0.1ms | ~5-10ms (HTTP) | ~50-100x |
| dedup_file (100MB) | ~2s (25MB/s) | ~10s (10MB/s) | ~5x |
缓解方案:
- ✅ Async concurrent upload(4-8 并发)
- ✅ ReadCache(64MB cache)
- ✅ Local cache for hot blocks
Dedup Ratio Impact
| File Type | Dedup Ratio | Network Traffic Saved |
|---|---|---|
| VM images (similar OS) | ~80% | -80% upload bandwidth |
| Log files (daily) | ~60% | -60% upload bandwidth |
| Unique files (photos) | ~5% | -5% upload bandwidth |
Next Steps
-
Phase 1 Implementation (~300 lines)
DedupS3Storestructstore_block()/get_block()via S3Vfsincrement_ref()with metadata update
-
Phase 2 CLI Integration (~100 lines)
markbase dedup-upload --s3markbase dedup-download --manifest-id
-
Phase 3 Performance Test
- Benchmark dedup_file (100MB)
- Compare LocalFs vs S3Vfs
Open Questions
- Concurrency: Accept non-atomic ref count vs implement versioning?
- Backend choice: Standalone CLI vs VfsBackend integration?
- Min versioning: Should we require MinIO versioning enabled?
- Ref count object: Metadata vs separate object?
- Block cache: Should we cache blocks locally?
文档创建: 2026-06-25 最后更新: 2026-06-25