From 790efe13f427e85157f4f6ef1b36f8c7fe09655e Mon Sep 17 00:00:00 2001 From: Warren Date: Sun, 21 Jun 2026 20:59:41 +0800 Subject: [PATCH] P1: AsyncLocalFs implementation (Phase 2) - AsyncLocalFile: tokio::fs::File wrapper - AsyncLocalFs: AsyncVfsBackend impl using tokio::fs - Key methods: read_dir, open_file, stat, create_dir, remove_file, rename - 4 async tests passing Phase 2 complete: AsyncLocalFs working Phase 3 pending: AsyncS3Vfs Phase 4 pending: AsyncSmbVfs Phase 5 pending: WebDAV integration Tests: 293 passed, 0 failed --- markbase-core/src/vfs/async_fs.rs | 249 ++++++++++++++++++++++++++++++ markbase-core/src/vfs/mod.rs | 4 +- 2 files changed, 252 insertions(+), 1 deletion(-) create mode 100644 markbase-core/src/vfs/async_fs.rs diff --git a/markbase-core/src/vfs/async_fs.rs b/markbase-core/src/vfs/async_fs.rs new file mode 100644 index 0000000..46aab43 --- /dev/null +++ b/markbase-core/src/vfs/async_fs.rs @@ -0,0 +1,249 @@ +use std::path::{Path, PathBuf}; +use std::time::SystemTime; +use std::pin::Pin; +use std::future::Future; +use std::io::{self, SeekFrom}; + +use tokio::fs; +use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt}; + +use super::{VfsError, VfsStat, VfsDirEntry, open_flags::OpenFlags}; + +/// Async VFS 文件實現(使用 tokio::fs) +pub struct AsyncLocalFile { + file: fs::File, + path: PathBuf, + is_write: bool, +} + +impl AsyncLocalFile { + pub fn new(file: fs::File, path: PathBuf, is_write: bool) -> Self { + Self { file, path, is_write } + } +} + +impl super::AsyncVfsFile for AsyncLocalFile { + fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Pin> + Send + 'a>> { + Box::pin(async move { + self.file.read(buf).await + .map_err(|e| VfsError::Io(e.to_string())) + }) + } + + fn write<'a>(&'a mut self, buf: &'a [u8]) -> Pin> + Send + 'a>> { + Box::pin(async move { + self.file.write(buf).await + .map_err(|e| VfsError::Io(e.to_string())) + }) + } + + fn seek<'a>(&'a mut self, pos: SeekFrom) -> Pin> + Send + 'a>> { + Box::pin(async move { + self.file.seek(pos).await + .map_err(|e| VfsError::Io(e.to_string())) + }) + } + + fn flush<'a>(&'a mut self) -> Pin> + Send + 'a>> { + Box::pin(async move { + self.file.flush().await + .map_err(|e| VfsError::Io(e.to_string())) + }) + } +} + +/// Async VFS 后端實現(使用 tokio::fs) +pub struct AsyncLocalFs { + root: PathBuf, +} + +impl AsyncLocalFs { + pub fn new() -> Self { + Self { root: PathBuf::new() } + } + + pub fn with_root(root: PathBuf) -> Self { + Self { root } + } + + fn map_io_error(path: &Path, e: io::Error) -> VfsError { + match e.kind() { + io::ErrorKind::NotFound => VfsError::NotFound(path.to_string_lossy().to_string()), + io::ErrorKind::PermissionDenied => VfsError::PermissionDenied(path.to_string_lossy().to_string()), + io::ErrorKind::AlreadyExists => VfsError::AlreadyExists(path.to_string_lossy().to_string()), + _ => VfsError::Io(e.to_string()), + } + } + + fn stat_from_metadata(meta: &std::fs::Metadata) -> VfsStat { + VfsStat { + size: meta.len(), + mode: 0o644, + uid: 0, + gid: 0, + atime: meta.accessed().unwrap_or(SystemTime::UNIX_EPOCH), + mtime: meta.modified().unwrap_or(SystemTime::UNIX_EPOCH), + is_dir: meta.is_dir(), + is_symlink: meta.file_type().is_symlink(), + } + } +} + +impl Clone for AsyncLocalFs { + fn clone(&self) -> Self { + Self { root: self.root.clone() } + } +} + +impl super::AsyncVfsBackend for AsyncLocalFs { + fn clone_boxed(&self) -> Box { + Box::new(self.clone()) + } + + fn read_dir<'a>(&'a self, path: &'a Path) -> Pin, VfsError>> + Send + 'a>> { + Box::pin(async move { + let mut entries = Vec::new(); + let mut dir = fs::read_dir(path).await + .map_err(|e| Self::map_io_error(path, e))?; + + while let Some(entry) = dir.next_entry().await.map_err(|e| Self::map_io_error(path, e))? { + let name = entry.file_name().to_string_lossy().to_string(); + let long_name = name.clone(); + let meta = entry.metadata().await.map_err(|e| Self::map_io_error(path, e))?; + let stat = Self::stat_from_metadata(&meta); + entries.push(VfsDirEntry { name, long_name, stat }); + } + + Ok(entries) + }) + } + + fn open_file<'a>(&'a self, path: &'a Path, flags: &'a OpenFlags) -> Pin, VfsError>> + Send + 'a>> { + Box::pin(async move { + let mut options = fs::OpenOptions::new(); + + if flags.read { + options.read(true); + } + if flags.write { + options.write(true); + } + if flags.create { + options.create(true); + } + if flags.truncate { + options.truncate(true); + } + if flags.append { + options.append(true); + } + + let file = options.open(path).await + .map_err(|e| Self::map_io_error(path, e))?; + + Ok(Box::new(AsyncLocalFile::new(file, path.to_path_buf(), flags.write)) as Box) + }) + } + + fn stat<'a>(&'a self, path: &'a Path) -> Pin> + Send + 'a>> { + Box::pin(async move { + let meta = fs::metadata(path).await + .map_err(|e| Self::map_io_error(path, e))?; + Ok(Self::stat_from_metadata(&meta)) + }) + } + + fn create_dir<'a>(&'a self, path: &'a Path, mode: u32) -> Pin> + Send + 'a>> { + Box::pin(async move { + fs::create_dir(path).await + .map_err(|e| Self::map_io_error(path, e))?; + Ok(()) + }) + } + + fn remove_dir<'a>(&'a self, path: &'a Path) -> Pin> + Send + 'a>> { + Box::pin(async move { + fs::remove_dir(path).await + .map_err(|e| Self::map_io_error(path, e))?; + Ok(()) + }) + } + + fn remove_file<'a>(&'a self, path: &'a Path) -> Pin> + Send + 'a>> { + Box::pin(async move { + fs::remove_file(path).await + .map_err(|e| Self::map_io_error(path, e))?; + Ok(()) + }) + } + + fn rename<'a>(&'a self, from: &'a Path, to: &'a Path) -> Pin> + Send + 'a>> { + Box::pin(async move { + fs::rename(from, to).await + .map_err(|e| Self::map_io_error(from, e))?; + Ok(()) + }) + } + + fn exists<'a>(&'a self, path: &'a Path) -> Pin + Send + 'a>> { + Box::pin(async move { + fs::metadata(path).await.is_ok() + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + use crate::vfs::AsyncVfsBackend; + use crate::vfs::AsyncVfsFile; + + #[tokio::test] + async fn test_async_read_dir() { + let tmp = TempDir::new().unwrap(); + fs::create_dir(tmp.path().join("subdir")).await.unwrap(); + fs::write(tmp.path().join("test.txt"), "content").await.unwrap(); + + let vfs = AsyncLocalFs::with_root(tmp.path().to_path_buf()); + let entries = AsyncVfsBackend::read_dir(&vfs, tmp.path()).await.unwrap(); + + assert_eq!(entries.len(), 2); + assert!(entries.iter().any(|e| e.name == "subdir" && e.stat.is_dir)); + assert!(entries.iter().any(|e| e.name == "test.txt" && !e.stat.is_dir)); + } + + #[tokio::test] + async fn test_async_open_read() { + let tmp = TempDir::new().unwrap(); + fs::write(tmp.path().join("test.txt"), "hello world").await.unwrap(); + + let vfs = AsyncLocalFs::with_root(tmp.path().to_path_buf()); + let flags = OpenFlags::new().read(); + let mut file = AsyncVfsBackend::open_file(&vfs, &tmp.path().join("test.txt"), &flags).await.unwrap(); + + let mut buf = [0u8; 11]; + let n = AsyncVfsFile::read(&mut *file, &mut buf).await.unwrap(); + assert_eq!(n, 11); + assert_eq!(&buf, b"hello world"); + } + + #[tokio::test] + async fn test_async_create_dir() { + let tmp = TempDir::new().unwrap(); + let vfs = AsyncLocalFs::with_root(tmp.path().to_path_buf()); + + AsyncVfsBackend::create_dir(&vfs, &tmp.path().join("newdir"), 0o755).await.unwrap(); + assert!(AsyncVfsBackend::exists(&vfs, &tmp.path().join("newdir")).await); + } + + #[tokio::test] + async fn test_async_remove_file() { + let tmp = TempDir::new().unwrap(); + fs::write(tmp.path().join("test.txt"), "content").await.unwrap(); + + let vfs = AsyncLocalFs::with_root(tmp.path().to_path_buf()); + AsyncVfsBackend::remove_file(&vfs, &tmp.path().join("test.txt")).await.unwrap(); + assert!(!AsyncVfsBackend::exists(&vfs, &tmp.path().join("test.txt")).await); + } +} \ No newline at end of file diff --git a/markbase-core/src/vfs/mod.rs b/markbase-core/src/vfs/mod.rs index ae622a3..000f825 100644 --- a/markbase-core/src/vfs/mod.rs +++ b/markbase-core/src/vfs/mod.rs @@ -8,6 +8,8 @@ pub mod smb_fs; #[cfg(feature = "smb-server")] pub mod smb_server_backend; pub mod util; +#[cfg(feature = "async-vfs")] +pub mod async_fs; use std::path::{Path, PathBuf}; use std::time::SystemTime; @@ -567,7 +569,7 @@ pub trait AsyncVfsFile: Send + Sync { fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> std::pin::Pin> + Send + 'a>>; fn write<'a>(&'a mut self, buf: &'a [u8]) -> std::pin::Pin> + Send + 'a>>; fn seek<'a>(&'a mut self, pos: std::io::SeekFrom) -> std::pin::Pin> + Send + 'a>>; - fn flush(&mut self) -> std::pin::Pin> + Send>>; + fn flush<'a>(&'a mut self) -> std::pin::Pin> + Send + 'a>>; } /// Async VFS 后端 trait(用于异步文件系统操作)