P0 fix: Mutex/RwLock poison recovery for webdav_locks and webdav_version
- Add recover_mutex() helper in webdav_locks.rs - Add recover_rwlock() helper in webdav_version.rs - Replace all .unwrap() calls with recovery pattern - Tests: 288 passed, 0 failed
This commit is contained in:
@@ -7,6 +7,16 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
|||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
use xmltree::Element;
|
use xmltree::Element;
|
||||||
|
|
||||||
|
fn recover_mutex<T>(result: std::sync::LockResult<T>) -> T {
|
||||||
|
match result {
|
||||||
|
Ok(guard) => guard,
|
||||||
|
Err(e) => {
|
||||||
|
log::warn!("Mutex poisoned in webdav_locks, recovering");
|
||||||
|
e.into_inner()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Serializable lock representation for JSON persistence
|
/// Serializable lock representation for JSON persistence
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
struct PersistedLock {
|
struct PersistedLock {
|
||||||
@@ -142,7 +152,7 @@ impl DavLockSystem for PersistedLs {
|
|||||||
let principal_owned = principal.map(|s| s.to_string());
|
let principal_owned = principal.map(|s| s.to_string());
|
||||||
let owner_owned = owner.map(|o| Box::new(o.clone()));
|
let owner_owned = owner.map(|o| Box::new(o.clone()));
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
let mut all = locks.lock().unwrap();
|
let mut all = recover_mutex(locks.lock());
|
||||||
cleanup_expired_locks(&mut all, &locks_file);
|
cleanup_expired_locks(&mut all, &locks_file);
|
||||||
let path_str = path2.to_string();
|
let path_str = path2.to_string();
|
||||||
for existing in all.iter() {
|
for existing in all.iter() {
|
||||||
@@ -186,7 +196,7 @@ impl DavLockSystem for PersistedLs {
|
|||||||
let locks_file = self.locks_file.clone();
|
let locks_file = self.locks_file.clone();
|
||||||
let token_owned = token.to_string();
|
let token_owned = token.to_string();
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
let mut all = locks.lock().unwrap();
|
let mut all = recover_mutex(locks.lock());
|
||||||
let before = all.len();
|
let before = all.len();
|
||||||
all.retain(|l| !(l.path.to_string() == path_str && l.token == token_owned));
|
all.retain(|l| !(l.path.to_string() == path_str && l.token == token_owned));
|
||||||
if all.len() == before {
|
if all.len() == before {
|
||||||
@@ -213,7 +223,7 @@ impl DavLockSystem for PersistedLs {
|
|||||||
let token_owned = token.to_string();
|
let token_owned = token.to_string();
|
||||||
let locks_file = self.locks_file.clone();
|
let locks_file = self.locks_file.clone();
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
let mut all = locks.lock().unwrap();
|
let mut all = recover_mutex(locks.lock());
|
||||||
let existing = all.iter_mut().find(|l| l.path.to_string() == path_str && l.token == token_owned);
|
let existing = all.iter_mut().find(|l| l.path.to_string() == path_str && l.token == token_owned);
|
||||||
match existing {
|
match existing {
|
||||||
Some(lock) => {
|
Some(lock) => {
|
||||||
@@ -247,7 +257,7 @@ impl DavLockSystem for PersistedLs {
|
|||||||
let submitted = submitted_tokens.to_vec();
|
let submitted = submitted_tokens.to_vec();
|
||||||
let locks_file = self.locks_file.clone();
|
let locks_file = self.locks_file.clone();
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
let mut all = locks.lock().unwrap();
|
let mut all = recover_mutex(locks.lock());
|
||||||
cleanup_expired_locks(&mut all, &locks_file);
|
cleanup_expired_locks(&mut all, &locks_file);
|
||||||
for existing in all.iter() {
|
for existing in all.iter() {
|
||||||
let ep = existing.path.to_string();
|
let ep = existing.path.to_string();
|
||||||
@@ -269,7 +279,7 @@ impl DavLockSystem for PersistedLs {
|
|||||||
let path_str = path.to_string();
|
let path_str = path.to_string();
|
||||||
let locks_file = self.locks_file.clone();
|
let locks_file = self.locks_file.clone();
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
let mut all = locks.lock().unwrap();
|
let mut all = recover_mutex(locks.lock());
|
||||||
cleanup_expired_locks(&mut all, &locks_file);
|
cleanup_expired_locks(&mut all, &locks_file);
|
||||||
let mut result: Vec<DavLock> = all
|
let mut result: Vec<DavLock> = all
|
||||||
.iter()
|
.iter()
|
||||||
@@ -289,7 +299,7 @@ impl DavLockSystem for PersistedLs {
|
|||||||
let prefix = path.to_string().trim_end_matches('/').to_string();
|
let prefix = path.to_string().trim_end_matches('/').to_string();
|
||||||
let locks_file = self.locks_file.clone();
|
let locks_file = self.locks_file.clone();
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
let mut all = locks.lock().unwrap();
|
let mut all = recover_mutex(locks.lock());
|
||||||
let before = all.len();
|
let before = all.len();
|
||||||
all.retain(|l| {
|
all.retain(|l| {
|
||||||
let lp = l.path.to_string().trim_end_matches('/').to_string();
|
let lp = l.path.to_string().trim_end_matches('/').to_string();
|
||||||
|
|||||||
@@ -5,6 +5,16 @@ use std::sync::{Arc, RwLock};
|
|||||||
use std::time::SystemTime;
|
use std::time::SystemTime;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
fn recover_rwlock<T>(result: std::sync::LockResult<T>) -> T {
|
||||||
|
match result {
|
||||||
|
Ok(guard) => guard,
|
||||||
|
Err(e) => {
|
||||||
|
log::warn!("RwLock poisoned in webdav_version, recovering");
|
||||||
|
e.into_inner()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct VersionInfo {
|
pub struct VersionInfo {
|
||||||
pub version_id: String,
|
pub version_id: String,
|
||||||
@@ -40,7 +50,7 @@ impl WebDavVersioning {
|
|||||||
if index_path.exists() {
|
if index_path.exists() {
|
||||||
if let Ok(json) = std::fs::read_to_string(&index_path) {
|
if let Ok(json) = std::fs::read_to_string(&index_path) {
|
||||||
if let Ok(map) = serde_json::from_str::<HashMap<String, Vec<u8>>>(&json) {
|
if let Ok(map) = serde_json::from_str::<HashMap<String, Vec<u8>>>(&json) {
|
||||||
*db.write().unwrap() = map;
|
*recover_rwlock(db.write()) = map;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -49,7 +59,7 @@ impl WebDavVersioning {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn save_index(&self) -> Result<(), VersionError> {
|
fn save_index(&self) -> Result<(), VersionError> {
|
||||||
let db = self.db.read().unwrap();
|
let db = recover_rwlock(self.db.read());
|
||||||
let json = serde_json::to_string(&*db)?;
|
let json = serde_json::to_string(&*db)?;
|
||||||
std::fs::write(&self.index_path, json)?;
|
std::fs::write(&self.index_path, json)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -89,7 +99,7 @@ impl WebDavVersioning {
|
|||||||
|
|
||||||
let key = Self::version_key(file_path, &version_id);
|
let key = Self::version_key(file_path, &version_id);
|
||||||
let value = serde_json::to_vec(&version_info)?;
|
let value = serde_json::to_vec(&version_info)?;
|
||||||
self.db.write().unwrap().insert(key, value);
|
recover_rwlock(self.db.write()).insert(key, value);
|
||||||
|
|
||||||
self.update_version_history(file_path, &version_id)?;
|
self.update_version_history(file_path, &version_id)?;
|
||||||
|
|
||||||
@@ -100,7 +110,7 @@ impl WebDavVersioning {
|
|||||||
|
|
||||||
pub fn get_version(&self, file_path: &str, version_id: &str) -> Result<Vec<u8>, VersionError> {
|
pub fn get_version(&self, file_path: &str, version_id: &str) -> Result<Vec<u8>, VersionError> {
|
||||||
let key = Self::version_key(file_path, version_id);
|
let key = Self::version_key(file_path, version_id);
|
||||||
let value = self.db.read().unwrap().get(&key).cloned().ok_or(VersionError::VersionNotFound)?;
|
let value = recover_rwlock(self.db.read()).get(&key).cloned().ok_or(VersionError::VersionNotFound)?;
|
||||||
|
|
||||||
let version_info: VersionInfo = serde_json::from_slice(&value)?;
|
let version_info: VersionInfo = serde_json::from_slice(&value)?;
|
||||||
let version_file = self.version_storage.join(&version_info.version_id);
|
let version_file = self.version_storage.join(&version_info.version_id);
|
||||||
@@ -110,14 +120,14 @@ impl WebDavVersioning {
|
|||||||
|
|
||||||
pub fn get_version_info(&self, file_path: &str, version_id: &str) -> Result<VersionInfo, VersionError> {
|
pub fn get_version_info(&self, file_path: &str, version_id: &str) -> Result<VersionInfo, VersionError> {
|
||||||
let key = Self::version_key(file_path, version_id);
|
let key = Self::version_key(file_path, version_id);
|
||||||
let value = self.db.read().unwrap().get(&key).cloned().ok_or(VersionError::VersionNotFound)?;
|
let value = recover_rwlock(self.db.read()).get(&key).cloned().ok_or(VersionError::VersionNotFound)?;
|
||||||
|
|
||||||
serde_json::from_slice(&value).map_err(|e| e.into())
|
serde_json::from_slice(&value).map_err(|e| e.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_version_history(&self, file_path: &str) -> Result<VersionHistory, VersionError> {
|
pub fn get_version_history(&self, file_path: &str) -> Result<VersionHistory, VersionError> {
|
||||||
let history_key = Self::history_key(file_path);
|
let history_key = Self::history_key(file_path);
|
||||||
let value = self.db.read().unwrap().get(&history_key).cloned().ok_or(VersionError::HistoryNotFound)?;
|
let value = recover_rwlock(self.db.read()).get(&history_key).cloned().ok_or(VersionError::HistoryNotFound)?;
|
||||||
|
|
||||||
serde_json::from_slice(&value).map_err(|e| e.into())
|
serde_json::from_slice(&value).map_err(|e| e.into())
|
||||||
}
|
}
|
||||||
@@ -126,7 +136,7 @@ impl WebDavVersioning {
|
|||||||
let prefix = format!("version:{}:", file_path);
|
let prefix = format!("version:{}:", file_path);
|
||||||
let mut versions = Vec::new();
|
let mut versions = Vec::new();
|
||||||
|
|
||||||
let db = self.db.read().unwrap();
|
let db = recover_rwlock(self.db.read());
|
||||||
for (key, value) in db.iter() {
|
for (key, value) in db.iter() {
|
||||||
if key.starts_with(&prefix) {
|
if key.starts_with(&prefix) {
|
||||||
let version_info: VersionInfo = serde_json::from_slice(&value)?;
|
let version_info: VersionInfo = serde_json::from_slice(&value)?;
|
||||||
@@ -161,7 +171,7 @@ impl WebDavVersioning {
|
|||||||
|
|
||||||
let key = Self::version_key(file_path, &new_version_id);
|
let key = Self::version_key(file_path, &new_version_id);
|
||||||
let value = serde_json::to_vec(&new_version_info)?;
|
let value = serde_json::to_vec(&new_version_info)?;
|
||||||
self.db.write().unwrap().insert(key, value);
|
recover_rwlock(self.db.write()).insert(key, value);
|
||||||
|
|
||||||
self.update_version_history(file_path, &new_version_id)?;
|
self.update_version_history(file_path, &new_version_id)?;
|
||||||
|
|
||||||
@@ -183,7 +193,7 @@ impl WebDavVersioning {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let key = Self::version_key(file_path, version_id);
|
let key = Self::version_key(file_path, version_id);
|
||||||
self.db.write().unwrap().remove(&key);
|
recover_rwlock(self.db.write()).remove(&key);
|
||||||
|
|
||||||
let current = self.get_current_version(file_path)?;
|
let current = self.get_current_version(file_path)?;
|
||||||
self.update_version_history(file_path, ¤t.version_id)?;
|
self.update_version_history(file_path, ¤t.version_id)?;
|
||||||
@@ -210,7 +220,7 @@ impl WebDavVersioning {
|
|||||||
|
|
||||||
let key = Self::version_key(file_path, &version.version_id);
|
let key = Self::version_key(file_path, &version.version_id);
|
||||||
let value = serde_json::to_vec(&updated_version)?;
|
let value = serde_json::to_vec(&updated_version)?;
|
||||||
self.db.write().unwrap().insert(key, value);
|
recover_rwlock(self.db.write()).insert(key, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -228,7 +238,7 @@ impl WebDavVersioning {
|
|||||||
|
|
||||||
let history_key = Self::history_key(file_path);
|
let history_key = Self::history_key(file_path);
|
||||||
let value = serde_json::to_vec(&history)?;
|
let value = serde_json::to_vec(&history)?;
|
||||||
self.db.write().unwrap().insert(history_key, value);
|
recover_rwlock(self.db.write()).insert(history_key, value);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user