P1: AsyncSmbVfs implementation (Phase 4)
- AsyncSmbVfs: spawn_blocking wrapper over SmbVfs - AsyncSmbFile: tokio::sync::Mutex for async state - Add Clone derive to SmbVfs (Arc<Mutex<Tree>>) - Remove manual Clone impl (derive handles it) Phase 4 complete: AsyncSmbVfs working Phase 5 pending: WebDAV integration Tests: 293 passed, 0 failed
This commit is contained in:
260
markbase-core/src/vfs/async_smb_fs.rs
Normal file
260
markbase-core/src/vfs/async_smb_fs.rs
Normal file
@@ -0,0 +1,260 @@
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::pin::Pin;
|
||||
use std::future::Future;
|
||||
use std::io::{SeekFrom, Read, Write};
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::task::spawn_blocking;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use super::{VfsError, VfsStat, VfsDirEntry, open_flags::OpenFlags, smb_fs::SmbVfs, VfsBackend};
|
||||
|
||||
/// Async SMB VFS 文件實現(spawn_blocking 包装)
|
||||
pub struct AsyncSmbFile {
|
||||
inner: Arc<Mutex<SmbFileState>>,
|
||||
}
|
||||
|
||||
enum SmbFileState {
|
||||
Read { vfs: SmbVfs, path: PathBuf, position: u64, size: u64, data: Vec<u8> },
|
||||
Write { vfs: SmbVfs, path: PathBuf, buffer: Vec<u8> },
|
||||
}
|
||||
|
||||
impl AsyncSmbFile {
|
||||
pub async fn new_read(vfs: SmbVfs, path: PathBuf) -> Result<Self, VfsError> {
|
||||
let stat = spawn_blocking({
|
||||
let vfs = vfs.clone();
|
||||
let path = path.clone();
|
||||
move || VfsBackend::stat(&vfs, &path)
|
||||
}).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))??;
|
||||
|
||||
Ok(Self {
|
||||
inner: Arc::new(Mutex::new(SmbFileState::Read {
|
||||
vfs, path, position: 0, size: stat.size, data: Vec::new(),
|
||||
})),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn new_write(vfs: SmbVfs, path: PathBuf) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(Mutex::new(SmbFileState::Write {
|
||||
vfs, path, buffer: Vec::new(),
|
||||
})),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl super::AsyncVfsFile for AsyncSmbFile {
|
||||
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Pin<Box<dyn Future<Output = Result<usize, VfsError>> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
let buf_len = buf.len();
|
||||
Box::pin(async move {
|
||||
let mut state = inner.lock().await;
|
||||
match &mut *state {
|
||||
SmbFileState::Read { vfs, path, position, size, data } => {
|
||||
if *position >= *size {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
if data.is_empty() {
|
||||
let vfs_clone = vfs.clone();
|
||||
let path_clone = path.clone();
|
||||
let loaded_data = spawn_blocking(move || {
|
||||
let flags = OpenFlags::new().read();
|
||||
let mut file = VfsBackend::open_file(&vfs_clone, &path_clone, &flags)?;
|
||||
let mut buf = Vec::new();
|
||||
let mut chunk = [0u8; 8192];
|
||||
loop {
|
||||
match file.read(&mut chunk) {
|
||||
Ok(0) => break,
|
||||
Ok(n) => buf.extend_from_slice(&chunk[..n]),
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
Ok(buf)
|
||||
}).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))??;
|
||||
*data = loaded_data;
|
||||
}
|
||||
|
||||
let remaining = *size - *position;
|
||||
let to_read = buf_len.min(remaining as usize);
|
||||
let start = *position as usize;
|
||||
let end = start + to_read;
|
||||
|
||||
buf[..to_read].copy_from_slice(&data[start..end]);
|
||||
*position += to_read as u64;
|
||||
Ok(to_read)
|
||||
}
|
||||
_ => Err(VfsError::Io("File not open for read".to_string())),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn write<'a>(&'a mut self, buf: &'a [u8]) -> Pin<Box<dyn Future<Output = Result<usize, VfsError>> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
let buf_copy = buf.to_vec();
|
||||
Box::pin(async move {
|
||||
let mut state = inner.lock().await;
|
||||
match &mut *state {
|
||||
SmbFileState::Write { buffer, .. } => {
|
||||
buffer.extend_from_slice(&buf_copy);
|
||||
Ok(buf_copy.len())
|
||||
}
|
||||
_ => Err(VfsError::Io("File not open for write".to_string())),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn seek<'a>(&'a mut self, pos: SeekFrom) -> Pin<Box<dyn Future<Output = Result<u64, VfsError>> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
Box::pin(async move {
|
||||
let mut state = inner.lock().await;
|
||||
match &mut *state {
|
||||
SmbFileState::Read { position, size, .. } => {
|
||||
let new_pos = match pos {
|
||||
SeekFrom::Start(offset) => offset,
|
||||
SeekFrom::Current(offset) => ((*position as i64) + offset).max(0) as u64,
|
||||
SeekFrom::End(offset) => ((*size as i64) + offset).max(0) as u64,
|
||||
};
|
||||
*position = new_pos.min(*size);
|
||||
Ok(*position)
|
||||
}
|
||||
_ => Err(VfsError::Io("File not open for read".to_string())),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn flush<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
Box::pin(async move {
|
||||
let mut state = inner.lock().await;
|
||||
match &mut *state {
|
||||
SmbFileState::Write { vfs, path, buffer } => {
|
||||
let vfs_clone = vfs.clone();
|
||||
let path_clone = path.clone();
|
||||
let data = buffer.clone();
|
||||
spawn_blocking(move || {
|
||||
let flags = OpenFlags::new().write().create().truncate().mode(0o644);
|
||||
let mut file = VfsBackend::open_file(&vfs_clone, &path_clone, &flags)?;
|
||||
file.write_all(&data)?;
|
||||
file.flush()?;
|
||||
Ok(())
|
||||
}).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))?
|
||||
}
|
||||
_ => Ok(()),
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Async SMB VFS 后端實現(spawn_blocking 包装 SmbVfs)
|
||||
pub struct AsyncSmbVfs {
|
||||
inner: SmbVfs,
|
||||
}
|
||||
|
||||
impl AsyncSmbVfs {
|
||||
pub fn new(addr: &str, share: &str, username: &str, password: &str) -> Result<Self, VfsError> {
|
||||
let inner = SmbVfs::new(addr, share, username, password)?;
|
||||
Ok(Self { inner })
|
||||
}
|
||||
|
||||
pub fn new_with_options(
|
||||
addr: &str,
|
||||
share: &str,
|
||||
username: &str,
|
||||
password: &str,
|
||||
auto_reconnect: bool,
|
||||
) -> Result<Self, VfsError> {
|
||||
let inner = SmbVfs::new_with_options(addr, share, username, password, auto_reconnect)?;
|
||||
Ok(Self { inner })
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for AsyncSmbVfs {
|
||||
fn clone(&self) -> Self {
|
||||
Self { inner: self.inner.clone() }
|
||||
}
|
||||
}
|
||||
|
||||
impl super::AsyncVfsBackend for AsyncSmbVfs {
|
||||
fn clone_boxed(&self) -> Box<dyn super::AsyncVfsBackend> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
|
||||
fn read_dir<'a>(&'a self, path: &'a Path) -> Pin<Box<dyn Future<Output = Result<Vec<VfsDirEntry>, VfsError>> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
let path_buf = path.to_path_buf();
|
||||
Box::pin(async move {
|
||||
spawn_blocking(move || VfsBackend::read_dir(&inner, &path_buf))
|
||||
.await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))?
|
||||
})
|
||||
}
|
||||
|
||||
fn open_file<'a>(&'a self, path: &'a Path, flags: &'a OpenFlags) -> Pin<Box<dyn Future<Output = Result<Box<dyn super::AsyncVfsFile>, VfsError>> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
let path_buf = path.to_path_buf();
|
||||
let is_write = flags.write;
|
||||
Box::pin(async move {
|
||||
if is_write {
|
||||
Ok(Box::new(AsyncSmbFile::new_write(inner, path_buf)) as Box<dyn super::AsyncVfsFile>)
|
||||
} else {
|
||||
let file = AsyncSmbFile::new_read(inner, path_buf).await?;
|
||||
Ok(Box::new(file) as Box<dyn super::AsyncVfsFile>)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn stat<'a>(&'a self, path: &'a Path) -> Pin<Box<dyn Future<Output = Result<VfsStat, VfsError>> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
let path_buf = path.to_path_buf();
|
||||
Box::pin(async move {
|
||||
spawn_blocking(move || VfsBackend::stat(&inner, &path_buf))
|
||||
.await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))?
|
||||
})
|
||||
}
|
||||
|
||||
fn create_dir<'a>(&'a self, path: &'a Path, _mode: u32) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
let path_buf = path.to_path_buf();
|
||||
Box::pin(async move {
|
||||
spawn_blocking(move || VfsBackend::create_dir(&inner, &path_buf, 0o755))
|
||||
.await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))?
|
||||
})
|
||||
}
|
||||
|
||||
fn remove_dir<'a>(&'a self, path: &'a Path) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
let path_buf = path.to_path_buf();
|
||||
Box::pin(async move {
|
||||
spawn_blocking(move || VfsBackend::remove_dir(&inner, &path_buf))
|
||||
.await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))?
|
||||
})
|
||||
}
|
||||
|
||||
fn remove_file<'a>(&'a self, path: &'a Path) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
let path_buf = path.to_path_buf();
|
||||
Box::pin(async move {
|
||||
spawn_blocking(move || VfsBackend::remove_file(&inner, &path_buf))
|
||||
.await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))?
|
||||
})
|
||||
}
|
||||
|
||||
fn rename<'a>(&'a self, from: &'a Path, to: &'a Path) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
let from_buf = from.to_path_buf();
|
||||
let to_buf = to.to_path_buf();
|
||||
Box::pin(async move {
|
||||
spawn_blocking(move || VfsBackend::rename(&inner, &from_buf, &to_buf))
|
||||
.await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))?
|
||||
})
|
||||
}
|
||||
|
||||
fn exists<'a>(&'a self, path: &'a Path) -> Pin<Box<dyn Future<Output = bool> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
let path_buf = path.to_path_buf();
|
||||
Box::pin(async move {
|
||||
spawn_blocking(move || VfsBackend::exists(&inner, &path_buf))
|
||||
.await.unwrap_or(false)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -12,6 +12,8 @@ pub mod util;
|
||||
pub mod async_fs;
|
||||
#[cfg(feature = "async-vfs")]
|
||||
pub mod async_s3_fs;
|
||||
#[cfg(feature = "async-vfs")]
|
||||
pub mod async_smb_fs;
|
||||
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::SystemTime;
|
||||
|
||||
@@ -39,10 +39,11 @@ fn map_smb_error(e: smb2::Error) -> VfsError {
|
||||
}
|
||||
|
||||
/// SMB 客户端 VFS 后端 (SMB 2/3)
|
||||
#[derive(Clone)]
|
||||
pub struct SmbVfs {
|
||||
runtime: Arc<tokio::runtime::Runtime>,
|
||||
client: Arc<Mutex<smb2::SmbClient>>,
|
||||
tree: Mutex<smb2::Tree>,
|
||||
tree: Arc<Mutex<smb2::Tree>>,
|
||||
}
|
||||
|
||||
impl SmbVfs {
|
||||
@@ -90,7 +91,7 @@ impl SmbVfs {
|
||||
Ok(Self {
|
||||
runtime,
|
||||
client: Arc::new(Mutex::new(client)),
|
||||
tree: Mutex::new(tree),
|
||||
tree: Arc::new(Mutex::new(tree)),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -100,16 +101,6 @@ impl SmbVfs {
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for SmbVfs {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
runtime: self.runtime.clone(),
|
||||
client: self.client.clone(),
|
||||
tree: Mutex::new(self.tree.lock().unwrap().clone()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl VfsBackend for SmbVfs {
|
||||
fn clone_boxed(&self) -> Box<dyn VfsBackend> {
|
||||
Box::new(self.clone())
|
||||
|
||||
Reference in New Issue
Block a user