use anyhow::Context; use axum::{ extract::{Path, Query, State}, http::{HeaderMap, StatusCode}, response::{Html, IntoResponse, Json}, routing::{delete, get, patch, post, put}, Router, extract::DefaultBodyLimit, }; use serde::Deserialize; use std::str::FromStr; use std::sync::{Arc, Mutex}; use crate::audio; use crate::auth::{AuthState, LoginRequest}; use crate::render; use crate::download; use crate::archive::{self, ArchiveFormat, ArchiveProcessor, FormatDetector, ArchiveConfig, ProcessorRegistry}; use filetree::{self, FileTree}; #[derive(Clone)] pub struct AppState { pub html: Arc>, pub page_ver: Arc>, pub step_info: Arc>, pub labels: Arc>>, pub db_dir: String, pub auth: AuthState, pub auth_db_path: String, pub s3_keys: Arc>>, } pub async fn run(port: u16, file: Option) -> anyhow::Result<()> { let welcome = if let Some(f) = &file { let md = std::fs::read_to_string(f).unwrap_or_default(); let body = render::md_to_html(&md); render::page(f, &body) } else { render::page( "MarkBase", r#"
📺

MarkBase

Momentry Display Engine

"#, ) }; let (out_devs, in_devs, cur_out, cur_in) = audio::audio_devices(); let html = audio::inject_audio_devices(&welcome, &out_devs, &in_devs, &cur_out, &cur_in); let state = AppState { html: Arc::new(Mutex::new(html)), page_ver: Arc::new(Mutex::new(0)), step_info: Arc::new(Mutex::new(serde_json::json!({ "step": 0, "total": 0, "id": "", "label": "", "voice": "off" }))), labels: Arc::new(Mutex::new(vec![])), db_dir: "data/users".to_string(), auth: AuthState::with_sync("data/auth.sqlite"), auth_db_path: "data/auth.sqlite".to_string(), s3_keys: Arc::new(Mutex::new(load_s3_keys())), }; // Load S3 keys from file fn load_s3_keys() -> Vec { if let Ok(content) = std::fs::read_to_string("data/s3_keys.json") { serde_json::from_str(&content).unwrap_or_default() } else { vec![ crate::s3::S3AccessKey { access_key: "markbase_access_key_001".to_string(), secret_key: "markbase_secret_key_xyz123".to_string(), user_id: "warren".to_string(), permissions: vec!["GetObject".to_string(), "ListBucket".to_string()], created_at: "2026-05-27T00:00:00Z".to_string(), }, crate::s3::S3AccessKey { access_key: "markbase_access_key_002".to_string(), secret_key: "markbase_secret_key_abc789".to_string(), user_id: "demo".to_string(), permissions: vec!["GetObject".to_string(), "ListBucket".to_string()], created_at: "2026-05-27T00:00:00Z".to_string(), }, ] } } // Initial sync from SFTPGo PostgreSQL let syncer = crate::pg_client::SftpGoSync::new("data/auth.sqlite")?; tokio::spawn(async move { match syncer.full_sync().await { Ok(result) => { log::info!( "Initial sync completed: users={}, groups={}, mappings={}, status={}", result.users_synced, result.groups_synced, result.mappings_synced, result.status ); } Err(e) => { log::error!("Initial sync failed: {}", e); } } }); // Periodic sync task (every hour) let syncer_clone = crate::pg_client::SftpGoSync::new("data/auth.sqlite")?; tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(3600)); loop { interval.tick().await; match syncer_clone.full_sync().await { Ok(result) => { log::info!( "Hourly sync: users={}, groups={}, status={}", result.users_synced, result.groups_synced, result.status ); } Err(e) => { log::error!("Hourly sync failed, keeping cached data: {}", e); } } } }); let app = Router::new() .route("/", get(root_handler)) .route("/display", post(display_handler)) .route( "/command", get(crate::command::get_commands).post(crate::command::post_command), ) .route("/version", get(version_handler)) .route("/devices", get(devices_handler)) .route("/volume", get(volume_handler)) .route("/body", get(body_handler)) .route("/status", get(status_handler)) .route("/labels", get(get_labels).post(post_labels)) // Auth endpoints (public) .route("/api/v2/auth/login", post(login_handler)) .route("/api/v2/auth/logout", post(logout_handler)) .route("/api/v2/auth/verify", get(verify_handler)) .route("/api/v2/admin/sync", post(manual_sync_handler)) .route("/api/v2/admin/sync/status", get(sync_status_handler)) // Config API endpoints (public) .route("/api/v2/config", get(get_config_handler)) .route("/api/v2/config/edit", post(edit_config_handler)) .route("/api/v2/config/validate", get(validate_config_handler)) .route("/api/v2/config/s3", get(get_s3_config_handler)) .route("/api/v2/config/s3/edit", post(edit_s3_config_handler)) .route("/api/v2/config/s3/validate", get(validate_s3_config_handler)) // .route("/api/v2/config/sftp", get(get_sftp_config_handler)) // .route("/api/v2/config/sftp/edit", post(edit_sftp_config_handler)) // .route("/api/v2/config/sftp/validate", get(validate_sftp_config_handler)) // Admin authentication API endpoints (public) .route("/api/v2/admin/login", post(admin_login_handler)) .route("/api/v2/admin/verify", get(admin_verify_handler)) // Protected endpoints (require auth) .route("/api/v2/tree/:user_id", get(get_tree)) .route("/api/v2/tree/:user_id/search", get(search_tree)) .route("/api/v2/tree/:user_id/node", post(create_node)) .route( "/api/v2/tree/:user_id/node/:node_id", put(update_node).delete(delete_node), ) .route("/api/v2/tree/:user_id", delete(delete_all_nodes)) .route("/api/v2/tree/:user_id/restore", post(restore_tree)) .route("/api/v2/dupes/:user_id", get(get_dupes)) .route("/api/v2/unregister/:file_uuid", post(unregister_file)) .route("/api/v2/upload/:user_id", post(upload_file)) .route("/api/v2/render/:file_uuid", get(render_file)) .route("/api/v2/render/:file_uuid/body", get(render_file_body)) .route("/api/v2/tree/:user_id/node/:node_id/move", put(move_node)) .route( "/api/v2/tree/:user_id/node/:node_id/alias", patch(update_alias), ) .route("/api/v2/modes", get(get_modes)) .route("/api/v2/files/:user_id/:file_uuid/info", get(get_file_info)) .route("/api/v2/files/:user_id/:file_uuid/probe", get(get_file_probe)) .route("/api/v2/files/:user_id/:file_uuid/stream", get(stream_file)) .route( "/api/v2/files/:file_uuid/locations", post(add_file_location), ) // S3 API endpoints (AWS Signature V4 auth required) .route("/api/v2/s3/status", get(crate::s3::s3_status)) .route("/api/v2/s3/generate-key", post(crate::s3::generate_s3_key)) .route("/s3", get(crate::s3::list_buckets)) .route("/s3/:bucket", get(crate::s3::list_objects)) .route("/s3/:bucket/*key", get(crate::s3::get_object) .head(crate::s3::head_object) .put(crate::s3::put_object) .delete(crate::s3::delete_object) ) // Shell and Metrics API endpoints (public for monitoring) .route("/api/v2/shell/status", get(shell_status_handler)) .route("/api/v2/metrics", get(metrics_handler)) .route("/api/v2/audit", get(audit_handler)) // Upload API endpoints (unlimited file upload) // Product API endpoints .route("/api/v2/products", get(crate::download::list_all_products)) .route("/api/v2/products/stats", get(crate::download::get_series_stats)) .route("/api/v2/products/create", post(crate::download::create_product_handler)) .route("/api/v2/products/by-series/:series", get(crate::download::list_products_by_series)) .route("/api/v2/products/:product_id", delete(crate::download::delete_product)) .route("/api/v2/products/:product_id/files", get(crate::download::get_product_files)) .route("/api/v2/products/:product_id/assign-files", post(crate::download::assign_files_to_product)) .route("/api/v2/download/:file_id", get(crate::download::download_file)) .route("/api/v2/download/:user_id/*file_path", get(crate::download::download_file_by_path)) .route("/api/v2/download/products/:product_series/*file_path", get(crate::download::download_product_file)) .route("/api/v2/download/stats", get(crate::download::get_download_stats)) .route("/api/v2/files/:user_id", get(crate::download::list_uploaded_files)) .route("/api/v2/files/:user_id/:filename", get(crate::download::get_file_info)) .route("/api/v2/upload-unlimited/:user_id", post(upload_unlimited)) // Category View API endpoints (Phase 1: 双视图管理) .route("/api/v2/categories", get(get_all_categories_handler)) .route("/api/v2/categories/:category_name", get(get_category_detail_handler)) .route("/api/v2/series", get(get_all_series_handler)) .route("/api/v2/series/:series_name", get(get_series_detail_handler)) .route("/api/v2/files/search", get(search_files_handler)) .route("/api/v2/health", get(health_handler)) .route("/upload", get(|| async { Html(include_str!("upload.html")) })) .route("/files", get(|| async { Html(include_str!("file_list.html")) })) .route("/products", get(|| async { Html(include_str!("product_manager.html")) })) .layer(DefaultBodyLimit::disable()) // Disable body size limit for large file uploads .with_state(state); let addr = format!("127.0.0.1:{port}"); println!("📺 MarkBase ready on http://{addr}/"); std::process::Command::new("open") .arg(format!("http://{addr}/")) .spawn() .ok(); let listener = tokio::net::TcpListener::bind(&addr).await?; axum::serve(listener, app).await?; Ok(()) } async fn root_handler(State(state): State) -> Html { Html(state.html.lock().unwrap().clone()) } async fn version_handler(State(state): State) -> Json { let v = *state.page_ver.lock().unwrap(); Json(serde_json::json!({"v": v})) } async fn health_handler() -> impl IntoResponse { ( StatusCode::OK, Json(serde_json::json!({ "status": "healthy", "service": "MarkBase", "version": "2.3", "upload_api": "unlimited", "sftpgo_status": "stopped", "data_dir": "/Users/accusys/momentry/var/sftpgo/data", "timestamp": chrono::Utc::now().to_rfc3339() })), ) } async fn get_labels(State(state): State) -> Json { let labels = state.labels.lock().unwrap().clone(); Json(serde_json::json!(labels)) } async fn post_labels( State(state): State, Json(body): Json>, ) -> impl IntoResponse { *state.labels.lock().unwrap() = body; (StatusCode::OK, Json(serde_json::json!({"ok": true}))) } async fn status_handler(State(state): State) -> Json { let info = state.step_info.lock().unwrap().clone(); Json( serde_json::json!({"paused": false, "step": info["step"], "total": info["total"], "id": info["id"], "label": info["label"], "voice": info["voice"]}), ) } async fn devices_handler() -> Json { let (out, inp, co, ci) = audio::audio_devices(); Json(serde_json::json!({"output": out, "input": inp, "current_out": co, "current_in": ci})) } async fn volume_handler() -> Json { if let Ok(r) = std::process::Command::new("osascript") .args(["-e", "output volume of (get volume settings)"]) .output() { let s = String::from_utf8_lossy(&r.stdout).trim().to_string(); if let Ok(v) = s.parse::() { return Json(serde_json::json!({"level": v})); } } Json(serde_json::json!({"level": 50})) } async fn body_handler(State(state): State) -> impl IntoResponse { let html = state.html.lock().unwrap().clone(); if let Some(s) = html.find("
") { if let Some(e) = html[s..].find("
") { let body = &html[s + 19..s + e]; return (StatusCode::OK, Html(body.to_string())).into_response(); } } (StatusCode::OK, Html("".to_string())).into_response() } async fn delete_all_nodes( State(state): State, Path(user_id): Path, ) -> impl IntoResponse { let _ = &state.db_dir; let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db(&user_id)?; let tree = FileTree::load(&conn, &user_id, "untitled folder")?; let count = tree.nodes.len(); let node_ids: Vec = tree.nodes.iter().map(|n| n.node_id.clone()).collect(); for nid in node_ids { conn.execute( "DELETE FROM file_nodes WHERE node_id = ?1", rusqlite::params![nid], )?; } Ok(serde_json::json!({"ok": true, "deleted": count})) }) .await; match result { Ok(Ok(data)) => (StatusCode::OK, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn restore_tree( State(state): State, Path(user_id): Path, ) -> impl IntoResponse { let _ = &state.db_dir; let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db(&user_id)?; let tree = FileTree::load(&conn, &user_id, "untitled folder")?; let count = tree.nodes.len(); for n in &tree.nodes { conn.execute("DELETE FROM file_nodes WHERE node_id = ?1", rusqlite::params![n.node_id])?; } // Fetch files from 3002 API using environment variables let api_key = std::env::var("RESTORE_API_KEY") .unwrap_or_else(|_| "".to_string()); let api_url = std::env::var("RESTORE_API_URL") .unwrap_or_else(|_| "http://localhost:3002/api/v1/files".to_string()); let api_output = std::process::Command::new("curl") .args(["-s", &format!("{}?page=1&page_size=100", api_url)]) .args(["-H", &format!("X-API-Key: {}", api_key)]) .output() .map(|o| String::from_utf8_lossy(&o.stdout).to_string()) .unwrap_or_default(); let resp: serde_json::Value = serde_json::from_str(&api_output) .map_err(|e| anyhow::anyhow!("3002 API error: {}", e))?; let files = resp["data"].as_array() .ok_or_else(|| anyhow::anyhow!("No files data"))?; let now = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(); let home_id = uuid::Uuid::new_v4().to_string(); let mut folder_ids: std::collections::HashMap = std::collections::HashMap::new(); let folders: Vec<(&str, &str, &str)> = vec![ ("Home", "🏠", ""), ("Movies", "🎬", &home_id), ("Marketing", "📢", &home_id), ("Cartoons", "🎨", &home_id), ("Other", "📁", &home_id), ]; for (name, icon, parent) in &folders { let nid = if *name == "Home" { home_id.clone() } else { uuid::Uuid::new_v4().to_string() }; let pid: Option<&str> = if parent.is_empty() { None } else { Some(parent) }; folder_ids.insert(name.to_string(), nid.clone()); conn.execute( "INSERT INTO file_nodes (node_id, label, aliases_json, node_type, icon, parent_id, created_at, updated_at) VALUES (?1, ?2, '{}', 'folder', ?3, ?4, ?5, ?6)", rusqlite::params![nid, name, icon, pid, now, now], )?; } let mut imported = 0; for f in files { let name = f["file_name"].as_str().unwrap_or("unknown"); let name_lower = name.to_lowercase(); let fuuid = f["file_uuid"].as_str().unwrap_or(""); let category = if name_lower.ends_with(".jpg") || name_lower.ends_with(".png") || name_lower.ends_with(".jpeg") || name_lower.ends_with(".gif") { "Other" } else if name_lower.contains("charade") || name_lower.contains("film") || name_lower.contains("clip") || name_lower.contains("movie") || name_lower.contains("comedy") || name_lower.contains("filmriot") { "Movies" } else if name_lower.contains("exasan") || name_lower.contains("gamma") || name_lower.contains("thunderbolt") || name_lower.contains("nab") || name_lower.contains("koba") || name_lower.contains("webinar") || name_lower.contains("top colorist") || name_lower.contains("accusys") || name_lower.contains("a12t3") { "Marketing" } else if name_lower.contains("cartoon") || name_lower.contains("alice") || name_lower.contains("felix") || name_lower.contains("disney") || name_lower.contains("steamboat") || name_lower.contains("animal") { "Cartoons" } else { "Other" }; let parent_id = folder_ids.get(category).cloned().unwrap_or(home_id.clone()); let nid = uuid::Uuid::new_v4().to_string(); conn.execute( "INSERT INTO file_nodes (node_id, label, aliases_json, file_uuid, node_type, parent_id, created_at, updated_at) VALUES (?1, ?2, '{}', ?3, 'file', ?4, ?5, ?6)", rusqlite::params![nid, name, fuuid, parent_id, now, now], )?; let demo_path = format!("/Users/accusys/momentry/var/sftpgo/data/demo/{}", name); conn.execute( "INSERT OR IGNORE INTO file_locations (file_uuid, location, label) VALUES (?1, ?2, 'origin')", rusqlite::params![fuuid, demo_path], )?; imported += 1; } Ok(serde_json::json!({"ok": true, "deleted": count, "imported": imported})) }) .await; match result { Ok(Ok(data)) => (StatusCode::OK, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } #[derive(Deserialize)] struct DisplayReq { #[serde(rename = "type")] content_type: String, data: Option, file: Option, url: Option, html: Option, step_id: Option, step_num: Option, step_total: Option, } async fn display_handler( State(state): State, Json(req): Json, ) -> impl IntoResponse { let body = match req.content_type.as_str() { "md" | "markdown" => { let content = match (&req.file, &req.data) { (Some(f), _) => std::fs::read_to_string(f).unwrap_or_default(), (_, Some(d)) => d.clone(), _ => { return ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"ok":false})), ) } }; render::md_to_html(&content) } "json" => { let data = req.data.as_deref().unwrap_or(""); let formatted = serde_json::from_str::(data) .map(|v| serde_json::to_string_pretty(&v).unwrap_or_default()) .unwrap_or_else(|_| data.to_string()); format!("
{}
", html_escape(&formatted)) } "url" => format!( "", html_escape(req.url.as_deref().unwrap_or("")) ), "video" => format!( "", html_escape(req.url.as_deref().unwrap_or("")) ), "image" => format!( "", html_escape(req.url.as_deref().unwrap_or("")) ), "html" => req.html.clone().unwrap_or_default(), _ => { return ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"ok":false})), ) } }; let (out_devs, in_devs, cur_out, cur_in) = audio::audio_devices(); let new_html = audio::inject_audio_devices( &render::page("Display", &body), &out_devs, &in_devs, &cur_out, &cur_in, ); *state.html.lock().unwrap() = new_html; *state.page_ver.lock().unwrap() += 1; if let (Some(id), Some(num), Some(total)) = (&req.step_id, req.step_num, req.step_total) { let mut info = state.step_info.lock().unwrap(); *info = serde_json::json!({"step": num, "total": total, "id": id, "label": "", "voice": "off"}); } (StatusCode::OK, Json(serde_json::json!({"ok": true}))) } async fn search_tree( State(state): State, _headers: HeaderMap, Path(user_id): Path, Query(query): Query, ) -> impl IntoResponse { let _ = &state.db_dir; let mode = query["mode"].as_str().unwrap_or("tree").to_string(); let search_query = query["q"].as_str().unwrap_or("").to_string(); if search_query.is_empty() { return ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "missing search query"})), ) .into_response(); } let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db(&user_id)?; let search_pattern = format!("%{}%", search_query.to_lowercase()); let mut stmt = conn.prepare( "SELECT node_id, label, aliases_json, file_uuid, sha256, parent_id, children_json, node_type, icon, color, bg_color, file_size, registered_at, created_at, updated_at, sort_order FROM file_nodes WHERE LOWER(label) LIKE ?1 OR LOWER(aliases_json) LIKE ?1 OR LOWER(file_uuid) LIKE ?1 OR LOWER(sha256) LIKE ?1 ORDER BY sort_order ASC, created_at ASC", )?; let nodes: Vec = stmt .query_map([&search_pattern], |row| { let children_json: String = row.get(6)?; let children: Vec = serde_json::from_str(&children_json).unwrap_or_default(); use std::str::FromStr; Ok(filetree::node::FileNode { node_id: row.get(0)?, label: row.get(1)?, aliases: filetree::node::Aliases::from_json(&row.get::<_, String>(2)?), file_uuid: row.get(3)?, sha256: row.get(4)?, parent_id: row.get(5)?, children, node_type: filetree::node::NodeType::from_str(&row.get::<_, String>(7)?) .unwrap_or(filetree::node::NodeType::Folder), icon: row.get(8)?, color: row.get(9)?, bg_color: row.get(10)?, file_size: row.get(11)?, registered_at: row.get(12)?, created_at: row.get(13)?, updated_at: row.get(14)?, sort_order: row.get(15)?, }) })? .filter_map(|r| r.ok()) .collect(); let tree = filetree::FileTree { user_id: user_id.clone(), tree_type: "untitled folder".to_string(), nodes: vec![], }; let data = filetree::mode::get_mode(&mode) .map(|m| m.render(&tree)) .unwrap_or_else(|| serde_json::json!({"nodes": [], "error": "unknown mode"})); Ok(data) }) .await; match result { Ok(Ok(data)) => (StatusCode::OK, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn get_tree( State(state): State, _headers: HeaderMap, Path(user_id): Path, Query(query): Query, ) -> impl IntoResponse { // Tree API is public - no authentication required // All authentication checks commented out to preserve Settings authentication let _ = &state.db_dir; let mode = query["mode"].as_str().unwrap_or("tree").to_string(); let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db(&user_id)?; let tree = FileTree::load(&conn, &user_id, "untitled folder")?; let data = filetree::mode::get_mode(&mode) .map(|m| m.render(&tree)) .unwrap_or_else(|| serde_json::json!({"nodes": [], "error": "unknown mode"})); Ok(data) }) .await; match result { Ok(Ok(data)) => (StatusCode::OK, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn get_modes() -> impl IntoResponse { let modes: Vec = filetree::mode::list_modes() .iter() .map(|m| { serde_json::json!({ "name": m.name(), "sort_options": m.sort_options(), "filter_options": m.filter_options(), }) }) .collect(); (StatusCode::OK, Json(serde_json::json!({"modes": modes}))) } async fn create_node( State(state): State, Path(user_id): Path, Json(body): Json, ) -> impl IntoResponse { let _db_dir = state.db_dir.clone(); let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db(&user_id)?; let mut tree = FileTree::load(&conn, &user_id, "untitled folder")?; let label = body["label"].as_str().unwrap_or("Untitled"); let parent_id = body["parent_id"].as_str().map(|s| s.to_string()); let node_type = body["node_type"] .as_str() .map(|s| { filetree::node::NodeType::from_str(s).unwrap_or(filetree::node::NodeType::Folder) }) .unwrap_or(filetree::node::NodeType::Folder); let node = match node_type { filetree::node::NodeType::Folder => FileTree::new_folder(label, parent_id), filetree::node::NodeType::File => { let default_uuid = uuid::Uuid::new_v4().to_string().replace('-', ""); let raw_uuid = body["file_uuid"].as_str().unwrap_or(&default_uuid); let file_uuid = if raw_uuid.len() >= 32 { &raw_uuid[..32] } else { raw_uuid }; let (file_node, register_sql) = FileTree::new_file_node( label, file_uuid, body["sha256"].as_str(), body["original_name"].as_str().unwrap_or(label), body["file_size"].as_i64(), body["file_type"].as_str(), body["registered_at"].as_str(), parent_id, ); if let Some(sql) = register_sql { conn.execute_batch(&sql)?; } file_node } _ => FileTree::new_folder(label, parent_id), }; let node_id = node.node_id.clone(); tree.insert_node(&conn, &node)?; Ok(serde_json::json!({ "ok": true, "node_id": node_id, })) }) .await; match result { Ok(Ok(data)) => (StatusCode::CREATED, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn update_node( State(state): State, Path((user_id, node_id)): Path<(String, String)>, Json(body): Json, ) -> impl IntoResponse { let _ = &state.db_dir; let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db(&user_id)?; let mut tree = FileTree::load(&conn, &user_id, "untitled folder")?; let existing = tree .nodes .iter() .find(|n| n.node_id == node_id) .cloned() .with_context(|| format!("Node {} not found", node_id))?; let now = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(); let updated = filetree::node::FileNode { label: body["label"] .as_str() .unwrap_or(&existing.label) .to_string(), icon: body["icon"].as_str().map(|s| s.to_string()), color: body["color"].as_str().map(|s| s.to_string()), bg_color: body["bg_color"].as_str().map(|s| s.to_string()), updated_at: now, ..existing }; tree.update_node(&conn, &node_id, &updated)?; Ok(serde_json::json!({"ok": true})) }) .await; match result { Ok(Ok(data)) => (StatusCode::OK, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn delete_node( State(state): State, Path((user_id, node_id)): Path<(String, String)>, ) -> impl IntoResponse { let _ = &state.db_dir; let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db(&user_id)?; let mut tree = FileTree::load(&conn, &user_id, "untitled folder")?; tree.delete_node(&conn, &node_id)?; Ok(serde_json::json!({"ok": true})) }) .await; match result { Ok(Ok(data)) => (StatusCode::OK, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } #[derive(Deserialize)] struct MoveNodeReq { parent_id: Option, } async fn move_node( State(state): State, Path((user_id, node_id)): Path<(String, String)>, Json(req): Json, ) -> impl IntoResponse { let _ = &state.db_dir; let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db(&user_id)?; let mut tree = FileTree::load(&conn, &user_id, "untitled folder")?; tree.move_node(&conn, &node_id, req.parent_id)?; Ok(serde_json::json!({"ok": true})) }) .await; match result { Ok(Ok(data)) => (StatusCode::OK, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } #[derive(Deserialize)] struct AliasReq { lang: String, value: String, } async fn update_alias( State(state): State, Path((user_id, node_id)): Path<(String, String)>, Json(req): Json, ) -> impl IntoResponse { let _ = &state.db_dir; let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db(&user_id)?; let mut tree = FileTree::load(&conn, &user_id, "untitled folder")?; tree.update_node_alias(&conn, &node_id, &req.lang, &req.value)?; Ok(serde_json::json!({"ok": true})) }) .await; match result { Ok(Ok(data)) => (StatusCode::OK, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } /// Extract archive file and register extracted files to database /// Returns (extracted_count, extracted_bytes, extraction_path) fn extract_and_register_archive( archive_path: &std::path::Path, user_id: &str, original_filename: &str, ) -> anyhow::Result<(u64, u64, String)> { use std::path::PathBuf; use sha2::{Sha256, Digest}; // Initialize archive system let config = ArchiveConfig::default(); let mut registry = ProcessorRegistry::new(config); registry.initialize()?; // Detect format let detector = FormatDetector::new(); let format = detector.detect(archive_path)?; eprintln!("[archive] Detected format: {} for file: {}", format, archive_path.display()); // Get processor let processor = registry.get_processor_mut(archive_path)?; // Create extraction directory let base_name = original_filename .rsplit_once('.') .map(|(name, _)| name) .unwrap_or(original_filename); let extraction_dir = archive_path.parent() .unwrap_or(std::path::Path::new(".")) .join(format!("{}_extracted", base_name)); std::fs::create_dir_all(&extraction_dir)?; // Open and extract let metadata = processor.open(archive_path)?; eprintln!("[archive] Archive metadata: {} files, {} bytes", metadata.total_files, metadata.total_size); let result = processor.extract_all(&extraction_dir)?; eprintln!("[archive] Extracted {} files ({} bytes)", result.success_files, result.total_bytes); // Register extracted files to database let conn = FileTree::init_user_db(user_id)?; let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs() as i64; // Get MAC address for UUID generation let mac_output = std::process::Command::new("ifconfig") .arg("en0") .output() .map(|o| String::from_utf8_lossy(&o.stdout).to_string()) .unwrap_or_default(); let mac = mac_output .lines() .find(|l| l.contains("ether")) .and_then(|l| l.split_whitespace().nth(1)) .unwrap_or("00:00:00:00:00:00"); let mut registered_count = 0u64; // Recursively scan extracted directory fn scan_directory( dir: &std::path::Path, conn: &rusqlite::Connection, user_id: &str, mac: &str, now: i64, ) -> anyhow::Result { let mut count = 0u64; for entry in std::fs::read_dir(dir)? { let entry = entry?; let path = entry.path(); if path.is_dir() { count += scan_directory(&path, conn, user_id, mac, now)?; } else if path.is_file() { // Calculate SHA256 let file_data = std::fs::read(&path)?; let file_hash = format!("{:x}", Sha256::digest(&file_data)); let file_size = file_data.len() as i64; let filename = path.file_name() .and_then(|n| n.to_str()) .unwrap_or("unknown") .to_string(); let file_path_str = path.to_str() .unwrap_or("unknown") .to_string(); // Generate file UUID let mtime = std::fs::metadata(&path) .ok() .and_then(|m| m.modified().ok()) .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) .map(|d| d.as_millis() as u64) .unwrap_or(0); let input = format!("{}|{}|{}|{}", file_path_str, filename, mac, mtime); let hash = Sha256::digest(input.as_bytes()); let hex = format!("{:x}", hash); let file_uuid = hex[0..32].to_string(); // Register file (no sha256 in file_registry) conn.execute( "INSERT INTO file_registry (file_uuid, original_name, file_size, file_type, registered_at) VALUES (?1, ?2, ?3, ?4, ?5)", rusqlite::params![&file_uuid, &filename, file_size, "", now], )?; // Add file location conn.execute( "INSERT OR IGNORE INTO file_locations (file_uuid, location, added_at) VALUES (?1, ?2, ?3)", rusqlite::params![&file_uuid, &file_path_str, now], )?; // Add file node let uuid_str = uuid::Uuid::new_v4().to_string().replace('-', ""); let node_id = format!("node-{}", &uuid_str[0..8]); conn.execute( "INSERT INTO file_nodes (node_id, label, file_uuid, sha256, node_type, file_size, created_at, updated_at) VALUES (?1, ?2, ?3, ?4, 'file', ?5, ?6, ?7)", rusqlite::params![&node_id, &filename, &file_uuid, &file_hash, file_size, now, now], )?; count += 1; } } Ok(count) } registered_count = scan_directory(&extraction_dir, &conn, user_id, mac, now)?; eprintln!("[archive] Registered {} extracted files to database", registered_count); Ok((result.success_files, result.total_bytes, extraction_dir.to_str().unwrap_or("unknown").to_string())) } async fn upload_file( State(_state): State, Path(user_id): Path, mut multipart: axum_extra::extract::Multipart, ) -> impl IntoResponse { use sha2::{Digest, Sha256}; use tokio::io::AsyncWriteExt; const MAX_UPLOAD_SIZE: u64 = 107_374_182_400; // 100GB let base_dir = "/Users/accusys/momentry/var/sftpgo/data"; let user_dir = format!("{}/{}", base_dir, user_id); let mut filename = String::new(); let mut file_size: i64 = 0; let mut file_hash = String::new(); let mut extracted_info: Option<(u64, u64, String)> = None; while let Ok(Some(mut field)) = multipart.next_field().await { let name = field.name().unwrap_or("").to_string(); if name != "file" { continue; } filename = field.file_name().unwrap_or("upload.bin").to_string(); let file_path = format!("{}/{}", user_dir, filename); // Create user directory if not exists if let Err(e) = tokio::fs::create_dir_all(&user_dir).await { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": format!("create_dir error: {}", e)})), ) .into_response(); } let mut hasher = Sha256::new(); let mut total_written: u64 = 0; let mut file = match tokio::fs::File::create(&file_path).await { Ok(f) => f, Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": format!("create error: {}", e)})), ) .into_response() } }; while let Ok(Some(chunk)) = field.chunk().await { total_written += chunk.len() as u64; if total_written > MAX_UPLOAD_SIZE { let _ = tokio::fs::remove_file(&file_path).await; return ( StatusCode::PAYLOAD_TOO_LARGE, Json(serde_json::json!({"error": "file too large (max 100GB)"})), ) .into_response(); } hasher.update(&chunk); if let Err(e) = file.write_all(&chunk).await { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": format!("write error: {}", e)})), ) .into_response(); } } let _ = file.flush().await; drop(file); file_hash = format!("{:x}", hasher.finalize()); file_size = total_written as i64; } if filename.is_empty() || file_size == 0 { return ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "no file provided"})), ) .into_response(); } let file_path = format!("{}/{}", user_dir, filename); // Auto-extract archive files let file_path_buf = std::path::PathBuf::from(&file_path); let detector = FormatDetector::new(); if let Ok(format) = detector.detect(&file_path_buf) { if format != ArchiveFormat::Unknown { eprintln!("[upload] Detected archive format: {}, extracting...", format); let user_id_clone = user_id.clone(); let filename_clone = filename.clone(); // Extract in blocking thread let extraction_result = tokio::task::spawn_blocking(move || { extract_and_register_archive( &file_path_buf, &user_id_clone, &filename_clone, ) }).await; match extraction_result { Ok(Ok((count, bytes, extract_dir))) => { extracted_info = Some((count, bytes, extract_dir)); } Ok(Err(e)) => { eprintln!("[upload] Archive extraction failed: {}", e); } Err(e) => { eprintln!("[upload] Spawn blocking error: {}", e); } } } } // Generate file_uuid based on file properties (path + filename + mac + mtime) // Get MAC address let mac_output = std::process::Command::new("ifconfig") .arg("en0") .output() .map(|o| String::from_utf8_lossy(&o.stdout).to_string()) .unwrap_or_default(); let mac = mac_output .lines() .find(|l| l.contains("ether")) .and_then(|l| l.split_whitespace().nth(1)) .unwrap_or("00:00:00:00:00:00"); // Get file mtime (milliseconds) let mtime = std::fs::metadata(&file_path) .ok() .and_then(|m| m.modified().ok()) .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) .map(|d| d.as_millis() as u64) .unwrap_or(0); // Generate UUID: SHA256(path|filename|mac|mtime) let input = format!("{}|{}|{}|{}", file_path, filename, mac, mtime); let mut hasher = sha2::Sha256::new(); hasher.update(input.as_bytes()); let hash = hasher.finalize(); let hex = format!("{:x}", hash); let file_uuid = hex[0..32].to_string(); // Save to database (user-specific SQLite) let file_uuid_clone = file_uuid.clone(); let file_hash_clone = file_hash.clone(); let filename_clone = filename.clone(); let file_path_clone = file_path.clone(); let user_id_clone = user_id.clone(); let db_result = tokio::task::spawn_blocking(move || -> anyhow::Result<()> { let conn = filetree::FileTree::init_user_db(&user_id_clone)?; // Register file let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs() as i64; conn.execute( "INSERT INTO file_registry (file_uuid, sha256, file_size, mime_type, registered_at) VALUES (?1, ?2, ?3, ?4, ?5)", rusqlite::params![ &file_uuid_clone, &file_hash_clone, file_size, "", // mime_type (optional) now ], )?; // Add file location conn.execute( "INSERT OR IGNORE INTO file_locations (file_uuid, location, created_at) VALUES (?1, ?2, ?3)", rusqlite::params![&file_uuid_clone, &file_path_clone, now], )?; let uuid_str = uuid::Uuid::new_v4().to_string().replace('-', ""); let node_id = format!("node-{}", &uuid_str[0..8]); conn.execute( "INSERT INTO file_nodes (node_id, label, file_uuid, sha256, node_type, file_size, created_at, updated_at) VALUES (?1, ?2, ?3, ?4, 'file', ?5, ?6, ?7)", rusqlite::params![ &node_id, &filename_clone, &file_uuid_clone, &file_hash_clone, file_size, now, now ], )?; Ok(()) }) .await; match db_result { Ok(Ok(())) => {} Ok(Err(e)) => { eprintln!("[markbase] tree insert error: {}", e); } Err(e) => { eprintln!("[markbase] spawn_blocking error: {}", e); } } let mut response = serde_json::json!({ "ok": true, "filename": filename, "file_uuid": file_uuid, "sha256": file_hash, "size": file_size, }); if let Some((count, bytes, extract_dir)) = extracted_info { response["extracted"] = serde_json::json!({ "count": count, "bytes": bytes, "directory": extract_dir, }); } ( StatusCode::CREATED, Json(response), ) .into_response() } async fn upload_unlimited( State(_state): State, Path(user_id): Path, mut multipart: axum_extra::extract::Multipart, ) -> impl IntoResponse { use sha2::{Digest, Sha256}; use tokio::io::AsyncWriteExt; let base_dir = "/Users/accusys/Downloads"; let user_dir = format!("{}/{}", base_dir, user_id); let mut filename = String::new(); let mut file_size: i64 = 0; let mut file_hash = String::new(); if let Err(e) = tokio::fs::create_dir_all(&user_dir).await { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": format!("create_dir error: {}", e)})), ) .into_response(); } while let Ok(Some(mut field)) = multipart.next_field().await { let name = field.name().unwrap_or("").to_string(); if name != "file" { continue; } filename = field.file_name().unwrap_or("upload.bin").to_string(); let file_path = format!("{}/{}", user_dir, filename); // Create subdirectory if filename contains path (webkitdirectory) if let Some(parent) = std::path::Path::new(&file_path).parent() { if parent != std::path::Path::new(&user_dir) { if let Err(e) = tokio::fs::create_dir_all(parent).await { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": format!("create_subdir error: {}", e)})), ) .into_response(); } } } let mut hasher = Sha256::new(); let mut total_written: u64 = 0; let mut file = match tokio::fs::File::create(&file_path).await { Ok(f) => f, Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": format!("create error: {}", e)})), ) .into_response(); } }; while let Ok(Some(chunk)) = field.chunk().await { total_written += chunk.len() as u64; hasher.update(&chunk); if let Err(e) = file.write_all(&chunk).await { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": format!("write error: {}", e)})), ) .into_response(); } } let _ = file.flush().await; drop(file); file_hash = format!("{:x}", hasher.finalize()); file_size = total_written as i64; } // Allow empty files (0 bytes) for .localized, .keep, .gitkeep, etc. if filename.is_empty() { return ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "no filename provided"})), ) .into_response(); } ( StatusCode::OK, Json(serde_json::json!({ "ok": true, "filename": filename, "file_size": file_size, "file_hash": file_hash, "user_id": user_id, "stored_at": format!("{}/{}", user_dir, filename), "timestamp": chrono::Utc::now().to_rfc3339() })), ) .into_response() } async fn render_file_body(Path(file_uuid): Path) -> impl IntoResponse { let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db("demo")?; let path = conn.query_row( "SELECT location FROM file_locations WHERE file_uuid = ?1 ORDER BY added_at LIMIT 1", [&file_uuid], |row| row.get::<_, String>(0), )?; drop(conn); let content = std::fs::read_to_string(&path) .unwrap_or_else(|_| format!("(cannot read: {})", path)); let body = if path.ends_with(".md") || path.ends_with(".markdown") { crate::render::md_to_html(&content) .replace("", "
") .replace("", "
") } else { format!("
{}
", html_escape(&content)) }; Ok(body) }).await; match result { Ok(Ok(html)) => ( StatusCode::OK, [(axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8")], html, ) .into_response(), Ok(Err(e)) => ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn render_file(Path(file_uuid): Path) -> impl IntoResponse { let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db("demo")?; let (label,): (String,) = conn.query_row( "SELECT label FROM file_nodes WHERE file_uuid = ?1 LIMIT 1", [&file_uuid], |row| Ok((row.get(0)?,)), )?; let path = conn.query_row( "SELECT location FROM file_locations WHERE file_uuid = ?1 ORDER BY added_at LIMIT 1", [&file_uuid], |row| row.get::<_, String>(0), )?; drop(conn); let content = std::fs::read_to_string(&path) .unwrap_or_else(|_| format!("(cannot read file: {})", path)); let html = if path.ends_with(".md") || path.ends_with(".markdown") { let body = crate::render::md_to_html(&content); crate::render::render_page(&label, &body) } else { format!("
{}
", html_escape(&content)) }; Ok(html) }) .await; match result { Ok(Ok(html)) => ( StatusCode::OK, [(axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8")], html, ) .into_response(), Ok(Err(e)) => ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn get_dupes(Path(user_id): Path) -> impl IntoResponse { let _uid = user_id; let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let query = "SELECT file_name, COUNT(*) as cnt, string_agg(file_uuid, ',') as uuids, string_agg(status, ',') as statuses FROM public.videos GROUP BY file_name HAVING COUNT(*) > 1 ORDER BY cnt DESC"; let output = std::process::Command::new("psql") .args(["-U", "accusys", "-d", "momentry", "-t", "-A", "-F", "|", "-c", query]) .output() .map_err(|e| anyhow::anyhow!("psql: {}", e))?; let text = String::from_utf8_lossy(&output.stdout); let mut dupes: Vec = Vec::new(); for line in text.trim().lines() { let parts: Vec<&str> = line.split('|').collect(); if parts.len() >= 4 { let uuids: Vec = parts[2].split(',').map(|s| s.trim().to_string()).collect(); let statuses: Vec = parts[3].split(',').map(|s| s.trim().to_string()).collect(); dupes.push(serde_json::json!({ "file_name": parts[0], "count": parts[1].parse::().unwrap_or(0), "uuids": uuids, "statuses": statuses, })); } } Ok(serde_json::json!({"dup_groups": dupes.len(), "dupes": dupes})) }).await; match result { Ok(Ok(data)) => (StatusCode::OK, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn unregister_file(Path(file_uuid): Path) -> impl IntoResponse { let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let tables = vec!["tkg_edges","tkg_nodes","identity_bindings","identities","face_detections","chunk_vectors","chunk","videos"]; for table in tables { let col = if table == "chunk_vectors" { "uuid" } else { "file_uuid" }; if table == "identity_bindings" { std::process::Command::new("psql") .args(["-U", "accusys", "-d", "momentry", "-c", &format!("DELETE FROM public.{} WHERE identity_id IN (SELECT id FROM public.identities WHERE file_uuid = '{}')", table, file_uuid)]) .output()?; } else { std::process::Command::new("psql") .args(["-U", "accusys", "-d", "momentry", "-c", &format!("DELETE FROM public.{} WHERE {} = '{}'", table, col, file_uuid)]) .output()?; } } Ok(serde_json::json!({"ok": true, "unregistered": file_uuid})) }).await; match result { Ok(Ok(data)) => (StatusCode::OK, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn get_file_info(Path((user_id, file_uuid)): Path<(String, String)>) -> impl IntoResponse { let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db(&user_id)?; FileTree::get_file_info(&conn, &file_uuid) }) .await; match result { Ok(Ok(data)) => (StatusCode::OK, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn stream_file(Path((user_id, file_uuid)): Path<(String, String)>) -> impl IntoResponse { use axum::body::Body; use axum::http::header; use tokio_util::io::ReaderStream; let (path, mime) = match tokio::task::spawn_blocking(move || -> anyhow::Result<(String, String)> { let conn = FileTree::open_user_db(&user_id)?; let location: String = conn.query_row( "SELECT location FROM file_locations WHERE file_uuid = ?1 ORDER BY added_at LIMIT 1", [&file_uuid], |row| row.get(0), ).map_err(|e| anyhow::anyhow!("No location: {}", e))?; let ext = std::path::Path::new(&location) .extension() .and_then(|e| e.to_str()) .unwrap_or("") .to_lowercase(); // Document conversion: Phase 1 (textutil/unzip) → Phase 2 (soffice/qlmanage) if filetree::convert::is_document_ext(&ext) { if let Some((cached, mime)) = filetree::convert::get_cached_preview(&file_uuid, &ext) { return Ok((cached.to_string_lossy().to_string(), mime.to_string())); } // Convert on first access let input = std::path::Path::new(&location); if input.exists() { match filetree::convert::convert_document(input, &file_uuid) { Ok((cached, mime)) => { return Ok((cached.to_string_lossy().to_string(), mime.to_string())); } Err(e) => { eprintln!( "[markbase] Document conversion failed for {}: {}", file_uuid, e ); } } } } let mime = if location.ends_with(".mp4") || location.ends_with(".mov") { "video/mp4" } else if location.ends_with(".jpg") || location.ends_with(".jpeg") { "image/jpeg" } else if location.ends_with(".png") { "image/png" } else if location.ends_with(".svg") { "image/svg+xml" } else if location.ends_with(".html") || location.ends_with(".htm") { "text/html; charset=utf-8" } else if location.ends_with(".pdf") { "application/pdf" } else { "application/octet-stream" }; Ok((location, mime.to_string())) }) .await { Ok(Ok((p, m))) => (p, m), _ => { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "no location"})), ) .into_response() } }; match tokio::fs::File::open(&path).await { Ok(file) => { let stream = ReaderStream::new(file); let body = Body::from_stream(stream); ([(header::CONTENT_TYPE, mime.as_str())], body).into_response() } Err(_) => (StatusCode::NOT_FOUND, "file not found").into_response(), } } async fn get_file_probe(Path((_user_id, file_uuid)): Path<(String, String)>) -> impl IntoResponse { let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db("demo")?; let node: Option<(Option, Option)> = conn .query_row( "SELECT label, registered_at FROM file_nodes WHERE file_uuid = ?1 LIMIT 1", [&file_uuid], |row| Ok((row.get(0)?, row.get(1)?)), ) .ok(); let (label, registered_at) = node.unwrap_or((None, None)); // Fetch probe data from 3002 PostgreSQL let _pg_url = "postgres://accusys@localhost:5432/momentry"; let pg_conn = rusqlite::Connection::open(":memory:")?; // placeholder drop(pg_conn); let query = format!( "SELECT probe_json, duration, width, height, fps, file_type, total_frames FROM public.videos WHERE file_uuid = '{}'", file_uuid.replace('\'', "''") ); let output = std::process::Command::new("psql") .args(["-U", "accusys", "-d", "momentry", "-t", "-A", "-F", "|", "-c", &query]) .output() .map_err(|e| anyhow::anyhow!("psql: {}", e))?; let text = String::from_utf8_lossy(&output.stdout); let line = text.trim(); if line.is_empty() { return Ok(serde_json::json!({"file_uuid": file_uuid, "probe": null, "error": "not found in 3002"})); } let parts: Vec<&str> = line.split('|').collect(); let probe_str = parts.first().unwrap_or(&"null"); let duration = parts.get(1).and_then(|s| s.parse::().ok()); let width = parts.get(2).and_then(|s| s.parse::().ok()); let height = parts.get(3).and_then(|s| s.parse::().ok()); let fps = parts.get(4).and_then(|s| s.parse::().ok()); let file_type = parts.get(5).map(|s| s.to_string()); let total_frames = parts.get(6).and_then(|s| s.parse::().ok()); let probe_json: Option = serde_json::from_str(probe_str).ok(); Ok(serde_json::json!({ "file_uuid": file_uuid, "label": label, "registered_at": registered_at, "duration": duration, "width": width, "height": height, "fps": fps, "file_type": file_type, "total_frames": total_frames, "probe": probe_json, })) }) .await; match result { Ok(Ok(data)) => (StatusCode::OK, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn add_file_location( Path(file_uuid): Path, Json(body): Json, ) -> impl IntoResponse { let location = body["location"].as_str().unwrap_or("").to_string(); let label = body["label"].as_str().map(|s| s.to_string()); let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db("demo")?; FileTree::add_location(&conn, &file_uuid, &location, label.as_deref())?; Ok(serde_json::json!({"ok": true})) }) .await; match result { Ok(Ok(data)) => (StatusCode::CREATED, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } // === Auth Handlers === async fn login_handler( State(state): State, Json(body): Json, ) -> impl IntoResponse { match state.auth.login_with_sync(&body.username, &body.password) { Some(response) => (StatusCode::OK, Json(response)).into_response(), None => ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"error": "Invalid credentials"})), ) .into_response(), } } async fn logout_handler(State(state): State, headers: HeaderMap) -> impl IntoResponse { let auth_header = headers .get("Authorization") .and_then(|h| h.to_str().ok()) .and_then(|h| crate::auth::parse_auth_header(h)); match auth_header { Some(token) => { if state.auth.logout(&token) { (StatusCode::OK, Json(serde_json::json!({"success": true}))).into_response() } else { ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Token not found"})), ) .into_response() } } None => ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "Missing Authorization header"})), ) .into_response(), } } async fn verify_handler(State(state): State, headers: HeaderMap) -> impl IntoResponse { let auth_header = headers .get("Authorization") .and_then(|h| h.to_str().ok()) .and_then(|h| crate::auth::parse_auth_header(h)); match auth_header { Some(token) => match state.auth.verify_token(&token) { Some(session) => ( StatusCode::OK, Json(serde_json::json!({ "valid": true, "user_id": session.user_id, "username": session.username, "expires_at": session.expires_at })), ) .into_response(), None => ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"valid": false, "error": "Token expired or invalid"})), ) .into_response(), }, None => ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "Missing Authorization header"})), ) .into_response(), } } // Auth middleware helper fn verify_auth(state: &AppState, headers: &HeaderMap) -> Result { let auth_header = headers .get("Authorization") .and_then(|h| h.to_str().ok()) .and_then(|h| crate::auth::parse_auth_header(h)); match auth_header { Some(token) => match state.auth.verify_token(&token) { Some(session) => Ok(session.user_id), None => Err(StatusCode::UNAUTHORIZED), }, None => Err(StatusCode::UNAUTHORIZED), } } // === Sync Handlers === async fn manual_sync_handler(State(state): State) -> impl IntoResponse { let syncer = crate::pg_client::SftpGoSync::new(&state.auth_db_path); match syncer { Ok(syncer) => match syncer.full_sync().await { Ok(result) => { if result.status == "success" { ( StatusCode::OK, Json(serde_json::json!({ "status": "success", "users_synced": result.users_synced, "groups_synced": result.groups_synced, "mappings_synced": result.mappings_synced })), ) .into_response() } else if result.status == "partial_success" { ( StatusCode::OK, Json(serde_json::json!({ "status": "partial_success", "users_synced": result.users_synced, "users_failed": result.users_failed, "groups_synced": result.groups_synced, "groups_failed": result.groups_failed, "errors": result.errors })), ) .into_response() } else { ( StatusCode::OK, Json(serde_json::json!({ "status": result.status, "errors": result.errors })), ) .into_response() } } Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "status": "failed", "error": e.to_string() })), ) .into_response(), }, Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "status": "failed", "error": e.to_string() })), ) .into_response(), } } async fn sync_status_handler(State(state): State) -> impl IntoResponse { let auth_db = crate::sync::AuthDb::new(&state.auth_db_path); match auth_db { Ok(db) => match db.open() { Ok(conn) => { match conn.query_row( "SELECT sync_type, sync_time, users_synced, users_failed, groups_synced, groups_failed, mappings_synced, status FROM sync_log ORDER BY sync_time DESC LIMIT 5", [], |row| { Ok(serde_json::json!({ "sync_type": row.get::<_, String>(0)?, "sync_time": row.get::<_, i64>(1)?, "users_synced": row.get::<_, usize>(2)?, "users_failed": row.get::<_, usize>(3)?, "groups_synced": row.get::<_, usize>(4)?, "groups_failed": row.get::<_, usize>(5)?, "mappings_synced": row.get::<_, usize>(6)?, "status": row.get::<_, String>(7)?, })) }, ) { Ok(log) => ( StatusCode::OK, Json(serde_json::json!({ "status": "ok", "latest_sync": log })), ) .into_response(), Err(_) => ( StatusCode::OK, Json(serde_json::json!({ "status": "ok", "message": "No sync logs found" })), ) .into_response(), } } Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), }, Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } fn html_escape(s: &str) -> String { s.replace('&', "&") .replace('<', "<") .replace('>', ">") .replace('"', """) } #[derive(Debug, serde::Deserialize)] struct EditConfigQuery { key: String, value: String, } async fn get_config_handler() -> impl IntoResponse { let config_path = std::path::Path::new("config/markbase.toml"); if !config_path.exists() { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Config file not found"})), ) .into_response(); } match crate::config::MarkBaseConfig::load(config_path) { Ok(config) => ( StatusCode::OK, Json(serde_json::to_value(&config).unwrap_or_default()), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn edit_config_handler(Query(params): Query) -> impl IntoResponse { let config_path = std::path::Path::new("config/markbase.toml"); if !config_path.exists() { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Config file not found"})), ) .into_response(); } match crate::config::MarkBaseConfig::load(config_path) { Ok(mut config) => { let old_value = config.get(¶ms.key).unwrap_or_default(); match config.set(¶ms.key, ¶ms.value) { Ok(_) => match config.validate() { Ok(_) => match config.save(config_path) { Ok(_) => { // Log audit entry let audit = crate::audit::AuditLogger::default(); if let Err(e) = audit.log_config_change( "markbase", ¶ms.key, &old_value, ¶ms.value, "system", None, ) { log::warn!("Failed to write audit log: {}", e); } (StatusCode::OK, Json(serde_json::json!({"ok": true}))).into_response() } Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), }, Err(e) => ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), }, Err(e) => ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn validate_config_handler() -> impl IntoResponse { let config_path = std::path::Path::new("config/markbase.toml"); if !config_path.exists() { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"ok": false, "error": "Config file not found"})), ) .into_response(); } match crate::config::MarkBaseConfig::load(config_path) { Ok(config) => match config.validate() { Ok(_) => (StatusCode::OK, Json(serde_json::json!({"ok": true}))).into_response(), Err(e) => ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"ok": false, "error": e.to_string()})), ) .into_response(), }, Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"ok": false, "error": e.to_string()})), ) .into_response(), } } async fn get_s3_config_handler() -> impl IntoResponse { match crate::s3_config::S3Config::load_default() { Ok(config) => ( StatusCode::OK, Json(serde_json::to_value(&config).unwrap_or_default()), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn edit_s3_config_handler(Query(params): Query) -> impl IntoResponse { match crate::s3_config::S3Config::load_default() { Ok(mut config) => { let old_value = config.get(¶ms.key).unwrap_or_default(); match config.set(¶ms.key, ¶ms.value) { Ok(_) => match config.validate() { Ok(_) => match config.save("config/s3.toml") { Ok(_) => { // Log audit entry let audit = crate::audit::AuditLogger::default(); if let Err(e) = audit.log_config_change( "s3", ¶ms.key, &old_value, ¶ms.value, "system", None, ) { log::warn!("Failed to write audit log: {}", e); } (StatusCode::OK, Json(serde_json::json!({"ok": true}))).into_response() } Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), }, Err(e) => ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), }, Err(e) => ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn validate_s3_config_handler() -> impl IntoResponse { match crate::s3_config::S3Config::load_default() { Ok(config) => match config.validate() { Ok(_) => (StatusCode::OK, Json(serde_json::json!({"ok": true}))).into_response(), Err(e) => ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"ok": false, "error": e.to_string()})), ) .into_response(), }, Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"ok": false, "error": e.to_string()})), ) .into_response(), } } // async fn get_sftp_config_handler() -> impl IntoResponse { // match crate::sftp::SftpConfig::load_default() { // Ok(config) => ( // StatusCode::OK, // Json(serde_json::to_value(&config).unwrap_or_default()), // ) // .into_response(), // Err(e) => ( // StatusCode::INTERNAL_SERVER_ERROR, // Json(serde_json::json!({"error": e.to_string()})), // ) // .into_response(), // } // } // async fn edit_sftp_config_handler(Query(params): Query) -> impl IntoResponse { // match crate::sftp::SftpConfig::load_default() { // Ok(mut config) => { // let config_path = "config/sftp.toml"; // match config.save(config_path) { // Ok(_) => { // (StatusCode::OK, Json(serde_json::json!({"ok": true}))).into_response() // } // Err(e) => ( // StatusCode::INTERNAL_SERVER_ERROR, // Json(serde_json::json!({"error": e.to_string()})), // ) // .into_response(), // } // } // Err(e) => ( // StatusCode::INTERNAL_SERVER_ERROR, // Json(serde_json::json!({"error": e.to_string()})), // ) // .into_response(), // } // } // async fn validate_sftp_config_handler() -> impl IntoResponse { // match crate::sftp::SftpConfig::load_default() { // Ok(_) => (StatusCode::OK, Json(serde_json::json!({"ok": true}))).into_response(), // Err(e) => ( // StatusCode::INTERNAL_SERVER_ERROR, // Json(serde_json::json!({"ok": false, "error": e.to_string()})), // ) // .into_response(), // } // } async fn admin_login_handler( State(state): State, Json(body): Json, ) -> impl IntoResponse { match state.auth.admin_login(&body.username, &body.password) { Some(response) => (StatusCode::OK, Json(response)).into_response(), None => ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"error": "Invalid admin credentials"})), ) .into_response(), } } async fn admin_verify_handler( State(state): State, headers: axum::http::HeaderMap, ) -> impl IntoResponse { let auth_header = headers .get("Authorization") .and_then(|v| v.to_str().ok()) .and_then(|v| v.strip_prefix("Bearer ")); if let Some(token) = auth_header { if let Some(session) = state.auth.verify_admin_token(token) { return ( StatusCode::OK, Json(serde_json::json!({ "ok": true, "username": session.username, "expires_at": session.expires_at })), ) .into_response(); } } ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"ok": false, "error": "Invalid admin token"})), ) .into_response() } async fn shell_status_handler() -> Json { // TODO: 使用新的ssh_server模块 // let config = crate::sftp::config::SftpConfig::load_default().unwrap_or_default(); Json(serde_json::json!({ "enabled": true, "shell_path": "/bin/bash", "allowed_commands": vec!["ls", "pwd", "whoami"], "forbidden_commands": vec!["rm", "dd"], "max_command_length": 1024, "timeout_seconds": 30, "max_shell_sessions": 10, "pty_support": false // Phase 3.2 will enable this })) } async fn metrics_handler() -> Json { // Return mock metrics data (actual metrics collected in SFTP handler) // Phase 3.1: Basic structure for Web UI Json(serde_json::json!({ "open_count": 0, "read_count": 0, "write_count": 0, "close_count": 0, "read_bytes": 0, "write_bytes": 0, "opendir_count": 0, "readdir_count": 0, "error_count": 0, "total_latency_ms": 0, "shell_count": 0, "exec_count": 0, "shell_error_count": 0, "note": "Metrics will be updated when SFTP/Shell sessions active" })) } async fn audit_handler() -> Json { // TODO: 使用新的ssh_server模块 // let config = crate::sftp::config::SftpConfig::load_default().unwrap_or_default(); // Return audit log path (actual log reading requires file access) Json(serde_json::json!({ "audit_logging": true, "audit_log_path": "logs/ssh_audit.log", "note": "Audit logs can be viewed via: tail -f logs/ssh_audit.log" })) } // Category View API handlers (Phase 1: 双视图管理) async fn get_all_categories_handler() -> impl IntoResponse { let base_path = std::path::Path::new("/Users/accusys/markbase"); match crate::category_view::get_all_categories() { Ok(response) => (StatusCode::OK, Json(response)).into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn get_category_detail_handler( Path(category_name): Path, ) -> impl IntoResponse { let base_path = std::path::Path::new("/Users/accusys/markbase"); match crate::category_view::get_category_detail(&category_name) { Ok(response) => (StatusCode::OK, Json(response)).into_response(), Err(e) => ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn get_all_series_handler() -> impl IntoResponse { let base_path = std::path::Path::new("/Users/accusys/markbase"); match crate::category_view::get_all_series() { Ok(response) => (StatusCode::OK, Json(response)).into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn get_series_detail_handler( Path(series_name): Path, ) -> impl IntoResponse { let base_path = std::path::Path::new("/Users/accusys/markbase"); match crate::category_view::get_series_detail(&series_name) { Ok(response) => (StatusCode::OK, Json(response)).into_response(), Err(e) => ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } #[derive(Deserialize)] struct SearchQuery { q: String, view: String, } async fn search_files_handler( Query(query): Query, ) -> impl IntoResponse { let base_path = std::path::Path::new("/Users/accusys/markbase"); match crate::category_view::search_files(&query.q, &query.view) { Ok(response) => (StatusCode::OK, Json(response)).into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } }