P1: AsyncS3Vfs native async implementation using reqwest
- Replace spawn_blocking + ureq with native async reqwest - AsyncS3Vfs uses reqwest::Client for HTTP operations - rusty-s3 for presigned URL generation + XML parsing - AsyncS3File with async read/write/seek/flush - reqwest dependency added under async-vfs feature Tests: 297 passed (293 + 4 new s3_auth tests)
This commit is contained in:
@@ -69,6 +69,7 @@ chacha20poly1305 = "0.10" # Phase 5: ChaCha20-Poly1305 AEAD(备用)
|
||||
nix = { version = "0.29", features = ["poll", "fs"] } # Phase 14: OpenSSH风格的poll()和非阻塞I/O(fs feature包含fcntl)
|
||||
rusty-s3 = "0.10" # S3 API 签名(AWS Signature V4)
|
||||
ureq = "2.12" # 輕量同步 HTTP 客戶端
|
||||
reqwest = { version = "0.12", optional = true } # Async HTTP client for AsyncS3Vfs
|
||||
rayon = "1.10" # Phase 4: 并行加密
|
||||
url = "2" # URL 解析(rusty-s3 依賴)
|
||||
|
||||
@@ -85,7 +86,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
|
||||
default = [] # 默认不启用可选格式
|
||||
optional-formats = ["unrar", "xz2", "sevenz-rust"] # 争议格式可选启用
|
||||
smb-server = ["dep:smb-server"] # SMB server feature flag
|
||||
async-vfs = [] # Async VfsBackend trait (Phase 1 - design only)
|
||||
async-vfs = ["dep:reqwest"] # Async VfsBackend trait + native async S3
|
||||
|
||||
[dev-dependencies]
|
||||
# tempfile moved to dependencies (needed for archive extraction)
|
||||
|
||||
@@ -1,160 +1,41 @@
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::pin::Pin;
|
||||
use std::future::Future;
|
||||
use std::io::{self, SeekFrom, Read, Write};
|
||||
use std::io::{SeekFrom};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::task::spawn_blocking;
|
||||
use tokio::sync::Mutex;
|
||||
use reqwest::Client;
|
||||
use rusty_s3::{Bucket, Credentials, S3Action, actions, UrlStyle};
|
||||
use url::Url;
|
||||
|
||||
use super::{VfsError, VfsStat, VfsDirEntry, open_flags::OpenFlags, s3_fs::S3Vfs, VfsBackend};
|
||||
use super::{VfsError, VfsStat, VfsDirEntry, open_flags::OpenFlags};
|
||||
|
||||
/// Async S3 VFS 文件實現(spawn_blocking 包装)
|
||||
pub struct AsyncS3File {
|
||||
inner: Arc<Mutex<S3FileState>>,
|
||||
}
|
||||
|
||||
enum S3FileState {
|
||||
Read { vfs: S3Vfs, key: String, position: u64, size: u64, data: Vec<u8> },
|
||||
Write { vfs: S3Vfs, key: String, buffer: Vec<u8> },
|
||||
}
|
||||
|
||||
impl AsyncS3File {
|
||||
pub async fn new_read(vfs: S3Vfs, key: String) -> Result<Self, VfsError> {
|
||||
let stat = spawn_blocking({
|
||||
let vfs = vfs.clone();
|
||||
let key = key.clone();
|
||||
move || VfsBackend::stat(&vfs, &PathBuf::from(&key))
|
||||
}).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))??;
|
||||
|
||||
Ok(Self {
|
||||
inner: Arc::new(Mutex::new(S3FileState::Read {
|
||||
vfs, key, position: 0, size: stat.size, data: Vec::new(),
|
||||
})),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn new_write(vfs: S3Vfs, key: String) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(Mutex::new(S3FileState::Write {
|
||||
vfs, key, buffer: Vec::new(),
|
||||
})),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl super::AsyncVfsFile for AsyncS3File {
|
||||
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Pin<Box<dyn Future<Output = Result<usize, VfsError>> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
let buf_len = buf.len();
|
||||
Box::pin(async move {
|
||||
let mut state = inner.lock().await;
|
||||
match &mut *state {
|
||||
S3FileState::Read { vfs, key, position, size, data } => {
|
||||
if *position >= *size {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
// Load data if needed
|
||||
if data.is_empty() {
|
||||
let vfs_clone = vfs.clone();
|
||||
let key_clone = key.clone();
|
||||
let loaded_data = spawn_blocking(move || {
|
||||
let flags = OpenFlags::new().read();
|
||||
let mut file = VfsBackend::open_file(&vfs_clone, &PathBuf::from(&key_clone), &flags)?;
|
||||
let mut buf = Vec::new();
|
||||
let mut chunk = [0u8; 8192];
|
||||
loop {
|
||||
match file.read(&mut chunk) {
|
||||
Ok(0) => break,
|
||||
Ok(n) => buf.extend_from_slice(&chunk[..n]),
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
Ok(buf)
|
||||
}).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))??;
|
||||
*data = loaded_data;
|
||||
}
|
||||
|
||||
let remaining = *size - *position;
|
||||
let to_read = buf_len.min(remaining as usize);
|
||||
let start = *position as usize;
|
||||
let end = start + to_read;
|
||||
|
||||
buf[..to_read].copy_from_slice(&data[start..end]);
|
||||
*position += to_read as u64;
|
||||
Ok(to_read)
|
||||
}
|
||||
_ => Err(VfsError::Io("File not open for read".to_string())),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn write<'a>(&'a mut self, buf: &'a [u8]) -> Pin<Box<dyn Future<Output = Result<usize, VfsError>> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
let buf_copy = buf.to_vec();
|
||||
Box::pin(async move {
|
||||
let mut state = inner.lock().await;
|
||||
match &mut *state {
|
||||
S3FileState::Write { buffer, .. } => {
|
||||
buffer.extend_from_slice(&buf_copy);
|
||||
Ok(buf_copy.len())
|
||||
}
|
||||
_ => Err(VfsError::Io("File not open for write".to_string())),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn seek<'a>(&'a mut self, pos: SeekFrom) -> Pin<Box<dyn Future<Output = Result<u64, VfsError>> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
Box::pin(async move {
|
||||
let mut state = inner.lock().await;
|
||||
match &mut *state {
|
||||
S3FileState::Read { position, size, .. } => {
|
||||
let new_pos = match pos {
|
||||
SeekFrom::Start(offset) => offset,
|
||||
SeekFrom::Current(offset) => {
|
||||
((*position as i64) + offset).max(0) as u64
|
||||
}
|
||||
SeekFrom::End(offset) => {
|
||||
((*size as i64) + offset).max(0) as u64
|
||||
}
|
||||
};
|
||||
*position = new_pos.min(*size);
|
||||
Ok(*position)
|
||||
}
|
||||
_ => Err(VfsError::Io("File not open for read".to_string())),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn flush<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
Box::pin(async move {
|
||||
let mut state = inner.lock().await;
|
||||
match &mut *state {
|
||||
S3FileState::Write { vfs, key, buffer } => {
|
||||
let vfs_clone = vfs.clone();
|
||||
let key_clone = key.clone();
|
||||
let data = buffer.clone();
|
||||
|
||||
spawn_blocking(move || {
|
||||
let flags = OpenFlags::new().write().create().truncate().mode(0o644);
|
||||
let mut file = VfsBackend::open_file(&vfs_clone, &PathBuf::from(&key_clone), &flags)?;
|
||||
file.write_all(&data)?;
|
||||
file.flush()?;
|
||||
Ok(())
|
||||
}).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))?
|
||||
}
|
||||
_ => Ok(()),
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Async S3 VFS 后端實現(spawn_blocking 包装 S3Vfs)
|
||||
pub struct AsyncS3Vfs {
|
||||
inner: S3Vfs,
|
||||
bucket: Bucket,
|
||||
credentials: Credentials,
|
||||
client: Client,
|
||||
}
|
||||
|
||||
struct AsyncS3FileState {
|
||||
key: String,
|
||||
mode: FileMode,
|
||||
position: u64,
|
||||
size: u64,
|
||||
data: Vec<u8>,
|
||||
write_buffer: Vec<u8>,
|
||||
mtime: std::time::SystemTime,
|
||||
}
|
||||
|
||||
enum FileMode {
|
||||
Read,
|
||||
Write,
|
||||
}
|
||||
|
||||
pub struct AsyncS3File {
|
||||
inner: Arc<Mutex<AsyncS3FileState>>,
|
||||
vfs: AsyncS3Vfs,
|
||||
}
|
||||
|
||||
impl AsyncS3Vfs {
|
||||
@@ -165,14 +46,313 @@ impl AsyncS3Vfs {
|
||||
access_key: &str,
|
||||
secret_key: &str,
|
||||
) -> Result<Self, VfsError> {
|
||||
let inner = S3Vfs::new(endpoint, region, bucket_name, access_key, secret_key)?;
|
||||
Ok(Self { inner })
|
||||
let endpoint_url = Url::parse(endpoint.trim_end_matches('/'))
|
||||
.map_err(|e| VfsError::Io(format!("Invalid S3 endpoint URL: {}", e)))?;
|
||||
|
||||
let bucket = Bucket::new(
|
||||
endpoint_url,
|
||||
UrlStyle::Path,
|
||||
bucket_name.to_string(),
|
||||
region.to_string(),
|
||||
).map_err(|e| VfsError::Io(format!("Failed to create S3 bucket config: {}", e)))?;
|
||||
|
||||
let credentials = Credentials::new(access_key, secret_key);
|
||||
let client = Client::new();
|
||||
|
||||
Ok(Self { bucket, credentials, client })
|
||||
}
|
||||
|
||||
fn path_to_key(path: &Path) -> String {
|
||||
let s = path.to_string_lossy();
|
||||
s.strip_prefix('/').unwrap_or(&s).to_string()
|
||||
}
|
||||
|
||||
async fn head_object(&self, key: &str) -> Result<(u64, std::time::SystemTime, String), VfsError> {
|
||||
let action = actions::HeadObject::new(&self.bucket, Some(&self.credentials), key);
|
||||
let url = action.sign(Duration::from_secs(3600));
|
||||
|
||||
let resp = self.client
|
||||
.head(url.as_str())
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| VfsError::Io(format!("S3 HEAD failed: {}", e)))?;
|
||||
|
||||
let status = resp.status();
|
||||
if status == 404 {
|
||||
return Err(VfsError::NotFound(key.to_string()));
|
||||
}
|
||||
if !status.is_success() {
|
||||
return Err(VfsError::Io(format!("HeadObject returned {}", status)));
|
||||
}
|
||||
|
||||
let content_len: u64 = resp
|
||||
.headers()
|
||||
.get("Content-Length")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.and_then(|v| v.parse().ok())
|
||||
.unwrap_or(0);
|
||||
|
||||
let last_modified = parse_last_modified(
|
||||
resp.headers()
|
||||
.get("Last-Modified")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
);
|
||||
|
||||
let etag = resp
|
||||
.headers()
|
||||
.get("ETag")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(|s| s.replace('"', ""))
|
||||
.unwrap_or_default();
|
||||
|
||||
Ok((content_len, last_modified, etag))
|
||||
}
|
||||
|
||||
async fn get_object(&self, key: &str) -> Result<Vec<u8>, VfsError> {
|
||||
let action = actions::GetObject::new(&self.bucket, Some(&self.credentials), key);
|
||||
let url = action.sign(Duration::from_secs(3600));
|
||||
|
||||
let resp = self.client
|
||||
.get(url.as_str())
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| VfsError::Io(format!("S3 GET failed: {}", e)))?;
|
||||
|
||||
let status = resp.status();
|
||||
if status == 404 {
|
||||
return Err(VfsError::NotFound(key.to_string()));
|
||||
}
|
||||
if !status.is_success() {
|
||||
return Err(VfsError::Io(format!("GetObject returned {}", status)));
|
||||
}
|
||||
|
||||
let bytes = resp.bytes().await
|
||||
.map_err(|e| VfsError::Io(format!("Failed to read response body: {}", e)))?;
|
||||
|
||||
Ok(bytes.to_vec())
|
||||
}
|
||||
|
||||
async fn put_object(&self, key: &str, data: &[u8]) -> Result<String, VfsError> {
|
||||
let action = actions::PutObject::new(&self.bucket, Some(&self.credentials), key);
|
||||
let url = action.sign(Duration::from_secs(3600));
|
||||
|
||||
let resp = self.client
|
||||
.put(url.as_str())
|
||||
.body(data.to_vec())
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| VfsError::Io(format!("S3 PUT failed: {}", e)))?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
return Err(VfsError::Io(format!("PutObject returned {}", resp.status())));
|
||||
}
|
||||
|
||||
let etag = resp
|
||||
.headers()
|
||||
.get("ETag")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(|s| s.replace('"', ""))
|
||||
.unwrap_or_default();
|
||||
|
||||
Ok(etag)
|
||||
}
|
||||
|
||||
async fn delete_object(&self, key: &str) -> Result<(), VfsError> {
|
||||
let action = actions::DeleteObject::new(&self.bucket, Some(&self.credentials), key);
|
||||
let url = action.sign(Duration::from_secs(3600));
|
||||
|
||||
let resp = self.client
|
||||
.delete(url.as_str())
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| VfsError::Io(format!("S3 DELETE failed: {}", e)))?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
return Err(VfsError::Io(format!("DeleteObject returned {}", resp.status())));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn list_objects(&self, prefix: &str) -> Result<Vec<VfsDirEntry>, VfsError> {
|
||||
let mut action = actions::ListObjectsV2::new(&self.bucket, Some(&self.credentials));
|
||||
if !prefix.is_empty() {
|
||||
action.with_prefix(prefix);
|
||||
}
|
||||
action.with_delimiter("/");
|
||||
|
||||
let url = action.sign(Duration::from_secs(3600));
|
||||
|
||||
let resp = self.client
|
||||
.get(url.as_str())
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| VfsError::Io(format!("S3 LIST failed: {}", e)))?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
return Err(VfsError::Io(format!("ListObjectsV2 returned {}", resp.status())));
|
||||
}
|
||||
|
||||
let body = resp.text().await
|
||||
.map_err(|e| VfsError::Io(format!("Failed to read LIST response: {}", e)))?;
|
||||
|
||||
// Use rusty-s3's built-in parser
|
||||
let list_response = actions::ListObjectsV2::parse_response(&body)
|
||||
.map_err(|e| VfsError::Io(format!("Failed to parse LIST response: {}", e)))?;
|
||||
|
||||
// Convert to VfsDirEntry
|
||||
let mut entries = Vec::new();
|
||||
for obj in list_response.contents {
|
||||
let name = obj.key.strip_prefix(prefix).unwrap_or(&obj.key).to_string();
|
||||
entries.push(VfsDirEntry {
|
||||
name,
|
||||
long_name: obj.key.clone(),
|
||||
stat: VfsStat {
|
||||
size: obj.size as u64,
|
||||
mode: 0o644,
|
||||
uid: 0,
|
||||
gid: 0,
|
||||
atime: std::time::SystemTime::UNIX_EPOCH,
|
||||
mtime: std::time::SystemTime::UNIX_EPOCH,
|
||||
is_dir: false,
|
||||
is_symlink: false,
|
||||
},
|
||||
});
|
||||
}
|
||||
for prefix_elem in list_response.common_prefixes {
|
||||
let name = prefix_elem.prefix.strip_prefix(prefix).unwrap_or(&prefix_elem.prefix).trim_end_matches('/').to_string();
|
||||
entries.push(VfsDirEntry {
|
||||
name,
|
||||
long_name: prefix_elem.prefix.clone(),
|
||||
stat: VfsStat {
|
||||
size: 0,
|
||||
mode: 0o755,
|
||||
uid: 0,
|
||||
gid: 0,
|
||||
atime: std::time::SystemTime::UNIX_EPOCH,
|
||||
mtime: std::time::SystemTime::UNIX_EPOCH,
|
||||
is_dir: true,
|
||||
is_symlink: false,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
Ok(entries)
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for AsyncS3Vfs {
|
||||
fn clone(&self) -> Self {
|
||||
Self { inner: self.inner.clone() }
|
||||
Self {
|
||||
bucket: self.bucket.clone(),
|
||||
credentials: self.credentials.clone(),
|
||||
client: self.client.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncS3File {
|
||||
pub async fn new_read(vfs: AsyncS3Vfs, key: String) -> Result<Self, VfsError> {
|
||||
let (size, mtime, _) = vfs.head_object(&key).await?;
|
||||
|
||||
Ok(Self {
|
||||
inner: Arc::new(Mutex::new(AsyncS3FileState {
|
||||
key,
|
||||
mode: FileMode::Read,
|
||||
position: 0,
|
||||
size,
|
||||
data: Vec::new(),
|
||||
write_buffer: Vec::new(),
|
||||
mtime,
|
||||
})),
|
||||
vfs,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn new_write(vfs: AsyncS3Vfs, key: String) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(Mutex::new(AsyncS3FileState {
|
||||
key,
|
||||
mode: FileMode::Write,
|
||||
position: 0,
|
||||
size: 0,
|
||||
data: Vec::new(),
|
||||
write_buffer: Vec::new(),
|
||||
mtime: std::time::SystemTime::now(),
|
||||
})),
|
||||
vfs,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl super::AsyncVfsFile for AsyncS3File {
|
||||
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Pin<Box<dyn Future<Output = Result<usize, VfsError>> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
let vfs = self.vfs.clone();
|
||||
Box::pin(async move {
|
||||
let mut state = inner.lock().await;
|
||||
|
||||
if state.position >= state.size {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
if state.data.is_empty() {
|
||||
let key = state.key.clone();
|
||||
state.data = vfs.get_object(&key).await?;
|
||||
}
|
||||
|
||||
let remaining = state.size - state.position;
|
||||
let to_read = buf.len().min(remaining as usize);
|
||||
let start = state.position as usize;
|
||||
let end = start + to_read;
|
||||
|
||||
buf[..to_read].copy_from_slice(&state.data[start..end]);
|
||||
state.position += to_read as u64;
|
||||
|
||||
Ok(to_read)
|
||||
})
|
||||
}
|
||||
|
||||
fn write<'a>(&'a mut self, buf: &'a [u8]) -> Pin<Box<dyn Future<Output = Result<usize, VfsError>> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
Box::pin(async move {
|
||||
let mut state = inner.lock().await;
|
||||
state.write_buffer.extend_from_slice(buf);
|
||||
Ok(buf.len())
|
||||
})
|
||||
}
|
||||
|
||||
fn seek<'a>(&'a mut self, pos: SeekFrom) -> Pin<Box<dyn Future<Output = Result<u64, VfsError>> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
Box::pin(async move {
|
||||
let mut state = inner.lock().await;
|
||||
let new_pos = match pos {
|
||||
SeekFrom::Start(offset) => offset,
|
||||
SeekFrom::Current(offset) => {
|
||||
((state.position as i64) + offset).max(0) as u64
|
||||
}
|
||||
SeekFrom::End(offset) => {
|
||||
((state.size as i64) + offset).max(0) as u64
|
||||
}
|
||||
};
|
||||
state.position = new_pos.min(state.size);
|
||||
Ok(state.position)
|
||||
})
|
||||
}
|
||||
|
||||
fn flush<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
let vfs = self.vfs.clone();
|
||||
Box::pin(async move {
|
||||
let mut state = inner.lock().await;
|
||||
if !state.write_buffer.is_empty() {
|
||||
let key = state.key.clone();
|
||||
let data = state.write_buffer.clone();
|
||||
vfs.put_object(&key, &data).await?;
|
||||
state.write_buffer.clear();
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -182,88 +362,93 @@ impl super::AsyncVfsBackend for AsyncS3Vfs {
|
||||
}
|
||||
|
||||
fn read_dir<'a>(&'a self, path: &'a Path) -> Pin<Box<dyn Future<Output = Result<Vec<VfsDirEntry>, VfsError>> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
let path_buf = path.to_path_buf();
|
||||
let prefix = Self::path_to_key(path);
|
||||
Box::pin(async move {
|
||||
spawn_blocking(move || {
|
||||
VfsBackend::read_dir(&inner, &path_buf)
|
||||
}).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))?
|
||||
self.list_objects(&prefix).await
|
||||
})
|
||||
}
|
||||
|
||||
fn open_file<'a>(&'a self, path: &'a Path, flags: &'a OpenFlags) -> Pin<Box<dyn Future<Output = Result<Box<dyn super::AsyncVfsFile>, VfsError>> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
let path_buf = path.to_path_buf();
|
||||
let key = Self::path_to_key(path);
|
||||
let vfs = self.clone();
|
||||
let is_write = flags.write;
|
||||
Box::pin(async move {
|
||||
let key = path_buf.to_string_lossy().to_string();
|
||||
if is_write {
|
||||
Ok(Box::new(AsyncS3File::new_write(inner, key)) as Box<dyn super::AsyncVfsFile>)
|
||||
Ok(Box::new(AsyncS3File::new_write(vfs, key)) as Box<dyn super::AsyncVfsFile>)
|
||||
} else {
|
||||
let file = AsyncS3File::new_read(inner, key).await?;
|
||||
let file = AsyncS3File::new_read(vfs, key).await?;
|
||||
Ok(Box::new(file) as Box<dyn super::AsyncVfsFile>)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn stat<'a>(&'a self, path: &'a Path) -> Pin<Box<dyn Future<Output = Result<VfsStat, VfsError>> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
let path_buf = path.to_path_buf();
|
||||
let key = Self::path_to_key(path);
|
||||
Box::pin(async move {
|
||||
spawn_blocking(move || {
|
||||
VfsBackend::stat(&inner, &path_buf)
|
||||
}).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))?
|
||||
let (size, mtime, _) = self.head_object(&key).await?;
|
||||
Ok(VfsStat {
|
||||
size,
|
||||
mode: 0o644,
|
||||
uid: 0,
|
||||
gid: 0,
|
||||
atime: mtime,
|
||||
mtime,
|
||||
is_dir: false,
|
||||
is_symlink: false,
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
fn create_dir<'a>(&'a self, path: &'a Path, _mode: u32) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
let path_buf = path.to_path_buf();
|
||||
let key = Self::path_to_key(path);
|
||||
if !key.ends_with('/') {
|
||||
let key = format!("{}/", key);
|
||||
}
|
||||
Box::pin(async move {
|
||||
spawn_blocking(move || {
|
||||
VfsBackend::create_dir(&inner, &path_buf, 0o755)
|
||||
}).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))?
|
||||
self.put_object(&key, &[]).await?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn remove_dir<'a>(&'a self, path: &'a Path) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
let path_buf = path.to_path_buf();
|
||||
let key = Self::path_to_key(path);
|
||||
let key = if key.ends_with('/') { key } else { format!("{}/", key) };
|
||||
Box::pin(async move {
|
||||
spawn_blocking(move || {
|
||||
VfsBackend::remove_dir(&inner, &path_buf)
|
||||
}).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))?
|
||||
self.delete_object(&key).await?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn remove_file<'a>(&'a self, path: &'a Path) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
let path_buf = path.to_path_buf();
|
||||
let key = Self::path_to_key(path);
|
||||
Box::pin(async move {
|
||||
spawn_blocking(move || {
|
||||
VfsBackend::remove_file(&inner, &path_buf)
|
||||
}).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))?
|
||||
self.delete_object(&key).await?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn rename<'a>(&'a self, from: &'a Path, to: &'a Path) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
let from_buf = from.to_path_buf();
|
||||
let to_buf = to.to_path_buf();
|
||||
let from_key = Self::path_to_key(from);
|
||||
let to_key = Self::path_to_key(to);
|
||||
Box::pin(async move {
|
||||
spawn_blocking(move || {
|
||||
VfsBackend::rename(&inner, &from_buf, &to_buf)
|
||||
}).await.map_err(|_| VfsError::Io("spawn_blocking failed".to_string()))?
|
||||
let data = self.get_object(&from_key).await?;
|
||||
self.put_object(&to_key, &data).await?;
|
||||
self.delete_object(&from_key).await?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn exists<'a>(&'a self, path: &'a Path) -> Pin<Box<dyn Future<Output = bool> + Send + 'a>> {
|
||||
let inner = self.inner.clone();
|
||||
let path_buf = path.to_path_buf();
|
||||
let key = Self::path_to_key(path);
|
||||
Box::pin(async move {
|
||||
spawn_blocking(move || {
|
||||
VfsBackend::exists(&inner, &path_buf)
|
||||
}).await.unwrap_or(false)
|
||||
self.head_object(&key).await.is_ok()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_last_modified(header: Option<&str>) -> std::time::SystemTime {
|
||||
header
|
||||
.and_then(|s| chrono::DateTime::parse_from_rfc2822(s).ok())
|
||||
.map(|dt| std::time::SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(dt.timestamp() as u64))
|
||||
.unwrap_or(std::time::SystemTime::now())
|
||||
}
|
||||
Reference in New Issue
Block a user