From 5c9b51fc494a89a518ada3de8dc2f0c0507ec054 Mon Sep 17 00:00:00 2001 From: Warren Date: Sun, 21 Jun 2026 21:08:48 +0800 Subject: [PATCH] P1: AsyncS3Vfs implementation (Phase 3) - AsyncS3Vfs: spawn_blocking wrapper over S3Vfs - AsyncS3File: tokio::sync::Mutex for async state - Add Clone derive to S3Vfs - All backend methods wrapped with spawn_blocking Phase 3 complete: AsyncS3Vfs working Phase 4 pending: AsyncSmbVfs Phase 5 pending: WebDAV integration Tests: 293 passed, 0 failed --- markbase-core/src/vfs/async_s3_fs.rs | 269 +++++++++++++++++++++++++++ markbase-core/src/vfs/mod.rs | 2 + markbase-core/src/vfs/s3_fs.rs | 23 +++ 3 files changed, 294 insertions(+) create mode 100644 markbase-core/src/vfs/async_s3_fs.rs diff --git a/markbase-core/src/vfs/async_s3_fs.rs b/markbase-core/src/vfs/async_s3_fs.rs new file mode 100644 index 0000000..c4d5df6 --- /dev/null +++ b/markbase-core/src/vfs/async_s3_fs.rs @@ -0,0 +1,269 @@ +use std::path::{Path, PathBuf}; +use std::pin::Pin; +use std::future::Future; +use std::io::{self, SeekFrom, Read, Write}; +use std::sync::Arc; + +use tokio::task::spawn_blocking; +use tokio::sync::Mutex; + +use super::{VfsError, VfsStat, VfsDirEntry, open_flags::OpenFlags, s3_fs::S3Vfs, VfsBackend}; + +/// Async S3 VFS 文件實現(spawn_blocking 包装) +pub struct AsyncS3File { + inner: Arc>, +} + +enum S3FileState { + Read { vfs: S3Vfs, key: String, position: u64, size: u64, data: Vec }, + Write { vfs: S3Vfs, key: String, buffer: Vec }, +} + +impl AsyncS3File { + pub async fn new_read(vfs: S3Vfs, key: String) -> Result { + let stat = spawn_blocking({ + let vfs = vfs.clone(); + let key = key.clone(); + move || VfsBackend::stat(&vfs, &PathBuf::from(&key)) + }).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))??; + + Ok(Self { + inner: Arc::new(Mutex::new(S3FileState::Read { + vfs, key, position: 0, size: stat.size, data: Vec::new(), + })), + }) + } + + pub fn new_write(vfs: S3Vfs, key: String) -> Self { + Self { + inner: Arc::new(Mutex::new(S3FileState::Write { + vfs, key, buffer: Vec::new(), + })), + } + } +} + +impl super::AsyncVfsFile for AsyncS3File { + fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Pin> + Send + 'a>> { + let inner = self.inner.clone(); + let buf_len = buf.len(); + Box::pin(async move { + let mut state = inner.lock().await; + match &mut *state { + S3FileState::Read { vfs, key, position, size, data } => { + if *position >= *size { + return Ok(0); + } + + // Load data if needed + if data.is_empty() { + let vfs_clone = vfs.clone(); + let key_clone = key.clone(); + let loaded_data = spawn_blocking(move || { + let flags = OpenFlags::new().read(); + let mut file = VfsBackend::open_file(&vfs_clone, &PathBuf::from(&key_clone), &flags)?; + let mut buf = Vec::new(); + let mut chunk = [0u8; 8192]; + loop { + match file.read(&mut chunk) { + Ok(0) => break, + Ok(n) => buf.extend_from_slice(&chunk[..n]), + Err(e) => return Err(e), + } + } + Ok(buf) + }).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))??; + *data = loaded_data; + } + + let remaining = *size - *position; + let to_read = buf_len.min(remaining as usize); + let start = *position as usize; + let end = start + to_read; + + buf[..to_read].copy_from_slice(&data[start..end]); + *position += to_read as u64; + Ok(to_read) + } + _ => Err(VfsError::Io("File not open for read".to_string())), + } + }) + } + + fn write<'a>(&'a mut self, buf: &'a [u8]) -> Pin> + Send + 'a>> { + let inner = self.inner.clone(); + let buf_copy = buf.to_vec(); + Box::pin(async move { + let mut state = inner.lock().await; + match &mut *state { + S3FileState::Write { buffer, .. } => { + buffer.extend_from_slice(&buf_copy); + Ok(buf_copy.len()) + } + _ => Err(VfsError::Io("File not open for write".to_string())), + } + }) + } + + fn seek<'a>(&'a mut self, pos: SeekFrom) -> Pin> + Send + 'a>> { + let inner = self.inner.clone(); + Box::pin(async move { + let mut state = inner.lock().await; + match &mut *state { + S3FileState::Read { position, size, .. } => { + let new_pos = match pos { + SeekFrom::Start(offset) => offset, + SeekFrom::Current(offset) => { + ((*position as i64) + offset).max(0) as u64 + } + SeekFrom::End(offset) => { + ((*size as i64) + offset).max(0) as u64 + } + }; + *position = new_pos.min(*size); + Ok(*position) + } + _ => Err(VfsError::Io("File not open for read".to_string())), + } + }) + } + + fn flush<'a>(&'a mut self) -> Pin> + Send + 'a>> { + let inner = self.inner.clone(); + Box::pin(async move { + let mut state = inner.lock().await; + match &mut *state { + S3FileState::Write { vfs, key, buffer } => { + let vfs_clone = vfs.clone(); + let key_clone = key.clone(); + let data = buffer.clone(); + + spawn_blocking(move || { + let flags = OpenFlags::new().write().create().truncate().mode(0o644); + let mut file = VfsBackend::open_file(&vfs_clone, &PathBuf::from(&key_clone), &flags)?; + file.write_all(&data)?; + file.flush()?; + Ok(()) + }).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))? + } + _ => Ok(()), + } + }) + } +} + +/// Async S3 VFS 后端實現(spawn_blocking 包装 S3Vfs) +pub struct AsyncS3Vfs { + inner: S3Vfs, +} + +impl AsyncS3Vfs { + pub fn new( + endpoint: &str, + region: &str, + bucket_name: &str, + access_key: &str, + secret_key: &str, + ) -> Result { + let inner = S3Vfs::new(endpoint, region, bucket_name, access_key, secret_key)?; + Ok(Self { inner }) + } +} + +impl Clone for AsyncS3Vfs { + fn clone(&self) -> Self { + Self { inner: self.inner.clone() } + } +} + +impl super::AsyncVfsBackend for AsyncS3Vfs { + fn clone_boxed(&self) -> Box { + Box::new(self.clone()) + } + + fn read_dir<'a>(&'a self, path: &'a Path) -> Pin, VfsError>> + Send + 'a>> { + let inner = self.inner.clone(); + let path_buf = path.to_path_buf(); + Box::pin(async move { + spawn_blocking(move || { + VfsBackend::read_dir(&inner, &path_buf) + }).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))? + }) + } + + fn open_file<'a>(&'a self, path: &'a Path, flags: &'a OpenFlags) -> Pin, VfsError>> + Send + 'a>> { + let inner = self.inner.clone(); + let path_buf = path.to_path_buf(); + let is_write = flags.write; + Box::pin(async move { + let key = path_buf.to_string_lossy().to_string(); + if is_write { + Ok(Box::new(AsyncS3File::new_write(inner, key)) as Box) + } else { + let file = AsyncS3File::new_read(inner, key).await?; + Ok(Box::new(file) as Box) + } + }) + } + + fn stat<'a>(&'a self, path: &'a Path) -> Pin> + Send + 'a>> { + let inner = self.inner.clone(); + let path_buf = path.to_path_buf(); + Box::pin(async move { + spawn_blocking(move || { + VfsBackend::stat(&inner, &path_buf) + }).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))? + }) + } + + fn create_dir<'a>(&'a self, path: &'a Path, _mode: u32) -> Pin> + Send + 'a>> { + let inner = self.inner.clone(); + let path_buf = path.to_path_buf(); + Box::pin(async move { + spawn_blocking(move || { + VfsBackend::create_dir(&inner, &path_buf, 0o755) + }).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))? + }) + } + + fn remove_dir<'a>(&'a self, path: &'a Path) -> Pin> + Send + 'a>> { + let inner = self.inner.clone(); + let path_buf = path.to_path_buf(); + Box::pin(async move { + spawn_blocking(move || { + VfsBackend::remove_dir(&inner, &path_buf) + }).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))? + }) + } + + fn remove_file<'a>(&'a self, path: &'a Path) -> Pin> + Send + 'a>> { + let inner = self.inner.clone(); + let path_buf = path.to_path_buf(); + Box::pin(async move { + spawn_blocking(move || { + VfsBackend::remove_file(&inner, &path_buf) + }).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))? + }) + } + + fn rename<'a>(&'a self, from: &'a Path, to: &'a Path) -> Pin> + Send + 'a>> { + let inner = self.inner.clone(); + let from_buf = from.to_path_buf(); + let to_buf = to.to_path_buf(); + Box::pin(async move { + spawn_blocking(move || { + VfsBackend::rename(&inner, &from_buf, &to_buf) + }).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))? + }) + } + + fn exists<'a>(&'a self, path: &'a Path) -> Pin + Send + 'a>> { + let inner = self.inner.clone(); + let path_buf = path.to_path_buf(); + Box::pin(async move { + spawn_blocking(move || { + VfsBackend::exists(&inner, &path_buf) + }).await.unwrap_or(false) + }) + } +} \ No newline at end of file diff --git a/markbase-core/src/vfs/mod.rs b/markbase-core/src/vfs/mod.rs index 000f825..eff6d9c 100644 --- a/markbase-core/src/vfs/mod.rs +++ b/markbase-core/src/vfs/mod.rs @@ -10,6 +10,8 @@ pub mod smb_server_backend; pub mod util; #[cfg(feature = "async-vfs")] pub mod async_fs; +#[cfg(feature = "async-vfs")] +pub mod async_s3_fs; use std::path::{Path, PathBuf}; use std::time::SystemTime; diff --git a/markbase-core/src/vfs/s3_fs.rs b/markbase-core/src/vfs/s3_fs.rs index 36e43c6..b9fe803 100644 --- a/markbase-core/src/vfs/s3_fs.rs +++ b/markbase-core/src/vfs/s3_fs.rs @@ -8,6 +8,7 @@ use std::time::{Duration, SystemTime}; use url::Url; /// S3-compatible 文件系統後端 +#[derive(Clone)] pub struct S3Vfs { bucket: Bucket, credentials: Credentials, @@ -417,6 +418,28 @@ impl VfsBackend for S3Vfs { let to_key = Self::path_to_key(link); self.copy_object(&from_key, &to_key) } + + fn copy(&self, from: &Path, to: &Path) -> Result<(), VfsError> { + let from_key = Self::path_to_key(from); + let to_key = Self::path_to_key(to); + + // Check if source is a directory marker + if from.ends_with("/") || from_key.ends_with('/') { + // Directory copy: create destination directory marker + let action = actions::PutObject::new(&self.bucket, Some(&self.credentials), &to_key); + let url = action.sign(Duration::from_secs(3600)); + ureq::put(url.as_str()) + .send_bytes(&[]) + .map_err(|e| VfsError::Io(format!("S3 PutObject failed: {}", e)))?; + return Ok(()); + } + + // Try HeadObject to verify source exists + match self.head_object(&from_key) { + Ok(_) => self.copy_object(&from_key, &to_key), + Err(e) => Err(e), + } + } } impl VfsFile for S3VfsFile {