Add S3 VFS backend: VfsBackend impl for S3-compatible storage
Some checks failed
Test / test (push) Has been cancelled
Test / build (push) Has been cancelled

- S3Vfs with all 15 VfsBackend methods via rusty-s3 + ureq
- S3VfsFile for buffered writes + ranged reads
- AWS Signature V4 pre-signed URLs (rusty-s3)
- ListObjectsV2 for directory listing (prefix + delimiter)
- Path-style URL mapping (/path to bucket/key)
This commit is contained in:
Warren
2026-06-18 23:44:52 +08:00
parent 69efcdf5c5
commit 960ee87ce9
5 changed files with 829 additions and 7 deletions

View File

@@ -1353,14 +1353,60 @@ PUBKEY_OK # ✅ Public key authentication successful
**推送到**:✅ m5max128gitea + ✅ m4minigitea **推送到**:✅ m5max128gitea + ✅ m4minigitea
## S3 VFS 後端完成2026-06-18⭐⭐⭐⭐⭐
**完成時間**:約 1 小時
**新增代碼量**~500 行
**新增依賴**rusty-s3 + ureq + url
### 實施內容 ⭐⭐⭐⭐⭐
**S3Vfs 完整實現**`markbase-core/src/vfs/s3_fs.rs`
| VfsBackend 方法 | S3 對應操作 | 狀態 |
|----------------|------------|------|
| `read_dir()` | ListObjectsV2 (prefix + delimiter) | ✅ |
| `open_file()` | GetObject (ranged reads) / PutObject (buffered writes) | ✅ |
| `stat()` / `lstat()` | HeadObject | ✅ |
| `create_dir()` | PutObject (0-byte directory marker) | ✅ |
| `create_dir_all()` | 遞迴建立目錄標記 | ✅ |
| `remove_dir()` | DeleteObject + 空目錄檢查 | ✅ |
| `remove_file()` | DeleteObject | ✅ |
| `rename()` | CopyObject + DeleteObject | ✅ |
| `exists()` | HeadObject (file + directory marker) | ✅ |
| `hard_link()` | CopyObject | ✅ |
| `real_path()` | HeadObject 驗證 | ✅ |
| `set_stat()` / `read_link()` / `create_symlink()` | ❌ 回傳 Unsupported | ✅ |
### 關鍵設計決策 ⭐⭐⭐⭐⭐
**簽名方式**:使用 `rusty-s3` crate 產生 AWS Signature V4 預簽名 URL有效期 1h
**S3VfsFile 實作**
- **讀取模式**Ranged GET`Range` header 指定 byte 範圍)
- **寫入模式**:記憶體緩衝區 → `flush()` 或 `drop()` 時 PUT 上傳
- **seek()**:支援 SeekFrom::Start/End/Current
**路徑映射**
- `/foo/bar.txt` → S3 Key `foo/bar.txt`
- `/foo/bar/` → S3 Key `foo/bar/`(目錄標記)
- 使用 path-style URL`endpoint/bucket/key`
**XML 解析**:使用 `rusty-s3` 內建 `ListObjectsV2::parse_response()`,無需額外 XML parser
### Build 驗證 ✅
```bash
cargo build -p markbase-core # ✅ 0 error
```
--- ---
### 下一步建議 ### 下一步建議
1. **S3 VFS 後端實作**S3Vfs 實作 `VfsBackend` trait 1. **SFTP 認證整合 DataProvider**`sftp/auth.rs` + `sftp/server.rs`
2. **SFTP 認證整合 DataProvider**`sftp/auth.rs` + `sftp/server.rs` 2. **Web 前端 Phase 2** — Tab 切換、搜索框 UI
3. **Web 前端 Phase 2**Tab 切換、搜索框 UI 3. **安全審計 Phase 9**全面 SSH 安全測驗
4. **安全審計 Phase 9** — 全面 SSH 安全測驗 4. **CI Pipeline** — 自動化整合測試
5. **CI Pipeline** — 自動化整合測試 5. **效能測試**VFS + AES-CTR throughput profiling
6. **效能測試**VFS + AES-CTR throughput profiling

