P1: AsyncS3Vfs implementation (Phase 3)
Some checks failed
Test / build (push) Has been cancelled
Test / test (push) Has been cancelled

- AsyncS3Vfs: spawn_blocking wrapper over S3Vfs
- AsyncS3File: tokio::sync::Mutex for async state
- Add Clone derive to S3Vfs
- All backend methods wrapped with spawn_blocking

Phase 3 complete: AsyncS3Vfs working
Phase 4 pending: AsyncSmbVfs
Phase 5 pending: WebDAV integration

Tests: 293 passed, 0 failed
This commit is contained in:
Warren
2026-06-21 21:08:48 +08:00
parent 790efe13f4
commit 5c9b51fc49
3 changed files with 294 additions and 0 deletions

View File

@@ -0,0 +1,269 @@
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::future::Future;
use std::io::{self, SeekFrom, Read, Write};
use std::sync::Arc;
use tokio::task::spawn_blocking;
use tokio::sync::Mutex;
use super::{VfsError, VfsStat, VfsDirEntry, open_flags::OpenFlags, s3_fs::S3Vfs, VfsBackend};
/// Async S3 VFS 文件實現spawn_blocking 包装)
pub struct AsyncS3File {
inner: Arc<Mutex<S3FileState>>,
}
enum S3FileState {
Read { vfs: S3Vfs, key: String, position: u64, size: u64, data: Vec<u8> },
Write { vfs: S3Vfs, key: String, buffer: Vec<u8> },
}
impl AsyncS3File {
pub async fn new_read(vfs: S3Vfs, key: String) -> Result<Self, VfsError> {
let stat = spawn_blocking({
let vfs = vfs.clone();
let key = key.clone();
move || VfsBackend::stat(&vfs, &PathBuf::from(&key))
}).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))??;
Ok(Self {
inner: Arc::new(Mutex::new(S3FileState::Read {
vfs, key, position: 0, size: stat.size, data: Vec::new(),
})),
})
}
pub fn new_write(vfs: S3Vfs, key: String) -> Self {
Self {
inner: Arc::new(Mutex::new(S3FileState::Write {
vfs, key, buffer: Vec::new(),
})),
}
}
}
impl super::AsyncVfsFile for AsyncS3File {
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Pin<Box<dyn Future<Output = Result<usize, VfsError>> + Send + 'a>> {
let inner = self.inner.clone();
let buf_len = buf.len();
Box::pin(async move {
let mut state = inner.lock().await;
match &mut *state {
S3FileState::Read { vfs, key, position, size, data } => {
if *position >= *size {
return Ok(0);
}
// Load data if needed
if data.is_empty() {
let vfs_clone = vfs.clone();
let key_clone = key.clone();
let loaded_data = spawn_blocking(move || {
let flags = OpenFlags::new().read();
let mut file = VfsBackend::open_file(&vfs_clone, &PathBuf::from(&key_clone), &flags)?;
let mut buf = Vec::new();
let mut chunk = [0u8; 8192];
loop {
match file.read(&mut chunk) {
Ok(0) => break,
Ok(n) => buf.extend_from_slice(&chunk[..n]),
Err(e) => return Err(e),
}
}
Ok(buf)
}).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))??;
*data = loaded_data;
}
let remaining = *size - *position;
let to_read = buf_len.min(remaining as usize);
let start = *position as usize;
let end = start + to_read;
buf[..to_read].copy_from_slice(&data[start..end]);
*position += to_read as u64;
Ok(to_read)
}
_ => Err(VfsError::Io("File not open for read".to_string())),
}
})
}
fn write<'a>(&'a mut self, buf: &'a [u8]) -> Pin<Box<dyn Future<Output = Result<usize, VfsError>> + Send + 'a>> {
let inner = self.inner.clone();
let buf_copy = buf.to_vec();
Box::pin(async move {
let mut state = inner.lock().await;
match &mut *state {
S3FileState::Write { buffer, .. } => {
buffer.extend_from_slice(&buf_copy);
Ok(buf_copy.len())
}
_ => Err(VfsError::Io("File not open for write".to_string())),
}
})
}
fn seek<'a>(&'a mut self, pos: SeekFrom) -> Pin<Box<dyn Future<Output = Result<u64, VfsError>> + Send + 'a>> {
let inner = self.inner.clone();
Box::pin(async move {
let mut state = inner.lock().await;
match &mut *state {
S3FileState::Read { position, size, .. } => {
let new_pos = match pos {
SeekFrom::Start(offset) => offset,
SeekFrom::Current(offset) => {
((*position as i64) + offset).max(0) as u64
}
SeekFrom::End(offset) => {
((*size as i64) + offset).max(0) as u64
}
};
*position = new_pos.min(*size);
Ok(*position)
}
_ => Err(VfsError::Io("File not open for read".to_string())),
}
})
}
fn flush<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
let inner = self.inner.clone();
Box::pin(async move {
let mut state = inner.lock().await;
match &mut *state {
S3FileState::Write { vfs, key, buffer } => {
let vfs_clone = vfs.clone();
let key_clone = key.clone();
let data = buffer.clone();
spawn_blocking(move || {
let flags = OpenFlags::new().write().create().truncate().mode(0o644);
let mut file = VfsBackend::open_file(&vfs_clone, &PathBuf::from(&key_clone), &flags)?;
file.write_all(&data)?;
file.flush()?;
Ok(())
}).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))?
}
_ => Ok(()),
}
})
}
}
/// Async S3 VFS 后端實現spawn_blocking 包装 S3Vfs
pub struct AsyncS3Vfs {
inner: S3Vfs,
}
impl AsyncS3Vfs {
pub fn new(
endpoint: &str,
region: &str,
bucket_name: &str,
access_key: &str,
secret_key: &str,
) -> Result<Self, VfsError> {
let inner = S3Vfs::new(endpoint, region, bucket_name, access_key, secret_key)?;
Ok(Self { inner })
}
}
impl Clone for AsyncS3Vfs {
fn clone(&self) -> Self {
Self { inner: self.inner.clone() }
}
}
impl super::AsyncVfsBackend for AsyncS3Vfs {
fn clone_boxed(&self) -> Box<dyn super::AsyncVfsBackend> {
Box::new(self.clone())
}
fn read_dir<'a>(&'a self, path: &'a Path) -> Pin<Box<dyn Future<Output = Result<Vec<VfsDirEntry>, VfsError>> + Send + 'a>> {
let inner = self.inner.clone();
let path_buf = path.to_path_buf();
Box::pin(async move {
spawn_blocking(move || {
VfsBackend::read_dir(&inner, &path_buf)
}).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))?
})
}
fn open_file<'a>(&'a self, path: &'a Path, flags: &'a OpenFlags) -> Pin<Box<dyn Future<Output = Result<Box<dyn super::AsyncVfsFile>, VfsError>> + Send + 'a>> {
let inner = self.inner.clone();
let path_buf = path.to_path_buf();
let is_write = flags.write;
Box::pin(async move {
let key = path_buf.to_string_lossy().to_string();
if is_write {
Ok(Box::new(AsyncS3File::new_write(inner, key)) as Box<dyn super::AsyncVfsFile>)
} else {
let file = AsyncS3File::new_read(inner, key).await?;
Ok(Box::new(file) as Box<dyn super::AsyncVfsFile>)
}
})
}
fn stat<'a>(&'a self, path: &'a Path) -> Pin<Box<dyn Future<Output = Result<VfsStat, VfsError>> + Send + 'a>> {
let inner = self.inner.clone();
let path_buf = path.to_path_buf();
Box::pin(async move {
spawn_blocking(move || {
VfsBackend::stat(&inner, &path_buf)
}).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))?
})
}
fn create_dir<'a>(&'a self, path: &'a Path, _mode: u32) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
let inner = self.inner.clone();
let path_buf = path.to_path_buf();
Box::pin(async move {
spawn_blocking(move || {
VfsBackend::create_dir(&inner, &path_buf, 0o755)
}).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))?
})
}
fn remove_dir<'a>(&'a self, path: &'a Path) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
let inner = self.inner.clone();
let path_buf = path.to_path_buf();
Box::pin(async move {
spawn_blocking(move || {
VfsBackend::remove_dir(&inner, &path_buf)
}).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))?
})
}
fn remove_file<'a>(&'a self, path: &'a Path) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
let inner = self.inner.clone();
let path_buf = path.to_path_buf();
Box::pin(async move {
spawn_blocking(move || {
VfsBackend::remove_file(&inner, &path_buf)
}).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))?
})
}
fn rename<'a>(&'a self, from: &'a Path, to: &'a Path) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
let inner = self.inner.clone();
let from_buf = from.to_path_buf();
let to_buf = to.to_path_buf();
Box::pin(async move {
spawn_blocking(move || {
VfsBackend::rename(&inner, &from_buf, &to_buf)
}).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))?
})
}
fn exists<'a>(&'a self, path: &'a Path) -> Pin<Box<dyn Future<Output = bool> + Send + 'a>> {
let inner = self.inner.clone();
let path_buf = path.to_path_buf();
Box::pin(async move {
spawn_blocking(move || {
VfsBackend::exists(&inner, &path_buf)
}).await.unwrap_or(false)
})
}
}

