|
|
|
|
@@ -12,6 +12,8 @@ use futures_util::StreamExt;
|
|
|
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
|
use serde_json::Value;
|
|
|
|
|
use sha2::{Digest, Sha256};
|
|
|
|
|
use std::collections::HashMap;
|
|
|
|
|
use std::io::Write;
|
|
|
|
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
|
|
|
|
use tokio_util::io::ReaderStream;
|
|
|
|
|
|
|
|
|
|
@@ -556,3 +558,292 @@ fn parse_range_header(range: &str, file_size: i64) -> Option<(u64, u64)> {
|
|
|
|
|
|
|
|
|
|
Some((start, end))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ===== Multipart Upload Support =====
|
|
|
|
|
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
use tokio::sync::RwLock;
|
|
|
|
|
use uuid::Uuid;
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
|
|
|
pub struct MultipartUpload {
|
|
|
|
|
pub upload_id: String,
|
|
|
|
|
pub bucket: String,
|
|
|
|
|
pub key: String,
|
|
|
|
|
pub parts: Vec<UploadedPart>,
|
|
|
|
|
pub created_at: chrono::DateTime<chrono::Utc>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
|
|
|
pub struct UploadedPart {
|
|
|
|
|
pub part_number: u32,
|
|
|
|
|
pub etag: String,
|
|
|
|
|
pub size: u64,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static MULTIPART_UPLOADS: once_cell::sync::Lazy<Arc<RwLock<HashMap<String, MultipartUpload>>>> =
|
|
|
|
|
once_cell::sync::Lazy::new(|| Arc::new(RwLock::new(HashMap::new())));
|
|
|
|
|
|
|
|
|
|
pub async fn initiate_multipart_upload(
|
|
|
|
|
Path((bucket, key)): Path<(String, String)>,
|
|
|
|
|
State(_state): State<crate::server::AppState>,
|
|
|
|
|
) -> impl IntoResponse {
|
|
|
|
|
let upload_id = Uuid::new_v4().to_string();
|
|
|
|
|
|
|
|
|
|
let upload = MultipartUpload {
|
|
|
|
|
upload_id: upload_id.clone(),
|
|
|
|
|
bucket: bucket.clone(),
|
|
|
|
|
key: key.clone(),
|
|
|
|
|
parts: Vec::new(),
|
|
|
|
|
created_at: chrono::Utc::now(),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
let mut uploads = MULTIPART_UPLOADS.write().await;
|
|
|
|
|
uploads.insert(upload_id.clone(), upload);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let (headers, xml_body) = crate::s3_xml::initiate_multipart_upload_xml(&bucket, &key, &upload_id);
|
|
|
|
|
(StatusCode::OK, headers, xml_body).into_response()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub async fn upload_part(
|
|
|
|
|
Path((bucket, key)): Path<(String, String)>,
|
|
|
|
|
State(_state): State<crate::server::AppState>,
|
|
|
|
|
query: axum::extract::Query<UploadPartQuery>,
|
|
|
|
|
body: Body,
|
|
|
|
|
) -> impl IntoResponse {
|
|
|
|
|
let upload_id = query.upload_id.clone();
|
|
|
|
|
let part_number = query.part_number;
|
|
|
|
|
|
|
|
|
|
let uploads = MULTIPART_UPLOADS.read().await;
|
|
|
|
|
let upload = uploads.get(&upload_id);
|
|
|
|
|
|
|
|
|
|
if upload.is_none() {
|
|
|
|
|
return (StatusCode::NOT_FOUND, "Upload not found").into_response();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let upload = upload.unwrap();
|
|
|
|
|
if upload.bucket != bucket || upload.key != key {
|
|
|
|
|
return (StatusCode::BAD_REQUEST, "Bucket/key mismatch").into_response();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Collect body data
|
|
|
|
|
let mut total_size: u64 = 0;
|
|
|
|
|
let mut hasher = Sha256::new();
|
|
|
|
|
let mut stream = body.into_data_stream();
|
|
|
|
|
|
|
|
|
|
// Create temp file for part data
|
|
|
|
|
let temp_dir = std::env::temp_dir();
|
|
|
|
|
let part_file_path = temp_dir.join(format!("s3_multipart_{}_{}_{}.tmp", upload_id, part_number, Uuid::new_v4()));
|
|
|
|
|
let part_file = match tokio::fs::File::create(&part_file_path).await {
|
|
|
|
|
Ok(f) => f,
|
|
|
|
|
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to create temp file: {}", e)).into_response(),
|
|
|
|
|
};
|
|
|
|
|
let mut writer = tokio::io::BufWriter::new(part_file);
|
|
|
|
|
|
|
|
|
|
while let Some(chunk_result) = stream.next().await {
|
|
|
|
|
let chunk = match chunk_result {
|
|
|
|
|
Ok(c) => c,
|
|
|
|
|
Err(e) => return (StatusCode::BAD_REQUEST, format!("Failed to read chunk: {}", e)).into_response(),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
total_size += chunk.len() as u64;
|
|
|
|
|
hasher.update(&chunk);
|
|
|
|
|
|
|
|
|
|
if let Err(e) = writer.write_all(&chunk).await {
|
|
|
|
|
return (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to write chunk: {}", e)).into_response();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if let Err(e) = writer.flush().await {
|
|
|
|
|
return (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to flush: {}", e)).into_response();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let etag = format!("{:x}", hasher.finalize());
|
|
|
|
|
|
|
|
|
|
// Update multipart upload with new part
|
|
|
|
|
{
|
|
|
|
|
let mut uploads = MULTIPART_UPLOADS.write().await;
|
|
|
|
|
if let Some(upload) = uploads.get_mut(&upload_id) {
|
|
|
|
|
upload.parts.push(UploadedPart {
|
|
|
|
|
part_number,
|
|
|
|
|
etag: etag.clone(),
|
|
|
|
|
size: total_size,
|
|
|
|
|
});
|
|
|
|
|
upload.parts.sort_by_key(|p| p.part_number);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let mut headers = HeaderMap::new();
|
|
|
|
|
headers.insert("ETag", format!("\"{}\"", etag).parse().unwrap());
|
|
|
|
|
(StatusCode::OK, headers).into_response()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, serde::Deserialize)]
|
|
|
|
|
pub struct UploadPartQuery {
|
|
|
|
|
pub upload_id: String,
|
|
|
|
|
pub part_number: u32,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub async fn complete_multipart_upload(
|
|
|
|
|
Path((bucket, key)): Path<(String, String)>,
|
|
|
|
|
State(_state): State<crate::server::AppState>,
|
|
|
|
|
query: axum::extract::Query<CompleteMultipartQuery>,
|
|
|
|
|
body: Body,
|
|
|
|
|
) -> impl IntoResponse {
|
|
|
|
|
let upload_id = query.upload_id.clone();
|
|
|
|
|
|
|
|
|
|
let uploads = MULTIPART_UPLOADS.read().await;
|
|
|
|
|
let upload = uploads.get(&upload_id);
|
|
|
|
|
|
|
|
|
|
if upload.is_none() {
|
|
|
|
|
return (StatusCode::NOT_FOUND, "Upload not found").into_response();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let upload = upload.unwrap();
|
|
|
|
|
if upload.bucket != bucket || upload.key != key {
|
|
|
|
|
return (StatusCode::BAD_REQUEST, "Bucket/key mismatch").into_response();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Parse CompleteMultipartUpload XML from body
|
|
|
|
|
let body_bytes = axum::body::to_bytes(body, 10000).await.ok();
|
|
|
|
|
let part_list = body_bytes.as_ref().and_then(|b| parse_complete_multipart_xml(b));
|
|
|
|
|
|
|
|
|
|
if part_list.is_none() {
|
|
|
|
|
return (StatusCode::BAD_REQUEST, "Invalid CompleteMultipartUpload XML").into_response();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Combine parts into final file
|
|
|
|
|
let base_dir = "/Users/accusys/momentry/var/sftpgo/data";
|
|
|
|
|
let file_path = format!("{}/{}/{}", base_dir, bucket, key);
|
|
|
|
|
|
|
|
|
|
if let Err(e) = tokio::fs::create_dir_all(&format!("{}/{}", base_dir, bucket)).await {
|
|
|
|
|
return (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to create dir: {}", e)).into_response();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let final_file = match tokio::fs::File::create(&file_path).await {
|
|
|
|
|
Ok(f) => f,
|
|
|
|
|
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to create file: {}", e)).into_response(),
|
|
|
|
|
};
|
|
|
|
|
let mut final_writer = tokio::io::BufWriter::new(final_file);
|
|
|
|
|
|
|
|
|
|
let temp_dir = std::env::temp_dir();
|
|
|
|
|
let mut final_hasher = Sha256::new();
|
|
|
|
|
let mut final_size: u64 = 0;
|
|
|
|
|
|
|
|
|
|
for part in &upload.parts {
|
|
|
|
|
let part_file_path = temp_dir.join(format!("s3_multipart_{}_{}_*.tmp", upload_id, part.part_number));
|
|
|
|
|
|
|
|
|
|
// Find the actual part file (with UUID suffix)
|
|
|
|
|
let part_files: Option<Vec<_>> = std::fs::read_dir(&temp_dir).ok()
|
|
|
|
|
.and_then(|dir| {
|
|
|
|
|
Some(dir.filter_map(|e| e.ok())
|
|
|
|
|
.filter(|e| e.file_name().to_str().unwrap_or("").starts_with(&format!("s3_multipart_{}_{}_", upload_id, part.part_number)))
|
|
|
|
|
.collect::<Vec<_>>())
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
if let Some(files) = part_files {
|
|
|
|
|
if let Some(part_file_entry) = files.first() {
|
|
|
|
|
let part_file = part_file_entry.path();
|
|
|
|
|
if let Ok(data) = tokio::fs::read(&part_file).await {
|
|
|
|
|
final_hasher.update(&data);
|
|
|
|
|
final_size += data.len() as u64;
|
|
|
|
|
if let Err(e) = final_writer.write_all(&data).await {
|
|
|
|
|
return (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to write part: {}", e)).into_response();
|
|
|
|
|
}
|
|
|
|
|
// Clean up temp file
|
|
|
|
|
let _ = tokio::fs::remove_file(&part_file).await;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if let Err(e) = final_writer.flush().await {
|
|
|
|
|
return (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to flush final: {}", e)).into_response();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let final_etag = format!("{:x}", final_hasher.finalize());
|
|
|
|
|
|
|
|
|
|
// Remove upload from tracking
|
|
|
|
|
{
|
|
|
|
|
let mut uploads = MULTIPART_UPLOADS.write().await;
|
|
|
|
|
uploads.remove(&upload_id);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let (headers, xml_body) = crate::s3_xml::complete_multipart_upload_xml(&bucket, &key, &final_etag);
|
|
|
|
|
(StatusCode::OK, headers, xml_body).into_response()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, serde::Deserialize)]
|
|
|
|
|
pub struct CompleteMultipartQuery {
|
|
|
|
|
pub upload_id: String,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub async fn abort_multipart_upload(
|
|
|
|
|
Path((bucket, key)): Path<(String, String)>,
|
|
|
|
|
State(_state): State<crate::server::AppState>,
|
|
|
|
|
query: axum::extract::Query<AbortMultipartQuery>,
|
|
|
|
|
) -> impl IntoResponse {
|
|
|
|
|
let upload_id = query.upload_id.clone();
|
|
|
|
|
|
|
|
|
|
let uploads = MULTIPART_UPLOADS.read().await;
|
|
|
|
|
let upload = uploads.get(&upload_id);
|
|
|
|
|
|
|
|
|
|
if upload.is_none() {
|
|
|
|
|
return (StatusCode::NOT_FOUND, "Upload not found").into_response();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let upload = upload.unwrap();
|
|
|
|
|
if upload.bucket != bucket || upload.key != key {
|
|
|
|
|
return (StatusCode::BAD_REQUEST, "Bucket/key mismatch").into_response();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Clean up temp files
|
|
|
|
|
let temp_dir = std::env::temp_dir();
|
|
|
|
|
if let Ok(dir) = std::fs::read_dir(&temp_dir) {
|
|
|
|
|
for entry in dir.filter_map(|e| e.ok()) {
|
|
|
|
|
if entry.file_name().to_str().unwrap_or("").starts_with(&format!("s3_multipart_{}_", upload_id)) {
|
|
|
|
|
let _ = tokio::fs::remove_file(entry.path()).await;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Remove upload from tracking
|
|
|
|
|
{
|
|
|
|
|
let mut uploads = MULTIPART_UPLOADS.write().await;
|
|
|
|
|
uploads.remove(&upload_id);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
(StatusCode::NO_CONTENT, HeaderMap::new()).into_response()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, serde::Deserialize)]
|
|
|
|
|
pub struct AbortMultipartQuery {
|
|
|
|
|
pub upload_id: String,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn parse_complete_multipart_xml(xml: &[u8]) -> Option<Vec<(u32, String)>> {
|
|
|
|
|
let xml_str = std::str::from_utf8(xml).ok()?;
|
|
|
|
|
let mut parts = Vec::new();
|
|
|
|
|
|
|
|
|
|
for part_elem in xml_str.split("<Part>") {
|
|
|
|
|
if part_elem.contains("</Part>") {
|
|
|
|
|
let part_number = part_elem.split("<PartNumber>")
|
|
|
|
|
.nth(1)
|
|
|
|
|
.and_then(|s| s.split("</PartNumber>").next())
|
|
|
|
|
.and_then(|s| s.parse().ok());
|
|
|
|
|
|
|
|
|
|
let etag = part_elem.split("<ETag>")
|
|
|
|
|
.nth(1)
|
|
|
|
|
.and_then(|s| s.split("</ETag>").next())
|
|
|
|
|
.map(|s| s.replace("\"", ""));
|
|
|
|
|
|
|
|
|
|
if let (Some(num), Some(tag)) = (part_number, etag) {
|
|
|
|
|
parts.push((num, tag));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Some(parts)
|
|
|
|
|
}
|
|
|
|
|
|