Add S3 VFS backend: VfsBackend impl for S3-compatible storage
Some checks failed
Test / test (push) Has been cancelled
Test / build (push) Has been cancelled

- S3Vfs with all 15 VfsBackend methods via rusty-s3 + ureq
- S3VfsFile for buffered writes + ranged reads
- AWS Signature V4 pre-signed URLs (rusty-s3)
- ListObjectsV2 for directory listing (prefix + delimiter)
- Path-style URL mapping (/path to bucket/key)
This commit is contained in:
Warren
2026-06-18 23:44:52 +08:00
parent 69efcdf5c5
commit 960ee87ce9
5 changed files with 829 additions and 7 deletions

View File

@@ -59,6 +59,9 @@ aes = "0.8"
ctr = "0.9"
cipher = "0.4"
nix = { version = "0.29", features = ["poll", "fs"] } # Phase 14: OpenSSH风格的poll()和非阻塞I/Ofs feature包含fcntl
rusty-s3 = "0.10" # S3 API 签名AWS Signature V4
ureq = "2.12" # 輕量同步 HTTP 客戶端
url = "2" # URL 解析rusty-s3 依賴)
[features]
default = [] # 默认不启用可选格式

View File

@@ -1,5 +1,6 @@
pub mod open_flags;
pub mod local_fs;
pub mod s3_fs;
pub mod util;
use std::path::{Path, PathBuf};

View File

