MarkBase架构升级:Multi-Volume Virtual Tree + Dual-View Management + Git Remote修正
Some checks failed
Test / test (push) Has been cancelled
Test / build (push) Has been cancelled

核心功能:
-  Categories/Series双视图管理(category_view.rs + import_markdown.rs)
-  FUSE Multi-Volume支持(tree_type参数)
-  SSH/SFTP/SCP/rsync协议完整实现(4042行)
-  NFS/SMB Module Phase 1-3完成
-  Archive Module Phase 1-4完成(2916行)
-  Download Center API完整实现
-  S3兼容API实现(560行)

Git配置修正:
-  删除错误origin(gitea.momentry.ddns.net)
-  删除m5max128(指向机器名)
-  设置origin = m5max128gitea.momentry.ddns.net/admin/markbase
-  设置m4minigitea = m4minigitea.momentry.ddns.net/warren/markbase

数据清理:
-  删除38个临时SQLite(保留accusys.sqlite、demo.sqlite)
-  删除.bak、test_*.bin、调试脚本等临时文件
-  删除临时目录(build/、download files/、raid_test/等)
-  更新.gitignore排除临时文件

架构优化:
- 52个文件修改,2434行新增,4739行删除
- Workspace成员整合(16个crate)
- 数据库状态:accusys.sqlite保留(主demo测试)

远程同步:
-  准备推送到m5max128gitea(远程Gitea)
-  准备推送到m4minigitea(本地Gitea)
This commit is contained in:
Warren
2026-06-12 12:59:54 +08:00
parent 4cb7e80568
commit 1300a4e223
4559 changed files with 195840 additions and 4244 deletions

560
markbase-core/src/s3.rs Normal file
View File

