use filetree::{FileTree, node::{FileNode, Aliases}}; use axum::{ body::Body, extract::{Path, State}, http::{HeaderMap, StatusCode}, response::{IntoResponse, Json}, }; use futures_util::StreamExt; use serde::{Deserialize, Serialize}; use serde_json::Value; use sha2::{Digest, Sha256}; use std::sync::{Arc, Mutex}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio_util::io::ReaderStream; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct S3AccessKey { pub access_key: String, pub secret_key: String, pub user_id: String, pub permissions: Vec, pub created_at: String, } pub async fn list_buckets(State(state): State) -> impl IntoResponse { let mut buckets = vec![]; if let Ok(dir) = std::fs::read_dir(&state.db_dir) { for entry in dir.flatten() { if let Some(name) = entry.file_name().to_str() { if name.ends_with(".sqlite") { let bucket_name = name.replace(".sqlite", ""); buckets.push(bucket_name); } } } } let (headers, xml_body) = crate::s3_xml::list_buckets_xml(&buckets); (StatusCode::OK, headers, xml_body).into_response() } pub async fn list_objects( Path(bucket): Path, State(state): State, ) -> impl IntoResponse { println!("S3 List Objects: bucket={}", bucket); let conn = match FileTree::open_user_db(&bucket) { Ok(c) => c, Err(e) => { println!("Error opening DB: {}", e); return (StatusCode::NOT_FOUND, "Bucket not found").into_response(); } }; let tree = match FileTree::load(&conn, &bucket, "untitled folder") { Ok(t) => t, Err(e) => { println!("Error loading tree: {}", e); return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to load tree").into_response(); } }; let objects: Vec = tree .nodes .iter() .filter(|n| n.node_type == filetree::node::NodeType::File) .map(|n| { serde_json::json!({ "Key": build_s3_key(&tree, n), "LastModified": n.registered_at.clone().unwrap_or_default(), "ETag": n.sha256.clone().unwrap_or_default(), "Size": n.file_size.clone().unwrap_or(0), }) }) .collect(); println!("Listed {} objects for bucket {}", objects.len(), bucket); let (headers, xml_body) = crate::s3_xml::list_objects_xml(&bucket, &objects); (StatusCode::OK, headers, xml_body).into_response() } pub async fn get_object( Path((bucket, key)): Path<(String, String)>, State(state): State, headers: HeaderMap, ) -> impl IntoResponse { println!("S3 GET Object: bucket={}, key={}", bucket, key); let conn = match FileTree::open_user_db(&bucket) { Ok(c) => c, Err(e) => { println!("Error opening DB: {}", e); return (StatusCode::NOT_FOUND, "Bucket not found").into_response(); } }; let tree = match FileTree::load(&conn, &bucket, "untitled folder") { Ok(t) => t, Err(e) => { println!("Error loading tree: {}", e); return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to load tree").into_response(); } }; println!("Tree loaded, {} nodes", tree.nodes.len()); let node = find_node_by_s3_key(&tree, &key); if node.is_none() { println!("Node not found for key: {}", key); return (StatusCode::NOT_FOUND, "Object not found").into_response(); } let node = node.unwrap(); println!( "Node found: file_uuid={}", node.file_uuid.clone().unwrap_or_default() ); let file_uuid = node.file_uuid.clone().unwrap_or_default(); let file_size = node.file_size.clone().unwrap_or(0); let sha256 = node.sha256.clone().unwrap_or_default(); let real_path = get_real_file_path(&conn, &file_uuid); if real_path.is_none() { println!("File location not found for uuid: {}", file_uuid); return (StatusCode::NOT_FOUND, "File location not found").into_response(); } let real_path = real_path.unwrap(); println!("Real path: {}", real_path); // 检查Range header let range_header = headers.get("Range").and_then(|v| v.to_str().ok()); if let Some(range) = range_header { println!("Range request: {}", range); return handle_range_request(real_path, range, file_size, sha256).await; } // 完整文件下载 let file = match tokio::fs::File::open(&real_path).await { Ok(f) => f, Err(e) => { println!("Error opening file: {}", e); return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to open file").into_response(); } }; println!("File opened successfully for streaming"); let stream = ReaderStream::new(file); let body = Body::from_stream(stream); let mut response_headers = HeaderMap::new(); response_headers.insert("Content-Type", "application/octet-stream".parse().unwrap()); response_headers.insert("ETag", format!("\"{}\"", sha256).parse().unwrap()); response_headers.insert("Content-Length", file_size.into()); response_headers.insert("Accept-Ranges", "bytes".parse().unwrap()); (StatusCode::OK, response_headers, body).into_response() } pub async fn put_object( Path((bucket, key)): Path<(String, String)>, State(_state): State, body: Body, ) -> impl IntoResponse { println!("S3 PUT Object: bucket={}, key={}", bucket, key); let base_dir = "/Users/accusys/momentry/var/sftpgo/data"; let file_path = format!("{}/{}/{}", base_dir, bucket, key); if let Err(e) = tokio::fs::create_dir_all(&format!("{}/{}", base_dir, bucket)).await { println!("Error creating directory: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, "Failed to create directory", ) .into_response(); } let file = match tokio::fs::File::create(&file_path).await { Ok(f) => f, Err(e) => { println!("Error creating file: {}", e); return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to create file").into_response(); } }; let mut writer = tokio::io::BufWriter::with_capacity(64 * 1024, file); let mut hasher = Sha256::new(); let mut total_size: u64 = 0; let mut stream = body.into_data_stream(); while let Some(chunk_result) = stream.next().await { let chunk = match chunk_result { Ok(c) => c, Err(e) => { println!("Error reading chunk: {}", e); let _ = tokio::fs::remove_file(&file_path).await; return (StatusCode::BAD_REQUEST, "Failed to read body").into_response(); } }; total_size += chunk.len() as u64; if total_size > 100_000_000_000 { println!("File too large: {} bytes", total_size); let _ = tokio::fs::remove_file(&file_path).await; return (StatusCode::BAD_REQUEST, "File too large (>100GB)").into_response(); } hasher.update(&chunk); if let Err(e) = writer.write_all(&chunk).await { println!("Error writing chunk: {}", e); let _ = tokio::fs::remove_file(&file_path).await; return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to write file").into_response(); } } if let Err(e) = writer.flush().await { println!("Error flushing writer: {}", e); let _ = tokio::fs::remove_file(&file_path).await; return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to flush file").into_response(); } let sha256_hash = format!("{:x}", hasher.finalize()); println!("File written: {} bytes, SHA256={}", total_size, sha256_hash); let sha256_hash_clone = sha256_hash.clone(); let file_path_clone = file_path.clone(); let label = key.split('/').last().unwrap_or(&key).to_string(); let result = tokio::task::spawn_blocking(move || -> anyhow::Result<()> { let conn = match FileTree::open_user_db(&bucket) { Ok(c) => { // Check if database has tables let has_tables: bool = c .query_row( "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='file_nodes'", [], |row| row.get::<_, i32>(0), ) .unwrap_or(0) > 0; if !has_tables { // Initialize tables if not exist c.execute_batch(filetree::CREATE_TABLES)?; } c } Err(_) => FileTree::init_user_db(&bucket)?, }; let file_uuid = sha256_hash_clone.clone(); let (node, _) = FileTree::new_file_node( &label, &file_uuid, Some(&sha256_hash_clone), &label, Some(total_size as i64), None, None, None, ); let mut tree = FileTree::load(&conn, &bucket, "untitled folder")?; tree.insert_node(&conn, &node)?; FileTree::add_location(&conn, &file_uuid, &file_path_clone, Some(&label))?; Ok(()) }) .await; match result { Ok(Ok(_)) => { println!("PutObject success: {}", key); let mut headers = HeaderMap::new(); headers.insert("ETag", format!("\"{}\"", sha256_hash).parse().unwrap()); (StatusCode::OK, headers).into_response() } Ok(Err(e)) => { println!("DB error: {}", e); let _ = tokio::fs::remove_file(&file_path).await; (StatusCode::INTERNAL_SERVER_ERROR, "Database error").into_response() } Err(e) => { println!("Task error: {}", e); let _ = tokio::fs::remove_file(&file_path).await; (StatusCode::INTERNAL_SERVER_ERROR, "Task error").into_response() } } } pub async fn head_object( Path((bucket, key)): Path<(String, String)>, State(state): State, ) -> impl IntoResponse { let conn = match FileTree::open_user_db(&bucket) { Ok(c) => c, Err(_) => return (StatusCode::NOT_FOUND, HeaderMap::new()), }; let tree = match FileTree::load(&conn, &bucket, "untitled folder") { Ok(t) => t, Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, HeaderMap::new()), }; let node = find_node_by_s3_key(&tree, &key); if node.is_none() { return (StatusCode::NOT_FOUND, HeaderMap::new()); } let node = node.unwrap(); let mut headers = HeaderMap::new(); headers.insert("Content-Type", "application/octet-stream".parse().unwrap()); headers.insert( "ETag", node.sha256.clone().unwrap_or_default().parse().unwrap(), ); headers.insert("Content-Length", node.file_size.clone().unwrap_or(0).into()); (StatusCode::OK, headers) } pub async fn s3_status(State(state): State) -> impl IntoResponse { let buckets = count_buckets(&state.db_dir); let keys_count = state.s3_keys.lock().unwrap().len(); Json(serde_json::json!({ "enabled": true, "endpoint": "http://localhost:11438/s3", "region": "us-east-1", "buckets_count": buckets, "keys_count": keys_count })) } pub async fn generate_s3_key(State(state): State) -> impl IntoResponse { let new_key = S3AccessKey { access_key: format!("markbase_access_key_{}", uuid::Uuid::new_v4()), secret_key: format!("markbase_secret_key_{}", uuid::Uuid::new_v4()), user_id: "warren".to_string(), permissions: vec!["GetObject".to_string(), "ListBucket".to_string()], created_at: chrono::Utc::now().to_rfc3339(), }; state.s3_keys.lock().unwrap().push(new_key.clone()); Json(serde_json::json!({ "access_key": new_key.access_key, "secret_key": new_key.secret_key, "user_id": new_key.user_id })) } pub async fn delete_object( Path((bucket, key)): Path<(String, String)>, State(_state): State, ) -> impl IntoResponse { println!("S3 DELETE Object: bucket={}, key={}", bucket, key); let result = tokio::task::spawn_blocking(move || -> anyhow::Result<()> { let conn = FileTree::open_user_db(&bucket)?; let mut tree = FileTree::load(&conn, &bucket, "untitled folder")?; let node = find_node_by_s3_key(&tree, &key); if node.is_none() { return Err(anyhow::anyhow!("Object not found")); } let node = node.unwrap(); let file_uuid = node.file_uuid.clone().unwrap_or_default(); let file_path = get_real_file_path(&conn, &file_uuid); if let Some(path) = file_path { std::fs::remove_file(&path)?; } tree.delete_node(&conn, &node.node_id)?; Ok(()) }) .await; match result { Ok(Ok(_)) => (StatusCode::NO_CONTENT, HeaderMap::new()).into_response(), Ok(Err(e)) => { println!("Delete error: {}", e); if e.to_string().contains("Object not found") { (StatusCode::NOT_FOUND, "Object not found").into_response() } else { (StatusCode::INTERNAL_SERVER_ERROR, "Delete error").into_response() } } Err(e) => { println!("Task error: {}", e); (StatusCode::INTERNAL_SERVER_ERROR, "Task error").into_response() } } } fn build_s3_key(tree: &FileTree, node: &FileNode) -> String { let mut path_parts = vec![]; let mut current_parent = node.parent_id.clone(); while let Some(parent_id) = current_parent { let parent = tree.nodes.iter().find(|n| n.node_id == parent_id); if let Some(p) = parent { path_parts.push(p.label.clone()); current_parent = p.parent_id.clone(); } else { break; } } path_parts.reverse(); path_parts.push(node.label.clone()); path_parts.join("/") } fn find_node_by_s3_key(tree: &FileTree, key: &str) -> Option { // 方法1:通过完整路径匹配 let node_by_path = tree .nodes .iter() .filter(|n| n.node_type == filetree::node::NodeType::File) .find(|n| build_s3_key(tree, n) == key) .cloned(); if node_by_path.is_some() { return node_by_path; } // 方法2:通过filename直接匹配(fallback) let filename = key.split('/').last().unwrap_or(key); tree.nodes .iter() .filter(|n| n.node_type == filetree::node::NodeType::File) .find(|n| n.label == filename) .cloned() } fn get_real_file_path(conn: &rusqlite::Connection, file_uuid: &str) -> Option { let mut stmt = conn .prepare("SELECT location FROM file_locations WHERE file_uuid = ?1 LIMIT 1") .ok()?; stmt.query_row([file_uuid], |row| row.get(0)).ok() } fn count_buckets(db_dir: &str) -> usize { if let Ok(dir) = std::fs::read_dir(db_dir) { dir.flatten() .filter(|e| e.file_name().to_str().unwrap_or("").ends_with(".sqlite")) .count() } else { 0 } } async fn handle_range_request( real_path: String, range: &str, file_size: i64, sha256: String, ) -> axum::response::Response { let range_spec = parse_range_header(range, file_size); if range_spec.is_none() { println!("Invalid Range header: {}", range); return (StatusCode::BAD_REQUEST, "Invalid Range header").into_response(); } let (start, end) = range_spec.unwrap(); let content_length = end - start + 1; println!( "Range request: bytes {}-{}, content_length={}", start, end, content_length ); let mut file = match tokio::fs::File::open(&real_path).await { Ok(f) => f, Err(e) => { println!("Error opening file for range: {}", e); return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to open file").into_response(); } }; // Seek到start位置 use tokio::io::AsyncSeekExt; if let Err(e) = file.seek(tokio::io::SeekFrom::Start(start)).await { println!("Error seeking file: {}", e); return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to seek file").into_response(); } // 使用take限制读取长度 let limited_file = file.take(content_length as u64); let stream = ReaderStream::new(limited_file); let body = Body::from_stream(stream); let mut headers = HeaderMap::new(); headers.insert("Content-Type", "application/octet-stream".parse().unwrap()); headers.insert( "Content-Range", format!("bytes {}-{}/{}", start, end, file_size) .parse() .unwrap(), ); headers.insert("Content-Length", content_length.into()); headers.insert("ETag", format!("\"{}\"", sha256).parse().unwrap()); headers.insert("Accept-Ranges", "bytes".parse().unwrap()); (StatusCode::PARTIAL_CONTENT, headers, body).into_response() } fn parse_range_header(range: &str, file_size: i64) -> Option<(u64, u64)> { let range_str = range.strip_prefix("bytes=")?; if range_str.contains(',') { return None; } let parts: Vec<&str> = range_str.split('-').collect(); if parts.len() != 2 { return None; } let (start, end) = if parts[0].is_empty() { // "bytes=-N"格式:最后N字节 let suffix_length = parts[1].parse::().ok()?; let start = if suffix_length > file_size as u64 { 0 } else { file_size as u64 - suffix_length }; (start, file_size as u64 - 1) } else if parts[1].is_empty() { // "bytes=N-"格式:从N到结尾 let start = parts[0].parse::().ok()?; (start, file_size as u64 - 1) } else { // "bytes=N-M"格式:从N到M let start = parts[0].parse::().ok()?; let end = parts[1].parse::().ok()?; (start, end) }; if start > end || end >= file_size as u64 { return None; } Some((start, end)) }