- Fix trailing whitespace in kex.rs and s3.rs - Add missing KexProposal import in kex_complete.rs - Auto-fix clippy warnings across all crates - All 153 tests pass
636 lines
20 KiB
Rust
636 lines
20 KiB
Rust
use super::open_flags::OpenFlags;
|
|
use super::{VfsBackend, VfsDirEntry, VfsError, VfsFile, VfsStat};
|
|
use chrono::Datelike;
|
|
use rusty_s3::{actions, Bucket, Credentials, S3Action, UrlStyle};
|
|
use std::io::Read;
|
|
use std::path::{Path, PathBuf};
|
|
use std::time::{Duration, SystemTime};
|
|
use url::Url;
|
|
|
|
/// S3-compatible 文件系統後端
|
|
pub struct S3Vfs {
|
|
bucket: Bucket,
|
|
credentials: Credentials,
|
|
}
|
|
|
|
struct S3VfsFile {
|
|
s3vfs_like: S3VfsLike,
|
|
key: String,
|
|
mode: FileMode,
|
|
position: u64,
|
|
write_buffer: Vec<u8>,
|
|
size: u64,
|
|
mtime: SystemTime,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
struct S3VfsLike {
|
|
bucket: Bucket,
|
|
credentials: Credentials,
|
|
}
|
|
|
|
enum FileMode {
|
|
Read,
|
|
Write,
|
|
}
|
|
|
|
impl S3Vfs {
|
|
pub fn new(
|
|
endpoint: &str,
|
|
region: &str,
|
|
bucket_name: &str,
|
|
access_key: &str,
|
|
secret_key: &str,
|
|
) -> Result<Self, VfsError> {
|
|
let e = endpoint.trim_end_matches('/');
|
|
let endpoint_url =
|
|
Url::parse(e).map_err(|e| VfsError::Io(format!("Invalid S3 endpoint URL: {}", e)))?;
|
|
|
|
let bucket = Bucket::new(
|
|
endpoint_url,
|
|
UrlStyle::Path,
|
|
bucket_name.to_string(),
|
|
region.to_string(),
|
|
)
|
|
.map_err(|e| VfsError::Io(format!("Failed to create S3 bucket config: {}", e)))?;
|
|
|
|
let credentials = Credentials::new(access_key, secret_key);
|
|
|
|
Ok(Self {
|
|
bucket,
|
|
credentials,
|
|
})
|
|
}
|
|
|
|
fn path_to_key(path: &Path) -> String {
|
|
let s = path.to_string_lossy();
|
|
s.strip_prefix('/').unwrap_or(&s).to_string()
|
|
}
|
|
|
|
fn dir_key(path: &Path) -> String {
|
|
let s = Self::path_to_key(path);
|
|
if s.is_empty() || s.ends_with('/') {
|
|
s
|
|
} else {
|
|
format!("{}/", s)
|
|
}
|
|
}
|
|
|
|
fn head_object(&self, key: &str) -> Result<(u64, SystemTime, String), VfsError> {
|
|
let action = actions::HeadObject::new(&self.bucket, Some(&self.credentials), key);
|
|
let url = action.sign(Duration::from_secs(3600));
|
|
|
|
let resp = ureq::head(url.as_str())
|
|
.call()
|
|
.map_err(|e| VfsError::Io(format!("S3 HEAD failed: {}", e)))?;
|
|
|
|
let status = resp.status();
|
|
if status == 404 {
|
|
return Err(VfsError::NotFound(key.to_string()));
|
|
}
|
|
if status != 200 {
|
|
return Err(VfsError::Io(format!("HeadObject returned {}", status)));
|
|
}
|
|
|
|
let content_len: u64 = resp
|
|
.header("Content-Length")
|
|
.and_then(|v| v.parse().ok())
|
|
.unwrap_or(0);
|
|
let last_modified = parse_last_modified(resp.header("Last-Modified"));
|
|
let content_type = resp
|
|
.header("Content-Type")
|
|
.unwrap_or("application/octet-stream")
|
|
.to_string();
|
|
|
|
Ok((content_len, last_modified, content_type))
|
|
}
|
|
|
|
fn put_object(&self, key: &str, body: &[u8]) -> Result<(), VfsError> {
|
|
let action = actions::PutObject::new(&self.bucket, Some(&self.credentials), key);
|
|
let url = action.sign(Duration::from_secs(3600));
|
|
|
|
let content_type = if key.ends_with('/') {
|
|
"application/directory"
|
|
} else {
|
|
"application/octet-stream"
|
|
};
|
|
|
|
let resp = ureq::put(url.as_str())
|
|
.set("Content-Type", content_type)
|
|
.send_bytes(body)
|
|
.map_err(|e| VfsError::Io(format!("S3 PUT failed: {}", e)))?;
|
|
|
|
if resp.status() != 200 {
|
|
return Err(VfsError::Io(format!(
|
|
"PutObject returned {}",
|
|
resp.status()
|
|
)));
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn delete_object(&self, key: &str) -> Result<(), VfsError> {
|
|
let action = actions::DeleteObject::new(&self.bucket, Some(&self.credentials), key);
|
|
let url = action.sign(Duration::from_secs(3600));
|
|
|
|
let resp = ureq::delete(url.as_str())
|
|
.call()
|
|
.map_err(|e| VfsError::Io(format!("S3 DELETE failed: {}", e)))?;
|
|
|
|
let status = resp.status();
|
|
if status != 204 && status != 200 {
|
|
return Err(VfsError::Io(format!("DeleteObject returned {}", status)));
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn copy_object(&self, from_key: &str, to_key: &str) -> Result<(), VfsError> {
|
|
let action = actions::PutObject::new(&self.bucket, Some(&self.credentials), to_key);
|
|
let url = action.sign(Duration::from_secs(3600));
|
|
|
|
let copy_source = format!("{}/{}", self.bucket.name(), from_key);
|
|
let resp = ureq::put(url.as_str())
|
|
.set("x-amz-copy-source", ©_source)
|
|
.send_bytes(&[])
|
|
.map_err(|e| VfsError::Io(format!("S3 CopyObject failed: {}", e)))?;
|
|
|
|
if resp.status() != 200 {
|
|
return Err(VfsError::Io(format!(
|
|
"CopyObject returned {}",
|
|
resp.status()
|
|
)));
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn list_objects(&self, prefix: &str) -> Result<actions::ListObjectsV2Response, VfsError> {
|
|
let mut action = actions::ListObjectsV2::new(&self.bucket, Some(&self.credentials));
|
|
if !prefix.is_empty() {
|
|
action.with_prefix(prefix);
|
|
}
|
|
action.with_delimiter("/");
|
|
let url = action.sign(Duration::from_secs(3600));
|
|
|
|
let resp = ureq::get(url.as_str())
|
|
.call()
|
|
.map_err(|e| VfsError::Io(format!("S3 ListObjects failed: {}", e)))?;
|
|
|
|
if resp.status() != 200 {
|
|
return Err(VfsError::Io(format!(
|
|
"ListObjectsV2 returned {}",
|
|
resp.status()
|
|
)));
|
|
}
|
|
|
|
let mut body = String::new();
|
|
resp.into_reader()
|
|
.read_to_string(&mut body)
|
|
.map_err(|e| VfsError::Io(format!("Failed to read S3 list response: {}", e)))?;
|
|
|
|
actions::ListObjectsV2::parse_response(&body)
|
|
.map_err(|e| VfsError::Io(format!("Failed to parse S3 list response XML: {}", e)))
|
|
}
|
|
}
|
|
|
|
impl VfsBackend for S3Vfs {
|
|
fn read_dir(&self, path: &Path) -> Result<Vec<VfsDirEntry>, VfsError> {
|
|
let prefix = Self::dir_key(path);
|
|
let list = self.list_objects(&prefix)?;
|
|
|
|
let mut entries = Vec::new();
|
|
|
|
for cp in &list.common_prefixes {
|
|
let full_prefix = &cp.prefix;
|
|
let rel = full_prefix.strip_prefix(&prefix).unwrap_or(full_prefix);
|
|
let name = rel.strip_suffix('/').unwrap_or(rel);
|
|
if name.is_empty() {
|
|
continue;
|
|
}
|
|
|
|
let dk = format!("{}{}/", prefix, name);
|
|
let st = match self.head_object(&dk) {
|
|
Ok((_size, mtime, _ct)) => VfsStat {
|
|
is_dir: true,
|
|
mtime,
|
|
..VfsStat::new()
|
|
},
|
|
Err(_) => VfsStat {
|
|
is_dir: true,
|
|
..VfsStat::new()
|
|
},
|
|
};
|
|
|
|
entries.push(VfsDirEntry {
|
|
name: name.to_string(),
|
|
long_name: build_long_name(&st, name),
|
|
stat: st,
|
|
});
|
|
}
|
|
|
|
for obj in &list.contents {
|
|
let rel = obj.key.strip_prefix(&prefix).unwrap_or(&obj.key);
|
|
let name = rel.strip_suffix('/').unwrap_or(rel);
|
|
if name.is_empty() {
|
|
continue;
|
|
}
|
|
|
|
let lm = parse_iso8601(&obj.last_modified);
|
|
let st = VfsStat {
|
|
size: obj.size,
|
|
mtime: lm,
|
|
is_dir: obj.key.ends_with('/'),
|
|
is_symlink: false,
|
|
..VfsStat::new()
|
|
};
|
|
|
|
entries.push(VfsDirEntry {
|
|
name: name.to_string(),
|
|
long_name: build_long_name(&st, name),
|
|
stat: st,
|
|
});
|
|
}
|
|
|
|
entries.sort_by(|a, b| a.name.cmp(&b.name));
|
|
Ok(entries)
|
|
}
|
|
|
|
fn open_file(&self, path: &Path, flags: &OpenFlags) -> Result<Box<dyn VfsFile>, VfsError> {
|
|
let key = Self::path_to_key(path);
|
|
let is_write = flags.write || flags.create || flags.truncate;
|
|
|
|
if is_write {
|
|
let sz = if flags.truncate || flags.create {
|
|
0
|
|
} else {
|
|
self.head_object(&key).map(|r| r.0).unwrap_or(0)
|
|
};
|
|
|
|
Ok(Box::new(S3VfsFile {
|
|
s3vfs_like: S3VfsLike {
|
|
bucket: self.bucket.clone(),
|
|
credentials: self.credentials.clone(),
|
|
},
|
|
key,
|
|
mode: FileMode::Write,
|
|
position: 0,
|
|
write_buffer: Vec::new(),
|
|
size: sz,
|
|
mtime: SystemTime::now(),
|
|
}))
|
|
} else if flags.read {
|
|
let (size, mtime, _) = self.head_object(&key)?;
|
|
|
|
Ok(Box::new(S3VfsFile {
|
|
s3vfs_like: S3VfsLike {
|
|
bucket: self.bucket.clone(),
|
|
credentials: self.credentials.clone(),
|
|
},
|
|
key,
|
|
mode: FileMode::Read,
|
|
position: 0,
|
|
write_buffer: Vec::new(),
|
|
size,
|
|
mtime,
|
|
}))
|
|
} else {
|
|
Err(VfsError::Io("Invalid open flags".to_string()))
|
|
}
|
|
}
|
|
|
|
fn stat(&self, path: &Path) -> Result<VfsStat, VfsError> {
|
|
let key = Self::path_to_key(path);
|
|
let (size, mtime, content_type) = self.head_object(&key)?;
|
|
Ok(VfsStat {
|
|
size,
|
|
mtime,
|
|
is_dir: key.ends_with('/') || content_type == "application/directory",
|
|
is_symlink: false,
|
|
..VfsStat::new()
|
|
})
|
|
}
|
|
|
|
fn lstat(&self, path: &Path) -> Result<VfsStat, VfsError> {
|
|
self.stat(path)
|
|
}
|
|
|
|
fn create_dir(&self, path: &Path, _mode: u32) -> Result<(), VfsError> {
|
|
let key = Self::dir_key(path);
|
|
self.put_object(&key, &[])
|
|
}
|
|
|
|
fn create_dir_all(&self, path: &Path, _mode: u32) -> Result<(), VfsError> {
|
|
let path_str = path.to_string_lossy();
|
|
let components: Vec<&str> = path_str
|
|
.strip_prefix('/')
|
|
.unwrap_or("")
|
|
.split('/')
|
|
.filter(|s| !s.is_empty())
|
|
.collect();
|
|
|
|
let mut acc = String::new();
|
|
for comp in components {
|
|
acc = format!("{}{}/", acc, comp);
|
|
let _ = self.put_object(&acc, &[]);
|
|
}
|
|
|
|
if !acc.is_empty() {
|
|
self.create_dir(path, _mode)
|
|
} else {
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
fn remove_dir(&self, path: &Path) -> Result<(), VfsError> {
|
|
let key = Self::dir_key(path);
|
|
let list = self.list_objects(&key)?;
|
|
if !list.contents.is_empty() || !list.common_prefixes.is_empty() {
|
|
return Err(VfsError::NotEmpty(path.display().to_string()));
|
|
}
|
|
self.delete_object(&key)
|
|
}
|
|
|
|
fn remove_file(&self, path: &Path) -> Result<(), VfsError> {
|
|
let key = Self::path_to_key(path);
|
|
self.delete_object(&key)
|
|
}
|
|
|
|
fn rename(&self, from: &Path, to: &Path) -> Result<(), VfsError> {
|
|
let from_key = Self::path_to_key(from);
|
|
let to_key = Self::path_to_key(to);
|
|
self.copy_object(&from_key, &to_key)?;
|
|
self.delete_object(&from_key)
|
|
}
|
|
|
|
fn set_stat(&self, _path: &Path, _stat: &VfsStat) -> Result<(), VfsError> {
|
|
Err(VfsError::Unsupported(
|
|
"set_stat not supported for S3 backend".to_string(),
|
|
))
|
|
}
|
|
|
|
fn read_link(&self, _path: &Path) -> Result<PathBuf, VfsError> {
|
|
Err(VfsError::Unsupported(
|
|
"read_link not supported for S3 backend".to_string(),
|
|
))
|
|
}
|
|
|
|
fn create_symlink(&self, _target: &Path, _link: &Path) -> Result<(), VfsError> {
|
|
Err(VfsError::Unsupported(
|
|
"create_symlink not supported for S3 backend".to_string(),
|
|
))
|
|
}
|
|
|
|
fn real_path(&self, path: &Path) -> Result<PathBuf, VfsError> {
|
|
let key = Self::path_to_key(path);
|
|
if key.is_empty() {
|
|
return Ok(PathBuf::from("/"));
|
|
}
|
|
match self.head_object(&key) {
|
|
Ok(_) => Ok(PathBuf::from("/").join(&key)),
|
|
Err(VfsError::NotFound(_)) => {
|
|
let dk = Self::dir_key(path);
|
|
match self.head_object(&dk) {
|
|
Ok(_) => Ok(PathBuf::from("/").join(&key)),
|
|
Err(_) => Ok(PathBuf::from("/").join(&key)),
|
|
}
|
|
}
|
|
Err(e) => Err(e),
|
|
}
|
|
}
|
|
|
|
fn exists(&self, path: &Path) -> bool {
|
|
let key = Self::path_to_key(path);
|
|
if key.is_empty() {
|
|
return true;
|
|
}
|
|
self.head_object(&key).is_ok() || self.head_object(&Self::dir_key(path)).is_ok()
|
|
}
|
|
|
|
fn hard_link(&self, original: &Path, link: &Path) -> Result<(), VfsError> {
|
|
let from_key = Self::path_to_key(original);
|
|
let to_key = Self::path_to_key(link);
|
|
self.copy_object(&from_key, &to_key)
|
|
}
|
|
}
|
|
|
|
impl VfsFile for S3VfsFile {
|
|
fn read(&mut self, buf: &mut [u8]) -> Result<usize, VfsError> {
|
|
let to_read = buf
|
|
.len()
|
|
.min((self.size.saturating_sub(self.position)) as usize);
|
|
if to_read == 0 {
|
|
return Ok(0);
|
|
}
|
|
|
|
let data = self
|
|
.s3vfs_like
|
|
.get_object_range(&self.key, self.position, to_read as u64)?;
|
|
let n = data.len().min(buf.len());
|
|
buf[..n].copy_from_slice(&data[..n]);
|
|
self.position += n as u64;
|
|
Ok(n)
|
|
}
|
|
|
|
fn write(&mut self, buf: &[u8]) -> Result<usize, VfsError> {
|
|
self.write_buffer.extend_from_slice(buf);
|
|
self.position += buf.len() as u64;
|
|
Ok(buf.len())
|
|
}
|
|
|
|
fn seek(&mut self, pos: std::io::SeekFrom) -> Result<u64, VfsError> {
|
|
match pos {
|
|
std::io::SeekFrom::Start(offset) => {
|
|
self.position = offset;
|
|
}
|
|
std::io::SeekFrom::End(offset) => {
|
|
let sz = match self.mode {
|
|
FileMode::Write => self.position,
|
|
FileMode::Read => self.size,
|
|
};
|
|
if offset >= 0 {
|
|
self.position = sz.saturating_add(offset as u64);
|
|
} else {
|
|
let abs = offset.unsigned_abs();
|
|
self.position = sz.saturating_sub(abs);
|
|
}
|
|
}
|
|
std::io::SeekFrom::Current(offset) => {
|
|
if offset >= 0 {
|
|
self.position = self.position.saturating_add(offset as u64);
|
|
} else {
|
|
let abs = offset.unsigned_abs();
|
|
self.position = self.position.saturating_sub(abs);
|
|
}
|
|
}
|
|
}
|
|
Ok(self.position)
|
|
}
|
|
|
|
fn flush(&mut self) -> Result<(), VfsError> {
|
|
if matches!(self.mode, FileMode::Write) && !self.write_buffer.is_empty() {
|
|
let buf = std::mem::take(&mut self.write_buffer);
|
|
self.s3vfs_like.put_object(&self.key, &buf)?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn stat(&mut self) -> Result<VfsStat, VfsError> {
|
|
let sz = match self.mode {
|
|
FileMode::Write => self.position,
|
|
FileMode::Read => self.size,
|
|
};
|
|
Ok(VfsStat {
|
|
size: sz,
|
|
mtime: self.mtime,
|
|
is_dir: self.key.ends_with('/'),
|
|
is_symlink: false,
|
|
..VfsStat::new()
|
|
})
|
|
}
|
|
|
|
fn set_len(&mut self, size: u64) -> Result<(), VfsError> {
|
|
if matches!(self.mode, FileMode::Write) {
|
|
let len = size as usize;
|
|
if len < self.write_buffer.len() {
|
|
self.write_buffer.truncate(len);
|
|
} else {
|
|
self.write_buffer.resize(len, 0);
|
|
}
|
|
self.position = size;
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
impl Drop for S3VfsFile {
|
|
fn drop(&mut self) {
|
|
if matches!(self.mode, FileMode::Write) && !self.write_buffer.is_empty() {
|
|
let buf = std::mem::take(&mut self.write_buffer);
|
|
let _ = self.s3vfs_like.put_object(&self.key, &buf);
|
|
}
|
|
}
|
|
}
|
|
|
|
impl S3VfsLike {
|
|
fn get_object_range(&self, key: &str, offset: u64, size: u64) -> Result<Vec<u8>, VfsError> {
|
|
let action = actions::GetObject::new(&self.bucket, Some(&self.credentials), key);
|
|
let url = action.sign(Duration::from_secs(3600));
|
|
|
|
let range = format!("bytes={}-{}", offset, offset + size - 1);
|
|
let resp = ureq::get(url.as_str())
|
|
.set("Range", &range)
|
|
.call()
|
|
.map_err(|e| VfsError::Io(format!("S3 GET range failed: {}", e)))?;
|
|
|
|
let status = resp.status();
|
|
if status == 404 {
|
|
return Err(VfsError::NotFound(key.to_string()));
|
|
}
|
|
if status != 206 && status != 200 {
|
|
return Err(VfsError::Io(format!("GetObject returned {}", status)));
|
|
}
|
|
|
|
let mut body = Vec::new();
|
|
resp.into_reader()
|
|
.read_to_end(&mut body)
|
|
.map_err(|e| VfsError::Io(format!("Failed to read S3 response: {}", e)))?;
|
|
Ok(body)
|
|
}
|
|
|
|
fn put_object(&self, key: &str, body: &[u8]) -> Result<(), VfsError> {
|
|
let action = actions::PutObject::new(&self.bucket, Some(&self.credentials), key);
|
|
let url = action.sign(Duration::from_secs(3600));
|
|
|
|
let ct = if key.ends_with('/') {
|
|
"application/directory"
|
|
} else {
|
|
"application/octet-stream"
|
|
};
|
|
|
|
let resp = ureq::put(url.as_str())
|
|
.set("Content-Type", ct)
|
|
.send_bytes(body)
|
|
.map_err(|e| VfsError::Io(format!("S3 PUT failed: {}", e)))?;
|
|
|
|
if resp.status() != 200 {
|
|
return Err(VfsError::Io(format!(
|
|
"PutObject returned {}",
|
|
resp.status()
|
|
)));
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
// ---- helper functions ----
|
|
|
|
fn parse_last_modified(header: Option<&str>) -> SystemTime {
|
|
match header {
|
|
Some(val) => {
|
|
if let Ok(dt) = chrono::DateTime::parse_from_rfc2822(val) {
|
|
dt.into()
|
|
} else if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(val) {
|
|
dt.into()
|
|
} else {
|
|
SystemTime::UNIX_EPOCH
|
|
}
|
|
}
|
|
None => SystemTime::UNIX_EPOCH,
|
|
}
|
|
}
|
|
|
|
fn parse_iso8601(s: &str) -> SystemTime {
|
|
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(s) {
|
|
dt.into()
|
|
} else if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.fZ") {
|
|
dt.and_utc().into()
|
|
} else {
|
|
SystemTime::UNIX_EPOCH
|
|
}
|
|
}
|
|
|
|
fn build_long_name(stat: &VfsStat, name: &str) -> String {
|
|
let ft = if stat.is_dir { 'd' } else { '-' };
|
|
let perms = "rwxr-xr-x";
|
|
let sz = stat.size;
|
|
let ts = match stat.mtime.duration_since(SystemTime::UNIX_EPOCH) {
|
|
Ok(d) => {
|
|
let secs = d.as_secs();
|
|
match chrono::DateTime::from_timestamp(secs as i64, 0) {
|
|
Some(dt) => {
|
|
let now = chrono::Utc::now();
|
|
if dt.year() == now.year() {
|
|
dt.format("%b %e %H:%M").to_string()
|
|
} else {
|
|
dt.format("%b %e %Y").to_string()
|
|
}
|
|
}
|
|
None => "Jan 1 1970".to_string(),
|
|
}
|
|
}
|
|
Err(_) => "Jan 1 1970".to_string(),
|
|
};
|
|
format!("{}{} 1 0 0 {} {} {}", ft, perms, sz, ts, name)
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn test_path_to_key() {
|
|
assert_eq!(S3Vfs::path_to_key(Path::new("/foo/bar.txt")), "foo/bar.txt");
|
|
assert_eq!(S3Vfs::path_to_key(Path::new("/")), "");
|
|
assert_eq!(
|
|
S3Vfs::path_to_key(Path::new("relative/path")),
|
|
"relative/path"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn test_dir_key() {
|
|
assert_eq!(S3Vfs::dir_key(Path::new("/foo/bar")), "foo/bar/");
|
|
assert_eq!(S3Vfs::dir_key(Path::new("/")), "");
|
|
assert_eq!(S3Vfs::dir_key(Path::new("/foo/")), "foo/");
|
|
}
|
|
}
|