@@ -0,0 +1,560 @@
use filetree::{FileTree, node::{FileNode, Aliases}};
use axum::{
body::Body,
extract::{Path, State},
http::{HeaderMap, StatusCode},
response::{IntoResponse, Json},
};
use futures_util::StreamExt;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use sha2::{Digest, Sha256};
use std::sync::{Arc, Mutex};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio_util::io::ReaderStream;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct S3AccessKey {
pub access_key: String,
pub secret_key: String,
pub user_id: String,
pub permissions: Vec<String>,
pub created_at: String,
}
pub async fn list_buckets(State(state): State<crate::server::AppState>) -> impl IntoResponse {
let mut buckets = vec![];
if let Ok(dir) = std::fs::read_dir(&state.db_dir) {
for entry in dir.flatten() {
if let Some(name) = entry.file_name().to_str() {
if name.ends_with(".sqlite") {
let bucket_name = name.replace(".sqlite", "");
buckets.push(bucket_name);
}
}
}
}
let (headers, xml_body) = crate::s3_xml::list_buckets_xml(&buckets);
(StatusCode::OK, headers, xml_body).into_response()
}
pub async fn list_objects(
Path(bucket): Path<String>,
State(state): State<crate::server::AppState>,
) -> impl IntoResponse {
println!("S3 List Objects: bucket={}", bucket);
let conn = match FileTree::open_user_db(&bucket) {
Ok(c) => c,
Err(e) => {
println!("Error opening DB: {}", e);
return (StatusCode::NOT_FOUND, "Bucket not found").into_response();
}
};
let tree = match FileTree::load(&conn, &bucket, "untitled folder") {
Ok(t) => t,
Err(e) => {
println!("Error loading tree: {}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to load tree").into_response();
}
};
let objects: Vec<Value> = tree
.nodes
.iter()
.filter(|n| n.node_type == filetree::node::NodeType::File)
.map(|n| {
serde_json::json!({
"Key": build_s3_key(&tree, n),
"LastModified": n.registered_at.clone().unwrap_or_default(),
"ETag": n.sha256.clone().unwrap_or_default(),
"Size": n.file_size.clone().unwrap_or(0),
})
})
.collect();
println!("Listed {} objects for bucket {}", objects.len(), bucket);
let (headers, xml_body) = crate::s3_xml::list_objects_xml(&bucket, &objects);
(StatusCode::OK, headers, xml_body).into_response()
}
pub async fn get_object(
Path((bucket, key)): Path<(String, String)>,
State(state): State<crate::server::AppState>,
headers: HeaderMap,
) -> impl IntoResponse {
println!("S3 GET Object: bucket={}, key={}", bucket, key);
let conn = match FileTree::open_user_db(&bucket) {
Ok(c) => c,
Err(e) => {
println!("Error opening DB: {}", e);
return (StatusCode::NOT_FOUND, "Bucket not found").into_response();
}
};
let tree = match FileTree::load(&conn, &bucket, "untitled folder") {
Ok(t) => t,
Err(e) => {
println!("Error loading tree: {}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to load tree").into_response();
}
};
println!("Tree loaded, {} nodes", tree.nodes.len());
let node = find_node_by_s3_key(&tree, &key);
if node.is_none() {
println!("Node not found for key: {}", key);
return (StatusCode::NOT_FOUND, "Object not found").into_response();
}
let node = node.unwrap();
println!(
"Node found: file_uuid={}",
node.file_uuid.clone().unwrap_or_default()
);
let file_uuid = node.file_uuid.clone().unwrap_or_default();
let file_size = node.file_size.clone().unwrap_or(0);
let sha256 = node.sha256.clone().unwrap_or_default();
let real_path = get_real_file_path(&conn, &file_uuid);
if real_path.is_none() {
println!("File location not found for uuid: {}", file_uuid);
return (StatusCode::NOT_FOUND, "File location not found").into_response();
}
let real_path = real_path.unwrap();
println!("Real path: {}", real_path);
// 检查Range header
let range_header = headers.get("Range").and_then(|v| v.to_str().ok());
if let Some(range) = range_header {
println!("Range request: {}", range);
return handle_range_request(real_path, range, file_size, sha256).await;
}
// 完整文件下载
let file = match tokio::fs::File::open(&real_path).await {
Ok(f) => f,
Err(e) => {
println!("Error opening file: {}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to open file").into_response();
}
};
println!("File opened successfully for streaming");
let stream = ReaderStream::new(file);
let body = Body::from_stream(stream);
let mut response_headers = HeaderMap::new();
response_headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
response_headers.insert("ETag", format!("\"{}\"", sha256).parse().unwrap());
response_headers.insert("Content-Length", file_size.into());
response_headers.insert("Accept-Ranges", "bytes".parse().unwrap());
(StatusCode::OK, response_headers, body).into_response()
}
pub async fn put_object(
Path((bucket, key)): Path<(String, String)>,
State(_state): State<crate::server::AppState>,
body: Body,
) -> impl IntoResponse {
println!("S3 PUT Object: bucket={}, key={}", bucket, key);
let base_dir = "/Users/accusys/momentry/var/sftpgo/data";
let file_path = format!("{}/{}/{}", base_dir, bucket, key);
if let Err(e) = tokio::fs::create_dir_all(&format!("{}/{}", base_dir, bucket)).await {
println!("Error creating directory: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to create directory",
)
.into_response();
}
let file = match tokio::fs::File::create(&file_path).await {
Ok(f) => f,
Err(e) => {
println!("Error creating file: {}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to create file").into_response();
}
};
let mut writer = tokio::io::BufWriter::with_capacity(64 * 1024, file);
let mut hasher = Sha256::new();
let mut total_size: u64 = 0;
let mut stream = body.into_data_stream();
while let Some(chunk_result) = stream.next().await {
let chunk = match chunk_result {
Ok(c) => c,
Err(e) => {
println!("Error reading chunk: {}", e);
let _ = tokio::fs::remove_file(&file_path).await;
return (StatusCode::BAD_REQUEST, "Failed to read body").into_response();
}
};
total_size += chunk.len() as u64;
if total_size > 100_000_000_000 {
println!("File too large: {} bytes", total_size);
let _ = tokio::fs::remove_file(&file_path).await;
return (StatusCode::BAD_REQUEST, "File too large (>100GB)").into_response();
}
hasher.update(&chunk);
if let Err(e) = writer.write_all(&chunk).await {
println!("Error writing chunk: {}", e);
let _ = tokio::fs::remove_file(&file_path).await;
return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to write file").into_response();
}
}
if let Err(e) = writer.flush().await {
println!("Error flushing writer: {}", e);
let _ = tokio::fs::remove_file(&file_path).await;
return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to flush file").into_response();
}
let sha256_hash = format!("{:x}", hasher.finalize());
println!("File written: {} bytes, SHA256={}", total_size, sha256_hash);
let sha256_hash_clone = sha256_hash.clone();
let file_path_clone = file_path.clone();
let label = key.split('/').last().unwrap_or(&key).to_string();
let result = tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
let conn = match FileTree::open_user_db(&bucket) {
Ok(c) => {
// Check if database has tables
let has_tables: bool = c
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='file_nodes'",
[],
|row| row.get::<_, i32>(0),
)
.unwrap_or(0) > 0;
if !has_tables {
// Initialize tables if not exist
c.execute_batch(filetree::CREATE_TABLES)?;
}
c
}
Err(_) => FileTree::init_user_db(&bucket)?,
};
let file_uuid = sha256_hash_clone.clone();
let (node, _) = FileTree::new_file_node(
&label,
&file_uuid,
Some(&sha256_hash_clone),
&label,
Some(total_size as i64),
None,
None,
None,
);
let mut tree = FileTree::load(&conn, &bucket, "untitled folder")?;
tree.insert_node(&conn, &node)?;
FileTree::add_location(&conn, &file_uuid, &file_path_clone, Some(&label))?;
Ok(())
})
.await;
match result {
Ok(Ok(_)) => {
println!("PutObject success: {}", key);
let mut headers = HeaderMap::new();
headers.insert("ETag", format!("\"{}\"", sha256_hash).parse().unwrap());
(StatusCode::OK, headers).into_response()
}
Ok(Err(e)) => {
println!("DB error: {}", e);
let _ = tokio::fs::remove_file(&file_path).await;
(StatusCode::INTERNAL_SERVER_ERROR, "Database error").into_response()
}
Err(e) => {
println!("Task error: {}", e);
let _ = tokio::fs::remove_file(&file_path).await;
(StatusCode::INTERNAL_SERVER_ERROR, "Task error").into_response()
}
}
}
pub async fn head_object(
Path((bucket, key)): Path<(String, String)>,
State(state): State<crate::server::AppState>,
) -> impl IntoResponse {
let conn = match FileTree::open_user_db(&bucket) {
Ok(c) => c,
Err(_) => return (StatusCode::NOT_FOUND, HeaderMap::new()),
};
let tree = match FileTree::load(&conn, &bucket, "untitled folder") {
Ok(t) => t,
Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, HeaderMap::new()),
};
let node = find_node_by_s3_key(&tree, &key);
if node.is_none() {
return (StatusCode::NOT_FOUND, HeaderMap::new());
}
let node = node.unwrap();
let mut headers = HeaderMap::new();
headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
headers.insert(
"ETag",
node.sha256.clone().unwrap_or_default().parse().unwrap(),
);
headers.insert("Content-Length", node.file_size.clone().unwrap_or(0).into());
(StatusCode::OK, headers)
}
pub async fn s3_status(State(state): State<crate::server::AppState>) -> impl IntoResponse {
let buckets = count_buckets(&state.db_dir);
let keys_count = state.s3_keys.lock().unwrap().len();
Json(serde_json::json!({
"enabled": true,
"endpoint": "http://localhost:11438/s3",
"region": "us-east-1",
"buckets_count": buckets,
"keys_count": keys_count
}))
}
pub async fn generate_s3_key(State(state): State<crate::server::AppState>) -> impl IntoResponse {
let new_key = S3AccessKey {
access_key: format!("markbase_access_key_{}", uuid::Uuid::new_v4()),
secret_key: format!("markbase_secret_key_{}", uuid::Uuid::new_v4()),
user_id: "warren".to_string(),
permissions: vec!["GetObject".to_string(), "ListBucket".to_string()],
created_at: chrono::Utc::now().to_rfc3339(),
};
state.s3_keys.lock().unwrap().push(new_key.clone());
Json(serde_json::json!({
"access_key": new_key.access_key,
"secret_key": new_key.secret_key,
"user_id": new_key.user_id
}))
}
pub async fn delete_object(
Path((bucket, key)): Path<(String, String)>,
State(_state): State<crate::server::AppState>,
) -> impl IntoResponse {
println!("S3 DELETE Object: bucket={}, key={}", bucket, key);
let result = tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
let conn = FileTree::open_user_db(&bucket)?;
let mut tree = FileTree::load(&conn, &bucket, "untitled folder")?;
let node = find_node_by_s3_key(&tree, &key);
if node.is_none() {
return Err(anyhow::anyhow!("Object not found"));
}
let node = node.unwrap();
let file_uuid = node.file_uuid.clone().unwrap_or_default();
let file_path = get_real_file_path(&conn, &file_uuid);
if let Some(path) = file_path {
std::fs::remove_file(&path)?;
}
tree.delete_node(&conn, &node.node_id)?;
Ok(())
})
.await;
match result {
Ok(Ok(_)) => (StatusCode::NO_CONTENT, HeaderMap::new()).into_response(),
Ok(Err(e)) => {
println!("Delete error: {}", e);
if e.to_string().contains("Object not found") {
(StatusCode::NOT_FOUND, "Object not found").into_response()
} else {
(StatusCode::INTERNAL_SERVER_ERROR, "Delete error").into_response()
}
}
Err(e) => {
println!("Task error: {}", e);
(StatusCode::INTERNAL_SERVER_ERROR, "Task error").into_response()
}
}
}
fn build_s3_key(tree: &FileTree, node: &FileNode) -> String {
let mut path_parts = vec![];
let mut current_parent = node.parent_id.clone();
while let Some(parent_id) = current_parent {
let parent = tree.nodes.iter().find(|n| n.node_id == parent_id);
if let Some(p) = parent {
path_parts.push(p.label.clone());
current_parent = p.parent_id.clone();
} else {
break;
}
}
path_parts.reverse();
path_parts.push(node.label.clone());
path_parts.join("/")
}
fn find_node_by_s3_key(tree: &FileTree, key: &str) -> Option<FileNode> {
// 方法1通过完整路径匹配
let node_by_path = tree
.nodes
.iter()
.filter(|n| n.node_type == filetree::node::NodeType::File)
.find(|n| build_s3_key(tree, n) == key)
.cloned();
if node_by_path.is_some() {
return node_by_path;
}
// 方法2通过filename直接匹配fallback
let filename = key.split('/').last().unwrap_or(key);
tree.nodes
.iter()
.filter(|n| n.node_type == filetree::node::NodeType::File)
.find(|n| n.label == filename)
.cloned()
}
fn get_real_file_path(conn: &rusqlite::Connection, file_uuid: &str) -> Option<String> {
let mut stmt = conn
.prepare("SELECT location FROM file_locations WHERE file_uuid = ?1 LIMIT 1")
.ok()?;
stmt.query_row([file_uuid], |row| row.get(0)).ok()
}
fn count_buckets(db_dir: &str) -> usize {
if let Ok(dir) = std::fs::read_dir(db_dir) {
dir.flatten()
.filter(|e| e.file_name().to_str().unwrap_or("").ends_with(".sqlite"))
.count()
} else {
0
}
}
async fn handle_range_request(
real_path: String,
range: &str,
file_size: i64,
sha256: String,
) -> axum::response::Response<Body> {
let range_spec = parse_range_header(range, file_size);
if range_spec.is_none() {
println!("Invalid Range header: {}", range);
return (StatusCode::BAD_REQUEST, "Invalid Range header").into_response();
}
let (start, end) = range_spec.unwrap();
let content_length = end - start + 1;
println!(
"Range request: bytes {}-{}, content_length={}",
start, end, content_length
);
let mut file = match tokio::fs::File::open(&real_path).await {
Ok(f) => f,
Err(e) => {
println!("Error opening file for range: {}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to open file").into_response();
}
};
// Seek到start位置
use tokio::io::AsyncSeekExt;
if let Err(e) = file.seek(tokio::io::SeekFrom::Start(start)).await {
println!("Error seeking file: {}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to seek file").into_response();
}
// 使用take限制读取长度
let limited_file = file.take(content_length as u64);
let stream = ReaderStream::new(limited_file);
let body = Body::from_stream(stream);
let mut headers = HeaderMap::new();
headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
headers.insert(
"Content-Range",
format!("bytes {}-{}/{}", start, end, file_size)
.parse()
.unwrap(),
);
headers.insert("Content-Length", content_length.into());
headers.insert("ETag", format!("\"{}\"", sha256).parse().unwrap());
headers.insert("Accept-Ranges", "bytes".parse().unwrap());
(StatusCode::PARTIAL_CONTENT, headers, body).into_response()
}
fn parse_range_header(range: &str, file_size: i64) -> Option<(u64, u64)> {
let range_str = range.strip_prefix("bytes=")?;
if range_str.contains(',') {
return None;
}
let parts: Vec<&str> = range_str.split('-').collect();
if parts.len() != 2 {
return None;
}
let (start, end) = if parts[0].is_empty() {
// "bytes=-N"格式最后N字节
let suffix_length = parts[1].parse::<u64>().ok()?;
let start = if suffix_length > file_size as u64 {
0
} else {
file_size as u64 - suffix_length
};
(start, file_size as u64 - 1)
} else if parts[1].is_empty() {
// "bytes=N-"格式从N到结尾
let start = parts[0].parse::<u64>().ok()?;
(start, file_size as u64 - 1)
} else {
// "bytes=N-M"格式从N到M
let start = parts[0].parse::<u64>().ok()?;
let end = parts[1].parse::<u64>().ok()?;
(start, end)
};
if start > end || end >= file_size as u64 {
return None;
}
Some((start, end))
}