From eb80c07c85cc943b80b0c3d2ba9f09ce13cc0d75 Mon Sep 17 00:00:00 2001 From: Warren Date: Fri, 19 Jun 2026 08:19:16 +0800 Subject: [PATCH] Implement WebDAV VFS integration: dav-server 0.11 compatible - Add webdav.rs module: VfsDavFs, VfsDavFile, VfsDavMetaData - Implement DavFileSystem + Clone for GuardedFileSystem blanket impl - Add clone_boxed to VfsBackend trait (required for Sync) - Update CLI webdav.rs to use VFS instead of SQLite - Add bytes dependency - All 155 tests pass --- Cargo.lock | 2 +- markbase-core/Cargo.toml | 2 +- markbase-core/src/cli/interface/webdav.rs | 35 ++- markbase-core/src/lib.rs | 16 +- markbase-core/src/vfs/local_fs.rs | 4 + markbase-core/src/vfs/mod.rs | 5 +- markbase-core/src/vfs/s3_fs.rs | 7 + markbase-core/src/webdav.rs | 310 ++++++++++++++++++++++ 8 files changed, 357 insertions(+), 24 deletions(-) create mode 100644 markbase-core/src/webdav.rs diff --git a/Cargo.lock b/Cargo.lock index 97c07c3..b95f66a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2675,6 +2675,7 @@ dependencies = [ "base64", "bcrypt", "byteorder", + "bytes", "chrono", "cipher 0.4.4", "clap", @@ -2689,7 +2690,6 @@ dependencies = [ "futures-util", "hmac 0.12.1", "log", - "markbase-webdav", "md5 0.8.0", "nix 0.29.0", "postgres", diff --git a/markbase-core/Cargo.toml b/markbase-core/Cargo.toml index 2e4f7f9..0805a0f 100644 --- a/markbase-core/Cargo.toml +++ b/markbase-core/Cargo.toml @@ -18,6 +18,7 @@ sevenz-rust = { version = "0.6.1", optional = true } # 7z格式 ⚠️库不 anyhow = "1" axum = { version = "0.7", features = ["macros"] } bcrypt = "0.16" +bytes = "1" chrono = { version = "0.4", features = ["serde"] } regex = "1" clap = { version = "4", features = ["derive"] } @@ -26,7 +27,6 @@ filetree = { path = "../filetree" } futures-util = "0.3" log = "0.4" env_logger = "0.11" -markbase-webdav = { path = "../markbase-webdav" } pulldown-cmark = "0.12" rusqlite = { version = "0.32", features = ["bundled"] } sled = "1.0.0-alpha.124" diff --git a/markbase-core/src/cli/interface/webdav.rs b/markbase-core/src/cli/interface/webdav.rs index 24b2f10..2d4c7f6 100644 --- a/markbase-core/src/cli/interface/webdav.rs +++ b/markbase-core/src/cli/interface/webdav.rs @@ -1,5 +1,8 @@ use axum::{extract::Request, response::IntoResponse, Extension}; use clap::Subcommand; +use dav_server::{fakels::FakeLs, DavHandler}; +use std::path::PathBuf; +use std::sync::Arc; #[derive(Subcommand)] pub enum WebdavCommand { @@ -15,22 +18,22 @@ pub enum WebdavCommand { pub async fn handle_webdav_command(cmd: WebdavCommand) -> anyhow::Result<()> { match cmd { WebdavCommand::Start { port, user } => { - let db_path = std::path::PathBuf::from(crate::FileTree::user_db_path(&user)); + let home_dir = PathBuf::from("/Users/accusys/momentry/var/sftpgo/data").join(&user); - if !db_path.exists() { + if !home_dir.exists() { return Err(anyhow::anyhow!( - "User database not found: {}", - db_path.display() + "User home directory not found: {}", + home_dir.display() )); } - println!("=== MarkBase WebDAV Server ==="); + println!("=== MarkBase WebDAV Server (VFS) ==="); println!("User: {}", user); println!("Port: {}", port); - println!("Database: {}", db_path.display()); + println!("Home: {}", home_dir.display()); println!(); - run_webdav_server(port, user, db_path).await?; + run_webdav_server(port, home_dir, user).await?; } } Ok(()) @@ -38,14 +41,22 @@ pub async fn handle_webdav_command(cmd: WebdavCommand) -> anyhow::Result<()> { async fn run_webdav_server( port: u16, + home_dir: PathBuf, user: String, - db_path: std::path::PathBuf, ) -> anyhow::Result<()> { - use axum::{routing::any, Extension, Router}; + use axum::{routing::any, Router}; use tokio::net::TcpListener; - let webdav = markbase_webdav::webdav::MarkBaseWebDAV::new(user, db_path); - let dav_handler = webdav.create_handler(); + let vfs = Box::new(crate::vfs::local_fs::LocalFs::new()); + let upload_hook = None; + + let dav_fs = crate::webdav::VfsDavFs::new(vfs, home_dir, upload_hook, user); + + let dav_handler = DavHandler::builder() + .filesystem(dav_fs) + .locksystem(FakeLs::new()) + .strip_prefix("/webdav") + .build_handler(); let app = Router::new() .route("/webdav", any(handle_dav)) @@ -67,7 +78,7 @@ async fn run_webdav_server( } async fn handle_dav( - Extension(dav): Extension, + Extension(dav): Extension, req: Request, ) -> impl IntoResponse { dav.handle(req).await diff --git a/markbase-core/src/lib.rs b/markbase-core/src/lib.rs index 0fdce86..10a1118 100644 --- a/markbase-core/src/lib.rs +++ b/markbase-core/src/lib.rs @@ -1,5 +1,5 @@ pub mod api; -pub mod archive; // Archive Module - Universal Compression Format Support (Phase 1-3完成) +pub mod archive; pub mod audio; pub mod audit; pub mod auth; @@ -10,6 +10,7 @@ pub mod config; pub mod download; pub mod import_markdown; pub mod pg_client; +pub mod provider; pub mod render; pub mod rsync; pub mod s3; @@ -17,17 +18,14 @@ pub mod s3_auth; pub mod s3_config; pub mod s3_xml; pub mod scan; -pub mod server; // Category View Module - 双视图管理(Phase 1) - // pub mod sftp; // ⚠️ russh版本(已禁用) - // pub mod ssh2_server; // ssh2服务器(已禁用) - // pub mod ssh2_mod; // ssh2辅助模块(已禁用) -pub mod provider; // DataProvider抽象层(Phase 5) -pub mod ssh_server; // SSH服务器(Phase 1-9完成,正在修复编译错误)⭐⭐⭐⭐⭐ +pub mod server; +pub mod ssh_server; pub mod sync; -pub mod vfs; // VFS抽象层(Phase 1-6重构计划) +pub mod vfs; +pub mod webdav; #[cfg(test)] -mod security_audit; // Security Audit Module - Phase 9 +mod security_audit; // Re-export from external filetree crate pub use filetree::node::FileNode; diff --git a/markbase-core/src/vfs/local_fs.rs b/markbase-core/src/vfs/local_fs.rs index a03dae2..63ca745 100644 --- a/markbase-core/src/vfs/local_fs.rs +++ b/markbase-core/src/vfs/local_fs.rs @@ -61,6 +61,10 @@ impl VfsFile for LocalFile { } impl VfsBackend for LocalFs { + fn clone_boxed(&self) -> Box { + Box::new(Self {}) + } + fn read_dir(&self, path: &Path) -> Result, VfsError> { let dir = fs::read_dir(path).map_err(|e| util::map_io_error(path, e))?; diff --git a/markbase-core/src/vfs/mod.rs b/markbase-core/src/vfs/mod.rs index e104b1f..b3d251f 100644 --- a/markbase-core/src/vfs/mod.rs +++ b/markbase-core/src/vfs/mod.rs @@ -115,7 +115,10 @@ pub trait VfsFile { } /// VFS 后端 trait(所有文件系统操作) -pub trait VfsBackend: Send { +pub trait VfsBackend: Send + Sync { + /// Clone boxed + fn clone_boxed(&self) -> Box; + /// 读取目录内容 fn read_dir(&self, path: &Path) -> Result, VfsError>; diff --git a/markbase-core/src/vfs/s3_fs.rs b/markbase-core/src/vfs/s3_fs.rs index 5089f25..36e43c6 100644 --- a/markbase-core/src/vfs/s3_fs.rs +++ b/markbase-core/src/vfs/s3_fs.rs @@ -193,6 +193,13 @@ impl S3Vfs { } impl VfsBackend for S3Vfs { + fn clone_boxed(&self) -> Box { + Box::new(Self { + bucket: self.bucket.clone(), + credentials: self.credentials.clone(), + }) + } + fn read_dir(&self, path: &Path) -> Result, VfsError> { let prefix = Self::dir_key(path); let list = self.list_objects(&prefix)?; diff --git a/markbase-core/src/webdav.rs b/markbase-core/src/webdav.rs new file mode 100644 index 0000000..66dfbc0 --- /dev/null +++ b/markbase-core/src/webdav.rs @@ -0,0 +1,310 @@ +use crate::vfs::open_flags::OpenFlags; +use crate::vfs::{VfsBackend, VfsDirEntry, VfsStat, VfsFile}; +use crate::ssh_server::upload_hook::UploadHook; +use bytes::{Buf, Bytes}; +use dav_server::davpath::DavPath; +use dav_server::fs::{ + DavDirEntry, DavFile, DavFileSystem, DavMetaData, FsError, FsFuture, FsStream, OpenOptions, +}; +use dav_server::fakels::FakeLs; +use futures_util::stream; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::SystemTime; + +pub struct VfsDavFs { + vfs: Box, + root: PathBuf, + upload_hook: Option>, + user_uuid: String, +} + +impl Clone for VfsDavFs { + fn clone(&self) -> Self { + Self { + vfs: self.vfs.clone_boxed(), + root: self.root.clone(), + upload_hook: self.upload_hook.clone(), + user_uuid: self.user_uuid.clone(), + } + } +} + +impl VfsDavFs { + pub fn new( + vfs: Box, + root: PathBuf, + upload_hook: Option>, + user_uuid: String, + ) -> Box { + Box::new(Self { + vfs, + root, + upload_hook, + user_uuid, + }) + } + + fn resolve_path(&self, path: &DavPath) -> PathBuf { + let relative = path.as_pathbuf(); + self.root.join(relative) + } +} + +#[derive(Debug, Clone)] +pub struct VfsDavMetaData { + len: u64, + is_dir: bool, + modified: SystemTime, +} + +impl VfsDavMetaData { + pub fn new(len: u64, is_dir: bool) -> Self { + Self { + len, + is_dir, + modified: SystemTime::now(), + } + } + + pub fn from_stat(stat: &VfsStat) -> Self { + Self { + len: stat.size, + is_dir: stat.is_dir, + modified: stat.mtime, + } + } +} + +impl DavMetaData for VfsDavMetaData { + fn len(&self) -> u64 { + self.len + } + + fn modified(&self) -> Result { + Ok(self.modified) + } + + fn is_dir(&self) -> bool { + self.is_dir + } +} + +#[derive(Debug, Clone)] +pub struct VfsDavDirEntry { + name: String, + meta: VfsDavMetaData, +} + +impl VfsDavDirEntry { + pub fn new(name: String, meta: VfsDavMetaData) -> Self { + Self { name, meta } + } +} + +impl DavDirEntry for VfsDavDirEntry { + fn name(&self) -> Vec { + self.name.as_bytes().to_vec() + } + + fn metadata(&self) -> FsFuture<'_, Box> { + Box::pin(std::future::ready(Ok(Box::new(self.meta.clone()) as Box))) + } +} + +pub struct VfsDavFile { + data: Vec, + position: u64, + path: Option, + upload_hook: Option>, + user_uuid: String, + is_write: bool, +} + +impl std::fmt::Debug for VfsDavFile { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("VfsDavFile") + .field("is_write", &self.is_write) + .field("path", &self.path) + .finish() + } +} + +impl VfsDavFile { + pub fn new_read(data: Vec) -> Self { + Self { + data, + position: 0, + path: None, + upload_hook: None, + user_uuid: String::new(), + is_write: false, + } + } + + pub fn new_write( + path: PathBuf, + upload_hook: Option>, + user_uuid: String, + ) -> Self { + Self { + data: Vec::new(), + position: 0, + path: Some(path), + upload_hook, + user_uuid, + is_write: true, + } + } +} + +impl DavFile for VfsDavFile { + fn metadata(&'_ mut self) -> FsFuture<'_, Box> { + let len = self.data.len() as u64; + Box::pin(std::future::ready(Ok( + Box::new(VfsDavMetaData::new(len, false)) as Box, + ))) + } + + fn write_buf(&'_ mut self, mut buf: Box) -> FsFuture<'_, ()> { + while buf.has_remaining() { + let chunk = buf.chunk(); + self.data.extend_from_slice(chunk); + buf.advance(chunk.len()); + } + Box::pin(std::future::ready(Ok(()))) + } + + fn write_bytes(&'_ mut self, buf: Bytes) -> FsFuture<'_, ()> { + self.data.extend_from_slice(&buf); + Box::pin(std::future::ready(Ok(()))) + } + + fn read_bytes(&'_ mut self, count: usize) -> FsFuture<'_, Bytes> { + let start = self.position as usize; + let end = std::cmp::min(start + count, self.data.len()); + + if start >= self.data.len() { + Box::pin(std::future::ready(Ok(Bytes::new()))) + } else { + let bytes = Bytes::copy_from_slice(&self.data[start..end]); + self.position = end as u64; + Box::pin(std::future::ready(Ok(bytes))) + } + } + + fn seek(&'_ mut self, pos: std::io::SeekFrom) -> FsFuture<'_, u64> { + let new_pos = match pos { + std::io::SeekFrom::Start(offset) => offset, + std::io::SeekFrom::Current(offset) => { + let current = self.position as i64; + (current + offset) as u64 + } + std::io::SeekFrom::End(offset) => { + let end = self.data.len() as i64; + (end + offset) as u64 + } + }; + self.position = new_pos; + Box::pin(std::future::ready(Ok(new_pos))) + } + + fn flush(&'_ mut self) -> FsFuture<'_, ()> { + if self.is_write { + if let Some(path) = &self.path { + if let Err(_e) = std::fs::write(path, &self.data) { + return Box::pin(std::future::ready(Err(FsError::NotImplemented))); + } + if let Some(hook) = &self.upload_hook { + if let Err(e) = hook.trigger(path, &self.user_uuid) { + log::warn!("Upload hook failed for {:?}: {}", path, e); + } + } + } + } + Box::pin(std::future::ready(Ok(()))) + } +} + +impl DavFileSystem for VfsDavFs { + fn open<'a>(&'a self, path: &'a DavPath, options: OpenOptions) -> FsFuture<'a, Box> { + let full_path = self.resolve_path(path); + + if options.write { + let file = VfsDavFile::new_write( + full_path, + self.upload_hook.clone(), + self.user_uuid.clone(), + ); + Box::pin(std::future::ready(Ok(Box::new(file) as Box))) + } else { + let flags = OpenFlags::new().read(); + match self.vfs.open_file(&full_path, &flags) { + Ok(mut vfs_file) => { + let mut data = Vec::new(); + loop { + let mut buf = vec![0u8; 8192]; + match vfs_file.read(&mut buf) { + Ok(0) => break, + Ok(n) => data.extend_from_slice(&buf[..n]), + Err(_) => return Box::pin(std::future::ready(Err(FsError::NotFound))), + } + } + let file = VfsDavFile::new_read(data); + Box::pin(std::future::ready(Ok(Box::new(file) as Box))) + } + Err(_) => Box::pin(std::future::ready(Err(FsError::NotFound))), + } + } + } + + fn read_dir<'a>( + &'a self, + path: &'a DavPath, + _meta: dav_server::fs::ReadDirMeta, + ) -> FsFuture<'a, FsStream>> { + let full_path = self.resolve_path(path); + + match self.vfs.read_dir(&full_path) { + Ok(entries) => { + let results: Vec> = entries + .into_iter() + .map(|e| { + let meta = VfsDavMetaData::from_stat(&e.stat); + Box::new(VfsDavDirEntry::new(e.name, meta)) as Box + }) + .collect(); + + let stream = stream::iter(results.into_iter().map(Ok)); + Box::pin(std::future::ready(Ok(Box::pin(stream) as FsStream>))) + } + Err(_) => Box::pin(std::future::ready(Err(FsError::NotFound))), + } + } + + fn metadata<'a>(&'a self, path: &'a DavPath) -> FsFuture<'a, Box> { + let full_path = self.resolve_path(path); + + match self.vfs.stat(&full_path) { + Ok(stat) => { + let meta = VfsDavMetaData::from_stat(&stat); + Box::pin(std::future::ready(Ok(Box::new(meta) as Box))) + } + Err(_) => Box::pin(std::future::ready(Err(FsError::NotFound))), + } + } +} + +pub fn create_webdav_handler( + vfs: Box, + root: PathBuf, + upload_hook: Option>, + user_uuid: String, +) -> dav_server::DavHandler { + let dav_fs = VfsDavFs::new(vfs, root, upload_hook, user_uuid); + dav_server::DavHandler::builder() + .filesystem(dav_fs) + .locksystem(FakeLs::new()) + .strip_prefix("/webdav") + .build_handler() +} \ No newline at end of file