diff --git a/Cargo.lock b/Cargo.lock index 3ebb91b..25a336c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3055,6 +3055,7 @@ dependencies = [ "tokio-postgres", "tokio-util", "toml", + "tower-http 0.5.2", "tracing", "tracing-subscriber", "unrar", @@ -4633,7 +4634,7 @@ dependencies = [ "tokio", "tokio-native-tls", "tower", - "tower-http", + "tower-http 0.6.11", "tower-service", "url", "wasm-bindgen", @@ -6117,6 +6118,22 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" +dependencies = [ + "bitflags 2.11.1", + "bytes", + "http", + "http-body", + "http-body-util", + "pin-project-lite", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-http" version = "0.6.11" diff --git a/config/markbase.toml b/config/markbase.toml index 21aabd9..c9cccee 100644 --- a/config/markbase.toml +++ b/config/markbase.toml @@ -4,6 +4,8 @@ port = 11438 log_level = "info" auth_db_path = "data/auth.sqlite" users_db_dir = "data/users" +webdav_root = "/Users/accusys/momentry/var/sftpgo/data/demo" +upload_path = "/Users/accusys/momentry/var/sftpgo/data" [postgresql] host = "127.0.0.1" diff --git a/data/auth.sqlite b/data/auth.sqlite index ca8463d..298dc3e 100644 Binary files a/data/auth.sqlite and b/data/auth.sqlite differ diff --git a/data/users/demo.sqlite b/data/users/demo.sqlite index 14574e3..1cbdede 100644 Binary files a/data/users/demo.sqlite and b/data/users/demo.sqlite differ diff --git a/markbase-core/Cargo.toml b/markbase-core/Cargo.toml index 1435918..1f4c133 100644 --- a/markbase-core/Cargo.toml +++ b/markbase-core/Cargo.toml @@ -73,6 +73,7 @@ rusty-s3 = "0.10" # S3 API 签名(AWS Signature V4) ureq = "2.12" # 輕量同步 HTTP 客戶端 reqwest = { version = "0.12", optional = true } # Async HTTP client for AsyncS3Vfs rayon = "1.10" # Phase 4: 并行加密 +tower-http = { version = "0.5", features = ["cors"] } url = "2" # URL 解析(rusty-s3 依賴) xattr = "1.0" # Extended attributes support (AFP_AfpInfo, Time Machine) diff --git a/markbase-core/src/api/admin.rs b/markbase-core/src/api/admin.rs new file mode 100644 index 0000000..1516b5c --- /dev/null +++ b/markbase-core/src/api/admin.rs @@ -0,0 +1,324 @@ +use axum::{ + extract::{Path, State}, + http::HeaderMap, + http::StatusCode, + response::{Html, IntoResponse, Json}, +}; +use serde_json::json; + +use crate::server::AppState; + +// === Admin Auth Helper === + +fn verify_admin_or_401( + state: &AppState, + headers: &HeaderMap, +) -> Result<(), impl IntoResponse> { + let auth_header = headers + .get("Authorization") + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.strip_prefix("Bearer ")); + + match auth_header { + Some(token) if state.auth.verify_admin_token(token).is_some() => Ok(()), + _ => Err(( + StatusCode::UNAUTHORIZED, + Json(json!({"ok": false, "error": "Invalid admin token"})), + )), + } +} + +// === Admin Authentication Handlers === + +pub async fn admin_login_handler( + State(state): State, + Json(body): Json, +) -> impl IntoResponse { + match state.auth.admin_login(&body.username, &body.password) { + Some(response) => (StatusCode::OK, Json(response)).into_response(), + None => ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "Invalid admin credentials"})), + ) + .into_response(), + } +} + +pub async fn admin_verify_handler( + State(state): State, + headers: HeaderMap, +) -> impl IntoResponse { + let auth_header = headers + .get("Authorization") + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.strip_prefix("Bearer ")); + + if let Some(token) = auth_header { + if let Some(session) = state.auth.verify_admin_token(token) { + return ( + StatusCode::OK, + Json(json!({ + "ok": true, + "username": session.username, + "expires_at": session.expires_at + })), + ) + .into_response(); + } + } + + ( + StatusCode::UNAUTHORIZED, + Json(json!({"ok": false, "error": "Invalid admin token"})), + ) + .into_response() +} + +// === Admin Page Handlers === + +pub async fn admin_products_page( + State(state): State, + headers: HeaderMap, +) -> impl IntoResponse { + if let Err(resp) = verify_admin_or_401(&state, &headers) { + return resp.into_response(); + } + Html(include_str!("../product_manager.html")).into_response() +} + +pub async fn admin_files_page( + State(state): State, + headers: HeaderMap, +) -> impl IntoResponse { + if let Err(resp) = verify_admin_or_401(&state, &headers) { + return resp.into_response(); + } + Html(include_str!("../file_list.html")).into_response() +} + +pub async fn admin_upload_page( + State(state): State, + headers: HeaderMap, +) -> impl IntoResponse { + if let Err(resp) = verify_admin_or_401(&state, &headers) { + return resp.into_response(); + } + Html(include_str!("../upload.html")).into_response() +} + +// === Admin-Wrapped Product/File API Handlers === + +pub async fn admin_list_all_products( + State(state): State, + headers: HeaderMap, +) -> axum::response::Response { + if let Err(resp) = verify_admin_or_401(&state, &headers) { + return resp.into_response(); + } + crate::download::product_handlers::list_all_products(State(state)) + .await + .into_response() +} + +pub async fn admin_create_product( + State(state): State, + headers: HeaderMap, + Json(payload): Json, +) -> axum::response::Response { + if let Err(resp) = verify_admin_or_401(&state, &headers) { + return resp.into_response(); + } + crate::download::product_handlers::create_product_handler(State(state), Json(payload)) + .await + .into_response() +} + +pub async fn admin_get_series_stats( + State(state): State, + headers: HeaderMap, +) -> axum::response::Response { + if let Err(resp) = verify_admin_or_401(&state, &headers) { + return resp.into_response(); + } + crate::download::product_handlers::get_series_stats(State(state)) + .await + .into_response() +} + +pub async fn admin_get_product_files( + State(state): State, + headers: HeaderMap, + Path(product_id): Path, +) -> axum::response::Response { + if let Err(resp) = verify_admin_or_401(&state, &headers) { + return resp.into_response(); + } + crate::download::product_handlers::get_product_files(Path(product_id), State(state)) + .await + .into_response() +} + +pub async fn admin_delete_product( + State(state): State, + headers: HeaderMap, + Path(product_id): Path, +) -> axum::response::Response { + if let Err(resp) = verify_admin_or_401(&state, &headers) { + return resp.into_response(); + } + crate::download::product_handlers::delete_product(Path(product_id), State(state)) + .await + .into_response() +} + +pub async fn admin_assign_files( + State(state): State, + headers: HeaderMap, + Path(product_id): Path, + Json(payload): Json, +) -> axum::response::Response { + if let Err(resp) = verify_admin_or_401(&state, &headers) { + return resp.into_response(); + } + crate::download::product_handlers::assign_files_to_product( + Path(product_id), + State(state), + Json(payload), + ) + .await + .into_response() +} + +pub async fn admin_list_uploaded_files( + State(state): State, + headers: HeaderMap, + Path(user_id): Path, +) -> axum::response::Response { + if let Err(resp) = verify_admin_or_401(&state, &headers) { + return resp.into_response(); + } + crate::download::handlers::list_uploaded_files(Path(user_id)) + .await + .into_response() +} + +// === Sync Handlers === + +pub async fn manual_sync_handler( + State(state): State, + headers: HeaderMap, +) -> impl IntoResponse { + if let Err(resp) = verify_admin_or_401(&state, &headers) { + return resp.into_response(); + } + let syncer = crate::pg_client::SftpGoSync::new(&state.auth_db_path); + + match syncer { + Ok(syncer) => match syncer.full_sync().await { + Ok(result) => { + if result.status == "success" { + ( + StatusCode::OK, + Json(json!({ + "status": "success", + "users_synced": result.users_synced, + "groups_synced": result.groups_synced, + "mappings_synced": result.mappings_synced + })), + ) + .into_response() + } else if result.status == "partial_success" { + ( + StatusCode::OK, + Json(json!({ + "status": "partial_success", + "users_synced": result.users_synced, + "users_failed": result.users_failed, + "groups_synced": result.groups_synced, + "groups_failed": result.groups_failed, + "errors": result.errors + })), + ) + .into_response() + } else { + ( + StatusCode::OK, + Json(json!({ + "status": result.status, + "errors": result.errors + })), + ) + .into_response() + } + } + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ + "status": "failed", + "error": e.to_string() + })), + ) + .into_response(), + }, + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ + "status": "failed", + "error": e.to_string() + })), + ) + .into_response(), + } +} + +pub async fn sync_status_handler( + State(state): State, + headers: HeaderMap, +) -> impl IntoResponse { + if let Err(resp) = verify_admin_or_401(&state, &headers) { + return resp.into_response(); + } + let auth_db = crate::sync::AuthDb::new(&state.auth_db_path); + + match auth_db { + Ok(db) => match db.open() { + Ok(conn) => { + match conn.query_row( + "SELECT sync_type, sync_time, users_synced, users_failed, + groups_synced, groups_failed, mappings_synced, status + FROM sync_log ORDER BY sync_time DESC LIMIT 5", + [], + |row| { + Ok(json!({ + "sync_type": row.get::<_, String>(0)?, + "sync_time": row.get::<_, i64>(1)?, + "users_synced": row.get::<_, usize>(2)?, + "users_failed": row.get::<_, usize>(3)?, + "groups_synced": row.get::<_, usize>(4)?, + "groups_failed": row.get::<_, usize>(5)?, + "mappings_synced": row.get::<_, usize>(6)?, + "status": row.get::<_, String>(7)?, + })) + }, + ) { + Ok(entries) => (StatusCode::OK, Json(entries)).into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": e.to_string()})), + ) + .into_response(), + } + } + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": e.to_string()})), + ) + .into_response(), + }, + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": e.to_string()})), + ) + .into_response(), + } +} diff --git a/markbase-core/src/api/config.rs b/markbase-core/src/api/config.rs new file mode 100644 index 0000000..b4acd98 --- /dev/null +++ b/markbase-core/src/api/config.rs @@ -0,0 +1,208 @@ +use axum::{ + extract::Query, + http::StatusCode, + response::{IntoResponse, Json}, +}; + +#[derive(Debug, serde::Deserialize)] +pub struct EditConfigQuery { + pub key: String, + pub value: String, +} + +pub async fn get_config_handler() -> impl IntoResponse { + let config_path = std::path::Path::new("config/markbase.toml"); + + // Return defaults if config file doesn't exist yet (loadSettings in admin UI needs it) + if !config_path.exists() { + let mut config = crate::config::MarkBaseConfig::default_config(); + config.merge_env(); + return ( + StatusCode::OK, + Json(serde_json::to_value(&config).unwrap_or_default()), + ) + .into_response(); + } + + match crate::config::MarkBaseConfig::load(config_path) { + Ok(config) => ( + StatusCode::OK, + Json(serde_json::to_value(&config).unwrap_or_default()), + ) + .into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": e.to_string()})), + ) + .into_response(), + } +} + +pub async fn edit_config_handler(Query(params): Query) -> impl IntoResponse { + let config_path = std::path::Path::new("config/markbase.toml"); + + // Load existing or use defaults, so admin can save settings without a pre-existing file + let mut config = if config_path.exists() { + match crate::config::MarkBaseConfig::load(config_path) { + Ok(c) => c, + Err(e) => { + return (StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": e.to_string()}))).into_response(); + } + } + } else { + let mut defaults = crate::config::MarkBaseConfig::default_config(); + defaults.merge_env(); + defaults + }; + + let old_value = config.get(¶ms.key).unwrap_or_default(); + + match config.set(¶ms.key, ¶ms.value) { + Ok(_) => match config.validate() { + Ok(_) => match config.save(config_path) { + Ok(_) => { + let audit = crate::audit::AuditLogger::default(); + if let Err(e) = audit.log_config_change( + "markbase", + ¶ms.key, + &old_value, + ¶ms.value, + "system", + None, + ) { + log::warn!("Failed to write audit log: {}", e); + } + + (StatusCode::OK, Json(serde_json::json!({"ok": true}))).into_response() + } + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": e.to_string()})), + ) + .into_response(), + }, + Err(e) => ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": e.to_string()})), + ) + .into_response(), + }, + Err(e) => ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": e.to_string()})), + ) + .into_response(), + } +} + +pub async fn validate_config_handler() -> impl IntoResponse { + let config_path = std::path::Path::new("config/markbase.toml"); + + if !config_path.exists() { + return ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({"ok": false, "error": "Config file not found"})), + ) + .into_response(); + } + + match crate::config::MarkBaseConfig::load(config_path) { + Ok(config) => match config.validate() { + Ok(_) => (StatusCode::OK, Json(serde_json::json!({"ok": true}))).into_response(), + Err(e) => ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"ok": false, "error": e.to_string()})), + ) + .into_response(), + }, + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"ok": false, "error": e.to_string()})), + ) + .into_response(), + } +} + +pub async fn get_s3_config_handler() -> impl IntoResponse { + match crate::s3_config::S3Config::load_default() { + Ok(config) => ( + StatusCode::OK, + Json(serde_json::to_value(&config).unwrap_or_default()), + ) + .into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": e.to_string()})), + ) + .into_response(), + } +} + +pub async fn edit_s3_config_handler(Query(params): Query) -> impl IntoResponse { + match crate::s3_config::S3Config::load_default() { + Ok(mut config) => { + let old_value = config.get(¶ms.key).unwrap_or_default(); + + match config.set(¶ms.key, ¶ms.value) { + Ok(_) => match config.validate() { + Ok(_) => match config.save("config/s3.toml") { + Ok(_) => { + let audit = crate::audit::AuditLogger::default(); + if let Err(e) = audit.log_config_change( + "s3", + ¶ms.key, + &old_value, + ¶ms.value, + "system", + None, + ) { + log::warn!("Failed to write audit log: {}", e); + } + + (StatusCode::OK, Json(serde_json::json!({"ok": true}))).into_response() + } + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": e.to_string()})), + ) + .into_response(), + }, + Err(e) => ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": e.to_string()})), + ) + .into_response(), + }, + Err(e) => ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": e.to_string()})), + ) + .into_response(), + } + } + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": e.to_string()})), + ) + .into_response(), + } +} + +pub async fn validate_s3_config_handler() -> impl IntoResponse { + match crate::s3_config::S3Config::load_default() { + Ok(config) => match config.validate() { + Ok(_) => (StatusCode::OK, Json(serde_json::json!({"ok": true}))).into_response(), + Err(e) => ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"ok": false, "error": e.to_string()})), + ) + .into_response(), + }, + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"ok": false, "error": e.to_string()})), + ) + .into_response(), + } +} diff --git a/markbase-core/src/api/mod.rs b/markbase-core/src/api/mod.rs index 158db53..8f160d0 100644 --- a/markbase-core/src/api/mod.rs +++ b/markbase-core/src/api/mod.rs @@ -1,12 +1,3 @@ +pub mod admin; +pub mod config; pub mod handlers; - -// API Module - Future Modular Architecture -// -// This module provides the structure for modular API handlers. -// Current implementation remains in server.rs for stability. -// -// Benefits of this architecture: -// - Clear separation of concerns -// - Easier maintenance for new features -// - Gradual migration path from server.rs -// - Independent testing per handler module diff --git a/markbase-core/src/auth.rs b/markbase-core/src/auth.rs index 439677d..c5dd21d 100644 --- a/markbase-core/src/auth.rs +++ b/markbase-core/src/auth.rs @@ -158,53 +158,78 @@ impl AuthState { } pub fn admin_login(&self, username: &str, password: &str) -> Option { + // Try auth_db first (legacy PostgreSQL sync) if let Some(auth_db) = &self.auth_db { match auth_db.get_admin(username) { Ok(Some(admin)) if admin.status == 1 => { if verify(password, &admin.password_hash).unwrap_or(false) { - let token = Uuid::new_v4().to_string(); - let now = Utc::now(); - let expires_at = now + Duration::hours(24); - - let session = AdminSession { - token: token.clone(), - username: username.to_string(), - created_at: now.format("%Y-%m-%dT%H:%M:%SZ").to_string(), - expires_at: expires_at.format("%Y-%m-%dT%H:%M:%SZ").to_string(), - }; - - let mut admin_sessions = self.admin_sessions.lock().unwrap(); - admin_sessions.insert(token.clone(), session); - - log::info!("Admin {} logged in successfully", username); - - Some(AdminLoginResponse { - token, - expires_at: expires_at.format("%Y-%m-%dT%H:%M:%SZ").to_string(), - username: username.to_string(), - }) + return self.create_admin_session(username, password); } else { log::warn!("Invalid password for admin {}", username); - None + return None; } } Ok(Some(_)) => { log::warn!("Admin {} is not active", username); - None - } - Ok(None) => { - log::warn!("Admin {} not found", username); - None + return None; } + Ok(None) => {} Err(e) => { log::error!("Failed to get admin {}: {}", username, e); - None + return None; } } - } else { - log::warn!("Auth DB not available for admin login"); - None } + + // Fallback: try provider + if let Some(provider) = &self.provider { + match provider.get_user(username) { + Ok(Some(user)) if user.status == 1 => { + if verify(password, &user.password_hash).unwrap_or(false) { + return self.create_admin_session(username, password); + } else { + log::warn!("Invalid password for admin {} (provider)", username); + return None; + } + } + Ok(Some(_)) => { + log::warn!("Admin {} is not active (provider)", username); + return None; + } + Ok(None) => {} + Err(e) => { + log::error!("Failed to get admin {} from provider: {}", username, e); + return None; + } + } + } + + log::warn!("Admin {} not found (auth_db + provider)", username); + None + } + + fn create_admin_session(&self, username: &str, _password: &str) -> Option { + let token = Uuid::new_v4().to_string(); + let now = Utc::now(); + let expires_at = now + Duration::hours(24); + + let session = AdminSession { + token: token.clone(), + username: username.to_string(), + created_at: now.format("%Y-%m-%dT%H:%M:%SZ").to_string(), + expires_at: expires_at.format("%Y-%m-%dT%H:%M:%SZ").to_string(), + }; + + let mut admin_sessions = self.admin_sessions.lock().unwrap(); + admin_sessions.insert(token.clone(), session); + + log::info!("Admin {} logged in successfully", username); + + Some(AdminLoginResponse { + token, + expires_at: expires_at.format("%Y-%m-%dT%H:%M:%SZ").to_string(), + username: username.to_string(), + }) } pub fn verify_admin_token(&self, token: &str) -> Option { diff --git a/markbase-core/src/cli/interface/ssh.rs b/markbase-core/src/cli/interface/ssh.rs index 5dc54ae..a87c3cf 100644 --- a/markbase-core/src/cli/interface/ssh.rs +++ b/markbase-core/src/cli/interface/ssh.rs @@ -28,7 +28,7 @@ pub async fn handle_ssh_command(cmd: SshCommand) -> anyhow::Result<()> { println!("Security: ⭐⭐⭐⭐⭐ (RustCrypto authoritative libraries)"); println!(); - crate::ssh_server::server::run_ssh_server(Some(port), pg_conn.as_deref())?; + crate::ssh_server::server::run_ssh_server(Some(port), pg_conn.as_deref()).await?; } } Ok(()) diff --git a/markbase-core/src/config/web.rs b/markbase-core/src/config/web.rs index 0d05107..afc77e9 100644 --- a/markbase-core/src/config/web.rs +++ b/markbase-core/src/config/web.rs @@ -13,12 +13,30 @@ pub struct MarkBaseConfig { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ServerConfig { + #[serde(default = "default_host")] pub host: String, + #[serde(default = "default_port")] pub port: u16, + #[serde(default = "default_log_level")] pub log_level: String, + #[serde(default = "default_auth_db_path")] pub auth_db_path: String, + #[serde(default = "default_users_db_dir")] pub users_db_dir: String, + #[serde(default = "default_webdav_root")] pub webdav_root: String, + #[serde(default = "default_upload_path")] + pub upload_path: String, +} + +fn default_host() -> String { "127.0.0.1".to_string() } +fn default_port() -> u16 { 11438 } +fn default_log_level() -> String { "info".to_string() } +fn default_auth_db_path() -> String { "data/auth.sqlite".to_string() } +fn default_users_db_dir() -> String { "data/users".to_string() } +fn default_webdav_root() -> String { "/Users/accusys/momentry/var/sftpgo/data/demo".to_string() } +fn default_upload_path() -> String { + "/Users/accusys/momentry/var/sftpgo/data".to_string() } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -89,6 +107,7 @@ impl MarkBaseConfig { auth_db_path: "data/auth.sqlite".to_string(), users_db_dir: "data/users".to_string(), webdav_root: "/Users/accusys/momentry/var/sftpgo/data/demo".to_string(), + upload_path: "/Users/accusys/momentry/var/sftpgo/data".to_string(), }, postgresql: PostgreSQLConfig { host: "127.0.0.1".to_string(), @@ -143,6 +162,9 @@ impl MarkBaseConfig { if let Ok(webdav_root) = std::env::var("MB_WEBDAV_ROOT") { self.server.webdav_root = webdav_root; } + if let Ok(upload_path) = std::env::var("MB_WEBDAV_PARENT") { + self.server.upload_path = upload_path; + } if let Ok(pg_host) = std::env::var("PG_HOST") { self.postgresql.host = pg_host; @@ -182,6 +204,7 @@ impl MarkBaseConfig { "server.auth_db_path" => Some(self.server.auth_db_path.clone()), "server.users_db_dir" => Some(self.server.users_db_dir.clone()), "server.webdav_root" => Some(self.server.webdav_root.clone()), + "server.upload_path" => Some(self.server.upload_path.clone()), "postgresql.host" => Some(self.postgresql.host.clone()), "postgresql.port" => Some(self.postgresql.port.to_string()), @@ -228,6 +251,7 @@ impl MarkBaseConfig { "server.auth_db_path" => self.server.auth_db_path = value.to_string(), "server.users_db_dir" => self.server.users_db_dir = value.to_string(), "server.webdav_root" => self.server.webdav_root = value.to_string(), + "server.upload_path" => self.server.upload_path = value.to_string(), "postgresql.host" => self.postgresql.host = value.to_string(), "postgresql.port" => self.postgresql.port = value.parse()?, @@ -290,6 +314,10 @@ impl MarkBaseConfig { return Err(anyhow::anyhow!("server.users_db_dir cannot be empty")); } + if self.server.upload_path.is_empty() { + return Err(anyhow::anyhow!("server.upload_path cannot be empty")); + } + if self.postgresql.port == 0 { return Err(anyhow::anyhow!( "Invalid PostgreSQL port: {}", diff --git a/markbase-core/src/myfiles.rs b/markbase-core/src/myfiles.rs index d852408..b9fed11 100644 --- a/markbase-core/src/myfiles.rs +++ b/markbase-core/src/myfiles.rs @@ -1,11 +1,13 @@ use axum::{ + body::Body, extract::{Path, Query, State}, - http::StatusCode, - response::{Html, Json}, + http::{header, StatusCode, HeaderMap}, + response::{Html, IntoResponse, Json, Response}, }; use rusqlite::{params, Connection}; use serde::{Deserialize, Serialize}; use std::path::PathBuf; +use std::sync::OnceLock; use crate::server::AppState; @@ -26,18 +28,25 @@ CREATE INDEX IF NOT EXISTS idx_file_tags_tag ON file_tags(tag); CREATE INDEX IF NOT EXISTS idx_file_tags_filename ON file_tags(filename); "; +static MYFILES_UPLOAD_PATH: OnceLock = OnceLock::new(); + +pub fn init_upload_path(path: String) { + let _ = MYFILES_UPLOAD_PATH.set(path); +} + +fn upload_base_path() -> &'static str { + MYFILES_UPLOAD_PATH.get().map(|s| s.as_str()) + .unwrap_or("/Users/accusys/momentry/var/sftpgo/data") +} + fn user_db_path(state: &AppState, username: &str) -> PathBuf { - let root = std::env::var("MB_WEBDAV_PARENT") - .unwrap_or_else(|_| "/Users/accusys/momentry/var/sftpgo/data".to_string()); - PathBuf::from(root) + PathBuf::from(&state.upload_path) .join(username) .join("webdav_virtual.sqlite") } -fn user_root(username: &str) -> PathBuf { - let root = std::env::var("MB_WEBDAV_PARENT") - .unwrap_or_else(|_| "/Users/accusys/momentry/var/sftpgo/data".to_string()); - PathBuf::from(root).join(username) +fn user_root(base_path: &str, username: &str) -> PathBuf { + PathBuf::from(base_path).join(username) } fn ensure_schema(db_path: &PathBuf) -> anyhow::Result { @@ -159,12 +168,32 @@ pub async fn delete_folder( Ok(Json(serde_json::json!({"status": "ok", "deleted": folder}))) } +pub async fn delete_file( + State(state): State, + Path((username, filename)): Path<(String, String)>, +) -> Result, (StatusCode, String)> { + let root = user_root(&state.upload_path, &username); + let file_path = root.join(&filename); + let db_path = user_db_path(&state, &username); + + if tokio::fs::remove_file(&file_path).await.is_err() { + return Err((StatusCode::NOT_FOUND, "File not found".to_string())); + } + + // Remove tags associated with this file + if let Ok(conn) = ensure_schema(&db_path) { + let _ = conn.execute("DELETE FROM file_tags WHERE filename = ?1", params![filename]); + } + + Ok(Json(serde_json::json!({"status": "ok", "deleted": filename}))) +} + pub async fn list_files( State(state): State, Path(username): Path, Query(q): Query>, ) -> Result>, (StatusCode, String)> { - let root = user_root(&username); + let root = user_root(&state.upload_path, &username); let db_path = user_db_path(&state, &username); let conn = ensure_schema(&db_path).map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; @@ -296,6 +325,64 @@ pub async fn file_tags( Ok(Json(tags)) } +pub async fn preview_file( + Path((username, filename)): Path<(String, String)>, +) -> Response { + let root = user_root(upload_base_path(), &username); + let file_path = root.join(&filename); + + if !file_path.exists() || !file_path.is_file() { + return (StatusCode::NOT_FOUND, "File not found").into_response(); + } + + let ext = file_path + .extension() + .and_then(|e| e.to_str()) + .unwrap_or("") + .to_lowercase(); + + let mime = match ext.as_str() { + "png" => "image/png", + "jpg" | "jpeg" => "image/jpeg", + "gif" => "image/gif", + "webp" => "image/webp", + "svg" => "image/svg+xml", + "pdf" => "application/pdf", + "mp4" | "m4v" => "video/mp4", + "webm" => "video/webm", + "mov" => "video/quicktime", + "avi" => "video/x-msvideo", + "mkv" => "video/x-matroska", + "mp3" => "audio/mpeg", + "m4a" => "audio/mp4", + "wav" => "audio/wav", + "flac" => "audio/flac", + "ogg" => "audio/ogg", + "aac" => "audio/aac", + "txt" | "md" | "json" | "yaml" | "yml" | "toml" | "log" | "csv" | "xml" | "html" | "js" | "ts" | "rs" | "py" | "sh" => "text/plain; charset=utf-8", + _ => "application/octet-stream", + }; + + let is_text = mime.starts_with("text/"); + if is_text { + match tokio::fs::read_to_string(&file_path).await { + Ok(content) => { + let headers = [(header::CONTENT_TYPE, "text/plain; charset=utf-8")]; + (headers, content).into_response() + } + Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Failed to read file").into_response(), + } + } else { + match tokio::fs::read(&file_path).await { + Ok(data) => { + let headers = [(header::CONTENT_TYPE, mime)]; + (headers, Body::from(data)).into_response() + } + Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Failed to read file").into_response(), + } + } +} + pub async fn ui_page() -> Html { Html(MYFILES_HTML.to_string()) } @@ -345,6 +432,18 @@ body { font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif; b .modal label { font-size: 14px; color: #6e6e73; display: block; margin-bottom: 4px; } .modal input { width: 100%; padding: 8px 12px; border: 1px solid #d2d2d7; border-radius: 8px; font-size: 14px; margin-bottom: 12px; } .modal .actions { display: flex; gap: 8px; justify-content: flex-end; margin-top: 16px; } +.preview-modal { max-width: 90vw; max-height: 90vh; width: 800px; display: flex; flex-direction: column; padding: 0; overflow: hidden; } +.preview-header { display: flex; align-items: center; justify-content: space-between; padding: 16px 20px; border-bottom: 1px solid #d2d2d7; } +.preview-header h3 { font-size: 16px; margin: 0; } +.btn-sm { padding: 4px 12px; font-size: 13px; } +.preview-content { flex: 1; overflow: auto; padding: 20px; min-height: 200px; max-height: calc(90vh - 60px); } +.preview-loading { text-align: center; padding: 40px; color: #6e6e73; } +.preview-content img { max-width: 100%; height: auto; display: block; margin: 0 auto; } +.preview-content pre { background: #f5f5f7; padding: 16px; border-radius: 8px; overflow: auto; font-size: 13px; line-height: 1.5; max-height: 60vh; } +.preview-content iframe { width: 100%; height: 70vh; border: none; } +.preview-content .file-meta { font-size: 14px; color: #6e6e73; text-align: center; padding: 40px; } +.preview-content .file-meta a { color: #0071e3; text-decoration: none; } +.preview-content .file-meta a:hover { text-decoration: underline; } @@ -396,6 +495,17 @@ body { font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif; b + diff --git a/markbase-core/src/server.rs b/markbase-core/src/server.rs index 8292b97..26c1c74 100644 --- a/markbase-core/src/server.rs +++ b/markbase-core/src/server.rs @@ -11,7 +11,7 @@ use axum::{ use base64::Engine as _; use serde::Deserialize; use std::str::FromStr; -use std::sync::{Arc, LazyLock, Mutex}; +use std::sync::{Arc, LazyLock, Mutex, OnceLock}; use std::time::{Duration, Instant}; use dashmap::DashMap; @@ -24,6 +24,7 @@ use crate::auth::{AuthState, LoginRequest}; use crate::provider::sqlite::SqliteProvider; use crate::render; use filetree::{self, FileTree}; +use tower_http::cors::CorsLayer; #[derive(Clone)] pub struct AppState { @@ -35,6 +36,7 @@ pub struct AppState { pub auth: AuthState, pub auth_db_path: String, pub s3_keys: Arc>>, + pub upload_path: String, } pub async fn run(port: u16, file: Option) -> anyhow::Result<()> { @@ -56,6 +58,30 @@ pub async fn run(port: u16, file: Option) -> anyhow::Result<()> { let (out_devs, in_devs, cur_out, cur_in) = audio::audio_devices(); let html = audio::inject_audio_devices(&welcome, &out_devs, &in_devs, &cur_out, &cur_in); + // Load config for upload_path (env var override is handled by MarkBaseConfig::merge_env) + let config_path = std::path::Path::new("config/markbase.toml"); + let markbase_config = if config_path.exists() { + match crate::config::MarkBaseConfig::load(config_path) { + Ok(mut c) => { c.merge_env(); c } + Err(e) => { + log::warn!("Failed to load config/markbase.toml: {}. Using defaults.", e); + let mut defaults = crate::config::MarkBaseConfig::default_config(); + defaults.merge_env(); + defaults + } + } + } else { + let mut defaults = crate::config::MarkBaseConfig::default_config(); + defaults.merge_env(); + defaults + }; + // If MB_WEBDAV_PARENT env var is set, it overrides config via merge_env above + let upload_path = markbase_config.server.upload_path.clone(); + + // Initialize admin WebDAV upload path + MyFiles upload path (replaces env var reads) + let _ = UPLOAD_PATH.set(upload_path.clone()); + crate::myfiles::init_upload_path(upload_path.clone()); + let state = AppState { html: Arc::new(Mutex::new(html)), page_ver: Arc::new(Mutex::new(0)), @@ -69,6 +95,7 @@ pub async fn run(port: u16, file: Option) -> anyhow::Result<()> { .map_err(|e| anyhow::anyhow!("Failed to init SqliteProvider: {}", e))?, )), auth_db_path: "data/auth.sqlite".to_string(), + upload_path: upload_path.clone(), s3_keys: Arc::new(Mutex::new(load_s3_keys())), }; @@ -138,10 +165,7 @@ pub async fn run(port: u16, file: Option) -> anyhow::Result<()> { }); // ===== WebDAV multi-user configuration (Phase 20 + P1) ===== - let webdav_parent = std::path::PathBuf::from( - std::env::var("MB_WEBDAV_PARENT") - .unwrap_or_else(|_| "/Users/accusys/momentry/var/sftpgo/data".to_string()), - ); + let webdav_parent = std::path::PathBuf::from(&upload_path); // WebDAV versioning storage let version_storage = std::path::PathBuf::from("data/webdav_versions"); @@ -190,21 +214,21 @@ pub async fn run(port: u16, file: Option) -> anyhow::Result<()> { .route("/api/v2/auth/login", post(login_handler)) .route("/api/v2/auth/logout", post(logout_handler)) .route("/api/v2/auth/verify", get(verify_handler)) - .route("/api/v2/admin/sync", post(manual_sync_handler)) - .route("/api/v2/admin/sync/status", get(sync_status_handler)) + .route("/api/v2/admin/sync", post(crate::api::admin::manual_sync_handler)) + .route("/api/v2/admin/sync/status", get(crate::api::admin::sync_status_handler)) // Config API endpoints (public) - .route("/api/v2/config", get(get_config_handler)) - .route("/api/v2/config/edit", post(edit_config_handler)) - .route("/api/v2/config/validate", get(validate_config_handler)) - .route("/api/v2/config/s3", get(get_s3_config_handler)) - .route("/api/v2/config/s3/edit", post(edit_s3_config_handler)) - .route("/api/v2/config/s3/validate", get(validate_s3_config_handler)) + .route("/api/v2/config", get(crate::api::config::get_config_handler)) + .route("/api/v2/config/edit", post(crate::api::config::edit_config_handler)) + .route("/api/v2/config/validate", get(crate::api::config::validate_config_handler)) + .route("/api/v2/config/s3", get(crate::api::config::get_s3_config_handler)) + .route("/api/v2/config/s3/edit", post(crate::api::config::edit_s3_config_handler)) + .route("/api/v2/config/s3/validate", get(crate::api::config::validate_s3_config_handler)) // .route("/api/v2/config/sftp", get(get_sftp_config_handler)) // .route("/api/v2/config/sftp/edit", post(edit_sftp_config_handler)) // .route("/api/v2/config/sftp/validate", get(validate_sftp_config_handler)) // Admin authentication API endpoints (public) - .route("/api/v2/admin/login", post(admin_login_handler)) - .route("/api/v2/admin/verify", get(admin_verify_handler)) + .route("/api/v2/admin/login", post(crate::api::admin::admin_login_handler)) + .route("/api/v2/admin/verify", get(crate::api::admin::admin_verify_handler)) // Protected endpoints (require auth) .route("/api/v2/tree/:user_id", get(get_tree)) .route("/api/v2/tree/:user_id/search", get(search_tree)) @@ -286,6 +310,19 @@ pub async fn run(port: u16, file: Option) -> anyhow::Result<()> { .route("/files", get(|| async { Html(include_str!("file_list.html")) })) .route("/products", get(|| async { Html(include_str!("product_manager.html")) })) .route("/downloads", get(|| async { Html(include_str!("category_view.html")) })) + // Admin GUI pages (require admin auth) + .route("/admin/products", get(crate::api::admin::admin_products_page)) + .route("/admin/files", get(crate::api::admin::admin_files_page)) + .route("/tools/usb-ssd-test", get(|| async { Html(include_str!("usb_ssd_test.html")) })) + .route("/upload", get(|| async { Html(include_str!("upload.html")) })) + // Product management API (admin auth required) + .route("/api/v2/products", get(crate::api::admin::admin_list_all_products)) + .route("/api/v2/products/create", post(crate::api::admin::admin_create_product)) + .route("/api/v2/products/stats", get(crate::api::admin::admin_get_series_stats)) + .route("/api/v2/products/:product_id/files", get(crate::api::admin::admin_get_product_files)) + .route("/api/v2/products/:product_id", delete(crate::api::admin::admin_delete_product)) + .route("/api/v2/products/:product_id/assign-files", post(crate::api::admin::admin_assign_files)) + .route("/api/v2/files/:user_id", get(crate::api::admin::admin_list_uploaded_files)) // WebDAV API endpoints (Phase 20, multi-user P1) .route("/webdav", any(handle_webdav_multi)) .route("/webdav/", any(handle_webdav_multi)) @@ -299,14 +336,17 @@ pub async fn run(port: u16, file: Option) -> anyhow::Result<()> { .route("/api/v2/myfiles/:username/folders", get(crate::myfiles::list_folders).post(crate::myfiles::create_folder)) .route("/api/v2/myfiles/:username/folders/:folder_name", delete(crate::myfiles::delete_folder)) .route("/api/v2/myfiles/:username/files", get(crate::myfiles::list_files)) + .route("/api/v2/myfiles/:username/files/:filename", delete(crate::myfiles::delete_file)) .route("/api/v2/myfiles/:username/tags", post(crate::myfiles::add_tag).delete(crate::myfiles::remove_tag)) .route("/api/v2/myfiles/:username/files/:filename/tags", get(crate::myfiles::file_tags)) + .route("/api/v2/myfiles/:username/preview/:filename", get(crate::myfiles::preview_file)) .layer(Extension(webdav_parent)) .layer(Extension(upload_hook)) .layer(Extension(webdav_versioning)) .layer(Extension(use_s3)) .layer(Extension(s3_cfg)) .layer(DefaultBodyLimit::disable()) + .layer(CorsLayer::permissive()) .with_state(state); let addr = format!("0.0.0.0:{port}"); @@ -1381,14 +1421,14 @@ async fn upload_file( } async fn upload_unlimited( - State(_state): State, + State(state): State, Path(user_id): Path, mut multipart: axum_extra::extract::Multipart, ) -> impl IntoResponse { use sha2::{Digest, Sha256}; use tokio::io::AsyncWriteExt; - let base_dir = "/Users/accusys/Downloads"; + let base_dir = &state.upload_path; let user_dir = format!("{}/{}", base_dir, user_id); let mut filename = String::new(); @@ -1953,123 +1993,6 @@ fn verify_auth(state: &AppState, headers: &HeaderMap) -> Result) -> impl IntoResponse { - let syncer = crate::pg_client::SftpGoSync::new(&state.auth_db_path); - - match syncer { - Ok(syncer) => match syncer.full_sync().await { - Ok(result) => { - if result.status == "success" { - ( - StatusCode::OK, - Json(serde_json::json!({ - "status": "success", - "users_synced": result.users_synced, - "groups_synced": result.groups_synced, - "mappings_synced": result.mappings_synced - })), - ) - .into_response() - } else if result.status == "partial_success" { - ( - StatusCode::OK, - Json(serde_json::json!({ - "status": "partial_success", - "users_synced": result.users_synced, - "users_failed": result.users_failed, - "groups_synced": result.groups_synced, - "groups_failed": result.groups_failed, - "errors": result.errors - })), - ) - .into_response() - } else { - ( - StatusCode::OK, - Json(serde_json::json!({ - "status": result.status, - "errors": result.errors - })), - ) - .into_response() - } - } - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ - "status": "failed", - "error": e.to_string() - })), - ) - .into_response(), - }, - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ - "status": "failed", - "error": e.to_string() - })), - ) - .into_response(), - } -} - -async fn sync_status_handler(State(state): State) -> impl IntoResponse { - let auth_db = crate::sync::AuthDb::new(&state.auth_db_path); - - match auth_db { - Ok(db) => match db.open() { - Ok(conn) => { - match conn.query_row( - "SELECT sync_type, sync_time, users_synced, users_failed, - groups_synced, groups_failed, mappings_synced, status - FROM sync_log ORDER BY sync_time DESC LIMIT 5", - [], - |row| { - Ok(serde_json::json!({ - "sync_type": row.get::<_, String>(0)?, - "sync_time": row.get::<_, i64>(1)?, - "users_synced": row.get::<_, usize>(2)?, - "users_failed": row.get::<_, usize>(3)?, - "groups_synced": row.get::<_, usize>(4)?, - "groups_failed": row.get::<_, usize>(5)?, - "mappings_synced": row.get::<_, usize>(6)?, - "status": row.get::<_, String>(7)?, - })) - }, - ) { - Ok(log) => ( - StatusCode::OK, - Json(serde_json::json!({ - "status": "ok", - "latest_sync": log - })), - ) - .into_response(), - Err(_) => ( - StatusCode::OK, - Json(serde_json::json!({ - "status": "ok", - "message": "No sync logs found" - })), - ) - .into_response(), - } - } - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"error": e.to_string()})), - ) - .into_response(), - }, - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"error": e.to_string()})), - ) - .into_response(), - } -} - fn html_escape(s: &str) -> String { s.replace('&', "&") .replace('<', "<") @@ -2077,210 +2000,6 @@ fn html_escape(s: &str) -> String { .replace('"', """) } -#[derive(Debug, serde::Deserialize)] -struct EditConfigQuery { - key: String, - value: String, -} - -async fn get_config_handler() -> impl IntoResponse { - let config_path = std::path::Path::new("config/markbase.toml"); - - if !config_path.exists() { - return ( - StatusCode::NOT_FOUND, - Json(serde_json::json!({"error": "Config file not found"})), - ) - .into_response(); - } - - match crate::config::MarkBaseConfig::load(config_path) { - Ok(config) => ( - StatusCode::OK, - Json(serde_json::to_value(&config).unwrap_or_default()), - ) - .into_response(), - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"error": e.to_string()})), - ) - .into_response(), - } -} - -async fn edit_config_handler(Query(params): Query) -> impl IntoResponse { - let config_path = std::path::Path::new("config/markbase.toml"); - - if !config_path.exists() { - return ( - StatusCode::NOT_FOUND, - Json(serde_json::json!({"error": "Config file not found"})), - ) - .into_response(); - } - - match crate::config::MarkBaseConfig::load(config_path) { - Ok(mut config) => { - let old_value = config.get(¶ms.key).unwrap_or_default(); - - match config.set(¶ms.key, ¶ms.value) { - Ok(_) => match config.validate() { - Ok(_) => match config.save(config_path) { - Ok(_) => { - // Log audit entry - let audit = crate::audit::AuditLogger::default(); - if let Err(e) = audit.log_config_change( - "markbase", - ¶ms.key, - &old_value, - ¶ms.value, - "system", - None, - ) { - log::warn!("Failed to write audit log: {}", e); - } - - (StatusCode::OK, Json(serde_json::json!({"ok": true}))).into_response() - } - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"error": e.to_string()})), - ) - .into_response(), - }, - Err(e) => ( - StatusCode::BAD_REQUEST, - Json(serde_json::json!({"error": e.to_string()})), - ) - .into_response(), - }, - Err(e) => ( - StatusCode::BAD_REQUEST, - Json(serde_json::json!({"error": e.to_string()})), - ) - .into_response(), - } - } - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"error": e.to_string()})), - ) - .into_response(), - } -} - -async fn validate_config_handler() -> impl IntoResponse { - let config_path = std::path::Path::new("config/markbase.toml"); - - if !config_path.exists() { - return ( - StatusCode::NOT_FOUND, - Json(serde_json::json!({"ok": false, "error": "Config file not found"})), - ) - .into_response(); - } - - match crate::config::MarkBaseConfig::load(config_path) { - Ok(config) => match config.validate() { - Ok(_) => (StatusCode::OK, Json(serde_json::json!({"ok": true}))).into_response(), - Err(e) => ( - StatusCode::BAD_REQUEST, - Json(serde_json::json!({"ok": false, "error": e.to_string()})), - ) - .into_response(), - }, - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"ok": false, "error": e.to_string()})), - ) - .into_response(), - } -} - -async fn get_s3_config_handler() -> impl IntoResponse { - match crate::s3_config::S3Config::load_default() { - Ok(config) => ( - StatusCode::OK, - Json(serde_json::to_value(&config).unwrap_or_default()), - ) - .into_response(), - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"error": e.to_string()})), - ) - .into_response(), - } -} - -async fn edit_s3_config_handler(Query(params): Query) -> impl IntoResponse { - match crate::s3_config::S3Config::load_default() { - Ok(mut config) => { - let old_value = config.get(¶ms.key).unwrap_or_default(); - - match config.set(¶ms.key, ¶ms.value) { - Ok(_) => match config.validate() { - Ok(_) => match config.save("config/s3.toml") { - Ok(_) => { - // Log audit entry - let audit = crate::audit::AuditLogger::default(); - if let Err(e) = audit.log_config_change( - "s3", - ¶ms.key, - &old_value, - ¶ms.value, - "system", - None, - ) { - log::warn!("Failed to write audit log: {}", e); - } - - (StatusCode::OK, Json(serde_json::json!({"ok": true}))).into_response() - } - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"error": e.to_string()})), - ) - .into_response(), - }, - Err(e) => ( - StatusCode::BAD_REQUEST, - Json(serde_json::json!({"error": e.to_string()})), - ) - .into_response(), - }, - Err(e) => ( - StatusCode::BAD_REQUEST, - Json(serde_json::json!({"error": e.to_string()})), - ) - .into_response(), - } - } - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"error": e.to_string()})), - ) - .into_response(), - } -} - -async fn validate_s3_config_handler() -> impl IntoResponse { - match crate::s3_config::S3Config::load_default() { - Ok(config) => match config.validate() { - Ok(_) => (StatusCode::OK, Json(serde_json::json!({"ok": true}))).into_response(), - Err(e) => ( - StatusCode::BAD_REQUEST, - Json(serde_json::json!({"ok": false, "error": e.to_string()})), - ) - .into_response(), - }, - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"ok": false, "error": e.to_string()})), - ) - .into_response(), - } -} - // async fn get_sftp_config_handler() -> impl IntoResponse { // match crate::sftp::SftpConfig::load_default() { // Ok(config) => ( @@ -2330,50 +2049,6 @@ async fn validate_s3_config_handler() -> impl IntoResponse { // } // } -async fn admin_login_handler( - State(state): State, - Json(body): Json, -) -> impl IntoResponse { - match state.auth.admin_login(&body.username, &body.password) { - Some(response) => (StatusCode::OK, Json(response)).into_response(), - None => ( - StatusCode::UNAUTHORIZED, - Json(serde_json::json!({"error": "Invalid admin credentials"})), - ) - .into_response(), - } -} - -async fn admin_verify_handler( - State(state): State, - headers: axum::http::HeaderMap, -) -> impl IntoResponse { - let auth_header = headers - .get("Authorization") - .and_then(|v| v.to_str().ok()) - .and_then(|v| v.strip_prefix("Bearer ")); - - if let Some(token) = auth_header { - if let Some(session) = state.auth.verify_admin_token(token) { - return ( - StatusCode::OK, - Json(serde_json::json!({ - "ok": true, - "username": session.username, - "expires_at": session.expires_at - })), - ) - .into_response(); - } - } - - ( - StatusCode::UNAUTHORIZED, - Json(serde_json::json!({"ok": false, "error": "Invalid admin token"})), - ) - .into_response() -} - async fn shell_status_handler() -> Json { // TODO: 使用新的ssh_server模块 // let config = crate::sftp::config::SftpConfig::load_default().unwrap_or_default(); @@ -2636,9 +2311,11 @@ fn unauthorized_response() -> axum::response::Response { ).into_response() } +static UPLOAD_PATH: OnceLock = OnceLock::new(); + static ADMIN_WEBDAV_HANDLER: LazyLock> = LazyLock::new(|| { - let parent = std::env::var("MB_WEBDAV_PARENT") - .unwrap_or_else(|_| "/Users/accusys/momentry/var/sftpgo/data".to_string()); + let parent = UPLOAD_PATH.get().cloned() + .unwrap_or_else(|| "/Users/accusys/momentry/var/sftpgo/data".to_string()); let parent_path = std::path::PathBuf::from(&parent); if !parent_path.exists() { return None; diff --git a/markbase-core/src/ssh_server/cipher.rs b/markbase-core/src/ssh_server/cipher.rs index d38ebb3..1444e2b 100644 --- a/markbase-core/src/ssh_server/cipher.rs +++ b/markbase-core/src/ssh_server/cipher.rs @@ -20,6 +20,7 @@ use hmac::{Hmac, Mac}; use log::info; use sha2::Sha256; use std::io::Write; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; type Aes128Ctr = Ctr128BE; // AES-128-CTR(16字节密钥) type HmacSha256 = Hmac; @@ -1167,6 +1168,187 @@ impl EncryptedPacket { pub fn take_payload(&mut self) -> Vec { std::mem::take(&mut self.payload) } + + // ── Async I/O (tokio) ───────────────────────────────────────────── + + /// Async write encrypted packet (tokio::io::AsyncWriteExt) + pub async fn write_async(&self, stream: &mut W) -> Result<()> { + if self.payload.len() > 4 && self.payload[0..4] == self.packet_length.to_be_bytes() { + stream.write_all(&self.payload).await?; + } else { + stream.write_all(&self.payload).await?; + stream.write_all(&self.mac).await?; + } + Ok(()) + } + + /// Async read encrypted packet (tokio::io::AsyncReadExt) + pub async fn read_async( + stream: &mut R, + encryption_ctx: &mut EncryptionContext, + is_client_to_server: bool, + ) -> Result { + if encryption_ctx.cipher_mode == CipherMode::AesGcm { + let mut packet_length_bytes = [0u8; 4]; + stream.read_exact(&mut packet_length_bytes).await?; + let packet_length = u32::from_be_bytes(packet_length_bytes); + if packet_length > 35000 { + return Err(anyhow!("Invalid packet_length: {}", packet_length)); + } + let ciphertext_length = packet_length as usize + 16; + let mut ciphertext = vec![0u8; ciphertext_length]; + stream.read_exact(&mut ciphertext).await?; + + let sequence_number = if is_client_to_server { + encryption_ctx.sequence_number_ctos + } else { + encryption_ctx.sequence_number_stoc + }; + let iv_bytes = if is_client_to_server { + &encryption_ctx.iv_ctos + } else { + &encryption_ctx.iv_stoc + }; + let mut nonce_bytes = [0u8; 12]; + nonce_bytes.copy_from_slice(&iv_bytes[..12]); + let mut carry = sequence_number; + for i in (8..12).rev() { + let sum = nonce_bytes[i] as u16 + (carry & 0xFF) as u16; + nonce_bytes[i] = (sum & 0xFF) as u8; + carry = (carry >> 8) + ((sum >> 8) as u32); + } + if carry > 0 { + for i in (4..8).rev() { + let sum = nonce_bytes[i] as u16 + (carry & 0xFF) as u16; + nonce_bytes[i] = (sum & 0xFF) as u8; + carry = (carry >> 8) + ((sum >> 8) as u32); + if carry == 0 { break; } + } + } + let key_bytes = if is_client_to_server { + &encryption_ctx.encryption_key_ctos + } else { + &encryption_ctx.encryption_key_stoc + }; + let cipher = Aes256GcmAead::new_from_slice(&key_bytes[..32]) + .map_err(|e| anyhow!("AES-GCM key init failed: {}", e))?; + let nonce = Nonce::from_slice(&nonce_bytes); + let plaintext_payload_buffer = cipher.decrypt(nonce, Payload { + msg: ciphertext.as_slice(), + aad: &packet_length_bytes, + }).map_err(|e| anyhow!("AES-GCM decrypt failed: {}", e))?; + + let padding_length = plaintext_payload_buffer[0]; + let payload_len = packet_length as usize - padding_length as usize - 1; + let compressed_payload = plaintext_payload_buffer[1..1 + payload_len].to_vec(); + let payload = if is_client_to_server { + if encryption_ctx.compression_ctos.is_enabled() { + encryption_ctx.compression_ctos.decompress(&compressed_payload)? + } else { compressed_payload } + } else { compressed_payload }; + let mac = ciphertext[ciphertext.len() - 16..].to_vec(); + if is_client_to_server { + encryption_ctx.sequence_number_ctos += 1; + } else { + encryption_ctx.sequence_number_stoc += 1; + } + return Ok(Self { packet_length, padding_length, payload, padding: Vec::new(), mac }); + } else if encryption_ctx.cipher_mode == CipherMode::ChaChaPoly { + let mut packet_length_bytes = [0u8; 4]; + stream.read_exact(&mut packet_length_bytes).await?; + let packet_length = u32::from_be_bytes(packet_length_bytes); + if packet_length > 35000 { + return Err(anyhow!("Invalid packet_length: {}", packet_length)); + } + let ciphertext_length = packet_length as usize + 16; + let mut ciphertext = vec![0u8; ciphertext_length]; + stream.read_exact(&mut ciphertext).await?; + + let sequence_number = if is_client_to_server { + encryption_ctx.sequence_number_ctos + } else { + encryption_ctx.sequence_number_stoc + }; + let iv_bytes = if is_client_to_server { + &encryption_ctx.iv_ctos + } else { + &encryption_ctx.iv_stoc + }; + let nonce_bytes: [u8; 12] = { + let mut n = [0u8; 12]; + n[0..4].copy_from_slice(&sequence_number.to_be_bytes()); + n[4..12].copy_from_slice(&iv_bytes[..8]); + n + }; + let key_bytes = if is_client_to_server { + &encryption_ctx.encryption_key_ctos + } else { + &encryption_ctx.encryption_key_stoc + }; + let cipher_cha = ChaCha20Poly1305::new(ChaKey::from_slice(&key_bytes[..32])); + let nonce = ChaNonce::from_slice(&nonce_bytes); + let plaintext_payload_buffer = cipher_cha.decrypt(nonce, ChaPayload { + msg: ciphertext.as_slice(), + aad: &packet_length_bytes, + }).map_err(|e| anyhow!("ChaCha20Poly1305 decrypt failed: {}", e))?; + + let padding_length = plaintext_payload_buffer[0]; + let payload_len = packet_length as usize - padding_length as usize - 1; + let payload = plaintext_payload_buffer[1..1 + payload_len].to_vec(); + let mac = ciphertext[ciphertext.len() - 16..].to_vec(); + if is_client_to_server { + encryption_ctx.sequence_number_ctos += 1; + } else { + encryption_ctx.sequence_number_stoc += 1; + } + return Ok(Self { packet_length, padding_length, payload, padding: Vec::new(), mac }); + } else { + let mut first_block_encrypted = [0u8; 16]; + stream.read_exact(&mut first_block_encrypted).await?; + let cipher = if is_client_to_server { + encryption_ctx.cipher_ctos.as_mut() + .ok_or_else(|| anyhow!("cipher_ctos not initialized"))? + } else { + encryption_ctx.cipher_stoc.as_mut() + .ok_or_else(|| anyhow!("cipher_stoc not initialized"))? + }; + let mut first_block_decrypted = first_block_encrypted; + cipher.apply_keystream(&mut first_block_decrypted); + let packet_length = u32::from_be_bytes([first_block_decrypted[0], first_block_decrypted[1], first_block_decrypted[2], first_block_decrypted[3]]); + let padding_length = first_block_decrypted[4]; + if packet_length > 35000 { + return Err(anyhow!("Invalid packet_length: {}", packet_length)); + } + let total_encrypted_size = packet_length as usize + 4; + let remaining_size = total_encrypted_size.saturating_sub(16); + let mut remaining_encrypted = vec![0u8; remaining_size]; + if remaining_size > 0 { + stream.read_exact(&mut remaining_encrypted).await?; + } + cipher.apply_keystream(&mut remaining_encrypted); + let payload_len = packet_length as usize - padding_length as usize - 1; + let part1_len = std::cmp::min(payload_len, 11); + let part1 = &first_block_decrypted[5..5 + part1_len]; + let part2 = &remaining_encrypted[..payload_len.saturating_sub(part1_len)]; + let mut payload = Vec::with_capacity(payload_len); + payload.extend_from_slice(part1); + payload.extend_from_slice(part2); + let payload = if is_client_to_server { + if encryption_ctx.compression_ctos.is_enabled() { + encryption_ctx.compression_ctos.decompress(&payload)? + } else { payload } + } else { payload }; + let padding = remaining_encrypted[payload_len.saturating_sub(part1_len)..].to_vec(); + let mut mac = vec![0u8; 32]; + stream.read_exact(&mut mac).await?; + if is_client_to_server { + encryption_ctx.sequence_number_ctos += 1; + } else { + encryption_ctx.sequence_number_stoc += 1; + } + return Ok(Self { packet_length, padding_length, payload, padding, mac }); + } + } } #[cfg(test)] diff --git a/markbase-core/src/ssh_server/packet.rs b/markbase-core/src/ssh_server/packet.rs index 9ea636e..f6437db 100644 --- a/markbase-core/src/ssh_server/packet.rs +++ b/markbase-core/src/ssh_server/packet.rs @@ -4,6 +4,7 @@ use anyhow::{anyhow, Result}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use std::io::{Read, Write}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; /// SSH Packet类型(参考OpenSSH SSH_MSG_*定义) #[derive(Debug, Clone, Copy, PartialEq)] @@ -160,6 +161,39 @@ impl SshPacket { }) } + /// Async write (tokio) + pub async fn write_async(&self, stream: &mut W) -> Result<()> { + stream.write_all(&self.packet_length.to_be_bytes()).await?; + stream.write_all(&[self.padding_length]).await?; + stream.write_all(&self.payload).await?; + stream.write_all(&self.padding).await?; + stream.flush().await?; + Ok(()) + } + + /// Async read (tokio) + pub async fn read_async(stream: &mut R) -> Result { + let mut len_buf = [0u8; 4]; + stream.read_exact(&mut len_buf).await?; + let packet_length = u32::from_be_bytes(len_buf); + if packet_length > 256 * 1024 { + return Err(anyhow!("Packet too large: {}", packet_length)); + } + let mut pad_buf = [0u8; 1]; + stream.read_exact(&mut pad_buf).await?; + let padding_length = pad_buf[0]; + let payload_len = packet_length.saturating_sub(padding_length as u32 + 1); + let mut payload = vec![0u8; payload_len as usize]; + if !payload.is_empty() { + stream.read_exact(&mut payload).await?; + } + let mut padding = vec![0u8; padding_length as usize]; + if !padding.is_empty() { + stream.read_exact(&mut padding).await?; + } + Ok(Self { packet_length, padding_length, payload, padding }) + } + /// 获取payload中的packet type pub fn get_type(&self) -> Result { if self.payload.is_empty() { diff --git a/markbase-core/src/ssh_server/server.rs b/markbase-core/src/ssh_server/server.rs index 1dfa741..8e9e973 100644 --- a/markbase-core/src/ssh_server/server.rs +++ b/markbase-core/src/ssh_server/server.rs @@ -17,10 +17,10 @@ use crate::ssh_server::version::VersionExchange; use anyhow::{anyhow, Result}; use log::{error, info, warn}; use std::io::{Read, Write}; -use std::net::{TcpListener, TcpStream}; +use std::net::TcpStream; use std::path::PathBuf; use std::sync::{Arc, Mutex}; -use std::thread; +use tokio::net::TcpListener; pub struct SshServerConfig { pub port: u16, @@ -71,11 +71,11 @@ impl SshServer { } } - pub fn run(&self) -> Result<()> { + pub async fn run(&self) -> Result<()> { let bind_addr = format!("{}:{}", self.config.bind_address, self.config.port); - let listener = TcpListener::bind(&bind_addr)?; + let listener = TcpListener::bind(&bind_addr).await?; - info!("MarkBaseSSH server listening on {}", bind_addr); + info!("MarkBaseSSH server listening on {} (async tokio)", bind_addr); info!("Implementation: Complete SSH/SFTP + Port Forwarding (Phase 1-13)"); info!( "Security config: GatewayPorts={}, PermitOpen={:?}, MaxSessions={}", @@ -88,23 +88,30 @@ impl SshServer { let pg_conn = self.config.pg_conn.clone(); let upload_hook_config = self.config.upload_hook_config.clone(); - for stream in listener.incoming() { - match stream { - Ok(stream) => { - let client_addr = stream.peer_addr()?; - info!("New SSH connection from {}", client_addr); + loop { + match listener.accept().await { + Ok((stream, addr)) => { + info!("New SSH connection from {}", addr); let security_config_clone = security_config.clone(); let pg_conn_clone = pg_conn.clone(); let upload_hook_config_clone = upload_hook_config.clone(); - thread::spawn(move || { - if let Err(e) = handle_connection_complete( - stream, - security_config_clone, - pg_conn_clone, - upload_hook_config_clone, - ) + // ⭐⭐⭐⭐⭐ Convert tokio TcpStream to std TcpStream for blocking handler + // Set blocking explicitly since into_std() may preserve non-blocking mode + let std_stream = stream.into_std()?; + std_stream.set_nonblocking(false)?; + + tokio::spawn(async move { + // Run the existing sync connection handler in a blocking thread + if let Err(e) = tokio::task::spawn_blocking(move || { + handle_connection_complete( + std_stream, + security_config_clone, + pg_conn_clone, + upload_hook_config_clone, + ) + }).await.unwrap_or(Err(anyhow!("Task join error"))) { error!("SSH connection error: {}", e); } @@ -115,8 +122,6 @@ impl SshServer { } } } - - Ok(()) } } @@ -787,7 +792,7 @@ fn extract_username_from_auth_request( } /// SSH服务器CLI入口 -pub fn run_ssh_server(port: Option, pg_conn: Option<&str>) -> Result<()> { +pub async fn run_ssh_server(port: Option, pg_conn: Option<&str>) -> Result<()> { let config = SshServerConfig { port: port.unwrap_or(2024), bind_address: "0.0.0.0".to_string(), // ⭐⭐⭐⭐⭐ Phase 8.3: Allow Docker container access @@ -797,5 +802,5 @@ pub fn run_ssh_server(port: Option, pg_conn: Option<&str>) -> Result<()> { }; let server = SshServer::new(config); - server.run() + server.run().await } diff --git a/markbase-core/src/upload.html b/markbase-core/src/upload.html index 9ee1fbe..86f2781 100644 --- a/markbase-core/src/upload.html +++ b/markbase-core/src/upload.html @@ -2,268 +2,145 @@ + File Upload -
-

📁 File Upload Service

- -
-
- - -
- -
- -
- - -
-
- -
- - -

- Upload entire folder with subdirectories -

-
- - - - +
+

Upload

+

Upload files to user storage directory

+ +
+
+ + +
+ +
+ +
+ + +
+
+ +
+
+
+
- +
+ +
+ + + +
+
+
+
+
+
+
+ +function toggleMode() { + const mode = document.querySelector('input[name="mode"]:checked').value; + document.getElementById('single-group').style.display = mode === 'file' ? 'block' : 'none'; + document.getElementById('folder-group').style.display = mode === 'folder' ? 'block' : 'none'; +} + +async function uploadFiles() { + const uid = document.getElementById('user_id').value.trim(); + if (!uid) return showError('Enter a user ID'); + + const mode = document.querySelector('input[name="mode"]:checked').value; + const files = mode === 'folder' + ? document.getElementById('folder').files + : document.getElementById('single_file').files; + + if (!files || files.length === 0) return showError('Select a file or folder'); + + const btn = document.getElementById('upload-btn'); + btn.disabled = true; + + const progress = document.getElementById('progress'); + const fill = document.getElementById('progress-fill'); + const ptext = document.getElementById('progress-text'); + const result = document.getElementById('result'); + progress.style.display = 'block'; + result.style.display = 'none'; + + let uploaded = 0; + const total = files.length; + + for (let i = 0; i < total; i++) { + const f = files[i]; + const fd = new FormData(); + fd.append('file', f); + ptext.textContent = `Uploading ${f.name} (${i+1}/${total})`; + + try { + const res = await fetch(`/api/v2/upload-unlimited/${uid}`, { method: 'POST', body: fd }); + if (!res.ok) { showError(`${f.name}: HTTP ${res.status}`); btn.disabled = false; return; } + const data = await res.json(); + if (!data.ok) { showError(`${f.name}: ${data.error || 'unknown'}`); btn.disabled = false; return; } + uploaded++; + const pct = Math.round(uploaded / total * 100); + fill.style.width = pct + '%'; + ptext.textContent = `${pct}% (${uploaded}/${total})`; + } catch(e) { + showError(`${f.name}: ${e.message}`); + btn.disabled = false; + return; + } + } + + showSuccess(`Uploaded ${uploaded} file${uploaded > 1 ? 's' : ''}`); + btn.disabled = false; +} + +function showSuccess(m) { showResult(m, 'success'); } +function showError(m) { showResult(m, 'error'); } +function showResult(m, t) { + const r = document.getElementById('result'); + r.className = 'result ' + t; + r.textContent = m; + r.style.display = 'block'; +} + \ No newline at end of file