142
Cargo.lock generated
View File

@@ -241,7 +241,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ec2f1fc3ec205783a5da9a7e6c1509cc69dedf09a1949e412c1e18469326d00" checksum = "5ec2f1fc3ec205783a5da9a7e6c1509cc69dedf09a1949e412c1e18469326d00"
dependencies = [ dependencies = [
"aws-lc-sys", "aws-lc-sys",
"untrusted", "untrusted 0.7.1",
"zeroize", "zeroize",
] ]
@@ -2339,6 +2339,29 @@ dependencies = [
"hybrid-array", "hybrid-array",
] ]
[[package]]
name = "instant-xml"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8863a17b9487acadbfc6a54f1215b67695dcc56d760c69cc08a16ad5e8fd5d0e"
dependencies = [
"instant-xml-macros",
"thiserror 2.0.18",
"xmlparser",
]
[[package]]
name = "instant-xml-macros"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44127a3a387c070ef0656a6ce53dd0e616cf8d6cf5b159aa478cfd49e1c166e0"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.117",
]
[[package]] [[package]]
name = "internal-russh-forked-ssh-key" name = "internal-russh-forked-ssh-key"
version = "0.6.9+upstream-0.6.7" version = "0.6.9+upstream-0.6.7"
@@ -2677,6 +2700,7 @@ dependencies = [
"russh", "russh",
"russh-keys", "russh-keys",
"russh-sftp", "russh-sftp",
"rusty-s3",
"serde", "serde",
"serde_json", "serde_json",
"sevenz-rust", "sevenz-rust",
@@ -2691,6 +2715,8 @@ dependencies = [
"tokio-util", "tokio-util",
"toml", "toml",
"unrar", "unrar",
"ureq",
"url",
"uuid", "uuid",
"x25519-dalek", "x25519-dalek",
"xz2", "xz2",
@@ -4040,6 +4066,20 @@ dependencies = [
"subtle", "subtle",
] ]
[[package]]
name = "ring"
version = "0.17.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7"
dependencies = [
"cc",
"cfg-if",
"getrandom 0.2.17",
"libc",
"untrusted 0.9.0",
"windows-sys 0.52.0",
]
[[package]] [[package]]
name = "rocksdb" name = "rocksdb"
version = "0.24.0" version = "0.24.0"
@@ -4348,12 +4388,66 @@ dependencies = [
"windows-sys 0.61.2", "windows-sys 0.61.2",
] ]
[[package]]
name = "rustls"
version = "0.23.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef86cd5876211988985292b91c96a8f2d298df24e75989a43a3c73f2d4d8168b"
dependencies = [
"log",
"once_cell",
"ring",
"rustls-pki-types",
"rustls-webpki",
"subtle",
"zeroize",
]
[[package]]
name = "rustls-pki-types"
version = "1.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30a7197ae7eb376e574fe940d068c30fe0462554a3ddbe4eca7838e049c937a9"
dependencies = [
"zeroize",
]
[[package]]
name = "rustls-webpki"
version = "0.103.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61c429a8649f110dddef65e2a5ad240f747e85f7758a6bccc7e5777bd33f756e"
dependencies = [
"ring",
"rustls-pki-types",
"untrusted 0.9.0",
]
[[package]] [[package]]
name = "rustversion" name = "rustversion"
version = "1.0.22" version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d"
[[package]]
name = "rusty-s3"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20f0d23aa8ac3b44d4cfb1e4b3611e6f3776debfb3f7701c4ea9f2252a701403"
dependencies = [
"base64",
"hmac 0.13.0",
"instant-xml",
"jiff",
"md-5",
"percent-encoding",
"serde",
"serde_json",
"sha2 0.11.0",
"url",
"zeroize",
]
[[package]] [[package]]
name = "ryu" name = "ryu"
version = "1.0.23" version = "1.0.23"
@@ -5372,6 +5466,28 @@ version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "untrusted"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
[[package]]
name = "ureq"
version = "2.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "02d1a66277ed75f640d608235660df48c8e3c19f3b4edb6a263315626cc3c01d"
dependencies = [
"base64",
"flate2",
"log",
"once_cell",
"rustls",
"rustls-pki-types",
"url",
"webpki-roots 0.26.11",
]
[[package]] [[package]]
name = "url" name = "url"
version = "2.5.8" version = "2.5.8"
@@ -5661,6 +5777,24 @@ dependencies = [
"wasm-bindgen", "wasm-bindgen",
] ]
[[package]]
name = "webpki-roots"
version = "0.26.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9"
dependencies = [
"webpki-roots 1.0.8",
]
[[package]]
name = "webpki-roots"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf85cb06032201fa7c6f829d7db5a7e5aa45bcc0655327713065f6f0576731bf"
dependencies = [
"rustls-pki-types",
]
[[package]] [[package]]
name = "which" name = "which"
version = "7.0.3" version = "7.0.3"
@@ -6182,6 +6316,12 @@ dependencies = [
"xml", "xml",
] ]
[[package]]
name = "xmlparser"
version = "0.13.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4"
[[package]] [[package]]
name = "xmltree" name = "xmltree"
version = "0.12.0" version = "0.12.0"

