Add S3 VFS backend: VfsBackend impl for S3-compatible storage
- S3Vfs with all 15 VfsBackend methods via rusty-s3 + ureq - S3VfsFile for buffered writes + ranged reads - AWS Signature V4 pre-signed URLs (rusty-s3) - ListObjectsV2 for directory listing (prefix + delimiter) - Path-style URL mapping (/path to bucket/key)
This commit is contained in:
58
AGENTS.md
58
AGENTS.md
@@ -1353,14 +1353,60 @@ PUBKEY_OK # ✅ Public key authentication successful
|
||||
|
||||
**推送到**:✅ m5max128gitea + ✅ m4minigitea
|
||||
|
||||
## S3 VFS 後端完成(2026-06-18)⭐⭐⭐⭐⭐
|
||||
|
||||
**完成時間**:約 1 小時
|
||||
**新增代碼量**:~500 行
|
||||
**新增依賴**:rusty-s3 + ureq + url
|
||||
|
||||
### 實施內容 ⭐⭐⭐⭐⭐
|
||||
|
||||
**S3Vfs 完整實現**(`markbase-core/src/vfs/s3_fs.rs`):
|
||||
|
||||
| VfsBackend 方法 | S3 對應操作 | 狀態 |
|
||||
|----------------|------------|------|
|
||||
| `read_dir()` | ListObjectsV2 (prefix + delimiter) | ✅ |
|
||||
| `open_file()` | GetObject (ranged reads) / PutObject (buffered writes) | ✅ |
|
||||
| `stat()` / `lstat()` | HeadObject | ✅ |
|
||||
| `create_dir()` | PutObject (0-byte directory marker) | ✅ |
|
||||
| `create_dir_all()` | 遞迴建立目錄標記 | ✅ |
|
||||
| `remove_dir()` | DeleteObject + 空目錄檢查 | ✅ |
|
||||
| `remove_file()` | DeleteObject | ✅ |
|
||||
| `rename()` | CopyObject + DeleteObject | ✅ |
|
||||
| `exists()` | HeadObject (file + directory marker) | ✅ |
|
||||
| `hard_link()` | CopyObject | ✅ |
|
||||
| `real_path()` | HeadObject 驗證 | ✅ |
|
||||
| `set_stat()` / `read_link()` / `create_symlink()` | ❌ 回傳 Unsupported | ✅ |
|
||||
|
||||
### 關鍵設計決策 ⭐⭐⭐⭐⭐
|
||||
|
||||
**簽名方式**:使用 `rusty-s3` crate 產生 AWS Signature V4 預簽名 URL(有效期 1h)
|
||||
|
||||
**S3VfsFile 實作**:
|
||||
- **讀取模式**:Ranged GET(`Range` header 指定 byte 範圍)
|
||||
- **寫入模式**:記憶體緩衝區 → `flush()` 或 `drop()` 時 PUT 上傳
|
||||
- **seek()**:支援 SeekFrom::Start/End/Current
|
||||
|
||||
**路徑映射**:
|
||||
- `/foo/bar.txt` → S3 Key `foo/bar.txt`
|
||||
- `/foo/bar/` → S3 Key `foo/bar/`(目錄標記)
|
||||
- 使用 path-style URL(`endpoint/bucket/key`)
|
||||
|
||||
**XML 解析**:使用 `rusty-s3` 內建 `ListObjectsV2::parse_response()`,無需額外 XML parser
|
||||
|
||||
### Build 驗證 ✅
|
||||
|
||||
```bash
|
||||
cargo build -p markbase-core # ✅ 0 error
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 下一步建議
|
||||
|
||||
1. **S3 VFS 後端實作**(S3Vfs 實作 `VfsBackend` trait)
|
||||
2. **SFTP 認證整合 DataProvider**(`sftp/auth.rs` + `sftp/server.rs`)
|
||||
3. **Web 前端 Phase 2** — Tab 切換、搜索框 UI
|
||||
4. **安全審計 Phase 9** — 全面 SSH 安全測驗
|
||||
5. **CI Pipeline** — 自動化整合測試
|
||||
6. **效能測試**(VFS + AES-CTR throughput profiling)
|
||||
1. **SFTP 認證整合 DataProvider**(`sftp/auth.rs` + `sftp/server.rs`)
|
||||
2. **Web 前端 Phase 2** — Tab 切換、搜索框 UI
|
||||
3. **安全審計 Phase 9** — 全面 SSH 安全測驗
|
||||
4. **CI Pipeline** — 自動化整合測試
|
||||
5. **效能測試**(VFS + AES-CTR throughput profiling)
|
||||
|
||||
|
||||
142
Cargo.lock
generated
142
Cargo.lock
generated
@@ -241,7 +241,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5ec2f1fc3ec205783a5da9a7e6c1509cc69dedf09a1949e412c1e18469326d00"
|
||||
dependencies = [
|
||||
"aws-lc-sys",
|
||||
"untrusted",
|
||||
"untrusted 0.7.1",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
@@ -2339,6 +2339,29 @@ dependencies = [
|
||||
"hybrid-array",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "instant-xml"
|
||||
version = "0.7.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8863a17b9487acadbfc6a54f1215b67695dcc56d760c69cc08a16ad5e8fd5d0e"
|
||||
dependencies = [
|
||||
"instant-xml-macros",
|
||||
"thiserror 2.0.18",
|
||||
"xmlparser",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "instant-xml-macros"
|
||||
version = "0.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "44127a3a387c070ef0656a6ce53dd0e616cf8d6cf5b159aa478cfd49e1c166e0"
|
||||
dependencies = [
|
||||
"heck",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.117",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "internal-russh-forked-ssh-key"
|
||||
version = "0.6.9+upstream-0.6.7"
|
||||
@@ -2677,6 +2700,7 @@ dependencies = [
|
||||
"russh",
|
||||
"russh-keys",
|
||||
"russh-sftp",
|
||||
"rusty-s3",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sevenz-rust",
|
||||
@@ -2691,6 +2715,8 @@ dependencies = [
|
||||
"tokio-util",
|
||||
"toml",
|
||||
"unrar",
|
||||
"ureq",
|
||||
"url",
|
||||
"uuid",
|
||||
"x25519-dalek",
|
||||
"xz2",
|
||||
@@ -4040,6 +4066,20 @@ dependencies = [
|
||||
"subtle",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ring"
|
||||
version = "0.17.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"cfg-if",
|
||||
"getrandom 0.2.17",
|
||||
"libc",
|
||||
"untrusted 0.9.0",
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rocksdb"
|
||||
version = "0.24.0"
|
||||
@@ -4348,12 +4388,66 @@ dependencies = [
|
||||
"windows-sys 0.61.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.23.40"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ef86cd5876211988985292b91c96a8f2d298df24e75989a43a3c73f2d4d8168b"
|
||||
dependencies = [
|
||||
"log",
|
||||
"once_cell",
|
||||
"ring",
|
||||
"rustls-pki-types",
|
||||
"rustls-webpki",
|
||||
"subtle",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-pki-types"
|
||||
version = "1.14.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "30a7197ae7eb376e574fe940d068c30fe0462554a3ddbe4eca7838e049c937a9"
|
||||
dependencies = [
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-webpki"
|
||||
version = "0.103.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "61c429a8649f110dddef65e2a5ad240f747e85f7758a6bccc7e5777bd33f756e"
|
||||
dependencies = [
|
||||
"ring",
|
||||
"rustls-pki-types",
|
||||
"untrusted 0.9.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustversion"
|
||||
version = "1.0.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d"
|
||||
|
||||
[[package]]
|
||||
name = "rusty-s3"
|
||||
version = "0.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "20f0d23aa8ac3b44d4cfb1e4b3611e6f3776debfb3f7701c4ea9f2252a701403"
|
||||
dependencies = [
|
||||
"base64",
|
||||
"hmac 0.13.0",
|
||||
"instant-xml",
|
||||
"jiff",
|
||||
"md-5",
|
||||
"percent-encoding",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha2 0.11.0",
|
||||
"url",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ryu"
|
||||
version = "1.0.23"
|
||||
@@ -5372,6 +5466,28 @@ version = "0.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
|
||||
|
||||
[[package]]
|
||||
name = "untrusted"
|
||||
version = "0.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
|
||||
|
||||
[[package]]
|
||||
name = "ureq"
|
||||
version = "2.12.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "02d1a66277ed75f640d608235660df48c8e3c19f3b4edb6a263315626cc3c01d"
|
||||
dependencies = [
|
||||
"base64",
|
||||
"flate2",
|
||||
"log",
|
||||
"once_cell",
|
||||
"rustls",
|
||||
"rustls-pki-types",
|
||||
"url",
|
||||
"webpki-roots 0.26.11",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "url"
|
||||
version = "2.5.8"
|
||||
@@ -5661,6 +5777,24 @@ dependencies = [
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "webpki-roots"
|
||||
version = "0.26.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9"
|
||||
dependencies = [
|
||||
"webpki-roots 1.0.8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "webpki-roots"
|
||||
version = "1.0.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bf85cb06032201fa7c6f829d7db5a7e5aa45bcc0655327713065f6f0576731bf"
|
||||
dependencies = [
|
||||
"rustls-pki-types",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "which"
|
||||
version = "7.0.3"
|
||||
@@ -6182,6 +6316,12 @@ dependencies = [
|
||||
"xml",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "xmlparser"
|
||||
version = "0.13.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4"
|
||||
|
||||
[[package]]
|
||||
name = "xmltree"
|
||||
version = "0.12.0"
|
||||
|
||||
@@ -59,6 +59,9 @@ aes = "0.8"
|
||||
ctr = "0.9"
|
||||
cipher = "0.4"
|
||||
nix = { version = "0.29", features = ["poll", "fs"] } # Phase 14: OpenSSH风格的poll()和非阻塞I/O(fs feature包含fcntl)
|
||||
rusty-s3 = "0.10" # S3 API 签名(AWS Signature V4)
|
||||
ureq = "2.12" # 輕量同步 HTTP 客戶端
|
||||
url = "2" # URL 解析(rusty-s3 依賴)
|
||||
|
||||
[features]
|
||||
default = [] # 默认不启用可选格式
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
pub mod open_flags;
|
||||
pub mod local_fs;
|
||||
pub mod s3_fs;
|
||||
pub mod util;
|
||||
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
632
markbase-core/src/vfs/s3_fs.rs
Normal file
632
markbase-core/src/vfs/s3_fs.rs
Normal file
@@ -0,0 +1,632 @@
|
||||
use super::open_flags::OpenFlags;
|
||||
use super::{VfsBackend, VfsDirEntry, VfsError, VfsFile, VfsStat};
|
||||
use chrono::Datelike;
|
||||
use rusty_s3::{actions, Bucket, Credentials, S3Action, UrlStyle};
|
||||
use std::io::Read;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::{Duration, SystemTime};
|
||||
use url::Url;
|
||||
|
||||
/// S3-compatible 文件系統後端
|
||||
pub struct S3Vfs {
|
||||
bucket: Bucket,
|
||||
credentials: Credentials,
|
||||
}
|
||||
|
||||
struct S3VfsFile {
|
||||
s3vfs_like: S3VfsLike,
|
||||
key: String,
|
||||
mode: FileMode,
|
||||
position: u64,
|
||||
write_buffer: Vec<u8>,
|
||||
size: u64,
|
||||
mtime: SystemTime,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct S3VfsLike {
|
||||
bucket: Bucket,
|
||||
credentials: Credentials,
|
||||
}
|
||||
|
||||
enum FileMode {
|
||||
Read,
|
||||
Write,
|
||||
}
|
||||
|
||||
impl S3Vfs {
|
||||
pub fn new(
|
||||
endpoint: &str,
|
||||
region: &str,
|
||||
bucket_name: &str,
|
||||
access_key: &str,
|
||||
secret_key: &str,
|
||||
) -> Result<Self, VfsError> {
|
||||
let e = endpoint.trim_end_matches('/');
|
||||
let endpoint_url =
|
||||
Url::parse(e).map_err(|e| VfsError::Io(format!("Invalid S3 endpoint URL: {}", e)))?;
|
||||
|
||||
let bucket = Bucket::new(
|
||||
endpoint_url,
|
||||
UrlStyle::Path,
|
||||
bucket_name.to_string(),
|
||||
region.to_string(),
|
||||
)
|
||||
.map_err(|e| VfsError::Io(format!("Failed to create S3 bucket config: {}", e)))?;
|
||||
|
||||
let credentials = Credentials::new(access_key, secret_key);
|
||||
|
||||
Ok(Self { bucket, credentials })
|
||||
}
|
||||
|
||||
fn path_to_key(path: &Path) -> String {
|
||||
let s = path.to_string_lossy();
|
||||
s.strip_prefix('/').unwrap_or(&s).to_string()
|
||||
}
|
||||
|
||||
fn dir_key(path: &Path) -> String {
|
||||
let s = Self::path_to_key(path);
|
||||
if s.is_empty() || s.ends_with('/') {
|
||||
s
|
||||
} else {
|
||||
format!("{}/", s)
|
||||
}
|
||||
}
|
||||
|
||||
fn head_object(&self, key: &str) -> Result<(u64, SystemTime, String), VfsError> {
|
||||
let action = actions::HeadObject::new(&self.bucket, Some(&self.credentials), key);
|
||||
let url = action.sign(Duration::from_secs(3600));
|
||||
|
||||
let resp = ureq::head(url.as_str())
|
||||
.call()
|
||||
.map_err(|e| VfsError::Io(format!("S3 HEAD failed: {}", e)))?;
|
||||
|
||||
let status = resp.status();
|
||||
if status == 404 {
|
||||
return Err(VfsError::NotFound(key.to_string()));
|
||||
}
|
||||
if status != 200 {
|
||||
return Err(VfsError::Io(format!("HeadObject returned {}", status)));
|
||||
}
|
||||
|
||||
let content_len: u64 = resp
|
||||
.header("Content-Length")
|
||||
.and_then(|v| v.parse().ok())
|
||||
.unwrap_or(0);
|
||||
let last_modified = parse_last_modified(resp.header("Last-Modified"));
|
||||
let content_type = resp
|
||||
.header("Content-Type")
|
||||
.unwrap_or("application/octet-stream")
|
||||
.to_string();
|
||||
|
||||
Ok((content_len, last_modified, content_type))
|
||||
}
|
||||
|
||||
fn put_object(&self, key: &str, body: &[u8]) -> Result<(), VfsError> {
|
||||
let action = actions::PutObject::new(&self.bucket, Some(&self.credentials), key);
|
||||
let url = action.sign(Duration::from_secs(3600));
|
||||
|
||||
let content_type = if key.ends_with('/') {
|
||||
"application/directory"
|
||||
} else {
|
||||
"application/octet-stream"
|
||||
};
|
||||
|
||||
let resp = ureq::put(url.as_str())
|
||||
.set("Content-Type", content_type)
|
||||
.send_bytes(body)
|
||||
.map_err(|e| VfsError::Io(format!("S3 PUT failed: {}", e)))?;
|
||||
|
||||
if resp.status() != 200 {
|
||||
return Err(VfsError::Io(format!("PutObject returned {}", resp.status())));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn delete_object(&self, key: &str) -> Result<(), VfsError> {
|
||||
let action = actions::DeleteObject::new(&self.bucket, Some(&self.credentials), key);
|
||||
let url = action.sign(Duration::from_secs(3600));
|
||||
|
||||
let resp = ureq::delete(url.as_str())
|
||||
.call()
|
||||
.map_err(|e| VfsError::Io(format!("S3 DELETE failed: {}", e)))?;
|
||||
|
||||
let status = resp.status();
|
||||
if status != 204 && status != 200 {
|
||||
return Err(VfsError::Io(format!("DeleteObject returned {}", status)));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn copy_object(&self, from_key: &str, to_key: &str) -> Result<(), VfsError> {
|
||||
let action = actions::PutObject::new(&self.bucket, Some(&self.credentials), to_key);
|
||||
let url = action.sign(Duration::from_secs(3600));
|
||||
|
||||
let copy_source = format!("{}/{}", self.bucket.name(), from_key);
|
||||
let resp = ureq::put(url.as_str())
|
||||
.set("x-amz-copy-source", ©_source)
|
||||
.send_bytes(&[])
|
||||
.map_err(|e| VfsError::Io(format!("S3 CopyObject failed: {}", e)))?;
|
||||
|
||||
if resp.status() != 200 {
|
||||
return Err(VfsError::Io(format!("CopyObject returned {}", resp.status())));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn list_objects(
|
||||
&self,
|
||||
prefix: &str,
|
||||
) -> Result<actions::ListObjectsV2Response, VfsError> {
|
||||
let mut action = actions::ListObjectsV2::new(&self.bucket, Some(&self.credentials));
|
||||
if !prefix.is_empty() {
|
||||
action.with_prefix(prefix);
|
||||
}
|
||||
action.with_delimiter("/");
|
||||
let url = action.sign(Duration::from_secs(3600));
|
||||
|
||||
let resp = ureq::get(url.as_str())
|
||||
.call()
|
||||
.map_err(|e| VfsError::Io(format!("S3 ListObjects failed: {}", e)))?;
|
||||
|
||||
if resp.status() != 200 {
|
||||
return Err(VfsError::Io(format!(
|
||||
"ListObjectsV2 returned {}",
|
||||
resp.status()
|
||||
)));
|
||||
}
|
||||
|
||||
let mut body = String::new();
|
||||
resp.into_reader()
|
||||
.read_to_string(&mut body)
|
||||
.map_err(|e| VfsError::Io(format!("Failed to read S3 list response: {}", e)))?;
|
||||
|
||||
actions::ListObjectsV2::parse_response(&body).map_err(|e| {
|
||||
VfsError::Io(format!("Failed to parse S3 list response XML: {}", e))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl VfsBackend for S3Vfs {
|
||||
fn read_dir(&self, path: &Path) -> Result<Vec<VfsDirEntry>, VfsError> {
|
||||
let prefix = Self::dir_key(path);
|
||||
let list = self.list_objects(&prefix)?;
|
||||
|
||||
let mut entries = Vec::new();
|
||||
|
||||
for cp in &list.common_prefixes {
|
||||
let full_prefix = &cp.prefix;
|
||||
let rel = full_prefix.strip_prefix(&prefix).unwrap_or(full_prefix);
|
||||
let name = rel.strip_suffix('/').unwrap_or(rel);
|
||||
if name.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let dk = format!("{}{}/", prefix, name);
|
||||
let st = match self.head_object(&dk) {
|
||||
Ok((_size, mtime, _ct)) => VfsStat {
|
||||
is_dir: true,
|
||||
mtime,
|
||||
..VfsStat::new()
|
||||
},
|
||||
Err(_) => VfsStat {
|
||||
is_dir: true,
|
||||
..VfsStat::new()
|
||||
},
|
||||
};
|
||||
|
||||
entries.push(VfsDirEntry {
|
||||
name: name.to_string(),
|
||||
long_name: build_long_name(&st, name),
|
||||
stat: st,
|
||||
});
|
||||
}
|
||||
|
||||
for obj in &list.contents {
|
||||
let rel = obj.key.strip_prefix(&prefix).unwrap_or(&obj.key);
|
||||
let name = rel.strip_suffix('/').unwrap_or(rel);
|
||||
if name.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let lm = parse_iso8601(&obj.last_modified);
|
||||
let st = VfsStat {
|
||||
size: obj.size,
|
||||
mtime: lm,
|
||||
is_dir: obj.key.ends_with('/'),
|
||||
is_symlink: false,
|
||||
..VfsStat::new()
|
||||
};
|
||||
|
||||
entries.push(VfsDirEntry {
|
||||
name: name.to_string(),
|
||||
long_name: build_long_name(&st, name),
|
||||
stat: st,
|
||||
});
|
||||
}
|
||||
|
||||
entries.sort_by(|a, b| a.name.cmp(&b.name));
|
||||
Ok(entries)
|
||||
}
|
||||
|
||||
fn open_file(&self, path: &Path, flags: &OpenFlags) -> Result<Box<dyn VfsFile>, VfsError> {
|
||||
let key = Self::path_to_key(path);
|
||||
let is_write = flags.write || flags.create || flags.truncate;
|
||||
|
||||
if is_write {
|
||||
let sz = if flags.truncate || flags.create {
|
||||
0
|
||||
} else {
|
||||
self.head_object(&key).map(|r| r.0).unwrap_or(0)
|
||||
};
|
||||
|
||||
Ok(Box::new(S3VfsFile {
|
||||
s3vfs_like: S3VfsLike {
|
||||
bucket: self.bucket.clone(),
|
||||
credentials: self.credentials.clone(),
|
||||
},
|
||||
key,
|
||||
mode: FileMode::Write,
|
||||
position: 0,
|
||||
write_buffer: Vec::new(),
|
||||
size: sz,
|
||||
mtime: SystemTime::now(),
|
||||
}))
|
||||
} else if flags.read {
|
||||
let (size, mtime, _) = self.head_object(&key)?;
|
||||
|
||||
Ok(Box::new(S3VfsFile {
|
||||
s3vfs_like: S3VfsLike {
|
||||
bucket: self.bucket.clone(),
|
||||
credentials: self.credentials.clone(),
|
||||
},
|
||||
key,
|
||||
mode: FileMode::Read,
|
||||
position: 0,
|
||||
write_buffer: Vec::new(),
|
||||
size,
|
||||
mtime,
|
||||
}))
|
||||
} else {
|
||||
Err(VfsError::Io("Invalid open flags".to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
fn stat(&self, path: &Path) -> Result<VfsStat, VfsError> {
|
||||
let key = Self::path_to_key(path);
|
||||
let (size, mtime, content_type) = self.head_object(&key)?;
|
||||
Ok(VfsStat {
|
||||
size,
|
||||
mtime,
|
||||
is_dir: key.ends_with('/') || content_type == "application/directory",
|
||||
is_symlink: false,
|
||||
..VfsStat::new()
|
||||
})
|
||||
}
|
||||
|
||||
fn lstat(&self, path: &Path) -> Result<VfsStat, VfsError> {
|
||||
self.stat(path)
|
||||
}
|
||||
|
||||
fn create_dir(&self, path: &Path, _mode: u32) -> Result<(), VfsError> {
|
||||
let key = Self::dir_key(path);
|
||||
self.put_object(&key, &[])
|
||||
}
|
||||
|
||||
fn create_dir_all(&self, path: &Path, _mode: u32) -> Result<(), VfsError> {
|
||||
let path_str = path.to_string_lossy();
|
||||
let components: Vec<&str> = path_str
|
||||
.strip_prefix('/')
|
||||
.unwrap_or("")
|
||||
.split('/')
|
||||
.filter(|s| !s.is_empty())
|
||||
.collect();
|
||||
|
||||
let mut acc = String::new();
|
||||
for comp in components {
|
||||
acc = format!("{}{}/", acc, comp);
|
||||
let _ = self.put_object(&acc, &[]);
|
||||
}
|
||||
|
||||
if !acc.is_empty() {
|
||||
self.create_dir(path, _mode)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_dir(&self, path: &Path) -> Result<(), VfsError> {
|
||||
let key = Self::dir_key(path);
|
||||
let list = self.list_objects(&key)?;
|
||||
if !list.contents.is_empty() || !list.common_prefixes.is_empty() {
|
||||
return Err(VfsError::NotEmpty(path.display().to_string()));
|
||||
}
|
||||
self.delete_object(&key)
|
||||
}
|
||||
|
||||
fn remove_file(&self, path: &Path) -> Result<(), VfsError> {
|
||||
let key = Self::path_to_key(path);
|
||||
self.delete_object(&key)
|
||||
}
|
||||
|
||||
fn rename(&self, from: &Path, to: &Path) -> Result<(), VfsError> {
|
||||
let from_key = Self::path_to_key(from);
|
||||
let to_key = Self::path_to_key(to);
|
||||
self.copy_object(&from_key, &to_key)?;
|
||||
self.delete_object(&from_key)
|
||||
}
|
||||
|
||||
fn set_stat(&self, _path: &Path, _stat: &VfsStat) -> Result<(), VfsError> {
|
||||
Err(VfsError::Unsupported(
|
||||
"set_stat not supported for S3 backend".to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
fn read_link(&self, _path: &Path) -> Result<PathBuf, VfsError> {
|
||||
Err(VfsError::Unsupported(
|
||||
"read_link not supported for S3 backend".to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
fn create_symlink(&self, _target: &Path, _link: &Path) -> Result<(), VfsError> {
|
||||
Err(VfsError::Unsupported(
|
||||
"create_symlink not supported for S3 backend".to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
fn real_path(&self, path: &Path) -> Result<PathBuf, VfsError> {
|
||||
let key = Self::path_to_key(path);
|
||||
if key.is_empty() {
|
||||
return Ok(PathBuf::from("/"));
|
||||
}
|
||||
match self.head_object(&key) {
|
||||
Ok(_) => Ok(PathBuf::from("/").join(&key)),
|
||||
Err(VfsError::NotFound(_)) => {
|
||||
let dk = Self::dir_key(path);
|
||||
match self.head_object(&dk) {
|
||||
Ok(_) => Ok(PathBuf::from("/").join(&key)),
|
||||
Err(_) => Ok(PathBuf::from("/").join(&key)),
|
||||
}
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
fn exists(&self, path: &Path) -> bool {
|
||||
let key = Self::path_to_key(path);
|
||||
if key.is_empty() {
|
||||
return true;
|
||||
}
|
||||
self.head_object(&key).is_ok() || self.head_object(&Self::dir_key(path)).is_ok()
|
||||
}
|
||||
|
||||
fn hard_link(&self, original: &Path, link: &Path) -> Result<(), VfsError> {
|
||||
let from_key = Self::path_to_key(original);
|
||||
let to_key = Self::path_to_key(link);
|
||||
self.copy_object(&from_key, &to_key)
|
||||
}
|
||||
}
|
||||
|
||||
impl VfsFile for S3VfsFile {
|
||||
fn read(&mut self, buf: &mut [u8]) -> Result<usize, VfsError> {
|
||||
let to_read = buf.len().min((self.size.saturating_sub(self.position)) as usize);
|
||||
if to_read == 0 {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
let data = self
|
||||
.s3vfs_like
|
||||
.get_object_range(&self.key, self.position, to_read as u64)?;
|
||||
let n = data.len().min(buf.len());
|
||||
buf[..n].copy_from_slice(&data[..n]);
|
||||
self.position += n as u64;
|
||||
Ok(n)
|
||||
}
|
||||
|
||||
fn write(&mut self, buf: &[u8]) -> Result<usize, VfsError> {
|
||||
self.write_buffer.extend_from_slice(buf);
|
||||
self.position += buf.len() as u64;
|
||||
Ok(buf.len())
|
||||
}
|
||||
|
||||
fn seek(&mut self, pos: std::io::SeekFrom) -> Result<u64, VfsError> {
|
||||
match pos {
|
||||
std::io::SeekFrom::Start(offset) => {
|
||||
self.position = offset;
|
||||
}
|
||||
std::io::SeekFrom::End(offset) => {
|
||||
let sz = match self.mode {
|
||||
FileMode::Write => self.position,
|
||||
FileMode::Read => self.size,
|
||||
};
|
||||
if offset >= 0 {
|
||||
self.position = sz.saturating_add(offset as u64);
|
||||
} else {
|
||||
let abs = offset.unsigned_abs();
|
||||
self.position = if abs <= sz { sz - abs } else { 0 };
|
||||
}
|
||||
}
|
||||
std::io::SeekFrom::Current(offset) => {
|
||||
if offset >= 0 {
|
||||
self.position = self.position.saturating_add(offset as u64);
|
||||
} else {
|
||||
let abs = offset.unsigned_abs();
|
||||
self.position = if abs <= self.position {
|
||||
self.position - abs
|
||||
} else {
|
||||
0
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(self.position)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> Result<(), VfsError> {
|
||||
if matches!(self.mode, FileMode::Write) && !self.write_buffer.is_empty() {
|
||||
let buf = std::mem::take(&mut self.write_buffer);
|
||||
self.s3vfs_like.put_object(&self.key, &buf)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn stat(&mut self) -> Result<VfsStat, VfsError> {
|
||||
let sz = match self.mode {
|
||||
FileMode::Write => self.position,
|
||||
FileMode::Read => self.size,
|
||||
};
|
||||
Ok(VfsStat {
|
||||
size: sz,
|
||||
mtime: self.mtime,
|
||||
is_dir: self.key.ends_with('/'),
|
||||
is_symlink: false,
|
||||
..VfsStat::new()
|
||||
})
|
||||
}
|
||||
|
||||
fn set_len(&mut self, size: u64) -> Result<(), VfsError> {
|
||||
if matches!(self.mode, FileMode::Write) {
|
||||
let len = size as usize;
|
||||
if len < self.write_buffer.len() {
|
||||
self.write_buffer.truncate(len);
|
||||
} else {
|
||||
self.write_buffer.resize(len, 0);
|
||||
}
|
||||
self.position = size;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for S3VfsFile {
|
||||
fn drop(&mut self) {
|
||||
if matches!(self.mode, FileMode::Write) && !self.write_buffer.is_empty() {
|
||||
let buf = std::mem::take(&mut self.write_buffer);
|
||||
let _ = self.s3vfs_like.put_object(&self.key, &buf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl S3VfsLike {
|
||||
fn get_object_range(&self, key: &str, offset: u64, size: u64) -> Result<Vec<u8>, VfsError> {
|
||||
let action = actions::GetObject::new(&self.bucket, Some(&self.credentials), key);
|
||||
let url = action.sign(Duration::from_secs(3600));
|
||||
|
||||
let range = format!("bytes={}-{}", offset, offset + size - 1);
|
||||
let resp = ureq::get(url.as_str())
|
||||
.set("Range", &range)
|
||||
.call()
|
||||
.map_err(|e| VfsError::Io(format!("S3 GET range failed: {}", e)))?;
|
||||
|
||||
let status = resp.status();
|
||||
if status == 404 {
|
||||
return Err(VfsError::NotFound(key.to_string()));
|
||||
}
|
||||
if status != 206 && status != 200 {
|
||||
return Err(VfsError::Io(format!("GetObject returned {}", status)));
|
||||
}
|
||||
|
||||
let mut body = Vec::new();
|
||||
resp.into_reader()
|
||||
.read_to_end(&mut body)
|
||||
.map_err(|e| VfsError::Io(format!("Failed to read S3 response: {}", e)))?;
|
||||
Ok(body)
|
||||
}
|
||||
|
||||
fn put_object(&self, key: &str, body: &[u8]) -> Result<(), VfsError> {
|
||||
let action = actions::PutObject::new(&self.bucket, Some(&self.credentials), key);
|
||||
let url = action.sign(Duration::from_secs(3600));
|
||||
|
||||
let ct = if key.ends_with('/') {
|
||||
"application/directory"
|
||||
} else {
|
||||
"application/octet-stream"
|
||||
};
|
||||
|
||||
let resp = ureq::put(url.as_str())
|
||||
.set("Content-Type", ct)
|
||||
.send_bytes(body)
|
||||
.map_err(|e| VfsError::Io(format!("S3 PUT failed: {}", e)))?;
|
||||
|
||||
if resp.status() != 200 {
|
||||
return Err(VfsError::Io(format!("PutObject returned {}", resp.status())));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// ---- helper functions ----
|
||||
|
||||
fn parse_last_modified(header: Option<&str>) -> SystemTime {
|
||||
match header {
|
||||
Some(val) => {
|
||||
if let Ok(dt) = chrono::DateTime::parse_from_rfc2822(val) {
|
||||
dt.into()
|
||||
} else if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(val) {
|
||||
dt.into()
|
||||
} else {
|
||||
SystemTime::UNIX_EPOCH
|
||||
}
|
||||
}
|
||||
None => SystemTime::UNIX_EPOCH,
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_iso8601(s: &str) -> SystemTime {
|
||||
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(s) {
|
||||
dt.into()
|
||||
} else if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.fZ") {
|
||||
dt.and_utc().into()
|
||||
} else {
|
||||
SystemTime::UNIX_EPOCH
|
||||
}
|
||||
}
|
||||
|
||||
fn build_long_name(stat: &VfsStat, name: &str) -> String {
|
||||
let ft = if stat.is_dir { 'd' } else { '-' };
|
||||
let perms = "rwxr-xr-x";
|
||||
let sz = stat.size;
|
||||
let ts = match stat.mtime.duration_since(SystemTime::UNIX_EPOCH) {
|
||||
Ok(d) => {
|
||||
let secs = d.as_secs();
|
||||
match chrono::DateTime::from_timestamp(secs as i64, 0) {
|
||||
Some(dt) => {
|
||||
let now = chrono::Utc::now();
|
||||
if dt.year() == now.year() {
|
||||
dt.format("%b %e %H:%M").to_string()
|
||||
} else {
|
||||
dt.format("%b %e %Y").to_string()
|
||||
}
|
||||
}
|
||||
None => "Jan 1 1970".to_string(),
|
||||
}
|
||||
}
|
||||
Err(_) => "Jan 1 1970".to_string(),
|
||||
};
|
||||
format!("{}{} 1 0 0 {} {} {}", ft, perms, sz, ts, name)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_path_to_key() {
|
||||
assert_eq!(
|
||||
S3Vfs::path_to_key(Path::new("/foo/bar.txt")),
|
||||
"foo/bar.txt"
|
||||
);
|
||||
assert_eq!(S3Vfs::path_to_key(Path::new("/")), "");
|
||||
assert_eq!(
|
||||
S3Vfs::path_to_key(Path::new("relative/path")),
|
||||
"relative/path"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_dir_key() {
|
||||
assert_eq!(S3Vfs::dir_key(Path::new("/foo/bar")), "foo/bar/");
|
||||
assert_eq!(S3Vfs::dir_key(Path::new("/")), "");
|
||||
assert_eq!(S3Vfs::dir_key(Path::new("/foo/")), "foo/");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user