P0 fix: Mutex/RwLock poison recovery for webdav_locks and webdav_version
Some checks failed
Test / build (push) Has been cancelled
Test / test (push) Has been cancelled

- Add recover_mutex() helper in webdav_locks.rs
- Add recover_rwlock() helper in webdav_version.rs
- Replace all .unwrap() calls with recovery pattern
- Tests: 288 passed, 0 failed
This commit is contained in:
Warren
2026-06-21 18:11:48 +08:00
parent 1408646424
commit b71510b2e8
2 changed files with 37 additions and 17 deletions

View File

@@ -7,6 +7,16 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use uuid::Uuid; use uuid::Uuid;
use xmltree::Element; use xmltree::Element;
fn recover_mutex<T>(result: std::sync::LockResult<T>) -> T {
match result {
Ok(guard) => guard,
Err(e) => {
log::warn!("Mutex poisoned in webdav_locks, recovering");
e.into_inner()
}
}
}
/// Serializable lock representation for JSON persistence /// Serializable lock representation for JSON persistence
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
struct PersistedLock { struct PersistedLock {
@@ -142,7 +152,7 @@ impl DavLockSystem for PersistedLs {
let principal_owned = principal.map(|s| s.to_string()); let principal_owned = principal.map(|s| s.to_string());
let owner_owned = owner.map(|o| Box::new(o.clone())); let owner_owned = owner.map(|o| Box::new(o.clone()));
Box::pin(async move { Box::pin(async move {
let mut all = locks.lock().unwrap(); let mut all = recover_mutex(locks.lock());
cleanup_expired_locks(&mut all, &locks_file); cleanup_expired_locks(&mut all, &locks_file);
let path_str = path2.to_string(); let path_str = path2.to_string();
for existing in all.iter() { for existing in all.iter() {
@@ -186,7 +196,7 @@ impl DavLockSystem for PersistedLs {
let locks_file = self.locks_file.clone(); let locks_file = self.locks_file.clone();
let token_owned = token.to_string(); let token_owned = token.to_string();
Box::pin(async move { Box::pin(async move {
let mut all = locks.lock().unwrap(); let mut all = recover_mutex(locks.lock());
let before = all.len(); let before = all.len();
all.retain(|l| !(l.path.to_string() == path_str && l.token == token_owned)); all.retain(|l| !(l.path.to_string() == path_str && l.token == token_owned));
if all.len() == before { if all.len() == before {
@@ -213,7 +223,7 @@ impl DavLockSystem for PersistedLs {
let token_owned = token.to_string(); let token_owned = token.to_string();
let locks_file = self.locks_file.clone(); let locks_file = self.locks_file.clone();
Box::pin(async move { Box::pin(async move {
let mut all = locks.lock().unwrap(); let mut all = recover_mutex(locks.lock());
let existing = all.iter_mut().find(|l| l.path.to_string() == path_str && l.token == token_owned); let existing = all.iter_mut().find(|l| l.path.to_string() == path_str && l.token == token_owned);
match existing { match existing {
Some(lock) => { Some(lock) => {
@@ -247,7 +257,7 @@ impl DavLockSystem for PersistedLs {
let submitted = submitted_tokens.to_vec(); let submitted = submitted_tokens.to_vec();
let locks_file = self.locks_file.clone(); let locks_file = self.locks_file.clone();
Box::pin(async move { Box::pin(async move {
let mut all = locks.lock().unwrap(); let mut all = recover_mutex(locks.lock());
cleanup_expired_locks(&mut all, &locks_file); cleanup_expired_locks(&mut all, &locks_file);
for existing in all.iter() { for existing in all.iter() {
let ep = existing.path.to_string(); let ep = existing.path.to_string();
@@ -269,7 +279,7 @@ impl DavLockSystem for PersistedLs {
let path_str = path.to_string(); let path_str = path.to_string();
let locks_file = self.locks_file.clone(); let locks_file = self.locks_file.clone();
Box::pin(async move { Box::pin(async move {
let mut all = locks.lock().unwrap(); let mut all = recover_mutex(locks.lock());
cleanup_expired_locks(&mut all, &locks_file); cleanup_expired_locks(&mut all, &locks_file);
let mut result: Vec<DavLock> = all let mut result: Vec<DavLock> = all
.iter() .iter()
@@ -289,7 +299,7 @@ impl DavLockSystem for PersistedLs {
let prefix = path.to_string().trim_end_matches('/').to_string(); let prefix = path.to_string().trim_end_matches('/').to_string();
let locks_file = self.locks_file.clone(); let locks_file = self.locks_file.clone();
Box::pin(async move { Box::pin(async move {
let mut all = locks.lock().unwrap(); let mut all = recover_mutex(locks.lock());
let before = all.len(); let before = all.len();
all.retain(|l| { all.retain(|l| {
let lp = l.path.to_string().trim_end_matches('/').to_string(); let lp = l.path.to_string().trim_end_matches('/').to_string();

View File

@@ -5,6 +5,16 @@ use std::sync::{Arc, RwLock};
use std::time::SystemTime; use std::time::SystemTime;
use uuid::Uuid; use uuid::Uuid;
fn recover_rwlock<T>(result: std::sync::LockResult<T>) -> T {
match result {
Ok(guard) => guard,
Err(e) => {
log::warn!("RwLock poisoned in webdav_version, recovering");
e.into_inner()
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VersionInfo { pub struct VersionInfo {
pub version_id: String, pub version_id: String,
@@ -40,7 +50,7 @@ impl WebDavVersioning {
if index_path.exists() { if index_path.exists() {
if let Ok(json) = std::fs::read_to_string(&index_path) { if let Ok(json) = std::fs::read_to_string(&index_path) {
if let Ok(map) = serde_json::from_str::<HashMap<String, Vec<u8>>>(&json) { if let Ok(map) = serde_json::from_str::<HashMap<String, Vec<u8>>>(&json) {
*db.write().unwrap() = map; *recover_rwlock(db.write()) = map;
} }
} }
} }
@@ -49,7 +59,7 @@ impl WebDavVersioning {
} }
fn save_index(&self) -> Result<(), VersionError> { fn save_index(&self) -> Result<(), VersionError> {
let db = self.db.read().unwrap(); let db = recover_rwlock(self.db.read());
let json = serde_json::to_string(&*db)?; let json = serde_json::to_string(&*db)?;
std::fs::write(&self.index_path, json)?; std::fs::write(&self.index_path, json)?;
Ok(()) Ok(())
@@ -89,7 +99,7 @@ impl WebDavVersioning {
let key = Self::version_key(file_path, &version_id); let key = Self::version_key(file_path, &version_id);
let value = serde_json::to_vec(&version_info)?; let value = serde_json::to_vec(&version_info)?;
self.db.write().unwrap().insert(key, value); recover_rwlock(self.db.write()).insert(key, value);
self.update_version_history(file_path, &version_id)?; self.update_version_history(file_path, &version_id)?;
@@ -100,7 +110,7 @@ impl WebDavVersioning {
pub fn get_version(&self, file_path: &str, version_id: &str) -> Result<Vec<u8>, VersionError> { pub fn get_version(&self, file_path: &str, version_id: &str) -> Result<Vec<u8>, VersionError> {
let key = Self::version_key(file_path, version_id); let key = Self::version_key(file_path, version_id);
let value = self.db.read().unwrap().get(&key).cloned().ok_or(VersionError::VersionNotFound)?; let value = recover_rwlock(self.db.read()).get(&key).cloned().ok_or(VersionError::VersionNotFound)?;
let version_info: VersionInfo = serde_json::from_slice(&value)?; let version_info: VersionInfo = serde_json::from_slice(&value)?;
let version_file = self.version_storage.join(&version_info.version_id); let version_file = self.version_storage.join(&version_info.version_id);
@@ -110,14 +120,14 @@ impl WebDavVersioning {
pub fn get_version_info(&self, file_path: &str, version_id: &str) -> Result<VersionInfo, VersionError> { pub fn get_version_info(&self, file_path: &str, version_id: &str) -> Result<VersionInfo, VersionError> {
let key = Self::version_key(file_path, version_id); let key = Self::version_key(file_path, version_id);
let value = self.db.read().unwrap().get(&key).cloned().ok_or(VersionError::VersionNotFound)?; let value = recover_rwlock(self.db.read()).get(&key).cloned().ok_or(VersionError::VersionNotFound)?;
serde_json::from_slice(&value).map_err(|e| e.into()) serde_json::from_slice(&value).map_err(|e| e.into())
} }
pub fn get_version_history(&self, file_path: &str) -> Result<VersionHistory, VersionError> { pub fn get_version_history(&self, file_path: &str) -> Result<VersionHistory, VersionError> {
let history_key = Self::history_key(file_path); let history_key = Self::history_key(file_path);
let value = self.db.read().unwrap().get(&history_key).cloned().ok_or(VersionError::HistoryNotFound)?; let value = recover_rwlock(self.db.read()).get(&history_key).cloned().ok_or(VersionError::HistoryNotFound)?;
serde_json::from_slice(&value).map_err(|e| e.into()) serde_json::from_slice(&value).map_err(|e| e.into())
} }
@@ -126,7 +136,7 @@ impl WebDavVersioning {
let prefix = format!("version:{}:", file_path); let prefix = format!("version:{}:", file_path);
let mut versions = Vec::new(); let mut versions = Vec::new();
let db = self.db.read().unwrap(); let db = recover_rwlock(self.db.read());
for (key, value) in db.iter() { for (key, value) in db.iter() {
if key.starts_with(&prefix) { if key.starts_with(&prefix) {
let version_info: VersionInfo = serde_json::from_slice(&value)?; let version_info: VersionInfo = serde_json::from_slice(&value)?;
@@ -161,7 +171,7 @@ impl WebDavVersioning {
let key = Self::version_key(file_path, &new_version_id); let key = Self::version_key(file_path, &new_version_id);
let value = serde_json::to_vec(&new_version_info)?; let value = serde_json::to_vec(&new_version_info)?;
self.db.write().unwrap().insert(key, value); recover_rwlock(self.db.write()).insert(key, value);
self.update_version_history(file_path, &new_version_id)?; self.update_version_history(file_path, &new_version_id)?;
@@ -183,7 +193,7 @@ impl WebDavVersioning {
} }
let key = Self::version_key(file_path, version_id); let key = Self::version_key(file_path, version_id);
self.db.write().unwrap().remove(&key); recover_rwlock(self.db.write()).remove(&key);
let current = self.get_current_version(file_path)?; let current = self.get_current_version(file_path)?;
self.update_version_history(file_path, &current.version_id)?; self.update_version_history(file_path, &current.version_id)?;
@@ -210,7 +220,7 @@ impl WebDavVersioning {
let key = Self::version_key(file_path, &version.version_id); let key = Self::version_key(file_path, &version.version_id);
let value = serde_json::to_vec(&updated_version)?; let value = serde_json::to_vec(&updated_version)?;
self.db.write().unwrap().insert(key, value); recover_rwlock(self.db.write()).insert(key, value);
} }
Ok(()) Ok(())
@@ -228,7 +238,7 @@ impl WebDavVersioning {
let history_key = Self::history_key(file_path); let history_key = Self::history_key(file_path);
let value = serde_json::to_vec(&history)?; let value = serde_json::to_vec(&history)?;
self.db.write().unwrap().insert(history_key, value); recover_rwlock(self.db.write()).insert(history_key, value);
Ok(()) Ok(())
} }