diff --git a/markbase-core/Cargo.toml b/markbase-core/Cargo.toml index aaef5b8..8440085 100644 --- a/markbase-core/Cargo.toml +++ b/markbase-core/Cargo.toml @@ -69,6 +69,7 @@ chacha20poly1305 = "0.10" # Phase 5: ChaCha20-Poly1305 AEAD(备用) nix = { version = "0.29", features = ["poll", "fs"] } # Phase 14: OpenSSH风格的poll()和非阻塞I/O(fs feature包含fcntl) rusty-s3 = "0.10" # S3 API 签名(AWS Signature V4) ureq = "2.12" # 輕量同步 HTTP 客戶端 +reqwest = { version = "0.12", optional = true } # Async HTTP client for AsyncS3Vfs rayon = "1.10" # Phase 4: 并行加密 url = "2" # URL 解析(rusty-s3 依賴) @@ -85,7 +86,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } default = [] # 默认不启用可选格式 optional-formats = ["unrar", "xz2", "sevenz-rust"] # 争议格式可选启用 smb-server = ["dep:smb-server"] # SMB server feature flag -async-vfs = [] # Async VfsBackend trait (Phase 1 - design only) +async-vfs = ["dep:reqwest"] # Async VfsBackend trait + native async S3 [dev-dependencies] # tempfile moved to dependencies (needed for archive extraction) diff --git a/markbase-core/src/vfs/async_s3_fs.rs b/markbase-core/src/vfs/async_s3_fs.rs index c4d5df6..fed937d 100644 --- a/markbase-core/src/vfs/async_s3_fs.rs +++ b/markbase-core/src/vfs/async_s3_fs.rs @@ -1,160 +1,41 @@ use std::path::{Path, PathBuf}; use std::pin::Pin; use std::future::Future; -use std::io::{self, SeekFrom, Read, Write}; +use std::io::{SeekFrom}; use std::sync::Arc; +use std::time::Duration; -use tokio::task::spawn_blocking; use tokio::sync::Mutex; +use reqwest::Client; +use rusty_s3::{Bucket, Credentials, S3Action, actions, UrlStyle}; +use url::Url; -use super::{VfsError, VfsStat, VfsDirEntry, open_flags::OpenFlags, s3_fs::S3Vfs, VfsBackend}; +use super::{VfsError, VfsStat, VfsDirEntry, open_flags::OpenFlags}; -/// Async S3 VFS 文件實現(spawn_blocking 包装) -pub struct AsyncS3File { - inner: Arc>, -} - -enum S3FileState { - Read { vfs: S3Vfs, key: String, position: u64, size: u64, data: Vec }, - Write { vfs: S3Vfs, key: String, buffer: Vec }, -} - -impl AsyncS3File { - pub async fn new_read(vfs: S3Vfs, key: String) -> Result { - let stat = spawn_blocking({ - let vfs = vfs.clone(); - let key = key.clone(); - move || VfsBackend::stat(&vfs, &PathBuf::from(&key)) - }).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))??; - - Ok(Self { - inner: Arc::new(Mutex::new(S3FileState::Read { - vfs, key, position: 0, size: stat.size, data: Vec::new(), - })), - }) - } - - pub fn new_write(vfs: S3Vfs, key: String) -> Self { - Self { - inner: Arc::new(Mutex::new(S3FileState::Write { - vfs, key, buffer: Vec::new(), - })), - } - } -} - -impl super::AsyncVfsFile for AsyncS3File { - fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Pin> + Send + 'a>> { - let inner = self.inner.clone(); - let buf_len = buf.len(); - Box::pin(async move { - let mut state = inner.lock().await; - match &mut *state { - S3FileState::Read { vfs, key, position, size, data } => { - if *position >= *size { - return Ok(0); - } - - // Load data if needed - if data.is_empty() { - let vfs_clone = vfs.clone(); - let key_clone = key.clone(); - let loaded_data = spawn_blocking(move || { - let flags = OpenFlags::new().read(); - let mut file = VfsBackend::open_file(&vfs_clone, &PathBuf::from(&key_clone), &flags)?; - let mut buf = Vec::new(); - let mut chunk = [0u8; 8192]; - loop { - match file.read(&mut chunk) { - Ok(0) => break, - Ok(n) => buf.extend_from_slice(&chunk[..n]), - Err(e) => return Err(e), - } - } - Ok(buf) - }).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))??; - *data = loaded_data; - } - - let remaining = *size - *position; - let to_read = buf_len.min(remaining as usize); - let start = *position as usize; - let end = start + to_read; - - buf[..to_read].copy_from_slice(&data[start..end]); - *position += to_read as u64; - Ok(to_read) - } - _ => Err(VfsError::Io("File not open for read".to_string())), - } - }) - } - - fn write<'a>(&'a mut self, buf: &'a [u8]) -> Pin> + Send + 'a>> { - let inner = self.inner.clone(); - let buf_copy = buf.to_vec(); - Box::pin(async move { - let mut state = inner.lock().await; - match &mut *state { - S3FileState::Write { buffer, .. } => { - buffer.extend_from_slice(&buf_copy); - Ok(buf_copy.len()) - } - _ => Err(VfsError::Io("File not open for write".to_string())), - } - }) - } - - fn seek<'a>(&'a mut self, pos: SeekFrom) -> Pin> + Send + 'a>> { - let inner = self.inner.clone(); - Box::pin(async move { - let mut state = inner.lock().await; - match &mut *state { - S3FileState::Read { position, size, .. } => { - let new_pos = match pos { - SeekFrom::Start(offset) => offset, - SeekFrom::Current(offset) => { - ((*position as i64) + offset).max(0) as u64 - } - SeekFrom::End(offset) => { - ((*size as i64) + offset).max(0) as u64 - } - }; - *position = new_pos.min(*size); - Ok(*position) - } - _ => Err(VfsError::Io("File not open for read".to_string())), - } - }) - } - - fn flush<'a>(&'a mut self) -> Pin> + Send + 'a>> { - let inner = self.inner.clone(); - Box::pin(async move { - let mut state = inner.lock().await; - match &mut *state { - S3FileState::Write { vfs, key, buffer } => { - let vfs_clone = vfs.clone(); - let key_clone = key.clone(); - let data = buffer.clone(); - - spawn_blocking(move || { - let flags = OpenFlags::new().write().create().truncate().mode(0o644); - let mut file = VfsBackend::open_file(&vfs_clone, &PathBuf::from(&key_clone), &flags)?; - file.write_all(&data)?; - file.flush()?; - Ok(()) - }).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))? - } - _ => Ok(()), - } - }) - } -} - -/// Async S3 VFS 后端實現(spawn_blocking 包装 S3Vfs) pub struct AsyncS3Vfs { - inner: S3Vfs, + bucket: Bucket, + credentials: Credentials, + client: Client, +} + +struct AsyncS3FileState { + key: String, + mode: FileMode, + position: u64, + size: u64, + data: Vec, + write_buffer: Vec, + mtime: std::time::SystemTime, +} + +enum FileMode { + Read, + Write, +} + +pub struct AsyncS3File { + inner: Arc>, + vfs: AsyncS3Vfs, } impl AsyncS3Vfs { @@ -165,14 +46,313 @@ impl AsyncS3Vfs { access_key: &str, secret_key: &str, ) -> Result { - let inner = S3Vfs::new(endpoint, region, bucket_name, access_key, secret_key)?; - Ok(Self { inner }) + let endpoint_url = Url::parse(endpoint.trim_end_matches('/')) + .map_err(|e| VfsError::Io(format!("Invalid S3 endpoint URL: {}", e)))?; + + let bucket = Bucket::new( + endpoint_url, + UrlStyle::Path, + bucket_name.to_string(), + region.to_string(), + ).map_err(|e| VfsError::Io(format!("Failed to create S3 bucket config: {}", e)))?; + + let credentials = Credentials::new(access_key, secret_key); + let client = Client::new(); + + Ok(Self { bucket, credentials, client }) + } + + fn path_to_key(path: &Path) -> String { + let s = path.to_string_lossy(); + s.strip_prefix('/').unwrap_or(&s).to_string() + } + + async fn head_object(&self, key: &str) -> Result<(u64, std::time::SystemTime, String), VfsError> { + let action = actions::HeadObject::new(&self.bucket, Some(&self.credentials), key); + let url = action.sign(Duration::from_secs(3600)); + + let resp = self.client + .head(url.as_str()) + .send() + .await + .map_err(|e| VfsError::Io(format!("S3 HEAD failed: {}", e)))?; + + let status = resp.status(); + if status == 404 { + return Err(VfsError::NotFound(key.to_string())); + } + if !status.is_success() { + return Err(VfsError::Io(format!("HeadObject returned {}", status))); + } + + let content_len: u64 = resp + .headers() + .get("Content-Length") + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.parse().ok()) + .unwrap_or(0); + + let last_modified = parse_last_modified( + resp.headers() + .get("Last-Modified") + .and_then(|v| v.to_str().ok()) + ); + + let etag = resp + .headers() + .get("ETag") + .and_then(|v| v.to_str().ok()) + .map(|s| s.replace('"', "")) + .unwrap_or_default(); + + Ok((content_len, last_modified, etag)) + } + + async fn get_object(&self, key: &str) -> Result, VfsError> { + let action = actions::GetObject::new(&self.bucket, Some(&self.credentials), key); + let url = action.sign(Duration::from_secs(3600)); + + let resp = self.client + .get(url.as_str()) + .send() + .await + .map_err(|e| VfsError::Io(format!("S3 GET failed: {}", e)))?; + + let status = resp.status(); + if status == 404 { + return Err(VfsError::NotFound(key.to_string())); + } + if !status.is_success() { + return Err(VfsError::Io(format!("GetObject returned {}", status))); + } + + let bytes = resp.bytes().await + .map_err(|e| VfsError::Io(format!("Failed to read response body: {}", e)))?; + + Ok(bytes.to_vec()) + } + + async fn put_object(&self, key: &str, data: &[u8]) -> Result { + let action = actions::PutObject::new(&self.bucket, Some(&self.credentials), key); + let url = action.sign(Duration::from_secs(3600)); + + let resp = self.client + .put(url.as_str()) + .body(data.to_vec()) + .send() + .await + .map_err(|e| VfsError::Io(format!("S3 PUT failed: {}", e)))?; + + if !resp.status().is_success() { + return Err(VfsError::Io(format!("PutObject returned {}", resp.status()))); + } + + let etag = resp + .headers() + .get("ETag") + .and_then(|v| v.to_str().ok()) + .map(|s| s.replace('"', "")) + .unwrap_or_default(); + + Ok(etag) + } + + async fn delete_object(&self, key: &str) -> Result<(), VfsError> { + let action = actions::DeleteObject::new(&self.bucket, Some(&self.credentials), key); + let url = action.sign(Duration::from_secs(3600)); + + let resp = self.client + .delete(url.as_str()) + .send() + .await + .map_err(|e| VfsError::Io(format!("S3 DELETE failed: {}", e)))?; + + if !resp.status().is_success() { + return Err(VfsError::Io(format!("DeleteObject returned {}", resp.status()))); + } + + Ok(()) + } + + async fn list_objects(&self, prefix: &str) -> Result, VfsError> { + let mut action = actions::ListObjectsV2::new(&self.bucket, Some(&self.credentials)); + if !prefix.is_empty() { + action.with_prefix(prefix); + } + action.with_delimiter("/"); + + let url = action.sign(Duration::from_secs(3600)); + + let resp = self.client + .get(url.as_str()) + .send() + .await + .map_err(|e| VfsError::Io(format!("S3 LIST failed: {}", e)))?; + + if !resp.status().is_success() { + return Err(VfsError::Io(format!("ListObjectsV2 returned {}", resp.status()))); + } + + let body = resp.text().await + .map_err(|e| VfsError::Io(format!("Failed to read LIST response: {}", e)))?; + + // Use rusty-s3's built-in parser + let list_response = actions::ListObjectsV2::parse_response(&body) + .map_err(|e| VfsError::Io(format!("Failed to parse LIST response: {}", e)))?; + + // Convert to VfsDirEntry + let mut entries = Vec::new(); + for obj in list_response.contents { + let name = obj.key.strip_prefix(prefix).unwrap_or(&obj.key).to_string(); + entries.push(VfsDirEntry { + name, + long_name: obj.key.clone(), + stat: VfsStat { + size: obj.size as u64, + mode: 0o644, + uid: 0, + gid: 0, + atime: std::time::SystemTime::UNIX_EPOCH, + mtime: std::time::SystemTime::UNIX_EPOCH, + is_dir: false, + is_symlink: false, + }, + }); + } + for prefix_elem in list_response.common_prefixes { + let name = prefix_elem.prefix.strip_prefix(prefix).unwrap_or(&prefix_elem.prefix).trim_end_matches('/').to_string(); + entries.push(VfsDirEntry { + name, + long_name: prefix_elem.prefix.clone(), + stat: VfsStat { + size: 0, + mode: 0o755, + uid: 0, + gid: 0, + atime: std::time::SystemTime::UNIX_EPOCH, + mtime: std::time::SystemTime::UNIX_EPOCH, + is_dir: true, + is_symlink: false, + }, + }); + } + + Ok(entries) } } impl Clone for AsyncS3Vfs { fn clone(&self) -> Self { - Self { inner: self.inner.clone() } + Self { + bucket: self.bucket.clone(), + credentials: self.credentials.clone(), + client: self.client.clone(), + } + } +} + +impl AsyncS3File { + pub async fn new_read(vfs: AsyncS3Vfs, key: String) -> Result { + let (size, mtime, _) = vfs.head_object(&key).await?; + + Ok(Self { + inner: Arc::new(Mutex::new(AsyncS3FileState { + key, + mode: FileMode::Read, + position: 0, + size, + data: Vec::new(), + write_buffer: Vec::new(), + mtime, + })), + vfs, + }) + } + + pub fn new_write(vfs: AsyncS3Vfs, key: String) -> Self { + Self { + inner: Arc::new(Mutex::new(AsyncS3FileState { + key, + mode: FileMode::Write, + position: 0, + size: 0, + data: Vec::new(), + write_buffer: Vec::new(), + mtime: std::time::SystemTime::now(), + })), + vfs, + } + } +} + +impl super::AsyncVfsFile for AsyncS3File { + fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Pin> + Send + 'a>> { + let inner = self.inner.clone(); + let vfs = self.vfs.clone(); + Box::pin(async move { + let mut state = inner.lock().await; + + if state.position >= state.size { + return Ok(0); + } + + if state.data.is_empty() { + let key = state.key.clone(); + state.data = vfs.get_object(&key).await?; + } + + let remaining = state.size - state.position; + let to_read = buf.len().min(remaining as usize); + let start = state.position as usize; + let end = start + to_read; + + buf[..to_read].copy_from_slice(&state.data[start..end]); + state.position += to_read as u64; + + Ok(to_read) + }) + } + + fn write<'a>(&'a mut self, buf: &'a [u8]) -> Pin> + Send + 'a>> { + let inner = self.inner.clone(); + Box::pin(async move { + let mut state = inner.lock().await; + state.write_buffer.extend_from_slice(buf); + Ok(buf.len()) + }) + } + + fn seek<'a>(&'a mut self, pos: SeekFrom) -> Pin> + Send + 'a>> { + let inner = self.inner.clone(); + Box::pin(async move { + let mut state = inner.lock().await; + let new_pos = match pos { + SeekFrom::Start(offset) => offset, + SeekFrom::Current(offset) => { + ((state.position as i64) + offset).max(0) as u64 + } + SeekFrom::End(offset) => { + ((state.size as i64) + offset).max(0) as u64 + } + }; + state.position = new_pos.min(state.size); + Ok(state.position) + }) + } + + fn flush<'a>(&'a mut self) -> Pin> + Send + 'a>> { + let inner = self.inner.clone(); + let vfs = self.vfs.clone(); + Box::pin(async move { + let mut state = inner.lock().await; + if !state.write_buffer.is_empty() { + let key = state.key.clone(); + let data = state.write_buffer.clone(); + vfs.put_object(&key, &data).await?; + state.write_buffer.clear(); + } + Ok(()) + }) } } @@ -182,88 +362,93 @@ impl super::AsyncVfsBackend for AsyncS3Vfs { } fn read_dir<'a>(&'a self, path: &'a Path) -> Pin, VfsError>> + Send + 'a>> { - let inner = self.inner.clone(); - let path_buf = path.to_path_buf(); + let prefix = Self::path_to_key(path); Box::pin(async move { - spawn_blocking(move || { - VfsBackend::read_dir(&inner, &path_buf) - }).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))? + self.list_objects(&prefix).await }) } fn open_file<'a>(&'a self, path: &'a Path, flags: &'a OpenFlags) -> Pin, VfsError>> + Send + 'a>> { - let inner = self.inner.clone(); - let path_buf = path.to_path_buf(); + let key = Self::path_to_key(path); + let vfs = self.clone(); let is_write = flags.write; Box::pin(async move { - let key = path_buf.to_string_lossy().to_string(); if is_write { - Ok(Box::new(AsyncS3File::new_write(inner, key)) as Box) + Ok(Box::new(AsyncS3File::new_write(vfs, key)) as Box) } else { - let file = AsyncS3File::new_read(inner, key).await?; + let file = AsyncS3File::new_read(vfs, key).await?; Ok(Box::new(file) as Box) } }) } fn stat<'a>(&'a self, path: &'a Path) -> Pin> + Send + 'a>> { - let inner = self.inner.clone(); - let path_buf = path.to_path_buf(); + let key = Self::path_to_key(path); Box::pin(async move { - spawn_blocking(move || { - VfsBackend::stat(&inner, &path_buf) - }).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))? + let (size, mtime, _) = self.head_object(&key).await?; + Ok(VfsStat { + size, + mode: 0o644, + uid: 0, + gid: 0, + atime: mtime, + mtime, + is_dir: false, + is_symlink: false, + }) }) } fn create_dir<'a>(&'a self, path: &'a Path, _mode: u32) -> Pin> + Send + 'a>> { - let inner = self.inner.clone(); - let path_buf = path.to_path_buf(); + let key = Self::path_to_key(path); + if !key.ends_with('/') { + let key = format!("{}/", key); + } Box::pin(async move { - spawn_blocking(move || { - VfsBackend::create_dir(&inner, &path_buf, 0o755) - }).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))? + self.put_object(&key, &[]).await?; + Ok(()) }) } fn remove_dir<'a>(&'a self, path: &'a Path) -> Pin> + Send + 'a>> { - let inner = self.inner.clone(); - let path_buf = path.to_path_buf(); + let key = Self::path_to_key(path); + let key = if key.ends_with('/') { key } else { format!("{}/", key) }; Box::pin(async move { - spawn_blocking(move || { - VfsBackend::remove_dir(&inner, &path_buf) - }).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))? + self.delete_object(&key).await?; + Ok(()) }) } fn remove_file<'a>(&'a self, path: &'a Path) -> Pin> + Send + 'a>> { - let inner = self.inner.clone(); - let path_buf = path.to_path_buf(); + let key = Self::path_to_key(path); Box::pin(async move { - spawn_blocking(move || { - VfsBackend::remove_file(&inner, &path_buf) - }).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))? + self.delete_object(&key).await?; + Ok(()) }) } fn rename<'a>(&'a self, from: &'a Path, to: &'a Path) -> Pin> + Send + 'a>> { - let inner = self.inner.clone(); - let from_buf = from.to_path_buf(); - let to_buf = to.to_path_buf(); + let from_key = Self::path_to_key(from); + let to_key = Self::path_to_key(to); Box::pin(async move { - spawn_blocking(move || { - VfsBackend::rename(&inner, &from_buf, &to_buf) - }).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))? + let data = self.get_object(&from_key).await?; + self.put_object(&to_key, &data).await?; + self.delete_object(&from_key).await?; + Ok(()) }) } fn exists<'a>(&'a self, path: &'a Path) -> Pin + Send + 'a>> { - let inner = self.inner.clone(); - let path_buf = path.to_path_buf(); + let key = Self::path_to_key(path); Box::pin(async move { - spawn_blocking(move || { - VfsBackend::exists(&inner, &path_buf) - }).await.unwrap_or(false) + self.head_object(&key).await.is_ok() }) } +} + +fn parse_last_modified(header: Option<&str>) -> std::time::SystemTime { + header + .and_then(|s| chrono::DateTime::parse_from_rfc2822(s).ok()) + .map(|dt| std::time::SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(dt.timestamp() as u64)) + .unwrap_or(std::time::SystemTime::now()) } \ No newline at end of file