@@ -0,0 +1,632 @@
use super::open_flags::OpenFlags;
use super::{VfsBackend, VfsDirEntry, VfsError, VfsFile, VfsStat};
use chrono::Datelike;
use rusty_s3::{actions, Bucket, Credentials, S3Action, UrlStyle};
use std::io::Read;
use std::path::{Path, PathBuf};
use std::time::{Duration, SystemTime};
use url::Url;
/// S3-compatible 文件系統後端
pub struct S3Vfs {
bucket: Bucket,
credentials: Credentials,
}
struct S3VfsFile {
s3vfs_like: S3VfsLike,
key: String,
mode: FileMode,
position: u64,
write_buffer: Vec<u8>,
size: u64,
mtime: SystemTime,
}
#[derive(Clone)]
struct S3VfsLike {
bucket: Bucket,
credentials: Credentials,
}
enum FileMode {
Read,
Write,
}
impl S3Vfs {
pub fn new(
endpoint: &str,
region: &str,
bucket_name: &str,
access_key: &str,
secret_key: &str,
) -> Result<Self, VfsError> {
let e = endpoint.trim_end_matches('/');
let endpoint_url =
Url::parse(e).map_err(|e| VfsError::Io(format!("Invalid S3 endpoint URL: {}", e)))?;
let bucket = Bucket::new(
endpoint_url,
UrlStyle::Path,
bucket_name.to_string(),
region.to_string(),
)
.map_err(|e| VfsError::Io(format!("Failed to create S3 bucket config: {}", e)))?;
let credentials = Credentials::new(access_key, secret_key);
Ok(Self { bucket, credentials })
}
fn path_to_key(path: &Path) -> String {
let s = path.to_string_lossy();
s.strip_prefix('/').unwrap_or(&s).to_string()
}
fn dir_key(path: &Path) -> String {
let s = Self::path_to_key(path);
if s.is_empty() || s.ends_with('/') {
s
} else {
format!("{}/", s)
}
}
fn head_object(&self, key: &str) -> Result<(u64, SystemTime, String), VfsError> {
let action = actions::HeadObject::new(&self.bucket, Some(&self.credentials), key);
let url = action.sign(Duration::from_secs(3600));
let resp = ureq::head(url.as_str())
.call()
.map_err(|e| VfsError::Io(format!("S3 HEAD failed: {}", e)))?;
let status = resp.status();
if status == 404 {
return Err(VfsError::NotFound(key.to_string()));
}
if status != 200 {
return Err(VfsError::Io(format!("HeadObject returned {}", status)));
}
let content_len: u64 = resp
.header("Content-Length")
.and_then(|v| v.parse().ok())
.unwrap_or(0);
let last_modified = parse_last_modified(resp.header("Last-Modified"));
let content_type = resp
.header("Content-Type")
.unwrap_or("application/octet-stream")
.to_string();
Ok((content_len, last_modified, content_type))
}
fn put_object(&self, key: &str, body: &[u8]) -> Result<(), VfsError> {
let action = actions::PutObject::new(&self.bucket, Some(&self.credentials), key);
let url = action.sign(Duration::from_secs(3600));
let content_type = if key.ends_with('/') {
"application/directory"
} else {
"application/octet-stream"
};
let resp = ureq::put(url.as_str())
.set("Content-Type", content_type)
.send_bytes(body)
.map_err(|e| VfsError::Io(format!("S3 PUT failed: {}", e)))?;
if resp.status() != 200 {
return Err(VfsError::Io(format!("PutObject returned {}", resp.status())));
}
Ok(())
}
fn delete_object(&self, key: &str) -> Result<(), VfsError> {
let action = actions::DeleteObject::new(&self.bucket, Some(&self.credentials), key);
let url = action.sign(Duration::from_secs(3600));
let resp = ureq::delete(url.as_str())
.call()
.map_err(|e| VfsError::Io(format!("S3 DELETE failed: {}", e)))?;
let status = resp.status();
if status != 204 && status != 200 {
return Err(VfsError::Io(format!("DeleteObject returned {}", status)));
}
Ok(())
}
fn copy_object(&self, from_key: &str, to_key: &str) -> Result<(), VfsError> {
let action = actions::PutObject::new(&self.bucket, Some(&self.credentials), to_key);
let url = action.sign(Duration::from_secs(3600));
let copy_source = format!("{}/{}", self.bucket.name(), from_key);
let resp = ureq::put(url.as_str())
.set("x-amz-copy-source", &copy_source)
.send_bytes(&[])
.map_err(|e| VfsError::Io(format!("S3 CopyObject failed: {}", e)))?;
if resp.status() != 200 {
return Err(VfsError::Io(format!("CopyObject returned {}", resp.status())));
}
Ok(())
}
fn list_objects(
&self,
prefix: &str,
) -> Result<actions::ListObjectsV2Response, VfsError> {
let mut action = actions::ListObjectsV2::new(&self.bucket, Some(&self.credentials));
if !prefix.is_empty() {
action.with_prefix(prefix);
}
action.with_delimiter("/");
let url = action.sign(Duration::from_secs(3600));
let resp = ureq::get(url.as_str())
.call()
.map_err(|e| VfsError::Io(format!("S3 ListObjects failed: {}", e)))?;
if resp.status() != 200 {
return Err(VfsError::Io(format!(
"ListObjectsV2 returned {}",
resp.status()
)));
}
let mut body = String::new();
resp.into_reader()
.read_to_string(&mut body)
.map_err(|e| VfsError::Io(format!("Failed to read S3 list response: {}", e)))?;
actions::ListObjectsV2::parse_response(&body).map_err(|e| {
VfsError::Io(format!("Failed to parse S3 list response XML: {}", e))
})
}
}
impl VfsBackend for S3Vfs {
fn read_dir(&self, path: &Path) -> Result<Vec<VfsDirEntry>, VfsError> {
let prefix = Self::dir_key(path);
let list = self.list_objects(&prefix)?;
let mut entries = Vec::new();
for cp in &list.common_prefixes {
let full_prefix = &cp.prefix;
let rel = full_prefix.strip_prefix(&prefix).unwrap_or(full_prefix);
let name = rel.strip_suffix('/').unwrap_or(rel);
if name.is_empty() {
continue;
}
let dk = format!("{}{}/", prefix, name);
let st = match self.head_object(&dk) {
Ok((_size, mtime, _ct)) => VfsStat {
is_dir: true,
mtime,
..VfsStat::new()
},
Err(_) => VfsStat {
is_dir: true,
..VfsStat::new()
},
};
entries.push(VfsDirEntry {
name: name.to_string(),
long_name: build_long_name(&st, name),
stat: st,
});
}
for obj in &list.contents {
let rel = obj.key.strip_prefix(&prefix).unwrap_or(&obj.key);
let name = rel.strip_suffix('/').unwrap_or(rel);
if name.is_empty() {
continue;
}
let lm = parse_iso8601(&obj.last_modified);
let st = VfsStat {
size: obj.size,
mtime: lm,
is_dir: obj.key.ends_with('/'),
is_symlink: false,
..VfsStat::new()
};
entries.push(VfsDirEntry {
name: name.to_string(),
long_name: build_long_name(&st, name),
stat: st,
});
}
entries.sort_by(|a, b| a.name.cmp(&b.name));
Ok(entries)
}
fn open_file(&self, path: &Path, flags: &OpenFlags) -> Result<Box<dyn VfsFile>, VfsError> {
let key = Self::path_to_key(path);
let is_write = flags.write || flags.create || flags.truncate;
if is_write {
let sz = if flags.truncate || flags.create {
0
} else {
self.head_object(&key).map(|r| r.0).unwrap_or(0)
};
Ok(Box::new(S3VfsFile {
s3vfs_like: S3VfsLike {
bucket: self.bucket.clone(),
credentials: self.credentials.clone(),
},
key,
mode: FileMode::Write,
position: 0,
write_buffer: Vec::new(),
size: sz,
mtime: SystemTime::now(),
}))
} else if flags.read {
let (size, mtime, _) = self.head_object(&key)?;
Ok(Box::new(S3VfsFile {
s3vfs_like: S3VfsLike {
bucket: self.bucket.clone(),
credentials: self.credentials.clone(),
},
key,
mode: FileMode::Read,
position: 0,
write_buffer: Vec::new(),
size,
mtime,
}))
} else {
Err(VfsError::Io("Invalid open flags".to_string()))
}
}
fn stat(&self, path: &Path) -> Result<VfsStat, VfsError> {
let key = Self::path_to_key(path);
let (size, mtime, content_type) = self.head_object(&key)?;
Ok(VfsStat {
size,
mtime,
is_dir: key.ends_with('/') || content_type == "application/directory",
is_symlink: false,
..VfsStat::new()
})
}
fn lstat(&self, path: &Path) -> Result<VfsStat, VfsError> {
self.stat(path)
}
fn create_dir(&self, path: &Path, _mode: u32) -> Result<(), VfsError> {
let key = Self::dir_key(path);
self.put_object(&key, &[])
}
fn create_dir_all(&self, path: &Path, _mode: u32) -> Result<(), VfsError> {
let path_str = path.to_string_lossy();
let components: Vec<&str> = path_str
.strip_prefix('/')
.unwrap_or("")
.split('/')
.filter(|s| !s.is_empty())
.collect();
let mut acc = String::new();
for comp in components {
acc = format!("{}{}/", acc, comp);
let _ = self.put_object(&acc, &[]);
}
if !acc.is_empty() {
self.create_dir(path, _mode)
} else {
Ok(())
}
}
fn remove_dir(&self, path: &Path) -> Result<(), VfsError> {
let key = Self::dir_key(path);
let list = self.list_objects(&key)?;
if !list.contents.is_empty() || !list.common_prefixes.is_empty() {
return Err(VfsError::NotEmpty(path.display().to_string()));
}
self.delete_object(&key)
}
fn remove_file(&self, path: &Path) -> Result<(), VfsError> {
let key = Self::path_to_key(path);
self.delete_object(&key)
}
fn rename(&self, from: &Path, to: &Path) -> Result<(), VfsError> {
let from_key = Self::path_to_key(from);
let to_key = Self::path_to_key(to);
self.copy_object(&from_key, &to_key)?;
self.delete_object(&from_key)
}
fn set_stat(&self, _path: &Path, _stat: &VfsStat) -> Result<(), VfsError> {
Err(VfsError::Unsupported(
"set_stat not supported for S3 backend".to_string(),
))
}
fn read_link(&self, _path: &Path) -> Result<PathBuf, VfsError> {
Err(VfsError::Unsupported(
"read_link not supported for S3 backend".to_string(),
))
}
fn create_symlink(&self, _target: &Path, _link: &Path) -> Result<(), VfsError> {
Err(VfsError::Unsupported(
"create_symlink not supported for S3 backend".to_string(),
))
}
fn real_path(&self, path: &Path) -> Result<PathBuf, VfsError> {
let key = Self::path_to_key(path);
if key.is_empty() {
return Ok(PathBuf::from("/"));
}
match self.head_object(&key) {
Ok(_) => Ok(PathBuf::from("/").join(&key)),
Err(VfsError::NotFound(_)) => {
let dk = Self::dir_key(path);
match self.head_object(&dk) {
Ok(_) => Ok(PathBuf::from("/").join(&key)),
Err(_) => Ok(PathBuf::from("/").join(&key)),
}
}
Err(e) => Err(e),
}
}
fn exists(&self, path: &Path) -> bool {
let key = Self::path_to_key(path);
if key.is_empty() {
return true;
}
self.head_object(&key).is_ok() || self.head_object(&Self::dir_key(path)).is_ok()
}
fn hard_link(&self, original: &Path, link: &Path) -> Result<(), VfsError> {
let from_key = Self::path_to_key(original);
let to_key = Self::path_to_key(link);
self.copy_object(&from_key, &to_key)
}
}
impl VfsFile for S3VfsFile {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, VfsError> {
let to_read = buf.len().min((self.size.saturating_sub(self.position)) as usize);
if to_read == 0 {
return Ok(0);
}
let data = self
.s3vfs_like
.get_object_range(&self.key, self.position, to_read as u64)?;
let n = data.len().min(buf.len());
buf[..n].copy_from_slice(&data[..n]);
self.position += n as u64;
Ok(n)
}
fn write(&mut self, buf: &[u8]) -> Result<usize, VfsError> {
self.write_buffer.extend_from_slice(buf);
self.position += buf.len() as u64;
Ok(buf.len())
}
fn seek(&mut self, pos: std::io::SeekFrom) -> Result<u64, VfsError> {
match pos {
std::io::SeekFrom::Start(offset) => {
self.position = offset;
}
std::io::SeekFrom::End(offset) => {
let sz = match self.mode {
FileMode::Write => self.position,
FileMode::Read => self.size,
};
if offset >= 0 {
self.position = sz.saturating_add(offset as u64);
} else {
let abs = offset.unsigned_abs();
self.position = if abs <= sz { sz - abs } else { 0 };
}
}
std::io::SeekFrom::Current(offset) => {
if offset >= 0 {
self.position = self.position.saturating_add(offset as u64);
} else {
let abs = offset.unsigned_abs();
self.position = if abs <= self.position {
self.position - abs
} else {
0
};
}
}
}
Ok(self.position)
}
fn flush(&mut self) -> Result<(), VfsError> {
if matches!(self.mode, FileMode::Write) && !self.write_buffer.is_empty() {
let buf = std::mem::take(&mut self.write_buffer);
self.s3vfs_like.put_object(&self.key, &buf)?;
}
Ok(())
}
fn stat(&mut self) -> Result<VfsStat, VfsError> {
let sz = match self.mode {
FileMode::Write => self.position,
FileMode::Read => self.size,
};
Ok(VfsStat {
size: sz,
mtime: self.mtime,
is_dir: self.key.ends_with('/'),
is_symlink: false,
..VfsStat::new()
})
}
fn set_len(&mut self, size: u64) -> Result<(), VfsError> {
if matches!(self.mode, FileMode::Write) {
let len = size as usize;
if len < self.write_buffer.len() {
self.write_buffer.truncate(len);
} else {
self.write_buffer.resize(len, 0);
}
self.position = size;
}
Ok(())
}
}
impl Drop for S3VfsFile {
fn drop(&mut self) {
if matches!(self.mode, FileMode::Write) && !self.write_buffer.is_empty() {
let buf = std::mem::take(&mut self.write_buffer);
let _ = self.s3vfs_like.put_object(&self.key, &buf);
}
}
}
impl S3VfsLike {
fn get_object_range(&self, key: &str, offset: u64, size: u64) -> Result<Vec<u8>, VfsError> {
let action = actions::GetObject::new(&self.bucket, Some(&self.credentials), key);
let url = action.sign(Duration::from_secs(3600));
let range = format!("bytes={}-{}", offset, offset + size - 1);
let resp = ureq::get(url.as_str())
.set("Range", &range)
.call()
.map_err(|e| VfsError::Io(format!("S3 GET range failed: {}", e)))?;
let status = resp.status();
if status == 404 {
return Err(VfsError::NotFound(key.to_string()));
}
if status != 206 && status != 200 {
return Err(VfsError::Io(format!("GetObject returned {}", status)));
}
let mut body = Vec::new();
resp.into_reader()
.read_to_end(&mut body)
.map_err(|e| VfsError::Io(format!("Failed to read S3 response: {}", e)))?;
Ok(body)
}
fn put_object(&self, key: &str, body: &[u8]) -> Result<(), VfsError> {
let action = actions::PutObject::new(&self.bucket, Some(&self.credentials), key);
let url = action.sign(Duration::from_secs(3600));
let ct = if key.ends_with('/') {
"application/directory"
} else {
"application/octet-stream"
};
let resp = ureq::put(url.as_str())
.set("Content-Type", ct)
.send_bytes(body)
.map_err(|e| VfsError::Io(format!("S3 PUT failed: {}", e)))?;
if resp.status() != 200 {
return Err(VfsError::Io(format!("PutObject returned {}", resp.status())));
}
Ok(())
}
}
// ---- helper functions ----
fn parse_last_modified(header: Option<&str>) -> SystemTime {
match header {
Some(val) => {
if let Ok(dt) = chrono::DateTime::parse_from_rfc2822(val) {
dt.into()
} else if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(val) {
dt.into()
} else {
SystemTime::UNIX_EPOCH
}
}
None => SystemTime::UNIX_EPOCH,
}
}
fn parse_iso8601(s: &str) -> SystemTime {
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(s) {
dt.into()
} else if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.fZ") {
dt.and_utc().into()
} else {
SystemTime::UNIX_EPOCH
}
}
fn build_long_name(stat: &VfsStat, name: &str) -> String {
let ft = if stat.is_dir { 'd' } else { '-' };
let perms = "rwxr-xr-x";
let sz = stat.size;
let ts = match stat.mtime.duration_since(SystemTime::UNIX_EPOCH) {
Ok(d) => {
let secs = d.as_secs();
match chrono::DateTime::from_timestamp(secs as i64, 0) {
Some(dt) => {
let now = chrono::Utc::now();
if dt.year() == now.year() {
dt.format("%b %e %H:%M").to_string()
} else {
dt.format("%b %e %Y").to_string()
}
}
None => "Jan 1 1970".to_string(),
}
}
Err(_) => "Jan 1 1970".to_string(),
};
format!("{}{} 1 0 0 {} {} {}", ft, perms, sz, ts, name)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_path_to_key() {
assert_eq!(
S3Vfs::path_to_key(Path::new("/foo/bar.txt")),
"foo/bar.txt"
);
assert_eq!(S3Vfs::path_to_key(Path::new("/")), "");
assert_eq!(
S3Vfs::path_to_key(Path::new("relative/path")),
"relative/path"
);
}
#[test]
fn test_dir_key() {
assert_eq!(S3Vfs::dir_key(Path::new("/foo/bar")), "foo/bar/");
assert_eq!(S3Vfs::dir_key(Path::new("/")), "");
assert_eq!(S3Vfs::dir_key(Path::new("/foo/")), "foo/");
}
}