WebDAV async + cache TTL: spawn_blocking for props persistence, 5min TTL eviction
P2 improvements: - patch_props: use tokio::spawn_blocking for blocking VFS writes - WEBDAV_HANDLER_CACHE: add CachedHandler with Instant timestamp - TTL check on each request (300s = 5 minutes), recreate if expired - create_handler_for_user() helper function Tests: 288 passed, 0 failed
This commit is contained in:
@@ -13,6 +13,7 @@ use base64::Engine as _;
|
||||
use serde::Deserialize;
|
||||
use std::str::FromStr;
|
||||
use std::sync::{Arc, LazyLock, Mutex, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use dashmap::DashMap;
|
||||
|
||||
@@ -2470,9 +2471,53 @@ async fn search_files_handler(Query(query): Query<SearchQuery>) -> impl IntoResp
|
||||
}
|
||||
// ===== WebDAV multi-user handler (Phase 20 + P1 multi-user) =====
|
||||
|
||||
static WEBDAV_HANDLER_CACHE: LazyLock<DashMap<String, dav_server::DavHandler>> =
|
||||
struct CachedHandler {
|
||||
handler: dav_server::DavHandler,
|
||||
created_at: Instant,
|
||||
}
|
||||
|
||||
static WEBDAV_HANDLER_CACHE: LazyLock<DashMap<String, CachedHandler>> =
|
||||
LazyLock::new(|| DashMap::new());
|
||||
|
||||
const WEBDAV_CACHE_TTL_SECS: u64 = 300; // 5 minutes
|
||||
|
||||
fn create_handler_for_user(
|
||||
parent: &std::path::Path,
|
||||
username: &str,
|
||||
use_s3: bool,
|
||||
s3_cfg: &crate::s3_config::S3Config,
|
||||
upload_hook: &Arc<crate::ssh_server::upload_hook::UploadHook>,
|
||||
versioning: &Arc<crate::webdav_version::WebDavVersioning>,
|
||||
) -> dav_server::DavHandler {
|
||||
let user_root = parent.join(username);
|
||||
let vfs: Box<dyn crate::vfs::VfsBackend> = if use_s3 {
|
||||
match crate::vfs::s3_fs::S3Vfs::new(
|
||||
&s3_cfg.s3.endpoint,
|
||||
&s3_cfg.s3.region,
|
||||
&format!("webdav-{}", username),
|
||||
&s3_cfg.keys.default_access_key,
|
||||
&s3_cfg.keys.default_secret_key,
|
||||
) {
|
||||
Ok(s3) => Box::new(s3),
|
||||
Err(_) => Box::new(crate::vfs::local_fs::LocalFs::new()),
|
||||
}
|
||||
} else {
|
||||
Box::new(crate::vfs::local_fs::LocalFs::new())
|
||||
};
|
||||
|
||||
let locks_dir = parent.join(".webdav_locks");
|
||||
let _ = std::fs::create_dir_all(&locks_dir);
|
||||
let locks_file = locks_dir.join(format!("{}.json", username));
|
||||
crate::webdav::create_webdav_handler_persisted(
|
||||
vfs,
|
||||
user_root,
|
||||
Some(upload_hook.clone()),
|
||||
username.to_string(),
|
||||
Some(versioning.clone()),
|
||||
locks_file,
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_webdav_multi(
|
||||
Extension(parent): Extension<std::path::PathBuf>,
|
||||
Extension(upload_hook): Extension<Arc<crate::ssh_server::upload_hook::UploadHook>>,
|
||||
@@ -2517,39 +2562,41 @@ async fn handle_webdav_multi(
|
||||
None => return unauthorized_response(),
|
||||
};
|
||||
|
||||
// 3. Get or create cached DavHandler for this user
|
||||
let handler = WEBDAV_HANDLER_CACHE
|
||||
.entry(username.clone())
|
||||
.or_insert_with(|| {
|
||||
let user_root = parent.join(&username);
|
||||
let vfs: Box<dyn crate::vfs::VfsBackend> = if use_s3 {
|
||||
match crate::vfs::s3_fs::S3Vfs::new(
|
||||
&s3_cfg.s3.endpoint,
|
||||
&s3_cfg.s3.region,
|
||||
&format!("webdav-{}", username),
|
||||
&s3_cfg.keys.default_access_key,
|
||||
&s3_cfg.keys.default_secret_key,
|
||||
) {
|
||||
Ok(s3) => Box::new(s3),
|
||||
Err(_) => Box::new(crate::vfs::local_fs::LocalFs::new()),
|
||||
}
|
||||
// 3. Get or create cached DavHandler for this user (with TTL eviction)
|
||||
let handler = {
|
||||
if let Some(cached) = WEBDAV_HANDLER_CACHE.get(&username) {
|
||||
if cached.created_at.elapsed() < Duration::from_secs(WEBDAV_CACHE_TTL_SECS) {
|
||||
cached.handler.clone()
|
||||
} else {
|
||||
Box::new(crate::vfs::local_fs::LocalFs::new())
|
||||
};
|
||||
|
||||
let locks_dir = parent.join(".webdav_locks");
|
||||
let _ = std::fs::create_dir_all(&locks_dir);
|
||||
let locks_file = locks_dir.join(format!("{}.json", username));
|
||||
crate::webdav::create_webdav_handler_persisted(
|
||||
vfs,
|
||||
user_root,
|
||||
Some(upload_hook),
|
||||
username,
|
||||
Some(versioning),
|
||||
locks_file,
|
||||
)
|
||||
})
|
||||
.clone();
|
||||
WEBDAV_HANDLER_CACHE.remove(&username);
|
||||
WEBDAV_HANDLER_CACHE
|
||||
.entry(username.clone())
|
||||
.or_insert_with(|| {
|
||||
CachedHandler {
|
||||
handler: create_handler_for_user(
|
||||
&parent, &username, use_s3, &s3_cfg, &upload_hook, &versioning,
|
||||
),
|
||||
created_at: Instant::now(),
|
||||
}
|
||||
})
|
||||
.handler
|
||||
.clone()
|
||||
}
|
||||
} else {
|
||||
WEBDAV_HANDLER_CACHE
|
||||
.entry(username.clone())
|
||||
.or_insert_with(|| {
|
||||
CachedHandler {
|
||||
handler: create_handler_for_user(
|
||||
&parent, &username, use_s3, &s3_cfg, &upload_hook, &versioning,
|
||||
),
|
||||
created_at: Instant::now(),
|
||||
}
|
||||
})
|
||||
.handler
|
||||
.clone()
|
||||
}
|
||||
};
|
||||
|
||||
let dav_resp = handler.handle(req).await;
|
||||
|
||||
|
||||
@@ -776,58 +776,62 @@ impl DavFileSystem for VfsDavFs {
|
||||
let vfs = self.vfs.clone_boxed();
|
||||
let root = self.root.clone();
|
||||
Box::pin(async move {
|
||||
let mut map = match data.write() {
|
||||
Ok(guard) => guard,
|
||||
Err(e) => {
|
||||
log::warn!("props_data RwLock poisoned in patch_props, recovering");
|
||||
e.into_inner()
|
||||
}
|
||||
let results: Vec<(StatusCode, DavProp)> = {
|
||||
let mut map = match data.write() {
|
||||
Ok(guard) => guard,
|
||||
Err(e) => {
|
||||
log::warn!("props_data RwLock poisoned in patch_props, recovering");
|
||||
e.into_inner()
|
||||
}
|
||||
};
|
||||
patch
|
||||
.into_iter()
|
||||
.map(|(set, prop)| {
|
||||
let code = if set {
|
||||
map.entry(key.clone()).or_default().push(prop.clone());
|
||||
StatusCode::OK
|
||||
} else {
|
||||
if let Some(props) = map.get_mut(&key) {
|
||||
props.retain(|p| {
|
||||
p.name != prop.name || p.namespace != prop.namespace
|
||||
});
|
||||
}
|
||||
StatusCode::NO_CONTENT
|
||||
};
|
||||
(code, prop)
|
||||
})
|
||||
.collect()
|
||||
};
|
||||
let results: Vec<(StatusCode, DavProp)> = patch
|
||||
.into_iter()
|
||||
.map(|(set, prop)| {
|
||||
let code = if set {
|
||||
map.entry(key.clone()).or_default().push(prop.clone());
|
||||
StatusCode::OK
|
||||
} else {
|
||||
if let Some(props) = map.get_mut(&key) {
|
||||
props.retain(|p| {
|
||||
p.name != prop.name || p.namespace != prop.namespace
|
||||
});
|
||||
}
|
||||
StatusCode::NO_CONTENT
|
||||
};
|
||||
(code, prop)
|
||||
})
|
||||
.collect();
|
||||
drop(map);
|
||||
let map = match data.read() {
|
||||
Ok(guard) => guard,
|
||||
Err(e) => {
|
||||
log::warn!("props_data RwLock poisoned in patch_props persistence, recovering");
|
||||
e.into_inner()
|
||||
}
|
||||
let entries: HashMap<String, Vec<DeadPropEntry>> = {
|
||||
let map = match data.read() {
|
||||
Ok(guard) => guard,
|
||||
Err(e) => {
|
||||
log::warn!("props_data RwLock poisoned in patch_props persistence, recovering");
|
||||
e.into_inner()
|
||||
}
|
||||
};
|
||||
map.iter()
|
||||
.map(|(k, v)| (k.clone(), v.iter().map(DeadPropEntry::from).collect()))
|
||||
.collect()
|
||||
};
|
||||
let entries: HashMap<String, Vec<DeadPropEntry>> = map
|
||||
.iter()
|
||||
.map(|(k, v)| (k.clone(), v.iter().map(DeadPropEntry::from).collect()))
|
||||
.collect();
|
||||
if let Ok(json) = serde_json::to_string(&entries) {
|
||||
let path = root.join(".webdav_props.json");
|
||||
let flags = OpenFlags::new().write().create().truncate().mode(0o644);
|
||||
match vfs.open_file(&path, &flags) {
|
||||
Ok(mut file) => {
|
||||
if let Err(e) = file.write_all(json.as_bytes()) {
|
||||
log::warn!("patch_props write_all failed: {:?}", e);
|
||||
let _ = tokio::task::spawn_blocking(move || {
|
||||
let flags = OpenFlags::new().write().create().truncate().mode(0o644);
|
||||
match vfs.open_file(&path, &flags) {
|
||||
Ok(mut file) => {
|
||||
if let Err(e) = file.write_all(json.as_bytes()) {
|
||||
log::warn!("patch_props write_all failed: {:?}", e);
|
||||
}
|
||||
if let Err(e) = file.flush() {
|
||||
log::warn!("patch_props flush failed: {:?}", e);
|
||||
}
|
||||
}
|
||||
if let Err(e) = file.flush() {
|
||||
log::warn!("patch_props flush failed: {:?}", e);
|
||||
Err(e) => {
|
||||
log::warn!("patch_props open_file failed: {:?}", e);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
log::warn!("patch_props open_file failed: {:?}", e);
|
||||
}
|
||||
}
|
||||
}).await;
|
||||
}
|
||||
Ok(results)
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user