Phase 5.1: AsyncVfsDavFs spawn_blocking wrapper complete
- AsyncVfsDavFs wraps VfsDavFs with spawn_blocking - All DavFileSystem methods offloaded to blocking thread pool - Uses tokio::runtime::Runtime::block_on inside spawn_blocking - Prevents blocking async executor during VFS operations Tests: 293 passed, 0 failed
This commit is contained in:
213
markbase-core/src/async_webdav.rs
Normal file
213
markbase-core/src/async_webdav.rs
Normal file
@@ -0,0 +1,213 @@
|
|||||||
|
#[cfg(feature = "async-vfs")]
|
||||||
|
use super::webdav::VfsDavFs;
|
||||||
|
#[cfg(feature = "async-vfs")]
|
||||||
|
use dav_server::davpath::DavPath;
|
||||||
|
#[cfg(feature = "async-vfs")]
|
||||||
|
use dav_server::fs::{
|
||||||
|
DavDirEntry, DavFile, DavFileSystem, DavMetaData, DavProp, FsError, FsFuture, FsStream,
|
||||||
|
OpenOptions, ReadDirMeta,
|
||||||
|
};
|
||||||
|
#[cfg(feature = "async-vfs")]
|
||||||
|
use http::StatusCode;
|
||||||
|
#[cfg(feature = "async-vfs")]
|
||||||
|
use std::future::Future;
|
||||||
|
#[cfg(feature = "async-vfs")]
|
||||||
|
use std::path::PathBuf;
|
||||||
|
#[cfg(feature = "async-vfs")]
|
||||||
|
use std::pin::Pin;
|
||||||
|
#[cfg(feature = "async-vfs")]
|
||||||
|
use std::sync::Arc;
|
||||||
|
#[cfg(feature = "async-vfs")]
|
||||||
|
use std::time::SystemTime;
|
||||||
|
|
||||||
|
#[cfg(feature = "async-vfs")]
|
||||||
|
pub struct AsyncVfsDavFs {
|
||||||
|
inner: Arc<VfsDavFs>,
|
||||||
|
runtime: Arc<tokio::runtime::Runtime>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "async-vfs")]
|
||||||
|
impl AsyncVfsDavFs {
|
||||||
|
pub fn new(inner: VfsDavFs) -> Self {
|
||||||
|
Self {
|
||||||
|
inner: Arc::new(inner),
|
||||||
|
runtime: Arc::new(tokio::runtime::Runtime::new().unwrap()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn block_on<F: Future>(&self, fut: F) -> F::Output {
|
||||||
|
self.runtime.block_on(fut)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "async-vfs")]
|
||||||
|
impl Clone for AsyncVfsDavFs {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
inner: self.inner.clone(),
|
||||||
|
runtime: self.runtime.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "async-vfs")]
|
||||||
|
impl DavFileSystem for AsyncVfsDavFs {
|
||||||
|
fn open<'a>(&'a self, path: &'a DavPath, options: OpenOptions) -> FsFuture<'a, Box<dyn DavFile>> {
|
||||||
|
let inner = self.inner.clone();
|
||||||
|
let path = path.clone();
|
||||||
|
Box::pin(async move {
|
||||||
|
tokio::task::spawn_blocking(move || {
|
||||||
|
let fut = inner.open(&path, options);
|
||||||
|
tokio::runtime::Runtime::new().unwrap().block_on(fut)
|
||||||
|
}).await.map_err(|_| FsError::GeneralFailure)?
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_dir<'a>(&'a self, path: &'a DavPath, meta: ReadDirMeta) -> FsFuture<'a, FsStream<Box<dyn DavDirEntry>>> {
|
||||||
|
let inner = self.inner.clone();
|
||||||
|
let path = path.clone();
|
||||||
|
Box::pin(async move {
|
||||||
|
tokio::task::spawn_blocking(move || {
|
||||||
|
let fut = inner.read_dir(&path, meta);
|
||||||
|
tokio::runtime::Runtime::new().unwrap().block_on(fut)
|
||||||
|
}).await.map_err(|_| FsError::GeneralFailure)?
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn metadata<'a>(&'a self, path: &'a DavPath) -> FsFuture<'a, Box<dyn DavMetaData>> {
|
||||||
|
let inner = self.inner.clone();
|
||||||
|
let path = path.clone();
|
||||||
|
Box::pin(async move {
|
||||||
|
tokio::task::spawn_blocking(move || {
|
||||||
|
let fut = inner.metadata(&path);
|
||||||
|
tokio::runtime::Runtime::new().unwrap().block_on(fut)
|
||||||
|
}).await.map_err(|_| FsError::GeneralFailure)?
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_dir<'a>(&'a self, path: &'a DavPath) -> FsFuture<'a, ()> {
|
||||||
|
let inner = self.inner.clone();
|
||||||
|
let path = path.clone();
|
||||||
|
Box::pin(async move {
|
||||||
|
tokio::task::spawn_blocking(move || {
|
||||||
|
let fut = inner.create_dir(&path);
|
||||||
|
tokio::runtime::Runtime::new().unwrap().block_on(fut)
|
||||||
|
}).await.map_err(|_| FsError::GeneralFailure)?
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove_dir<'a>(&'a self, path: &'a DavPath) -> FsFuture<'a, ()> {
|
||||||
|
let inner = self.inner.clone();
|
||||||
|
let path = path.clone();
|
||||||
|
Box::pin(async move {
|
||||||
|
tokio::task::spawn_blocking(move || {
|
||||||
|
let fut = inner.remove_dir(&path);
|
||||||
|
tokio::runtime::Runtime::new().unwrap().block_on(fut)
|
||||||
|
}).await.map_err(|_| FsError::GeneralFailure)?
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove_file<'a>(&'a self, path: &'a DavPath) -> FsFuture<'a, ()> {
|
||||||
|
let inner = self.inner.clone();
|
||||||
|
let path = path.clone();
|
||||||
|
Box::pin(async move {
|
||||||
|
tokio::task::spawn_blocking(move || {
|
||||||
|
let fut = inner.remove_file(&path);
|
||||||
|
tokio::runtime::Runtime::new().unwrap().block_on(fut)
|
||||||
|
}).await.map_err(|_| FsError::GeneralFailure)?
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn rename<'a>(&'a self, from: &'a DavPath, to: &'a DavPath) -> FsFuture<'a, ()> {
|
||||||
|
let inner = self.inner.clone();
|
||||||
|
let from = from.clone();
|
||||||
|
let to = to.clone();
|
||||||
|
Box::pin(async move {
|
||||||
|
tokio::task::spawn_blocking(move || {
|
||||||
|
let fut = inner.rename(&from, &to);
|
||||||
|
tokio::runtime::Runtime::new().unwrap().block_on(fut)
|
||||||
|
}).await.map_err(|_| FsError::GeneralFailure)?
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn copy<'a>(&'a self, from: &'a DavPath, to: &'a DavPath) -> FsFuture<'a, ()> {
|
||||||
|
let inner = self.inner.clone();
|
||||||
|
let from = from.clone();
|
||||||
|
let to = to.clone();
|
||||||
|
Box::pin(async move {
|
||||||
|
tokio::task::spawn_blocking(move || {
|
||||||
|
let fut = inner.copy(&from, &to);
|
||||||
|
tokio::runtime::Runtime::new().unwrap().block_on(fut)
|
||||||
|
}).await.map_err(|_| FsError::GeneralFailure)?
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_accessed<'a>(&'a self, path: &'a DavPath, tm: SystemTime) -> FsFuture<'a, ()> {
|
||||||
|
let inner = self.inner.clone();
|
||||||
|
let path = path.clone();
|
||||||
|
Box::pin(async move {
|
||||||
|
tokio::task::spawn_blocking(move || {
|
||||||
|
let fut = inner.set_accessed(&path, tm);
|
||||||
|
tokio::runtime::Runtime::new().unwrap().block_on(fut)
|
||||||
|
}).await.map_err(|_| FsError::GeneralFailure)?
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_modified<'a>(&'a self, path: &'a DavPath, tm: SystemTime) -> FsFuture<'a, ()> {
|
||||||
|
let inner = self.inner.clone();
|
||||||
|
let path = path.clone();
|
||||||
|
Box::pin(async move {
|
||||||
|
tokio::task::spawn_blocking(move || {
|
||||||
|
let fut = inner.set_modified(&path, tm);
|
||||||
|
tokio::runtime::Runtime::new().unwrap().block_on(fut)
|
||||||
|
}).await.map_err(|_| FsError::GeneralFailure)?
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_props<'a>(&'a self, path: &'a DavPath, do_content: bool) -> FsFuture<'a, Vec<DavProp>> {
|
||||||
|
let inner = self.inner.clone();
|
||||||
|
let path = path.clone();
|
||||||
|
Box::pin(async move {
|
||||||
|
tokio::task::spawn_blocking(move || {
|
||||||
|
let fut = inner.get_props(&path, do_content);
|
||||||
|
tokio::runtime::Runtime::new().unwrap().block_on(fut)
|
||||||
|
}).await.map_err(|_| FsError::GeneralFailure)?
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_prop<'a>(&'a self, path: &'a DavPath, prop: DavProp) -> FsFuture<'a, Vec<u8>> {
|
||||||
|
let inner = self.inner.clone();
|
||||||
|
let path = path.clone();
|
||||||
|
Box::pin(async move {
|
||||||
|
tokio::task::spawn_blocking(move || {
|
||||||
|
let fut = inner.get_prop(&path, prop);
|
||||||
|
tokio::runtime::Runtime::new().unwrap().block_on(fut)
|
||||||
|
}).await.map_err(|_| FsError::GeneralFailure)?
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn patch_props<'a>(&'a self, path: &'a DavPath, patch: Vec<(bool, DavProp)>) -> FsFuture<'a, Vec<(StatusCode, DavProp)>> {
|
||||||
|
let inner = self.inner.clone();
|
||||||
|
let path = path.clone();
|
||||||
|
Box::pin(async move {
|
||||||
|
tokio::task::spawn_blocking(move || {
|
||||||
|
let fut = inner.patch_props(&path, patch);
|
||||||
|
tokio::runtime::Runtime::new().unwrap().block_on(fut)
|
||||||
|
}).await.map_err(|_| FsError::GeneralFailure)?
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn have_props<'a>(&'a self, path: &'a DavPath) -> Pin<Box<dyn Future<Output = bool> + Send + 'a>> {
|
||||||
|
self.inner.have_props(path)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_quota(&self) -> FsFuture<'_, (u64, Option<u64>)> {
|
||||||
|
let inner = self.inner.clone();
|
||||||
|
Box::pin(async move {
|
||||||
|
tokio::task::spawn_blocking(move || {
|
||||||
|
let fut = inner.get_quota();
|
||||||
|
tokio::runtime::Runtime::new().unwrap().block_on(fut)
|
||||||
|
}).await.map_err(|_| FsError::GeneralFailure)?
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -26,6 +26,9 @@ pub mod webdav;
|
|||||||
pub mod webdav_locks;
|
pub mod webdav_locks;
|
||||||
pub mod webdav_version;
|
pub mod webdav_version;
|
||||||
|
|
||||||
|
#[cfg(feature = "async-vfs")]
|
||||||
|
pub mod async_webdav;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod security_audit;
|
mod security_audit;
|
||||||
|
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ use bytes::{Buf, Bytes};
|
|||||||
use dav_server::davpath::DavPath;
|
use dav_server::davpath::DavPath;
|
||||||
use dav_server::fs::{
|
use dav_server::fs::{
|
||||||
DavDirEntry, DavFile, DavFileSystem, DavMetaData, DavProp, FsError, FsFuture, FsStream,
|
DavDirEntry, DavFile, DavFileSystem, DavMetaData, DavProp, FsError, FsFuture, FsStream,
|
||||||
OpenOptions,
|
OpenOptions, ReadDirMeta,
|
||||||
};
|
};
|
||||||
use dav_server::ls::DavLockSystem;
|
use dav_server::ls::DavLockSystem;
|
||||||
use http::StatusCode;
|
use http::StatusCode;
|
||||||
|
|||||||
Reference in New Issue
Block a user