View File

@@ -59,6 +59,9 @@ aes = "0.8"
ctr = "0.9" ctr = "0.9"
cipher = "0.4" cipher = "0.4"
nix = { version = "0.29", features = ["poll", "fs"] } # Phase 14: OpenSSH风格的poll()和非阻塞I/Ofs feature包含fcntl nix = { version = "0.29", features = ["poll", "fs"] } # Phase 14: OpenSSH风格的poll()和非阻塞I/Ofs feature包含fcntl
rusty-s3 = "0.10" # S3 API 签名AWS Signature V4
ureq = "2.12" # 輕量同步 HTTP 客戶端
url = "2" # URL 解析rusty-s3 依賴)
[features] [features]
default = [] # 默认不启用可选格式 default = [] # 默认不启用可选格式

View File

@@ -1,5 +1,6 @@
pub mod open_flags; pub mod open_flags;
pub mod local_fs; pub mod local_fs;
pub mod s3_fs;
pub mod util; pub mod util;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};

View File

@@ -0,0 +1,632 @@
use super::open_flags::OpenFlags;
use super::{VfsBackend, VfsDirEntry, VfsError, VfsFile, VfsStat};
use chrono::Datelike;
use rusty_s3::{actions, Bucket, Credentials, S3Action, UrlStyle};
use std::io::Read;
use std::path::{Path, PathBuf};
use std::time::{Duration, SystemTime};
use url::Url;
/// S3-compatible 文件系統後端
pub struct S3Vfs {
bucket: Bucket,
credentials: Credentials,
}
struct S3VfsFile {
s3vfs_like: S3VfsLike,
key: String,
mode: FileMode,
position: u64,
write_buffer: Vec<u8>,
size: u64,
mtime: SystemTime,
}
#[derive(Clone)]
struct S3VfsLike {
bucket: Bucket,
credentials: Credentials,
}
enum FileMode {
Read,
Write,
}
impl S3Vfs {
pub fn new(
endpoint: &str,
region: &str,
bucket_name: &str,
access_key: &str,
secret_key: &str,
) -> Result<Self, VfsError> {
let e = endpoint.trim_end_matches('/');
let endpoint_url =
Url::parse(e).map_err(|e| VfsError::Io(format!("Invalid S3 endpoint URL: {}", e)))?;
let bucket = Bucket::new(
endpoint_url,
UrlStyle::Path,
bucket_name.to_string(),
region.to_string(),
)
.map_err(|e| VfsError::Io(format!("Failed to create S3 bucket config: {}", e)))?;
let credentials = Credentials::new(access_key, secret_key);
Ok(Self { bucket, credentials })
}
fn path_to_key(path: &Path) -> String {
let s = path.to_string_lossy();
s.strip_prefix('/').unwrap_or(&s).to_string()
}
fn dir_key(path: &Path) -> String {
let s = Self::path_to_key(path);
if s.is_empty() || s.ends_with('/') {
s
} else {
format!("{}/", s)
}
}
fn head_object(&self, key: &str) -> Result<(u64, SystemTime, String), VfsError> {
let action = actions::HeadObject::new(&self.bucket, Some(&self.credentials), key);
let url = action.sign(Duration::from_secs(3600));
let resp = ureq::head(url.as_str())
.call()
.map_err(|e| VfsError::Io(format!("S3 HEAD failed: {}", e)))?;
let status = resp.status();
if status == 404 {
return Err(VfsError::NotFound(key.to_string()));
}
if status != 200 {
return Err(VfsError::Io(format!("HeadObject returned {}", status)));
}
let content_len: u64 = resp
.header("Content-Length")
.and_then(|v| v.parse().ok())
.unwrap_or(0);
let last_modified = parse_last_modified(resp.header("Last-Modified"));
let content_type = resp
.header("Content-Type")
.unwrap_or("application/octet-stream")
.to_string();
Ok((content_len, last_modified, content_type))
}
fn put_object(&self, key: &str, body: &[u8]) -> Result<(), VfsError> {
let action = actions::PutObject::new(&self.bucket, Some(&self.credentials), key);
let url = action.sign(Duration::from_secs(3600));
let content_type = if key.ends_with('/') {
"application/directory"
} else {
"application/octet-stream"
};
let resp = ureq::put(url.as_str())
.set("Content-Type", content_type)
.send_bytes(body)
.map_err(|e| VfsError::Io(format!("S3 PUT failed: {}", e)))?;
if resp.status() != 200 {
return Err(VfsError::Io(format!("PutObject returned {}", resp.status())));
}
Ok(())
}
fn delete_object(&self, key: &str) -> Result<(), VfsError> {
let action = actions::DeleteObject::new(&self.bucket, Some(&self.credentials), key);
let url = action.sign(Duration::from_secs(3600));
let resp = ureq::delete(url.as_str())
.call()
.map_err(|e| VfsError::Io(format!("S3 DELETE failed: {}", e)))?;
let status = resp.status();
if status != 204 && status != 200 {
return Err(VfsError::Io(format!("DeleteObject returned {}", status)));
}
Ok(())
}
fn copy_object(&self, from_key: &str, to_key: &str) -> Result<(), VfsError> {
let action = actions::PutObject::new(&self.bucket, Some(&self.credentials), to_key);
let url = action.sign(Duration::from_secs(3600));
let copy_source = format!("{}/{}", self.bucket.name(), from_key);
let resp = ureq::put(url.as_str())
.set("x-amz-copy-source", &copy_source)
.send_bytes(&[])
.map_err(|e| VfsError::Io(format!("S3 CopyObject failed: {}", e)))?;
if resp.status() != 200 {
return Err(VfsError::Io(format!("CopyObject returned {}", resp.status())));
}
Ok(())
}
fn list_objects(
&self,
prefix: &str,
) -> Result<actions::ListObjectsV2Response, VfsError> {
let mut action = actions::ListObjectsV2::new(&self.bucket, Some(&self.credentials));
if !prefix.is_empty() {
action.with_prefix(prefix);
}
action.with_delimiter("/");
let url = action.sign(Duration::from_secs(3600));
let resp = ureq::get(url.as_str())
.call()
.map_err(|e| VfsError::Io(format!("S3 ListObjects failed: {}", e)))?;
if resp.status() != 200 {
return Err(VfsError::Io(format!(
"ListObjectsV2 returned {}",
resp.status()
)));
}
let mut body = String::new();
resp.into_reader()
.read_to_string(&mut body)
.map_err(|e| VfsError::Io(format!("Failed to read S3 list response: {}", e)))?;
actions::ListObjectsV2::parse_response(&body).map_err(|e| {
VfsError::Io(format!("Failed to parse S3 list response XML: {}", e))
})
}
}
impl VfsBackend for S3Vfs {
fn read_dir(&self, path: &Path) -> Result<Vec<VfsDirEntry>, VfsError> {
let prefix = Self::dir_key(path);
let list = self.list_objects(&prefix)?;
let mut entries = Vec::new();
for cp in &list.common_prefixes {
let full_prefix = &cp.prefix;
let rel = full_prefix.strip_prefix(&prefix).unwrap_or(full_prefix);
let name = rel.strip_suffix('/').unwrap_or(rel);
if name.is_empty() {
continue;
}
let dk = format!("{}{}/", prefix, name);
let st = match self.head_object(&dk) {
Ok((_size, mtime, _ct)) => VfsStat {
is_dir: true,
mtime,
..VfsStat::new()
},
Err(_) => VfsStat {
is_dir: true,
..VfsStat::new()
},
};
entries.push(VfsDirEntry {
name: name.to_string(),
long_name: build_long_name(&st, name),
stat: st,
});
}
for obj in &list.contents {
let rel = obj.key.strip_prefix(&prefix).unwrap_or(&obj.key);
let name = rel.strip_suffix('/').unwrap_or(rel);
if name.is_empty() {
continue;
}
let lm = parse_iso8601(&obj.last_modified);
let st = VfsStat {
size: obj.size,
mtime: lm,
is_dir: obj.key.ends_with('/'),
is_symlink: false,
..VfsStat::new()
};
entries.push(VfsDirEntry {
name: name.to_string(),
long_name: build_long_name(&st, name),
stat: st,
});
}
entries.sort_by(|a, b| a.name.cmp(&b.name));
Ok(entries)
}
fn open_file(&self, path: &Path, flags: &OpenFlags) -> Result<Box<dyn VfsFile>, VfsError> {
let key = Self::path_to_key(path);
let is_write = flags.write || flags.create || flags.truncate;
if is_write {
let sz = if flags.truncate || flags.create {
0
} else {
self.head_object(&key).map(|r| r.0).unwrap_or(0)
};
Ok(Box::new(S3VfsFile {
s3vfs_like: S3VfsLike {
bucket: self.bucket.clone(),
credentials: self.credentials.clone(),
},
key,
mode: FileMode::Write,
position: 0,
write_buffer: Vec::new(),
size: sz,
mtime: SystemTime::now(),
}))
} else if flags.read {
let (size, mtime, _) = self.head_object(&key)?;
Ok(Box::new(S3VfsFile {
s3vfs_like: S3VfsLike {
bucket: self.bucket.clone(),
credentials: self.credentials.clone(),
},
key,
mode: FileMode::Read,
position: 0,
write_buffer: Vec::new(),
size,
mtime,
}))
} else {
Err(VfsError::Io("Invalid open flags".to_string()))
}
}
fn stat(&self, path: &Path) -> Result<VfsStat, VfsError> {
let key = Self::path_to_key(path);
let (size, mtime, content_type) = self.head_object(&key)?;
Ok(VfsStat {
size,
mtime,
is_dir: key.ends_with('/') || content_type == "application/directory",
is_symlink: false,
..VfsStat::new()
})
}
fn lstat(&self, path: &Path) -> Result<VfsStat, VfsError> {
self.stat(path)
}
fn create_dir(&self, path: &Path, _mode: u32) -> Result<(), VfsError> {
let key = Self::dir_key(path);
self.put_object(&key, &[])
}
fn create_dir_all(&self, path: &Path, _mode: u32) -> Result<(), VfsError> {
let path_str = path.to_string_lossy();
let components: Vec<&str> = path_str
.strip_prefix('/')
.unwrap_or("")
.split('/')
.filter(|s| !s.is_empty())
.collect();
let mut acc = String::new();
for comp in components {
acc = format!("{}{}/", acc, comp);
let _ = self.put_object(&acc, &[]);
}
if !acc.is_empty() {
self.create_dir(path, _mode)
} else {
Ok(())
}
}
fn remove_dir(&self, path: &Path) -> Result<(), VfsError> {
let key = Self::dir_key(path);
let list = self.list_objects(&key)?;
if !list.contents.is_empty() || !list.common_prefixes.is_empty() {
return Err(VfsError::NotEmpty(path.display().to_string()));
}
self.delete_object(&key)
}
fn remove_file(&self, path: &Path) -> Result<(), VfsError> {
let key = Self::path_to_key(path);
self.delete_object(&key)
}
fn rename(&self, from: &Path, to: &Path) -> Result<(), VfsError> {
let from_key = Self::path_to_key(from);
let to_key = Self::path_to_key(to);
self.copy_object(&from_key, &to_key)?;
self.delete_object(&from_key)
}
fn set_stat(&self, _path: &Path, _stat: &VfsStat) -> Result<(), VfsError> {
Err(VfsError::Unsupported(
"set_stat not supported for S3 backend".to_string(),
))
}
fn read_link(&self, _path: &Path) -> Result<PathBuf, VfsError> {
Err(VfsError::Unsupported(
"read_link not supported for S3 backend".to_string(),
))
}
fn create_symlink(&self, _target: &Path, _link: &Path) -> Result<(), VfsError> {
Err(VfsError::Unsupported(
"create_symlink not supported for S3 backend".to_string(),
))
}
fn real_path(&self, path: &Path) -> Result<PathBuf, VfsError> {
let key = Self::path_to_key(path);
if key.is_empty() {
return Ok(PathBuf::from("/"));
}
match self.head_object(&key) {
Ok(_) => Ok(PathBuf::from("/").join(&key)),
Err(VfsError::NotFound(_)) => {
let dk = Self::dir_key(path);
match self.head_object(&dk) {
Ok(_) => Ok(PathBuf::from("/").join(&key)),
Err(_) => Ok(PathBuf::from("/").join(&key)),
}
}
Err(e) => Err(e),
}
}
fn exists(&self, path: &Path) -> bool {
let key = Self::path_to_key(path);
if key.is_empty() {
return true;
}
self.head_object(&key).is_ok() || self.head_object(&Self::dir_key(path)).is_ok()
}
fn hard_link(&self, original: &Path, link: &Path) -> Result<(), VfsError> {
let from_key = Self::path_to_key(original);
let to_key = Self::path_to_key(link);
self.copy_object(&from_key, &to_key)
}
}
impl VfsFile for S3VfsFile {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, VfsError> {
let to_read = buf.len().min((self.size.saturating_sub(self.position)) as usize);
if to_read == 0 {
return Ok(0);
}
let data = self
.s3vfs_like
.get_object_range(&self.key, self.position, to_read as u64)?;
let n = data.len().min(buf.len());
buf[..n].copy_from_slice(&data[..n]);
self.position += n as u64;
Ok(n)
}
fn write(&mut self, buf: &[u8]) -> Result<usize, VfsError> {
self.write_buffer.extend_from_slice(buf);
self.position += buf.len() as u64;
Ok(buf.len())
}
fn seek(&mut self, pos: std::io::SeekFrom) -> Result<u64, VfsError> {
match pos {
std::io::SeekFrom::Start(offset) => {
self.position = offset;
}
std::io::SeekFrom::End(offset) => {
let sz = match self.mode {
FileMode::Write => self.position,
FileMode::Read => self.size,
};
if offset >= 0 {
self.position = sz.saturating_add(offset as u64);
} else {
let abs = offset.unsigned_abs();
self.position = if abs <= sz { sz - abs } else { 0 };
}
}
std::io::SeekFrom::Current(offset) => {
if offset >= 0 {
self.position = self.position.saturating_add(offset as u64);
} else {
let abs = offset.unsigned_abs();
self.position = if abs <= self.position {
self.position - abs
} else {
0
};
}
}
}
Ok(self.position)
}
fn flush(&mut self) -> Result<(), VfsError> {
if matches!(self.mode, FileMode::Write) && !self.write_buffer.is_empty() {
let buf = std::mem::take(&mut self.write_buffer);
self.s3vfs_like.put_object(&self.key, &buf)?;
}
Ok(())
}
fn stat(&mut self) -> Result<VfsStat, VfsError> {
let sz = match self.mode {
FileMode::Write => self.position,
FileMode::Read => self.size,
};
Ok(VfsStat {
size: sz,
mtime: self.mtime,
is_dir: self.key.ends_with('/'),
is_symlink: false,
..VfsStat::new()
})
}
fn set_len(&mut self, size: u64) -> Result<(), VfsError> {
if matches!(self.mode, FileMode::Write) {
let len = size as usize;
if len < self.write_buffer.len() {
self.write_buffer.truncate(len);
} else {
self.write_buffer.resize(len, 0);
}
self.position = size;
}
Ok(())
}
}
impl Drop for S3VfsFile {
fn drop(&mut self) {
if matches!(self.mode, FileMode::Write) && !self.write_buffer.is_empty() {
let buf = std::mem::take(&mut self.write_buffer);
let _ = self.s3vfs_like.put_object(&self.key, &buf);
}
}
}
impl S3VfsLike {
fn get_object_range(&self, key: &str, offset: u64, size: u64) -> Result<Vec<u8>, VfsError> {
let action = actions::GetObject::new(&self.bucket, Some(&self.credentials), key);
let url = action.sign(Duration::from_secs(3600));
let range = format!("bytes={}-{}", offset, offset + size - 1);
let resp = ureq::get(url.as_str())
.set("Range", &range)
.call()
.map_err(|e| VfsError::Io(format!("S3 GET range failed: {}", e)))?;
let status = resp.status();
if status == 404 {
return Err(VfsError::NotFound(key.to_string()));
}
if status != 206 && status != 200 {
return Err(VfsError::Io(format!("GetObject returned {}", status)));
}
let mut body = Vec::new();
resp.into_reader()
.read_to_end(&mut body)
.map_err(|e| VfsError::Io(format!("Failed to read S3 response: {}", e)))?;
Ok(body)
}
fn put_object(&self, key: &str, body: &[u8]) -> Result<(), VfsError> {
let action = actions::PutObject::new(&self.bucket, Some(&self.credentials), key);
let url = action.sign(Duration::from_secs(3600));
let ct = if key.ends_with('/') {
"application/directory"
} else {
"application/octet-stream"
};
let resp = ureq::put(url.as_str())
.set("Content-Type", ct)
.send_bytes(body)
.map_err(|e| VfsError::Io(format!("S3 PUT failed: {}", e)))?;
if resp.status() != 200 {
return Err(VfsError::Io(format!("PutObject returned {}", resp.status())));
}
Ok(())
}
}
// ---- helper functions ----
fn parse_last_modified(header: Option<&str>) -> SystemTime {
match header {
Some(val) => {
if let Ok(dt) = chrono::DateTime::parse_from_rfc2822(val) {
dt.into()
} else if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(val) {
dt.into()
} else {
SystemTime::UNIX_EPOCH
}
}
None => SystemTime::UNIX_EPOCH,
}
}
fn parse_iso8601(s: &str) -> SystemTime {
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(s) {
dt.into()
} else if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.fZ") {
dt.and_utc().into()
} else {
SystemTime::UNIX_EPOCH
}
}
fn build_long_name(stat: &VfsStat, name: &str) -> String {
let ft = if stat.is_dir { 'd' } else { '-' };
let perms = "rwxr-xr-x";
let sz = stat.size;
let ts = match stat.mtime.duration_since(SystemTime::UNIX_EPOCH) {
Ok(d) => {
let secs = d.as_secs();
match chrono::DateTime::from_timestamp(secs as i64, 0) {
Some(dt) => {
let now = chrono::Utc::now();
if dt.year() == now.year() {
dt.format("%b %e %H:%M").to_string()
} else {
dt.format("%b %e %Y").to_string()
}
}
None => "Jan 1 1970".to_string(),
}
}
Err(_) => "Jan 1 1970".to_string(),
};
format!("{}{} 1 0 0 {} {} {}", ft, perms, sz, ts, name)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_path_to_key() {
assert_eq!(
S3Vfs::path_to_key(Path::new("/foo/bar.txt")),
"foo/bar.txt"
);
assert_eq!(S3Vfs::path_to_key(Path::new("/")), "");
assert_eq!(
S3Vfs::path_to_key(Path::new("relative/path")),
"relative/path"
);
}
#[test]
fn test_dir_key() {
assert_eq!(S3Vfs::dir_key(Path::new("/foo/bar")), "foo/bar/");
assert_eq!(S3Vfs::dir_key(Path::new("/")), "");
assert_eq!(S3Vfs::dir_key(Path::new("/foo/")), "foo/");
}
}