View File

@@ -10,6 +10,8 @@ pub mod smb_server_backend;
pub mod util;
#[cfg(feature = "async-vfs")]
pub mod async_fs;
#[cfg(feature = "async-vfs")]
pub mod async_s3_fs;
use std::path::{Path, PathBuf};
use std::time::SystemTime;

View File

@@ -8,6 +8,7 @@ use std::time::{Duration, SystemTime};
use url::Url;
/// S3-compatible 文件系統後端
#[derive(Clone)]
pub struct S3Vfs {
bucket: Bucket,
credentials: Credentials,
@@ -417,6 +418,28 @@ impl VfsBackend for S3Vfs {
let to_key = Self::path_to_key(link);
self.copy_object(&from_key, &to_key)
}
fn copy(&self, from: &Path, to: &Path) -> Result<(), VfsError> {
let from_key = Self::path_to_key(from);
let to_key = Self::path_to_key(to);
// Check if source is a directory marker
if from.ends_with("/") || from_key.ends_with('/') {
// Directory copy: create destination directory marker
let action = actions::PutObject::new(&self.bucket, Some(&self.credentials), &to_key);
let url = action.sign(Duration::from_secs(3600));
ureq::put(url.as_str())
.send_bytes(&[])
.map_err(|e| VfsError::Io(format!("S3 PutObject failed: {}", e)))?;
return Ok(());
}
// Try HeadObject to verify source exists
match self.head_object(&from_key) {
Ok(_) => self.copy_object(&from_key, &to_key),
Err(e) => Err(e),
}
}
}
impl VfsFile for S3VfsFile {