Implement Write/Read Cache (Phase 3)
Some checks failed
Test / test (push) Has been cancelled
Test / build (push) Has been cancelled

This commit is contained in:
Warren
2026-06-22 04:42:55 +08:00
parent 9ae0402318
commit ebe976eee4
2 changed files with 335 additions and 0 deletions

View File

@@ -0,0 +1,334 @@
use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};
use bytes::Bytes;
const DEFAULT_READ_CACHE_SIZE: usize = 64 * 1024 * 1024; // 64MB
const DEFAULT_READ_CACHE_BLOCK_SIZE: usize = 64 * 1024; // 64KB blocks
const DEFAULT_WRITE_CACHE_SIZE: usize = 32 * 1024 * 1024; // 32MB
const DEFAULT_CACHE_TTL_SECS: u64 = 300; // 5 minutes
#[derive(Clone)]
pub struct CacheConfig {
pub read_cache_size: usize,
pub read_cache_block_size: usize,
pub write_cache_size: usize,
pub cache_ttl_secs: u64,
}
impl Default for CacheConfig {
fn default() -> Self {
Self {
read_cache_size: DEFAULT_READ_CACHE_SIZE,
read_cache_block_size: DEFAULT_READ_CACHE_BLOCK_SIZE,
write_cache_size: DEFAULT_WRITE_CACHE_SIZE,
cache_ttl_secs: DEFAULT_CACHE_TTL_SECS,
}
}
}
pub struct ReadCache {
config: CacheConfig,
blocks: RwLock<HashMap<u64, CachedBlock>>,
total_size: RwLock<usize>,
last_access: RwLock<Instant>,
}
struct CachedBlock {
data: Bytes,
timestamp: Instant,
}
impl ReadCache {
pub fn new(config: CacheConfig) -> Self {
Self {
config,
blocks: RwLock::new(HashMap::new()),
total_size: RwLock::new(0),
last_access: RwLock::new(Instant::now()),
}
}
pub fn get(&self, block_offset: u64) -> Option<Bytes> {
let blocks = self.blocks.read().unwrap();
if let Some(block) = blocks.get(&block_offset) {
if block.timestamp.elapsed() < Duration::from_secs(self.config.cache_ttl_secs) {
*self.last_access.write().unwrap() = Instant::now();
return Some(block.data.clone());
}
}
None
}
pub fn put(&self, block_offset: u64, data: Bytes) {
let mut blocks = self.blocks.write().unwrap();
let mut total_size = self.total_size.write().unwrap();
// Evict old blocks if cache is full
while *total_size + data.len() > self.config.read_cache_size && !blocks.is_empty() {
let oldest_key = blocks
.iter()
.min_by_key(|(_, b)| b.timestamp)
.map(|(k, _)| *k);
if let Some(key) = oldest_key {
if let Some(removed) = blocks.remove(&key) {
*total_size -= removed.data.len();
}
}
}
// Insert new block
if *total_size + data.len() <= self.config.read_cache_size {
let data_clone = data.clone();
blocks.insert(
block_offset,
CachedBlock {
data: data_clone,
timestamp: Instant::now(),
},
);
*total_size += data.len();
}
}
pub fn invalidate(&self, block_offset: u64) {
let mut blocks = self.blocks.write().unwrap();
let mut total_size = self.total_size.write().unwrap();
if let Some(removed) = blocks.remove(&block_offset) {
*total_size -= removed.data.len();
}
}
pub fn invalidate_all(&self) {
let mut blocks = self.blocks.write().unwrap();
blocks.clear();
*self.total_size.write().unwrap() = 0;
}
pub fn stats(&self) -> CacheStats {
let blocks = self.blocks.read().unwrap();
CacheStats {
block_count: blocks.len(),
total_size: *self.total_size.read().unwrap(),
max_size: self.config.read_cache_size,
}
}
pub fn block_offset(offset: u64, block_size: usize) -> u64 {
offset / block_size as u64
}
pub fn block_range(offset: u64, len: u32, block_size: usize) -> Vec<u64> {
let start_block = Self::block_offset(offset, block_size);
let end_block = Self::block_offset(offset + len as u64 - 1, block_size);
(start_block..=end_block).collect()
}
}
pub struct WriteCache {
config: CacheConfig,
pending_writes: Mutex<HashMap<u64, Vec<u8>>>,
total_size: RwLock<usize>,
}
impl WriteCache {
pub fn new(config: CacheConfig) -> Self {
Self {
config,
pending_writes: Mutex::new(HashMap::new()),
total_size: RwLock::new(0),
}
}
pub fn put(&self, offset: u64, data: &[u8]) -> bool {
let mut pending = self.pending_writes.lock().unwrap();
let mut total_size = self.total_size.write().unwrap();
// Check if we have space
if *total_size + data.len() > self.config.write_cache_size {
return false;
}
// Merge with existing write at same offset
if let Some(existing) = pending.get_mut(&offset) {
// Overwrite overlapping range
for (i, byte) in data.iter().enumerate() {
if i < existing.len() {
existing[i] = *byte;
} else {
existing.push(*byte);
}
}
} else {
pending.insert(offset, data.to_vec());
*total_size += data.len();
}
true
}
pub fn get_pending(&self) -> Vec<(u64, Vec<u8>)> {
let pending = self.pending_writes.lock().unwrap();
pending.iter().map(|(k, v)| (*k, v.clone())).collect()
}
pub fn clear(&self) {
let mut pending = self.pending_writes.lock().unwrap();
pending.clear();
*self.total_size.write().unwrap() = 0;
}
pub fn stats(&self) -> CacheStats {
CacheStats {
block_count: self.pending_writes.lock().unwrap().len(),
total_size: *self.total_size.read().unwrap(),
max_size: self.config.write_cache_size,
}
}
}
pub struct CacheStats {
pub block_count: usize,
pub total_size: usize,
pub max_size: usize,
}
pub struct FileManager {
read_cache: Arc<ReadCache>,
write_cache: Arc<WriteCache>,
}
impl FileManager {
pub fn new(config: CacheConfig) -> Self {
Self {
read_cache: Arc::new(ReadCache::new(config.clone())),
write_cache: Arc::new(WriteCache::new(config)),
}
}
pub fn read_cache(&self) -> Arc<ReadCache> {
self.read_cache.clone()
}
pub fn write_cache(&self) -> Arc<WriteCache> {
self.write_cache.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_read_cache_basic() {
let config = CacheConfig::default();
let cache = ReadCache::new(config);
let block_offset = 0;
let data1 = Bytes::from(vec![1, 2, 3, 4, 5]);
cache.put(block_offset, data1.clone());
let cached = cache.get(block_offset);
assert!(cached.is_some());
assert_eq!(cached.unwrap(), data1);
}
#[test]
fn test_read_cache_ttl_expiry() {
let config = CacheConfig {
cache_ttl_secs: 1,
..Default::default()
};
let cache = ReadCache::new(config);
let block_offset = 0;
let data1 = Bytes::from(vec![1, 2, 3, 4, 5]);
cache.put(block_offset, data1.clone());
// Should be available immediately
assert!(cache.get(block_offset).is_some());
// Wait for TTL
std::thread::sleep(Duration::from_secs(2));
// Should be expired
assert!(cache.get(block_offset).is_none());
}
#[test]
fn test_read_cache_eviction() {
let config = CacheConfig {
read_cache_size: 100,
..Default::default()
};
let cache = ReadCache::new(config);
// Fill cache
cache.put(0, Bytes::from(vec![0u8; 50]));
cache.put(1, Bytes::from(vec![1u8; 50]));
// Should evict oldest block
cache.put(2, Bytes::from(vec![2u8; 50]));
// Block 0 should be evicted
assert!(cache.get(0).is_none());
assert!(cache.get(1).is_some());
assert!(cache.get(2).is_some());
}
#[test]
fn test_write_cache_basic() {
let config = CacheConfig::default();
let cache = WriteCache::new(config);
let data1 = vec![1, 2, 3, 4, 5];
cache.put(0, &data1);
let pending = cache.get_pending();
assert_eq!(pending.len(), 1);
assert_eq!(pending[0], (0, data1));
}
#[test]
fn test_write_cache_merge() {
let config = CacheConfig::default();
let cache = WriteCache::new(config);
cache.put(0, &[1, 2, 3]);
cache.put(0, &[4, 5, 6, 7, 8]);
let pending = cache.get_pending();
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].1, vec![4, 5, 6, 7, 8]);
}
#[test]
fn test_write_cache_full() {
let config = CacheConfig {
write_cache_size: 10,
..Default::default()
};
let cache = WriteCache::new(config);
assert!(cache.put(0, &[1, 2, 3, 4, 5]));
assert!(cache.put(5, &[6, 7, 8, 9, 10]));
assert!(!cache.put(10, &[11, 12])); // Cache full
}
#[test]
fn test_block_range() {
let block_size = 1024;
let range = ReadCache::block_range(0, 512, block_size);
assert_eq!(range, vec![0]);
let range = ReadCache::block_range(0, 2048, block_size);
assert_eq!(range, vec![0, 1]);
let range = ReadCache::block_range(1024, 512, block_size);
assert_eq!(range, vec![1]);
}
}

View File

@@ -1,3 +1,4 @@
pub mod cache;
pub mod compression;
pub mod dedup;
pub mod local_fs;