WebDAV async + cache TTL: spawn_blocking for props persistence, 5min TTL eviction
Some checks failed
Test / test (push) Has been cancelled
Test / build (push) Has been cancelled

P2 improvements:
- patch_props: use tokio::spawn_blocking for blocking VFS writes
- WEBDAV_HANDLER_CACHE: add CachedHandler with Instant timestamp
- TTL check on each request (300s = 5 minutes), recreate if expired
- create_handler_for_user() helper function

Tests: 288 passed, 0 failed
This commit is contained in:
Warren
2026-06-21 16:14:42 +08:00
parent 9acd174388
commit 5000ba7c14
2 changed files with 129 additions and 78 deletions

View File

@@ -13,6 +13,7 @@ use base64::Engine as _;
use serde::Deserialize; use serde::Deserialize;
use std::str::FromStr; use std::str::FromStr;
use std::sync::{Arc, LazyLock, Mutex, RwLock}; use std::sync::{Arc, LazyLock, Mutex, RwLock};
use std::time::{Duration, Instant};
use dashmap::DashMap; use dashmap::DashMap;
@@ -2470,9 +2471,53 @@ async fn search_files_handler(Query(query): Query<SearchQuery>) -> impl IntoResp
} }
// ===== WebDAV multi-user handler (Phase 20 + P1 multi-user) ===== // ===== WebDAV multi-user handler (Phase 20 + P1 multi-user) =====
static WEBDAV_HANDLER_CACHE: LazyLock<DashMap<String, dav_server::DavHandler>> = struct CachedHandler {
handler: dav_server::DavHandler,
created_at: Instant,
}
static WEBDAV_HANDLER_CACHE: LazyLock<DashMap<String, CachedHandler>> =
LazyLock::new(|| DashMap::new()); LazyLock::new(|| DashMap::new());
const WEBDAV_CACHE_TTL_SECS: u64 = 300; // 5 minutes
fn create_handler_for_user(
parent: &std::path::Path,
username: &str,
use_s3: bool,
s3_cfg: &crate::s3_config::S3Config,
upload_hook: &Arc<crate::ssh_server::upload_hook::UploadHook>,
versioning: &Arc<crate::webdav_version::WebDavVersioning>,
) -> dav_server::DavHandler {
let user_root = parent.join(username);
let vfs: Box<dyn crate::vfs::VfsBackend> = if use_s3 {
match crate::vfs::s3_fs::S3Vfs::new(
&s3_cfg.s3.endpoint,
&s3_cfg.s3.region,
&format!("webdav-{}", username),
&s3_cfg.keys.default_access_key,
&s3_cfg.keys.default_secret_key,
) {
Ok(s3) => Box::new(s3),
Err(_) => Box::new(crate::vfs::local_fs::LocalFs::new()),
}
} else {
Box::new(crate::vfs::local_fs::LocalFs::new())
};
let locks_dir = parent.join(".webdav_locks");
let _ = std::fs::create_dir_all(&locks_dir);
let locks_file = locks_dir.join(format!("{}.json", username));
crate::webdav::create_webdav_handler_persisted(
vfs,
user_root,
Some(upload_hook.clone()),
username.to_string(),
Some(versioning.clone()),
locks_file,
)
}
async fn handle_webdav_multi( async fn handle_webdav_multi(
Extension(parent): Extension<std::path::PathBuf>, Extension(parent): Extension<std::path::PathBuf>,
Extension(upload_hook): Extension<Arc<crate::ssh_server::upload_hook::UploadHook>>, Extension(upload_hook): Extension<Arc<crate::ssh_server::upload_hook::UploadHook>>,
@@ -2517,39 +2562,41 @@ async fn handle_webdav_multi(
None => return unauthorized_response(), None => return unauthorized_response(),
}; };
// 3. Get or create cached DavHandler for this user // 3. Get or create cached DavHandler for this user (with TTL eviction)
let handler = WEBDAV_HANDLER_CACHE let handler = {
.entry(username.clone()) if let Some(cached) = WEBDAV_HANDLER_CACHE.get(&username) {
.or_insert_with(|| { if cached.created_at.elapsed() < Duration::from_secs(WEBDAV_CACHE_TTL_SECS) {
let user_root = parent.join(&username); cached.handler.clone()
let vfs: Box<dyn crate::vfs::VfsBackend> = if use_s3 {
match crate::vfs::s3_fs::S3Vfs::new(
&s3_cfg.s3.endpoint,
&s3_cfg.s3.region,
&format!("webdav-{}", username),
&s3_cfg.keys.default_access_key,
&s3_cfg.keys.default_secret_key,
) {
Ok(s3) => Box::new(s3),
Err(_) => Box::new(crate::vfs::local_fs::LocalFs::new()),
}
} else { } else {
Box::new(crate::vfs::local_fs::LocalFs::new()) WEBDAV_HANDLER_CACHE.remove(&username);
}; WEBDAV_HANDLER_CACHE
.entry(username.clone())
let locks_dir = parent.join(".webdav_locks"); .or_insert_with(|| {
let _ = std::fs::create_dir_all(&locks_dir); CachedHandler {
let locks_file = locks_dir.join(format!("{}.json", username)); handler: create_handler_for_user(
crate::webdav::create_webdav_handler_persisted( &parent, &username, use_s3, &s3_cfg, &upload_hook, &versioning,
vfs, ),
user_root, created_at: Instant::now(),
Some(upload_hook), }
username, })
Some(versioning), .handler
locks_file, .clone()
) }
}) } else {
.clone(); WEBDAV_HANDLER_CACHE
.entry(username.clone())
.or_insert_with(|| {
CachedHandler {
handler: create_handler_for_user(
&parent, &username, use_s3, &s3_cfg, &upload_hook, &versioning,
),
created_at: Instant::now(),
}
})
.handler
.clone()
}
};
let dav_resp = handler.handle(req).await; let dav_resp = handler.handle(req).await;

View File

@@ -776,58 +776,62 @@ impl DavFileSystem for VfsDavFs {
let vfs = self.vfs.clone_boxed(); let vfs = self.vfs.clone_boxed();
let root = self.root.clone(); let root = self.root.clone();
Box::pin(async move { Box::pin(async move {
let mut map = match data.write() { let results: Vec<(StatusCode, DavProp)> = {
Ok(guard) => guard, let mut map = match data.write() {
Err(e) => { Ok(guard) => guard,
log::warn!("props_data RwLock poisoned in patch_props, recovering"); Err(e) => {
e.into_inner() log::warn!("props_data RwLock poisoned in patch_props, recovering");
} e.into_inner()
}
};
patch
.into_iter()
.map(|(set, prop)| {
let code = if set {
map.entry(key.clone()).or_default().push(prop.clone());
StatusCode::OK
} else {
if let Some(props) = map.get_mut(&key) {
props.retain(|p| {
p.name != prop.name || p.namespace != prop.namespace
});
}
StatusCode::NO_CONTENT
};
(code, prop)
})
.collect()
}; };
let results: Vec<(StatusCode, DavProp)> = patch let entries: HashMap<String, Vec<DeadPropEntry>> = {
.into_iter() let map = match data.read() {
.map(|(set, prop)| { Ok(guard) => guard,
let code = if set { Err(e) => {
map.entry(key.clone()).or_default().push(prop.clone()); log::warn!("props_data RwLock poisoned in patch_props persistence, recovering");
StatusCode::OK e.into_inner()
} else { }
if let Some(props) = map.get_mut(&key) { };
props.retain(|p| { map.iter()
p.name != prop.name || p.namespace != prop.namespace .map(|(k, v)| (k.clone(), v.iter().map(DeadPropEntry::from).collect()))
}); .collect()
}
StatusCode::NO_CONTENT
};
(code, prop)
})
.collect();
drop(map);
let map = match data.read() {
Ok(guard) => guard,
Err(e) => {
log::warn!("props_data RwLock poisoned in patch_props persistence, recovering");
e.into_inner()
}
}; };
let entries: HashMap<String, Vec<DeadPropEntry>> = map
.iter()
.map(|(k, v)| (k.clone(), v.iter().map(DeadPropEntry::from).collect()))
.collect();
if let Ok(json) = serde_json::to_string(&entries) { if let Ok(json) = serde_json::to_string(&entries) {
let path = root.join(".webdav_props.json"); let path = root.join(".webdav_props.json");
let flags = OpenFlags::new().write().create().truncate().mode(0o644); let _ = tokio::task::spawn_blocking(move || {
match vfs.open_file(&path, &flags) { let flags = OpenFlags::new().write().create().truncate().mode(0o644);
Ok(mut file) => { match vfs.open_file(&path, &flags) {
if let Err(e) = file.write_all(json.as_bytes()) { Ok(mut file) => {
log::warn!("patch_props write_all failed: {:?}", e); if let Err(e) = file.write_all(json.as_bytes()) {
log::warn!("patch_props write_all failed: {:?}", e);
}
if let Err(e) = file.flush() {
log::warn!("patch_props flush failed: {:?}", e);
}
} }
if let Err(e) = file.flush() { Err(e) => {
log::warn!("patch_props flush failed: {:?}", e); log::warn!("patch_props open_file failed: {:?}", e);
} }
} }
Err(e) => { }).await;
log::warn!("patch_props open_file failed: {:?}", e);
}
}
} }
Ok(results) Ok(results)
}) })