P1: AsyncLocalFs implementation (Phase 2)
- AsyncLocalFile: tokio::fs::File wrapper - AsyncLocalFs: AsyncVfsBackend impl using tokio::fs - Key methods: read_dir, open_file, stat, create_dir, remove_file, rename - 4 async tests passing Phase 2 complete: AsyncLocalFs working Phase 3 pending: AsyncS3Vfs Phase 4 pending: AsyncSmbVfs Phase 5 pending: WebDAV integration Tests: 293 passed, 0 failed
This commit is contained in:
249
markbase-core/src/vfs/async_fs.rs
Normal file
249
markbase-core/src/vfs/async_fs.rs
Normal file
@@ -0,0 +1,249 @@
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::SystemTime;
|
||||
use std::pin::Pin;
|
||||
use std::future::Future;
|
||||
use std::io::{self, SeekFrom};
|
||||
|
||||
use tokio::fs;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt};
|
||||
|
||||
use super::{VfsError, VfsStat, VfsDirEntry, open_flags::OpenFlags};
|
||||
|
||||
/// Async VFS 文件實現(使用 tokio::fs)
|
||||
pub struct AsyncLocalFile {
|
||||
file: fs::File,
|
||||
path: PathBuf,
|
||||
is_write: bool,
|
||||
}
|
||||
|
||||
impl AsyncLocalFile {
|
||||
pub fn new(file: fs::File, path: PathBuf, is_write: bool) -> Self {
|
||||
Self { file, path, is_write }
|
||||
}
|
||||
}
|
||||
|
||||
impl super::AsyncVfsFile for AsyncLocalFile {
|
||||
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Pin<Box<dyn Future<Output = Result<usize, VfsError>> + Send + 'a>> {
|
||||
Box::pin(async move {
|
||||
self.file.read(buf).await
|
||||
.map_err(|e| VfsError::Io(e.to_string()))
|
||||
})
|
||||
}
|
||||
|
||||
fn write<'a>(&'a mut self, buf: &'a [u8]) -> Pin<Box<dyn Future<Output = Result<usize, VfsError>> + Send + 'a>> {
|
||||
Box::pin(async move {
|
||||
self.file.write(buf).await
|
||||
.map_err(|e| VfsError::Io(e.to_string()))
|
||||
})
|
||||
}
|
||||
|
||||
fn seek<'a>(&'a mut self, pos: SeekFrom) -> Pin<Box<dyn Future<Output = Result<u64, VfsError>> + Send + 'a>> {
|
||||
Box::pin(async move {
|
||||
self.file.seek(pos).await
|
||||
.map_err(|e| VfsError::Io(e.to_string()))
|
||||
})
|
||||
}
|
||||
|
||||
fn flush<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
|
||||
Box::pin(async move {
|
||||
self.file.flush().await
|
||||
.map_err(|e| VfsError::Io(e.to_string()))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Async VFS 后端實現(使用 tokio::fs)
|
||||
pub struct AsyncLocalFs {
|
||||
root: PathBuf,
|
||||
}
|
||||
|
||||
impl AsyncLocalFs {
|
||||
pub fn new() -> Self {
|
||||
Self { root: PathBuf::new() }
|
||||
}
|
||||
|
||||
pub fn with_root(root: PathBuf) -> Self {
|
||||
Self { root }
|
||||
}
|
||||
|
||||
fn map_io_error(path: &Path, e: io::Error) -> VfsError {
|
||||
match e.kind() {
|
||||
io::ErrorKind::NotFound => VfsError::NotFound(path.to_string_lossy().to_string()),
|
||||
io::ErrorKind::PermissionDenied => VfsError::PermissionDenied(path.to_string_lossy().to_string()),
|
||||
io::ErrorKind::AlreadyExists => VfsError::AlreadyExists(path.to_string_lossy().to_string()),
|
||||
_ => VfsError::Io(e.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
fn stat_from_metadata(meta: &std::fs::Metadata) -> VfsStat {
|
||||
VfsStat {
|
||||
size: meta.len(),
|
||||
mode: 0o644,
|
||||
uid: 0,
|
||||
gid: 0,
|
||||
atime: meta.accessed().unwrap_or(SystemTime::UNIX_EPOCH),
|
||||
mtime: meta.modified().unwrap_or(SystemTime::UNIX_EPOCH),
|
||||
is_dir: meta.is_dir(),
|
||||
is_symlink: meta.file_type().is_symlink(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for AsyncLocalFs {
|
||||
fn clone(&self) -> Self {
|
||||
Self { root: self.root.clone() }
|
||||
}
|
||||
}
|
||||
|
||||
impl super::AsyncVfsBackend for AsyncLocalFs {
|
||||
fn clone_boxed(&self) -> Box<dyn super::AsyncVfsBackend> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
|
||||
fn read_dir<'a>(&'a self, path: &'a Path) -> Pin<Box<dyn Future<Output = Result<Vec<VfsDirEntry>, VfsError>> + Send + 'a>> {
|
||||
Box::pin(async move {
|
||||
let mut entries = Vec::new();
|
||||
let mut dir = fs::read_dir(path).await
|
||||
.map_err(|e| Self::map_io_error(path, e))?;
|
||||
|
||||
while let Some(entry) = dir.next_entry().await.map_err(|e| Self::map_io_error(path, e))? {
|
||||
let name = entry.file_name().to_string_lossy().to_string();
|
||||
let long_name = name.clone();
|
||||
let meta = entry.metadata().await.map_err(|e| Self::map_io_error(path, e))?;
|
||||
let stat = Self::stat_from_metadata(&meta);
|
||||
entries.push(VfsDirEntry { name, long_name, stat });
|
||||
}
|
||||
|
||||
Ok(entries)
|
||||
})
|
||||
}
|
||||
|
||||
fn open_file<'a>(&'a self, path: &'a Path, flags: &'a OpenFlags) -> Pin<Box<dyn Future<Output = Result<Box<dyn super::AsyncVfsFile>, VfsError>> + Send + 'a>> {
|
||||
Box::pin(async move {
|
||||
let mut options = fs::OpenOptions::new();
|
||||
|
||||
if flags.read {
|
||||
options.read(true);
|
||||
}
|
||||
if flags.write {
|
||||
options.write(true);
|
||||
}
|
||||
if flags.create {
|
||||
options.create(true);
|
||||
}
|
||||
if flags.truncate {
|
||||
options.truncate(true);
|
||||
}
|
||||
if flags.append {
|
||||
options.append(true);
|
||||
}
|
||||
|
||||
let file = options.open(path).await
|
||||
.map_err(|e| Self::map_io_error(path, e))?;
|
||||
|
||||
Ok(Box::new(AsyncLocalFile::new(file, path.to_path_buf(), flags.write)) as Box<dyn super::AsyncVfsFile>)
|
||||
})
|
||||
}
|
||||
|
||||
fn stat<'a>(&'a self, path: &'a Path) -> Pin<Box<dyn Future<Output = Result<VfsStat, VfsError>> + Send + 'a>> {
|
||||
Box::pin(async move {
|
||||
let meta = fs::metadata(path).await
|
||||
.map_err(|e| Self::map_io_error(path, e))?;
|
||||
Ok(Self::stat_from_metadata(&meta))
|
||||
})
|
||||
}
|
||||
|
||||
fn create_dir<'a>(&'a self, path: &'a Path, mode: u32) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
|
||||
Box::pin(async move {
|
||||
fs::create_dir(path).await
|
||||
.map_err(|e| Self::map_io_error(path, e))?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn remove_dir<'a>(&'a self, path: &'a Path) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
|
||||
Box::pin(async move {
|
||||
fs::remove_dir(path).await
|
||||
.map_err(|e| Self::map_io_error(path, e))?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn remove_file<'a>(&'a self, path: &'a Path) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
|
||||
Box::pin(async move {
|
||||
fs::remove_file(path).await
|
||||
.map_err(|e| Self::map_io_error(path, e))?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn rename<'a>(&'a self, from: &'a Path, to: &'a Path) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
|
||||
Box::pin(async move {
|
||||
fs::rename(from, to).await
|
||||
.map_err(|e| Self::map_io_error(from, e))?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn exists<'a>(&'a self, path: &'a Path) -> Pin<Box<dyn Future<Output = bool> + Send + 'a>> {
|
||||
Box::pin(async move {
|
||||
fs::metadata(path).await.is_ok()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tempfile::TempDir;
|
||||
use crate::vfs::AsyncVfsBackend;
|
||||
use crate::vfs::AsyncVfsFile;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_async_read_dir() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
fs::create_dir(tmp.path().join("subdir")).await.unwrap();
|
||||
fs::write(tmp.path().join("test.txt"), "content").await.unwrap();
|
||||
|
||||
let vfs = AsyncLocalFs::with_root(tmp.path().to_path_buf());
|
||||
let entries = AsyncVfsBackend::read_dir(&vfs, tmp.path()).await.unwrap();
|
||||
|
||||
assert_eq!(entries.len(), 2);
|
||||
assert!(entries.iter().any(|e| e.name == "subdir" && e.stat.is_dir));
|
||||
assert!(entries.iter().any(|e| e.name == "test.txt" && !e.stat.is_dir));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_async_open_read() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
fs::write(tmp.path().join("test.txt"), "hello world").await.unwrap();
|
||||
|
||||
let vfs = AsyncLocalFs::with_root(tmp.path().to_path_buf());
|
||||
let flags = OpenFlags::new().read();
|
||||
let mut file = AsyncVfsBackend::open_file(&vfs, &tmp.path().join("test.txt"), &flags).await.unwrap();
|
||||
|
||||
let mut buf = [0u8; 11];
|
||||
let n = AsyncVfsFile::read(&mut *file, &mut buf).await.unwrap();
|
||||
assert_eq!(n, 11);
|
||||
assert_eq!(&buf, b"hello world");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_async_create_dir() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let vfs = AsyncLocalFs::with_root(tmp.path().to_path_buf());
|
||||
|
||||
AsyncVfsBackend::create_dir(&vfs, &tmp.path().join("newdir"), 0o755).await.unwrap();
|
||||
assert!(AsyncVfsBackend::exists(&vfs, &tmp.path().join("newdir")).await);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_async_remove_file() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
fs::write(tmp.path().join("test.txt"), "content").await.unwrap();
|
||||
|
||||
let vfs = AsyncLocalFs::with_root(tmp.path().to_path_buf());
|
||||
AsyncVfsBackend::remove_file(&vfs, &tmp.path().join("test.txt")).await.unwrap();
|
||||
assert!(!AsyncVfsBackend::exists(&vfs, &tmp.path().join("test.txt")).await);
|
||||
}
|
||||
}
|
||||
@@ -8,6 +8,8 @@ pub mod smb_fs;
|
||||
#[cfg(feature = "smb-server")]
|
||||
pub mod smb_server_backend;
|
||||
pub mod util;
|
||||
#[cfg(feature = "async-vfs")]
|
||||
pub mod async_fs;
|
||||
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::SystemTime;
|
||||
@@ -567,7 +569,7 @@ pub trait AsyncVfsFile: Send + Sync {
|
||||
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<usize, VfsError>> + Send + 'a>>;
|
||||
fn write<'a>(&'a mut self, buf: &'a [u8]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<usize, VfsError>> + Send + 'a>>;
|
||||
fn seek<'a>(&'a mut self, pos: std::io::SeekFrom) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<u64, VfsError>> + Send + 'a>>;
|
||||
fn flush(&mut self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), VfsError>> + Send>>;
|
||||
fn flush<'a>(&'a mut self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), VfsError>> + Send + 'a>>;
|
||||
}
|
||||
|
||||
/// Async VFS 后端 trait(用于异步文件系统操作)
|
||||
|
||||
Reference in New Issue
Block a user