diff --git a/markbase-core/src/vfs/async_smb_fs.rs b/markbase-core/src/vfs/async_smb_fs.rs new file mode 100644 index 0000000..6fc7511 --- /dev/null +++ b/markbase-core/src/vfs/async_smb_fs.rs @@ -0,0 +1,260 @@ +use std::path::{Path, PathBuf}; +use std::pin::Pin; +use std::future::Future; +use std::io::{SeekFrom, Read, Write}; +use std::sync::Arc; + +use tokio::task::spawn_blocking; +use tokio::sync::Mutex; + +use super::{VfsError, VfsStat, VfsDirEntry, open_flags::OpenFlags, smb_fs::SmbVfs, VfsBackend}; + +/// Async SMB VFS 文件實現(spawn_blocking 包装) +pub struct AsyncSmbFile { + inner: Arc>, +} + +enum SmbFileState { + Read { vfs: SmbVfs, path: PathBuf, position: u64, size: u64, data: Vec }, + Write { vfs: SmbVfs, path: PathBuf, buffer: Vec }, +} + +impl AsyncSmbFile { + pub async fn new_read(vfs: SmbVfs, path: PathBuf) -> Result { + let stat = spawn_blocking({ + let vfs = vfs.clone(); + let path = path.clone(); + move || VfsBackend::stat(&vfs, &path) + }).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))??; + + Ok(Self { + inner: Arc::new(Mutex::new(SmbFileState::Read { + vfs, path, position: 0, size: stat.size, data: Vec::new(), + })), + }) + } + + pub fn new_write(vfs: SmbVfs, path: PathBuf) -> Self { + Self { + inner: Arc::new(Mutex::new(SmbFileState::Write { + vfs, path, buffer: Vec::new(), + })), + } + } +} + +impl super::AsyncVfsFile for AsyncSmbFile { + fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Pin> + Send + 'a>> { + let inner = self.inner.clone(); + let buf_len = buf.len(); + Box::pin(async move { + let mut state = inner.lock().await; + match &mut *state { + SmbFileState::Read { vfs, path, position, size, data } => { + if *position >= *size { + return Ok(0); + } + + if data.is_empty() { + let vfs_clone = vfs.clone(); + let path_clone = path.clone(); + let loaded_data = spawn_blocking(move || { + let flags = OpenFlags::new().read(); + let mut file = VfsBackend::open_file(&vfs_clone, &path_clone, &flags)?; + let mut buf = Vec::new(); + let mut chunk = [0u8; 8192]; + loop { + match file.read(&mut chunk) { + Ok(0) => break, + Ok(n) => buf.extend_from_slice(&chunk[..n]), + Err(e) => return Err(e), + } + } + Ok(buf) + }).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))??; + *data = loaded_data; + } + + let remaining = *size - *position; + let to_read = buf_len.min(remaining as usize); + let start = *position as usize; + let end = start + to_read; + + buf[..to_read].copy_from_slice(&data[start..end]); + *position += to_read as u64; + Ok(to_read) + } + _ => Err(VfsError::Io("File not open for read".to_string())), + } + }) + } + + fn write<'a>(&'a mut self, buf: &'a [u8]) -> Pin> + Send + 'a>> { + let inner = self.inner.clone(); + let buf_copy = buf.to_vec(); + Box::pin(async move { + let mut state = inner.lock().await; + match &mut *state { + SmbFileState::Write { buffer, .. } => { + buffer.extend_from_slice(&buf_copy); + Ok(buf_copy.len()) + } + _ => Err(VfsError::Io("File not open for write".to_string())), + } + }) + } + + fn seek<'a>(&'a mut self, pos: SeekFrom) -> Pin> + Send + 'a>> { + let inner = self.inner.clone(); + Box::pin(async move { + let mut state = inner.lock().await; + match &mut *state { + SmbFileState::Read { position, size, .. } => { + let new_pos = match pos { + SeekFrom::Start(offset) => offset, + SeekFrom::Current(offset) => ((*position as i64) + offset).max(0) as u64, + SeekFrom::End(offset) => ((*size as i64) + offset).max(0) as u64, + }; + *position = new_pos.min(*size); + Ok(*position) + } + _ => Err(VfsError::Io("File not open for read".to_string())), + } + }) + } + + fn flush<'a>(&'a mut self) -> Pin> + Send + 'a>> { + let inner = self.inner.clone(); + Box::pin(async move { + let mut state = inner.lock().await; + match &mut *state { + SmbFileState::Write { vfs, path, buffer } => { + let vfs_clone = vfs.clone(); + let path_clone = path.clone(); + let data = buffer.clone(); + spawn_blocking(move || { + let flags = OpenFlags::new().write().create().truncate().mode(0o644); + let mut file = VfsBackend::open_file(&vfs_clone, &path_clone, &flags)?; + file.write_all(&data)?; + file.flush()?; + Ok(()) + }).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))? + } + _ => Ok(()), + } + }) + } +} + +/// Async SMB VFS 后端實現(spawn_blocking 包装 SmbVfs) +pub struct AsyncSmbVfs { + inner: SmbVfs, +} + +impl AsyncSmbVfs { + pub fn new(addr: &str, share: &str, username: &str, password: &str) -> Result { + let inner = SmbVfs::new(addr, share, username, password)?; + Ok(Self { inner }) + } + + pub fn new_with_options( + addr: &str, + share: &str, + username: &str, + password: &str, + auto_reconnect: bool, + ) -> Result { + let inner = SmbVfs::new_with_options(addr, share, username, password, auto_reconnect)?; + Ok(Self { inner }) + } +} + +impl Clone for AsyncSmbVfs { + fn clone(&self) -> Self { + Self { inner: self.inner.clone() } + } +} + +impl super::AsyncVfsBackend for AsyncSmbVfs { + fn clone_boxed(&self) -> Box { + Box::new(self.clone()) + } + + fn read_dir<'a>(&'a self, path: &'a Path) -> Pin, VfsError>> + Send + 'a>> { + let inner = self.inner.clone(); + let path_buf = path.to_path_buf(); + Box::pin(async move { + spawn_blocking(move || VfsBackend::read_dir(&inner, &path_buf)) + .await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))? + }) + } + + fn open_file<'a>(&'a self, path: &'a Path, flags: &'a OpenFlags) -> Pin, VfsError>> + Send + 'a>> { + let inner = self.inner.clone(); + let path_buf = path.to_path_buf(); + let is_write = flags.write; + Box::pin(async move { + if is_write { + Ok(Box::new(AsyncSmbFile::new_write(inner, path_buf)) as Box) + } else { + let file = AsyncSmbFile::new_read(inner, path_buf).await?; + Ok(Box::new(file) as Box) + } + }) + } + + fn stat<'a>(&'a self, path: &'a Path) -> Pin> + Send + 'a>> { + let inner = self.inner.clone(); + let path_buf = path.to_path_buf(); + Box::pin(async move { + spawn_blocking(move || VfsBackend::stat(&inner, &path_buf)) + .await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))? + }) + } + + fn create_dir<'a>(&'a self, path: &'a Path, _mode: u32) -> Pin> + Send + 'a>> { + let inner = self.inner.clone(); + let path_buf = path.to_path_buf(); + Box::pin(async move { + spawn_blocking(move || VfsBackend::create_dir(&inner, &path_buf, 0o755)) + .await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))? + }) + } + + fn remove_dir<'a>(&'a self, path: &'a Path) -> Pin> + Send + 'a>> { + let inner = self.inner.clone(); + let path_buf = path.to_path_buf(); + Box::pin(async move { + spawn_blocking(move || VfsBackend::remove_dir(&inner, &path_buf)) + .await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))? + }) + } + + fn remove_file<'a>(&'a self, path: &'a Path) -> Pin> + Send + 'a>> { + let inner = self.inner.clone(); + let path_buf = path.to_path_buf(); + Box::pin(async move { + spawn_blocking(move || VfsBackend::remove_file(&inner, &path_buf)) + .await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))? + }) + } + + fn rename<'a>(&'a self, from: &'a Path, to: &'a Path) -> Pin> + Send + 'a>> { + let inner = self.inner.clone(); + let from_buf = from.to_path_buf(); + let to_buf = to.to_path_buf(); + Box::pin(async move { + spawn_blocking(move || VfsBackend::rename(&inner, &from_buf, &to_buf)) + .await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))? + }) + } + + fn exists<'a>(&'a self, path: &'a Path) -> Pin + Send + 'a>> { + let inner = self.inner.clone(); + let path_buf = path.to_path_buf(); + Box::pin(async move { + spawn_blocking(move || VfsBackend::exists(&inner, &path_buf)) + .await.unwrap_or(false) + }) + } +} \ No newline at end of file diff --git a/markbase-core/src/vfs/mod.rs b/markbase-core/src/vfs/mod.rs index eff6d9c..c92d8be 100644 --- a/markbase-core/src/vfs/mod.rs +++ b/markbase-core/src/vfs/mod.rs @@ -12,6 +12,8 @@ pub mod util; pub mod async_fs; #[cfg(feature = "async-vfs")] pub mod async_s3_fs; +#[cfg(feature = "async-vfs")] +pub mod async_smb_fs; use std::path::{Path, PathBuf}; use std::time::SystemTime; diff --git a/markbase-core/src/vfs/smb_fs.rs b/markbase-core/src/vfs/smb_fs.rs index 67fcecd..4aadf84 100644 --- a/markbase-core/src/vfs/smb_fs.rs +++ b/markbase-core/src/vfs/smb_fs.rs @@ -39,10 +39,11 @@ fn map_smb_error(e: smb2::Error) -> VfsError { } /// SMB 客户端 VFS 后端 (SMB 2/3) +#[derive(Clone)] pub struct SmbVfs { runtime: Arc, client: Arc>, - tree: Mutex, + tree: Arc>, } impl SmbVfs { @@ -90,7 +91,7 @@ impl SmbVfs { Ok(Self { runtime, client: Arc::new(Mutex::new(client)), - tree: Mutex::new(tree), + tree: Arc::new(Mutex::new(tree)), }) } @@ -100,16 +101,6 @@ impl SmbVfs { } } -impl Clone for SmbVfs { - fn clone(&self) -> Self { - Self { - runtime: self.runtime.clone(), - client: self.client.clone(), - tree: Mutex::new(self.tree.lock().unwrap().clone()), - } - } -} - impl VfsBackend for SmbVfs { fn clone_boxed(&self) -> Box { Box::new(self.clone())