From 5000ba7c14777c1fb3f6fb48a1a56c606eb801e2 Mon Sep 17 00:00:00 2001 From: Warren Date: Sun, 21 Jun 2026 16:14:42 +0800 Subject: [PATCH] WebDAV async + cache TTL: spawn_blocking for props persistence, 5min TTL eviction P2 improvements: - patch_props: use tokio::spawn_blocking for blocking VFS writes - WEBDAV_HANDLER_CACHE: add CachedHandler with Instant timestamp - TTL check on each request (300s = 5 minutes), recreate if expired - create_handler_for_user() helper function Tests: 288 passed, 0 failed --- markbase-core/src/server.rs | 113 +++++++++++++++++++++++++----------- markbase-core/src/webdav.rs | 94 ++++++++++++++++-------------- 2 files changed, 129 insertions(+), 78 deletions(-) diff --git a/markbase-core/src/server.rs b/markbase-core/src/server.rs index 99ca4a0..a4a4d1b 100644 --- a/markbase-core/src/server.rs +++ b/markbase-core/src/server.rs @@ -13,6 +13,7 @@ use base64::Engine as _; use serde::Deserialize; use std::str::FromStr; use std::sync::{Arc, LazyLock, Mutex, RwLock}; +use std::time::{Duration, Instant}; use dashmap::DashMap; @@ -2470,9 +2471,53 @@ async fn search_files_handler(Query(query): Query) -> impl IntoResp } // ===== WebDAV multi-user handler (Phase 20 + P1 multi-user) ===== -static WEBDAV_HANDLER_CACHE: LazyLock> = +struct CachedHandler { + handler: dav_server::DavHandler, + created_at: Instant, +} + +static WEBDAV_HANDLER_CACHE: LazyLock> = LazyLock::new(|| DashMap::new()); +const WEBDAV_CACHE_TTL_SECS: u64 = 300; // 5 minutes + +fn create_handler_for_user( + parent: &std::path::Path, + username: &str, + use_s3: bool, + s3_cfg: &crate::s3_config::S3Config, + upload_hook: &Arc, + versioning: &Arc, +) -> dav_server::DavHandler { + let user_root = parent.join(username); + let vfs: Box = if use_s3 { + match crate::vfs::s3_fs::S3Vfs::new( + &s3_cfg.s3.endpoint, + &s3_cfg.s3.region, + &format!("webdav-{}", username), + &s3_cfg.keys.default_access_key, + &s3_cfg.keys.default_secret_key, + ) { + Ok(s3) => Box::new(s3), + Err(_) => Box::new(crate::vfs::local_fs::LocalFs::new()), + } + } else { + Box::new(crate::vfs::local_fs::LocalFs::new()) + }; + + let locks_dir = parent.join(".webdav_locks"); + let _ = std::fs::create_dir_all(&locks_dir); + let locks_file = locks_dir.join(format!("{}.json", username)); + crate::webdav::create_webdav_handler_persisted( + vfs, + user_root, + Some(upload_hook.clone()), + username.to_string(), + Some(versioning.clone()), + locks_file, + ) +} + async fn handle_webdav_multi( Extension(parent): Extension, Extension(upload_hook): Extension>, @@ -2517,39 +2562,41 @@ async fn handle_webdav_multi( None => return unauthorized_response(), }; - // 3. Get or create cached DavHandler for this user - let handler = WEBDAV_HANDLER_CACHE - .entry(username.clone()) - .or_insert_with(|| { - let user_root = parent.join(&username); - let vfs: Box = if use_s3 { - match crate::vfs::s3_fs::S3Vfs::new( - &s3_cfg.s3.endpoint, - &s3_cfg.s3.region, - &format!("webdav-{}", username), - &s3_cfg.keys.default_access_key, - &s3_cfg.keys.default_secret_key, - ) { - Ok(s3) => Box::new(s3), - Err(_) => Box::new(crate::vfs::local_fs::LocalFs::new()), - } + // 3. Get or create cached DavHandler for this user (with TTL eviction) + let handler = { + if let Some(cached) = WEBDAV_HANDLER_CACHE.get(&username) { + if cached.created_at.elapsed() < Duration::from_secs(WEBDAV_CACHE_TTL_SECS) { + cached.handler.clone() } else { - Box::new(crate::vfs::local_fs::LocalFs::new()) - }; - - let locks_dir = parent.join(".webdav_locks"); - let _ = std::fs::create_dir_all(&locks_dir); - let locks_file = locks_dir.join(format!("{}.json", username)); - crate::webdav::create_webdav_handler_persisted( - vfs, - user_root, - Some(upload_hook), - username, - Some(versioning), - locks_file, - ) - }) - .clone(); + WEBDAV_HANDLER_CACHE.remove(&username); + WEBDAV_HANDLER_CACHE + .entry(username.clone()) + .or_insert_with(|| { + CachedHandler { + handler: create_handler_for_user( + &parent, &username, use_s3, &s3_cfg, &upload_hook, &versioning, + ), + created_at: Instant::now(), + } + }) + .handler + .clone() + } + } else { + WEBDAV_HANDLER_CACHE + .entry(username.clone()) + .or_insert_with(|| { + CachedHandler { + handler: create_handler_for_user( + &parent, &username, use_s3, &s3_cfg, &upload_hook, &versioning, + ), + created_at: Instant::now(), + } + }) + .handler + .clone() + } + }; let dav_resp = handler.handle(req).await; diff --git a/markbase-core/src/webdav.rs b/markbase-core/src/webdav.rs index bb1a937..a3e2d49 100644 --- a/markbase-core/src/webdav.rs +++ b/markbase-core/src/webdav.rs @@ -776,58 +776,62 @@ impl DavFileSystem for VfsDavFs { let vfs = self.vfs.clone_boxed(); let root = self.root.clone(); Box::pin(async move { - let mut map = match data.write() { - Ok(guard) => guard, - Err(e) => { - log::warn!("props_data RwLock poisoned in patch_props, recovering"); - e.into_inner() - } + let results: Vec<(StatusCode, DavProp)> = { + let mut map = match data.write() { + Ok(guard) => guard, + Err(e) => { + log::warn!("props_data RwLock poisoned in patch_props, recovering"); + e.into_inner() + } + }; + patch + .into_iter() + .map(|(set, prop)| { + let code = if set { + map.entry(key.clone()).or_default().push(prop.clone()); + StatusCode::OK + } else { + if let Some(props) = map.get_mut(&key) { + props.retain(|p| { + p.name != prop.name || p.namespace != prop.namespace + }); + } + StatusCode::NO_CONTENT + }; + (code, prop) + }) + .collect() }; - let results: Vec<(StatusCode, DavProp)> = patch - .into_iter() - .map(|(set, prop)| { - let code = if set { - map.entry(key.clone()).or_default().push(prop.clone()); - StatusCode::OK - } else { - if let Some(props) = map.get_mut(&key) { - props.retain(|p| { - p.name != prop.name || p.namespace != prop.namespace - }); - } - StatusCode::NO_CONTENT - }; - (code, prop) - }) - .collect(); - drop(map); - let map = match data.read() { - Ok(guard) => guard, - Err(e) => { - log::warn!("props_data RwLock poisoned in patch_props persistence, recovering"); - e.into_inner() - } + let entries: HashMap> = { + let map = match data.read() { + Ok(guard) => guard, + Err(e) => { + log::warn!("props_data RwLock poisoned in patch_props persistence, recovering"); + e.into_inner() + } + }; + map.iter() + .map(|(k, v)| (k.clone(), v.iter().map(DeadPropEntry::from).collect())) + .collect() }; - let entries: HashMap> = map - .iter() - .map(|(k, v)| (k.clone(), v.iter().map(DeadPropEntry::from).collect())) - .collect(); if let Ok(json) = serde_json::to_string(&entries) { let path = root.join(".webdav_props.json"); - let flags = OpenFlags::new().write().create().truncate().mode(0o644); - match vfs.open_file(&path, &flags) { - Ok(mut file) => { - if let Err(e) = file.write_all(json.as_bytes()) { - log::warn!("patch_props write_all failed: {:?}", e); + let _ = tokio::task::spawn_blocking(move || { + let flags = OpenFlags::new().write().create().truncate().mode(0o644); + match vfs.open_file(&path, &flags) { + Ok(mut file) => { + if let Err(e) = file.write_all(json.as_bytes()) { + log::warn!("patch_props write_all failed: {:?}", e); + } + if let Err(e) = file.flush() { + log::warn!("patch_props flush failed: {:?}", e); + } } - if let Err(e) = file.flush() { - log::warn!("patch_props flush failed: {:?}", e); + Err(e) => { + log::warn!("patch_props open_file failed: {:?}", e); } } - Err(e) => { - log::warn!("patch_props open_file failed: {:?}", e); - } - } + }).await; } Ok(results) })