Files
markbase/docs/DEDUP_S3_COMBINATION.md

563 lines
18 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# DedupFs + S3Vfs Combination Design
**Date**: 2026-06-25
**Status**: Design proposal
**Goal**: Distributed deduplication storage via MinIO/S3 backend
---
## Executive Summary
### Current State
**DedupStore**`dedup.rs`, 224 行):
- 基于**本地文件系统**的 dedup 存储
- SHA-256 块哈希 + 引用计数
- 块存储到本地目录(`store_path/.dedup/`
**问题**
- ❌ 无法跨节点共享 dedup 块
- ❌ 无分布式容错能力
- ❌ 单节点存储限制
### Proposed Solution
**DedupS3Store**
- 块存储到 **MinIO/S3** 对象(跨节点共享)
- 引用计数存储到 S3 object metadata
- Manifest 存储到 S3 对象JSON 格式)
**优势**
- ✅ 跨节点 dedup 共享MinIO 分布式)
- ✅ 自动容错MinIO erasure coding
- ✅ 无单节点限制MinIO 可扩展)
- ✅ 与现有 S3Vfs 集成(无需新 HTTP API
---
## Architecture
```
┌─────────────────────────────────────────────────────────────────────────┐
│ MarkBase Node A │
│ ├── DedupS3Store │
│ │ ├── store_block() → S3 PUT <hash> │
│ │ ├── get_block() → S3 GET <hash> │
│ │ └── dedup_file() → 分块 + S3 PUT + manifest │
│ └───────────────────────────────────────────────────────────────────────┘
│ ↓ │
┌─────────────────────────────────────────────────────────────────────────┐
│ MinIO Cluster (S3-compatible) │
│ ├── Bucket: markbase-dedup │
│ │ ├── Objects: <sha256-hash> (dedup 块) │
│ │ ├── Metadata: x-amz-meta-ref-count (引用计数) │
│ │ └── Manifests: manifests/<file-id>.json │
│ │ │
│ ├── Erasure Coding: EC:2 (自动容错) │
│ ├── Replication: Node A → Node B (DR) │
│ └─────────────────────────────────────────────────────────────────────┘
│ ↓ │
┌─────────────────────────────────────────────────────────────────────────┐
│ MarkBase Node B │
│ ├── DedupS3Store │
│ │ ├── get_block() → S3 GET <hash> (共享 Node A 的块) │
│ │ └── restore_file() → S3 GET manifest + S3 GET blocks │
│ └─────────────────────────────────────────────────────────────────────┘
```
---
## Implementation Design
### DedupS3Store Struct
```rust
pub struct DedupS3Store {
s3vfs: S3Vfs, // S3 backend
bucket: String, // Bucket name (markbase-dedup)
block_prefix: String, // Object key prefix (blocks/)
manifest_prefix: String, // Manifest prefix (manifests/)
config: VfsDedupConfig, // block_size, min_file_size
}
pub struct DedupManifest {
original_size: usize,
block_hashes: Vec<String>,
dedup_ratio: f64,
file_id: String, // UUID for manifest storage
}
```
### Core Methods
| Method | Current (LocalFs) | Proposed (S3Vfs) |
|--------|------------------|------------------|
| `store_block(data)` | `std::fs::write(store_path/hash, data)` | `S3Vfs.put_object(blocks/hash, data)` |
| `get_block(hash)` | `std::fs::read(store_path/hash)` | `S3Vfs.get_object(blocks/hash)` |
| `increment_ref(hash)` | `std::fs::write(hash.ref, count)` | `S3Vfs.put_object(blocks/hash, data) + metadata update` |
| `decrement_ref(hash)` | `std::fs::write/remove` | `S3Vfs.delete_object + metadata check` |
| `dedup_file(source)` | Local file read + block store | Local file read + S3 PUT blocks |
| `restore_file(manifest)` | Local file write + block read | Local file write + S3 GET blocks |
| `get_ref_count(hash)` | `std::fs::read(hash.ref)` | `S3Vfs.head_object(blocks/hash) → metadata` |
---
## S3 Object Layout
```
Bucket: markbase-dedup
├── blocks/
│ ├── <sha256-hash-1> # Dedup 块4KB
│ │ └── Metadata: x-amz-meta-ref-count: 5
│ ├── <sha256-hash-2>
│ │ └── Metadata: x-amz-meta-ref-count: 2
│ └── ...
├── manifests/
│ ├── <file-id-1>.json # Manifest JSON
│ │ └── Content: {"original_size": 1024, "block_hashes": [...], ...}
│ ├── <file-id-2>.json
│ └── ...
└── stats.json # DedupStats可选
```
---
## Reference Count Management
### Challenge
S3 对象不支持 atomic increment/decrement 操作。
### Solution 1: Metadata Update (推荐 ⭐⭐⭐⭐⭐)
**流程**
```rust
fn increment_ref(&self, hash: &str) -> Result<(), VfsError> {
// 1. GET current metadata
let head = self.s3vfs.head_object(&format!("blocks/{}", hash))?;
let current_ref = head.metadata.get("x-amz-meta-ref-count")
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(0);
// 2. PUT with updated metadata
let block_data = self.s3vfs.get_object(&format!("blocks/{}", hash))?;
self.s3vfs.put_object_with_metadata(
&format!("blocks/{}", hash),
&block_data,
[("x-amz-meta-ref-count", (current_ref + 1).to_string())]
)?;
Ok(())
}
```
**优势**
- ✅ 简单实现
- ✅ 与 S3 标准兼容
- ⚠️ 需要两次请求GET + PUT
**劣势**
- ⚠️ 非原子操作(并发问题)
- ⚠️ 需要读取块数据PUT 需要 body
---
### Solution 2: Separate Ref Count Object
**流程**
```rust
fn increment_ref(&self, hash: &str) -> Result<(), VfsError> {
// 1. GET ref count object
let ref_key = format!("refs/{}/count", hash);
let current = self.s3vfs.get_object(&ref_key)
.and_then(|data| data.parse::<u64>())
.unwrap_or(0);
// 2. PUT updated ref count
self.s3vfs.put_object(&ref_key, (current + 1).to_string())?;
Ok(())
}
```
**优势**
- ✅ 无需读取块数据
- ✅ 更小的对象(仅数字)
**劣势**
- ⚠️ 需要额外对象存储
- ⚠️ 非原子操作(并发问题)
---
### Solution 3: MinIO Extended API (企业版)
MinIO 企业版提供 `mc admin bucket policy` 和 object locking API。
**优势**
- ✅ 可能提供 atomic operation
**劣势**
- ⚠️ 仅 MinIO 企业版
- ⚠️ 需要研究具体 API
---
## Concurrency Problem
### Scenario
Node A 和 Node B 同时 dedup 相同文件:
1. Node A: `increment_ref(hash-abc)` → GET count=2 → PUT count=3
2. Node B: `increment_ref(hash-abc)` → GET count=2 → PUT count=3
3. 结果count=3错误应为 count=4
### Solution 1: Optimistic Locking
使用 S3 versioning 检测冲突:
```rust
fn increment_ref(&self, hash: &str) -> Result<(), VfsError> {
loop {
// 1. GET current version + metadata
let (version_id, current_ref) = self.get_ref_with_version(hash)?;
// 2. PUT with version check
let result = self.s3vfs.put_object_if_version(
&format!("blocks/{}", hash),
block_data,
(current_ref + 1),
version_id // Only succeed if version unchanged
);
if result.is_ok() {
break;
}
// Retry if version mismatch
}
Ok(())
}
```
**要求**MinIO versioning enabled。
---
### Solution 2: Distributed Lock Service
使用外部分布式锁(如 Redis/Zookeeper
```rust
fn increment_ref(&self, hash: &str) -> Result<(), VfsError> {
// 1. Acquire distributed lock
let lock = self.lock_service.acquire(&format!("lock:{}", hash))?;
// 2. Increment ref count
self.update_ref_count(hash)?;
// 3. Release lock
lock.release();
Ok(())
}
```
**劣势**需要额外服务Redis
---
### Solution 3: Accept Non-Atomic (简化方案)
对于 MarkBase Lightweight 定位:
- ⚠️ 接受非原子操作风险
- ⚠️ 偶尔 ref count 不准确(不影响数据完整性)
- ⚠️ 定期修复scrub job
**推荐**Phase 1 使用 Solution 1Metadata UpdatePhase 2 研究 MinIO versioning。
---
## Implementation Phases
| Phase | Task | Code Lines | Priority | Risk |
|-------|------|------------|----------|------|
| **Phase 1** | DedupS3Store struct + basic I/O | ~300 | P0 | Medium |
| **Phase 2** | Reference count metadata | ~100 | P0 | Medium |
| **Phase 3** | Manifest storage to S3 | ~50 | P1 | Low |
| **Phase 4** | CLI integration | ~100 | P1 | Low |
| **Phase 5** | Async version (DedupAsyncS3Store) | ~200 | P2 | High |
| **Phase 6** | Concurrency fix (versioning) | ~150 | P2 | High |
| **Phase 7** | Performance benchmark | ~100 | P2 | Low |
| **Total** | | **~1000** | | |
---
## DedupS3Store Implementation (Phase 1 Draft)
```rust
use super::s3_fs::S3Vfs;
use super::{VfsDedupConfig, VfsError};
use sha2::{Sha256, Digest};
use std::path::Path;
pub struct DedupS3Store {
s3vfs: S3Vfs,
bucket: String,
block_prefix: String,
manifest_prefix: String,
config: VfsDedupConfig,
}
impl DedupS3Store {
pub fn new(
endpoint: &str,
region: &str,
bucket: &str,
access_key: &str,
secret_key: &str,
config: VfsDedupConfig,
) -> Result<Self, VfsError> {
let s3vfs = S3Vfs::new(endpoint, region, bucket, access_key, secret_key)?;
Ok(Self {
s3vfs,
bucket: bucket.to_string(),
block_prefix: "blocks/".to_string(),
manifest_prefix: "manifests/".to_string(),
config,
})
}
pub fn store_block(&self, data: &[u8]) -> Result<String, VfsError> {
if data.len() > self.config.block_size {
return Err(VfsError::Io(format!("Block size exceeds limit")));
}
let hash = Self::hash_block(data);
let key = format!("{}{}", self.block_prefix, hash);
// Check if block exists
if !self.s3vfs.object_exists(&key)? {
// PUT with initial ref count = 1
self.s3vfs.put_object_with_metadata(
&key,
data,
[("x-amz-meta-ref-count", "1")]
)?;
} else {
// Increment ref count
self.increment_ref(&hash)?;
}
Ok(hash)
}
pub fn get_block(&self, hash: &str) -> Result<Vec<u8>, VfsError> {
let key = format!("{}{}", self.block_prefix, hash);
self.s3vfs.get_object(&key)
}
pub fn increment_ref(&self, hash: &str) -> Result<(), VfsError> {
let key = format!("{}{}", self.block_prefix, hash);
let head = self.s3vfs.head_object(&key)?;
let current_ref = head.metadata
.get("x-amz-meta-ref-count")
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(1);
// Need to GET block data + PUT with new metadata
let block_data = self.get_block(hash)?;
self.s3vfs.put_object_with_metadata(
&key,
&block_data,
[("x-amz-meta-ref-count", (current_ref + 1).to_string())]
)?;
Ok(())
}
pub fn dedup_file(&self, source: &Path) -> Result<DedupManifest, VfsError> {
let mut file = std::fs::File::open(source)?;
let mut manifest = DedupManifest::new();
let mut buffer = vec![0u8; self.config.block_size];
loop {
let n = file.read(&mut buffer)?;
if n == 0 { break; }
manifest.original_size += n;
let hash = self.store_block(&buffer[..n])?;
manifest.block_hashes.push(hash);
}
// Store manifest to S3
let file_id = uuid::Uuid::new_v4().to_string();
manifest.file_id = file_id;
let manifest_key = format!("{}{}.json", self.manifest_prefix, file_id);
let manifest_json = serde_json::to_string(&manifest)?;
self.s3vfs.put_object(&manifest_key, manifest_json.as_bytes())?;
Ok(manifest)
}
pub fn restore_file(&self, manifest_id: &str, target: &Path) -> Result<(), VfsError> {
let manifest_key = format!("{}{}.json", self.manifest_prefix, manifest_id);
let manifest_json = self.s3vfs.get_object(&manifest_key)?;
let manifest: DedupManifest = serde_json::from_slice(&manifest_json)?;
let mut file = std::fs::File::create(target)?;
for hash in &manifest.block_hashes {
let block = self.get_block(hash)?;
file.write_all(&block)?;
}
Ok(())
}
fn hash_block(data: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(data);
hex::encode(hasher.finalize())
}
}
```
---
## Integration with MarkBase VFS
### Option 1: Standalone DedupS3Store
用户手动创建 DedupS3Store
```bash
# CLI tool
markbase dedup-upload --s3 --s3-endpoint http://localhost:9000 --file /data/large.iso
markbase dedup-download --s3 --manifest-id <uuid> --output /data/restored.iso
```
---
### Option 2: DedupVfsBackend (VfsBackend trait)
创建 VfsBackend wrapper自动 dedup
```rust
pub struct DedupS3Backend {
dedup_store: DedupS3Store,
manifest_dir: PathBuf, // Local cache for manifests
}
impl VfsBackend for DedupS3Backend {
fn open_file(&self, path: &Path, flags: &OpenFlags) -> Result<Box<dyn VfsFile>, VfsError> {
// 1. Read manifest from S3
let manifest = self.load_manifest(path)?;
// 2. DedupS3File (read blocks from S3)
Ok(Box::new(DedupS3File::new(self.dedup_store.clone(), manifest)))
}
fn stat(&self, path: &Path) -> Result<VfsStat, VfsError> {
// Read from manifest metadata
let manifest = self.load_manifest(path)?;
Ok(VfsStat {
size: manifest.original_size,
mtime: manifest.mtime,
...
})
}
fn read_dir(&self, path: &Path) -> Result<Vec<VfsDirEntry>, VfsError> {
// List manifests from S3
self.dedup_store.s3vfs.list_objects(&self.manifest_prefix)
}
}
```
**优势**
- ✅ 透明 dedup用户无需关心
- ✅ 与 SMB/WebDAV/SFTP 无缝集成
---
### Option 3: Hybrid (LocalFs + DedupS3Store)
```rust
pub struct HybridDedupBackend {
local: LocalFs, // Small files (<1MB) 存本地
dedup_s3: DedupS3Store, // Large files (>1MB) dedup to S3
}
impl VfsBackend for HybridDedupBackend {
fn open_file(&self, path: &Path, flags: &OpenFlags) -> Result<Box<dyn VfsFile>, VfsError> {
// Check file size
let stat = self.local.stat(path)?;
if stat.size < self.dedup_s3.config.min_file_size {
// Small file: direct LocalFs
self.local.open_file(path, flags)
} else {
// Large file: dedup to S3
self.dedup_s3.dedup_file(path)?;
self.dedup_s3.open_file_from_manifest(path)
}
}
}
```
**推荐**Option 1Phase 1Option 3Phase 2
---
## Performance Considerations
### Network Latency
| Operation | LocalFs | S3Vfs | Overhead |
|-----------|---------|-------|----------|
| store_block (4KB) | ~0.1ms | ~5-10ms (HTTP) | ~50-100x |
| get_block (4KB) | ~0.1ms | ~5-10ms (HTTP) | ~50-100x |
| dedup_file (100MB) | ~2s (25MB/s) | ~10s (10MB/s) | ~5x |
**缓解方案**
- ✅ Async concurrent upload4-8 并发)
- ✅ ReadCache64MB cache
- ✅ Local cache for hot blocks
---
### Dedup Ratio Impact
| File Type | Dedup Ratio | Network Traffic Saved |
|-----------|-------------|----------------------|
| VM images (similar OS) | ~80% | -80% upload bandwidth |
| Log files (daily) | ~60% | -60% upload bandwidth |
| Unique files (photos) | ~5% | -5% upload bandwidth |
---
## Next Steps
1. **Phase 1 Implementation** (~300 lines)
- `DedupS3Store` struct
- `store_block()` / `get_block()` via S3Vfs
- `increment_ref()` with metadata update
2. **Phase 2 CLI Integration** (~100 lines)
- `markbase dedup-upload --s3`
- `markbase dedup-download --manifest-id`
3. **Phase 3 Performance Test**
- Benchmark dedup_file (100MB)
- Compare LocalFs vs S3Vfs
---
## Open Questions
1. **Concurrency**: Accept non-atomic ref count vs implement versioning?
2. **Backend choice**: Standalone CLI vs VfsBackend integration?
3. **Min versioning**: Should we require MinIO versioning enabled?
4. **Ref count object**: Metadata vs separate object?
5. **Block cache**: Should we cache blocks locally?
---
**文档创建**: 2026-06-25
**最后更新**: 2026-06-25