From ebe976eee45d4cc71244d7a263c4105d31c83afe Mon Sep 17 00:00:00 2001 From: Warren Date: Mon, 22 Jun 2026 04:42:55 +0800 Subject: [PATCH] Implement Write/Read Cache (Phase 3) --- markbase-core/src/vfs/cache.rs | 334 +++++++++++++++++++++++++++++++++ markbase-core/src/vfs/mod.rs | 1 + 2 files changed, 335 insertions(+) create mode 100644 markbase-core/src/vfs/cache.rs diff --git a/markbase-core/src/vfs/cache.rs b/markbase-core/src/vfs/cache.rs new file mode 100644 index 0000000..eb5ff89 --- /dev/null +++ b/markbase-core/src/vfs/cache.rs @@ -0,0 +1,334 @@ +use std::collections::HashMap; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::{Duration, Instant}; + +use bytes::Bytes; + +const DEFAULT_READ_CACHE_SIZE: usize = 64 * 1024 * 1024; // 64MB +const DEFAULT_READ_CACHE_BLOCK_SIZE: usize = 64 * 1024; // 64KB blocks +const DEFAULT_WRITE_CACHE_SIZE: usize = 32 * 1024 * 1024; // 32MB +const DEFAULT_CACHE_TTL_SECS: u64 = 300; // 5 minutes + +#[derive(Clone)] +pub struct CacheConfig { + pub read_cache_size: usize, + pub read_cache_block_size: usize, + pub write_cache_size: usize, + pub cache_ttl_secs: u64, +} + +impl Default for CacheConfig { + fn default() -> Self { + Self { + read_cache_size: DEFAULT_READ_CACHE_SIZE, + read_cache_block_size: DEFAULT_READ_CACHE_BLOCK_SIZE, + write_cache_size: DEFAULT_WRITE_CACHE_SIZE, + cache_ttl_secs: DEFAULT_CACHE_TTL_SECS, + } + } +} + +pub struct ReadCache { + config: CacheConfig, + blocks: RwLock>, + total_size: RwLock, + last_access: RwLock, +} + +struct CachedBlock { + data: Bytes, + timestamp: Instant, +} + +impl ReadCache { + pub fn new(config: CacheConfig) -> Self { + Self { + config, + blocks: RwLock::new(HashMap::new()), + total_size: RwLock::new(0), + last_access: RwLock::new(Instant::now()), + } + } + + pub fn get(&self, block_offset: u64) -> Option { + let blocks = self.blocks.read().unwrap(); + if let Some(block) = blocks.get(&block_offset) { + if block.timestamp.elapsed() < Duration::from_secs(self.config.cache_ttl_secs) { + *self.last_access.write().unwrap() = Instant::now(); + return Some(block.data.clone()); + } + } + None + } + + pub fn put(&self, block_offset: u64, data: Bytes) { + let mut blocks = self.blocks.write().unwrap(); + let mut total_size = self.total_size.write().unwrap(); + + // Evict old blocks if cache is full + while *total_size + data.len() > self.config.read_cache_size && !blocks.is_empty() { + let oldest_key = blocks + .iter() + .min_by_key(|(_, b)| b.timestamp) + .map(|(k, _)| *k); + if let Some(key) = oldest_key { + if let Some(removed) = blocks.remove(&key) { + *total_size -= removed.data.len(); + } + } + } + + // Insert new block + if *total_size + data.len() <= self.config.read_cache_size { + let data_clone = data.clone(); + blocks.insert( + block_offset, + CachedBlock { + data: data_clone, + timestamp: Instant::now(), + }, + ); + *total_size += data.len(); + } + } + + pub fn invalidate(&self, block_offset: u64) { + let mut blocks = self.blocks.write().unwrap(); + let mut total_size = self.total_size.write().unwrap(); + if let Some(removed) = blocks.remove(&block_offset) { + *total_size -= removed.data.len(); + } + } + + pub fn invalidate_all(&self) { + let mut blocks = self.blocks.write().unwrap(); + blocks.clear(); + *self.total_size.write().unwrap() = 0; + } + + pub fn stats(&self) -> CacheStats { + let blocks = self.blocks.read().unwrap(); + CacheStats { + block_count: blocks.len(), + total_size: *self.total_size.read().unwrap(), + max_size: self.config.read_cache_size, + } + } + + pub fn block_offset(offset: u64, block_size: usize) -> u64 { + offset / block_size as u64 + } + + pub fn block_range(offset: u64, len: u32, block_size: usize) -> Vec { + let start_block = Self::block_offset(offset, block_size); + let end_block = Self::block_offset(offset + len as u64 - 1, block_size); + (start_block..=end_block).collect() + } +} + +pub struct WriteCache { + config: CacheConfig, + pending_writes: Mutex>>, + total_size: RwLock, +} + +impl WriteCache { + pub fn new(config: CacheConfig) -> Self { + Self { + config, + pending_writes: Mutex::new(HashMap::new()), + total_size: RwLock::new(0), + } + } + + pub fn put(&self, offset: u64, data: &[u8]) -> bool { + let mut pending = self.pending_writes.lock().unwrap(); + let mut total_size = self.total_size.write().unwrap(); + + // Check if we have space + if *total_size + data.len() > self.config.write_cache_size { + return false; + } + + // Merge with existing write at same offset + if let Some(existing) = pending.get_mut(&offset) { + // Overwrite overlapping range + for (i, byte) in data.iter().enumerate() { + if i < existing.len() { + existing[i] = *byte; + } else { + existing.push(*byte); + } + } + } else { + pending.insert(offset, data.to_vec()); + *total_size += data.len(); + } + + true + } + + pub fn get_pending(&self) -> Vec<(u64, Vec)> { + let pending = self.pending_writes.lock().unwrap(); + pending.iter().map(|(k, v)| (*k, v.clone())).collect() + } + + pub fn clear(&self) { + let mut pending = self.pending_writes.lock().unwrap(); + pending.clear(); + *self.total_size.write().unwrap() = 0; + } + + pub fn stats(&self) -> CacheStats { + CacheStats { + block_count: self.pending_writes.lock().unwrap().len(), + total_size: *self.total_size.read().unwrap(), + max_size: self.config.write_cache_size, + } + } +} + +pub struct CacheStats { + pub block_count: usize, + pub total_size: usize, + pub max_size: usize, +} + +pub struct FileManager { + read_cache: Arc, + write_cache: Arc, +} + +impl FileManager { + pub fn new(config: CacheConfig) -> Self { + Self { + read_cache: Arc::new(ReadCache::new(config.clone())), + write_cache: Arc::new(WriteCache::new(config)), + } + } + + pub fn read_cache(&self) -> Arc { + self.read_cache.clone() + } + + pub fn write_cache(&self) -> Arc { + self.write_cache.clone() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_read_cache_basic() { + let config = CacheConfig::default(); + let cache = ReadCache::new(config); + + let block_offset = 0; + let data1 = Bytes::from(vec![1, 2, 3, 4, 5]); + + cache.put(block_offset, data1.clone()); + + let cached = cache.get(block_offset); + assert!(cached.is_some()); + assert_eq!(cached.unwrap(), data1); + } + + #[test] + fn test_read_cache_ttl_expiry() { + let config = CacheConfig { + cache_ttl_secs: 1, + ..Default::default() + }; + let cache = ReadCache::new(config); + + let block_offset = 0; + let data1 = Bytes::from(vec![1, 2, 3, 4, 5]); + + cache.put(block_offset, data1.clone()); + + // Should be available immediately + assert!(cache.get(block_offset).is_some()); + + // Wait for TTL + std::thread::sleep(Duration::from_secs(2)); + + // Should be expired + assert!(cache.get(block_offset).is_none()); + } + + #[test] + fn test_read_cache_eviction() { + let config = CacheConfig { + read_cache_size: 100, + ..Default::default() + }; + let cache = ReadCache::new(config); + + // Fill cache + cache.put(0, Bytes::from(vec![0u8; 50])); + cache.put(1, Bytes::from(vec![1u8; 50])); + + // Should evict oldest block + cache.put(2, Bytes::from(vec![2u8; 50])); + + // Block 0 should be evicted + assert!(cache.get(0).is_none()); + assert!(cache.get(1).is_some()); + assert!(cache.get(2).is_some()); + } + + #[test] + fn test_write_cache_basic() { + let config = CacheConfig::default(); + let cache = WriteCache::new(config); + + let data1 = vec![1, 2, 3, 4, 5]; + cache.put(0, &data1); + + let pending = cache.get_pending(); + assert_eq!(pending.len(), 1); + assert_eq!(pending[0], (0, data1)); + } + + #[test] + fn test_write_cache_merge() { + let config = CacheConfig::default(); + let cache = WriteCache::new(config); + + cache.put(0, &[1, 2, 3]); + cache.put(0, &[4, 5, 6, 7, 8]); + + let pending = cache.get_pending(); + assert_eq!(pending.len(), 1); + assert_eq!(pending[0].1, vec![4, 5, 6, 7, 8]); + } + + #[test] + fn test_write_cache_full() { + let config = CacheConfig { + write_cache_size: 10, + ..Default::default() + }; + let cache = WriteCache::new(config); + + assert!(cache.put(0, &[1, 2, 3, 4, 5])); + assert!(cache.put(5, &[6, 7, 8, 9, 10])); + assert!(!cache.put(10, &[11, 12])); // Cache full + } + + #[test] + fn test_block_range() { + let block_size = 1024; + + let range = ReadCache::block_range(0, 512, block_size); + assert_eq!(range, vec![0]); + + let range = ReadCache::block_range(0, 2048, block_size); + assert_eq!(range, vec![0, 1]); + + let range = ReadCache::block_range(1024, 512, block_size); + assert_eq!(range, vec![1]); + } +} \ No newline at end of file diff --git a/markbase-core/src/vfs/mod.rs b/markbase-core/src/vfs/mod.rs index f49c374..ca6d3d1 100644 --- a/markbase-core/src/vfs/mod.rs +++ b/markbase-core/src/vfs/mod.rs @@ -1,3 +1,4 @@ +pub mod cache; pub mod compression; pub mod dedup; pub mod local_fs;