use anyhow::Context; use axum::{ extract::{Path, Query, State}, http::{HeaderMap, StatusCode}, response::{Html, IntoResponse, Json}, routing::{delete, get, patch, post, put}, Router, }; use serde::Deserialize; use std::str::FromStr; use std::sync::{Arc, Mutex}; use crate::audio; use crate::auth::{AuthState, LoginRequest}; use crate::filetree::{self, FileTree}; use crate::render; #[derive(Clone)] struct AppState { html: Arc>, page_ver: Arc>, step_info: Arc>, labels: Arc>>, db_dir: String, auth: AuthState, auth_db_path: String, } pub async fn run(port: u16, file: Option) -> anyhow::Result<()> { let welcome = if let Some(f) = &file { let md = std::fs::read_to_string(f).unwrap_or_default(); let body = render::md_to_html(&md); render::page(f, &body) } else { render::page( "MarkBase", r#"
📺

MarkBase

Momentry Display Engine

"#, ) }; let (out_devs, in_devs, cur_out, cur_in) = audio::audio_devices(); let html = audio::inject_audio_devices(&welcome, &out_devs, &in_devs, &cur_out, &cur_in); let state = AppState { html: Arc::new(Mutex::new(html)), page_ver: Arc::new(Mutex::new(0)), step_info: Arc::new(Mutex::new(serde_json::json!({ "step": 0, "total": 0, "id": "", "label": "", "voice": "off" }))), labels: Arc::new(Mutex::new(vec![])), db_dir: "data/users".to_string(), auth: AuthState::with_sync("data/auth.sqlite"), auth_db_path: "data/auth.sqlite".to_string(), }; // Initial sync from SFTPGo PostgreSQL let syncer = crate::pg_client::SftpGoSync::new("data/auth.sqlite")?; tokio::spawn(async move { match syncer.full_sync().await { Ok(result) => { log::info!( "Initial sync completed: users={}, groups={}, mappings={}, status={}", result.users_synced, result.groups_synced, result.mappings_synced, result.status ); } Err(e) => { log::error!("Initial sync failed: {}", e); } } }); // Periodic sync task (every hour) let syncer_clone = crate::pg_client::SftpGoSync::new("data/auth.sqlite")?; tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(3600)); loop { interval.tick().await; match syncer_clone.full_sync().await { Ok(result) => { log::info!( "Hourly sync: users={}, groups={}, status={}", result.users_synced, result.groups_synced, result.status ); } Err(e) => { log::error!("Hourly sync failed, keeping cached data: {}", e); } } } }); let app = Router::new() .route("/", get(root_handler)) .route("/display", post(display_handler)) .route( "/command", get(crate::command::get_commands).post(crate::command::post_command), ) .route("/version", get(version_handler)) .route("/devices", get(devices_handler)) .route("/volume", get(volume_handler)) .route("/body", get(body_handler)) .route("/status", get(status_handler)) .route("/labels", get(get_labels).post(post_labels)) // Auth endpoints (public) .route("/api/v2/auth/login", post(login_handler)) .route("/api/v2/auth/logout", post(logout_handler)) .route("/api/v2/auth/verify", get(verify_handler)) .route("/api/v2/admin/sync", post(manual_sync_handler)) .route("/api/v2/admin/sync/status", get(sync_status_handler)) // Config API endpoints (public) .route("/api/v2/config", get(get_config_handler)) .route("/api/v2/config/edit", post(edit_config_handler)) .route("/api/v2/config/validate", get(validate_config_handler)) // Admin authentication API endpoints (public) .route("/api/v2/admin/login", post(admin_login_handler)) .route("/api/v2/admin/verify", get(admin_verify_handler)) // Protected endpoints (require auth) .route("/api/v2/tree/:user_id", get(get_tree)) .route("/api/v2/tree/:user_id/node", post(create_node)) .route( "/api/v2/tree/:user_id/node/:node_id", put(update_node).delete(delete_node), ) .route("/api/v2/tree/:user_id", delete(delete_all_nodes)) .route("/api/v2/tree/:user_id/restore", post(restore_tree)) .route("/api/v2/dupes/:user_id", get(get_dupes)) .route("/api/v2/unregister/:file_uuid", post(unregister_file)) .route("/api/v2/upload/:user_id", post(upload_file)) .route("/api/v2/render/:file_uuid", get(render_file)) .route("/api/v2/render/:file_uuid/body", get(render_file_body)) .route("/api/v2/tree/:user_id/node/:node_id/move", put(move_node)) .route( "/api/v2/tree/:user_id/node/:node_id/alias", patch(update_alias), ) .route("/api/v2/modes", get(get_modes)) .route("/api/v2/files/:file_uuid/info", get(get_file_info)) .route("/api/v2/files/:file_uuid/probe", get(get_file_probe)) .route("/api/v2/files/:file_uuid/stream", get(stream_file)) .route( "/api/v2/files/:file_uuid/locations", post(add_file_location), ) .with_state(state); let addr = format!("127.0.0.1:{port}"); println!("📺 MarkBase ready on http://{addr}/"); std::process::Command::new("open") .arg(format!("http://{addr}/")) .spawn() .ok(); let listener = tokio::net::TcpListener::bind(&addr).await?; axum::serve(listener, app).await?; Ok(()) } async fn root_handler(State(state): State) -> Html { Html(state.html.lock().unwrap().clone()) } async fn version_handler(State(state): State) -> Json { let v = *state.page_ver.lock().unwrap(); Json(serde_json::json!({"v": v})) } async fn get_labels(State(state): State) -> Json { let labels = state.labels.lock().unwrap().clone(); Json(serde_json::json!(labels)) } async fn post_labels( State(state): State, Json(body): Json>, ) -> impl IntoResponse { *state.labels.lock().unwrap() = body; (StatusCode::OK, Json(serde_json::json!({"ok": true}))) } async fn status_handler(State(state): State) -> Json { let info = state.step_info.lock().unwrap().clone(); Json( serde_json::json!({"paused": false, "step": info["step"], "total": info["total"], "id": info["id"], "label": info["label"], "voice": info["voice"]}), ) } async fn devices_handler() -> Json { let (out, inp, co, ci) = audio::audio_devices(); Json(serde_json::json!({"output": out, "input": inp, "current_out": co, "current_in": ci})) } async fn volume_handler() -> Json { if let Ok(r) = std::process::Command::new("osascript") .args(["-e", "output volume of (get volume settings)"]) .output() { let s = String::from_utf8_lossy(&r.stdout).trim().to_string(); if let Ok(v) = s.parse::() { return Json(serde_json::json!({"level": v})); } } Json(serde_json::json!({"level": 50})) } async fn body_handler(State(state): State) -> impl IntoResponse { let html = state.html.lock().unwrap().clone(); if let Some(s) = html.find("
") { if let Some(e) = html[s..].find("
") { let body = &html[s + 19..s + e]; return (StatusCode::OK, Html(body.to_string())).into_response(); } } (StatusCode::OK, Html("".to_string())).into_response() } async fn delete_all_nodes( State(state): State, Path(user_id): Path, ) -> impl IntoResponse { let _ = &state.db_dir; let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db(&user_id)?; let tree = FileTree::load(&conn, &user_id)?; let count = tree.nodes.len(); let node_ids: Vec = tree.nodes.iter().map(|n| n.node_id.clone()).collect(); for nid in node_ids { conn.execute( "DELETE FROM file_nodes WHERE node_id = ?1", rusqlite::params![nid], )?; } Ok(serde_json::json!({"ok": true, "deleted": count})) }) .await; match result { Ok(Ok(data)) => (StatusCode::OK, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn restore_tree( State(state): State, Path(user_id): Path, ) -> impl IntoResponse { let _ = &state.db_dir; let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db(&user_id)?; let tree = FileTree::load(&conn, &user_id)?; let count = tree.nodes.len(); for n in &tree.nodes { conn.execute("DELETE FROM file_nodes WHERE node_id = ?1", rusqlite::params![n.node_id])?; } // Fetch files from 3002 API using environment variables let api_key = std::env::var("RESTORE_API_KEY") .unwrap_or_else(|_| "".to_string()); let api_url = std::env::var("RESTORE_API_URL") .unwrap_or_else(|_| "http://localhost:3002/api/v1/files".to_string()); let api_output = std::process::Command::new("curl") .args(["-s", &format!("{}?page=1&page_size=100", api_url)]) .args(["-H", &format!("X-API-Key: {}", api_key)]) .output() .map(|o| String::from_utf8_lossy(&o.stdout).to_string()) .unwrap_or_default(); let resp: serde_json::Value = serde_json::from_str(&api_output) .map_err(|e| anyhow::anyhow!("3002 API error: {}", e))?; let files = resp["data"].as_array() .ok_or_else(|| anyhow::anyhow!("No files data"))?; let now = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(); let home_id = uuid::Uuid::new_v4().to_string(); let mut folder_ids: std::collections::HashMap = std::collections::HashMap::new(); let folders: Vec<(&str, &str, &str)> = vec![ ("Home", "🏠", ""), ("Movies", "🎬", &home_id), ("Marketing", "📢", &home_id), ("Cartoons", "🎨", &home_id), ("Other", "📁", &home_id), ]; for (name, icon, parent) in &folders { let nid = if *name == "Home" { home_id.clone() } else { uuid::Uuid::new_v4().to_string() }; let pid: Option<&str> = if parent.is_empty() { None } else { Some(parent) }; folder_ids.insert(name.to_string(), nid.clone()); conn.execute( "INSERT INTO file_nodes (node_id, label, aliases_json, node_type, icon, parent_id, created_at, updated_at) VALUES (?1, ?2, '{}', 'folder', ?3, ?4, ?5, ?6)", rusqlite::params![nid, name, icon, pid, now, now], )?; } let mut imported = 0; for f in files { let name = f["file_name"].as_str().unwrap_or("unknown"); let name_lower = name.to_lowercase(); let fuuid = f["file_uuid"].as_str().unwrap_or(""); let category = if name_lower.ends_with(".jpg") || name_lower.ends_with(".png") || name_lower.ends_with(".jpeg") || name_lower.ends_with(".gif") { "Other" } else if name_lower.contains("charade") || name_lower.contains("film") || name_lower.contains("clip") || name_lower.contains("movie") || name_lower.contains("comedy") || name_lower.contains("filmriot") { "Movies" } else if name_lower.contains("exasan") || name_lower.contains("gamma") || name_lower.contains("thunderbolt") || name_lower.contains("nab") || name_lower.contains("koba") || name_lower.contains("webinar") || name_lower.contains("top colorist") || name_lower.contains("accusys") || name_lower.contains("a12t3") { "Marketing" } else if name_lower.contains("cartoon") || name_lower.contains("alice") || name_lower.contains("felix") || name_lower.contains("disney") || name_lower.contains("steamboat") || name_lower.contains("animal") { "Cartoons" } else { "Other" }; let parent_id = folder_ids.get(category).cloned().unwrap_or(home_id.clone()); let nid = uuid::Uuid::new_v4().to_string(); conn.execute( "INSERT INTO file_nodes (node_id, label, aliases_json, file_uuid, node_type, parent_id, created_at, updated_at) VALUES (?1, ?2, '{}', ?3, 'file', ?4, ?5, ?6)", rusqlite::params![nid, name, fuuid, parent_id, now, now], )?; let demo_path = format!("/Users/accusys/momentry/var/sftpgo/data/demo/{}", name); conn.execute( "INSERT OR IGNORE INTO file_locations (file_uuid, location, label) VALUES (?1, ?2, 'origin')", rusqlite::params![fuuid, demo_path], )?; imported += 1; } Ok(serde_json::json!({"ok": true, "deleted": count, "imported": imported})) }) .await; match result { Ok(Ok(data)) => (StatusCode::OK, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } #[derive(Deserialize)] struct DisplayReq { #[serde(rename = "type")] content_type: String, data: Option, file: Option, url: Option, html: Option, step_id: Option, step_num: Option, step_total: Option, } async fn display_handler( State(state): State, Json(req): Json, ) -> impl IntoResponse { let body = match req.content_type.as_str() { "md" | "markdown" => { let content = match (&req.file, &req.data) { (Some(f), _) => std::fs::read_to_string(f).unwrap_or_default(), (_, Some(d)) => d.clone(), _ => { return ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"ok":false})), ) } }; render::md_to_html(&content) } "json" => { let data = req.data.as_deref().unwrap_or(""); let formatted = serde_json::from_str::(data) .map(|v| serde_json::to_string_pretty(&v).unwrap_or_default()) .unwrap_or_else(|_| data.to_string()); format!("
{}
", html_escape(&formatted)) } "url" => format!( "", html_escape(req.url.as_deref().unwrap_or("")) ), "video" => format!( "", html_escape(req.url.as_deref().unwrap_or("")) ), "image" => format!( "", html_escape(req.url.as_deref().unwrap_or("")) ), "html" => req.html.clone().unwrap_or_default(), _ => { return ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"ok":false})), ) } }; let (out_devs, in_devs, cur_out, cur_in) = audio::audio_devices(); let new_html = audio::inject_audio_devices( &render::page("Display", &body), &out_devs, &in_devs, &cur_out, &cur_in, ); *state.html.lock().unwrap() = new_html; *state.page_ver.lock().unwrap() += 1; if let (Some(id), Some(num), Some(total)) = (&req.step_id, req.step_num, req.step_total) { let mut info = state.step_info.lock().unwrap(); *info = serde_json::json!({"step": num, "total": total, "id": id, "label": "", "voice": "off"}); } (StatusCode::OK, Json(serde_json::json!({"ok": true}))) } async fn get_tree( State(state): State, headers: HeaderMap, Path(user_id): Path, Query(query): Query, ) -> impl IntoResponse { // Verify authentication if let Err(status) = verify_auth(&state, &headers) { return ( status, Json(serde_json::json!({"error": "Unauthorized"})), ) .into_response(); } let _ = &state.db_dir; let mode = query["mode"].as_str().unwrap_or("tree").to_string(); let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db(&user_id)?; let tree = FileTree::load(&conn, &user_id)?; let data = filetree::mode::get_mode(&mode) .map(|m| m.render(&tree)) .unwrap_or_else(|| serde_json::json!({"nodes": [], "error": "unknown mode"})); Ok(data) }) .await; match result { Ok(Ok(data)) => (StatusCode::OK, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn get_modes() -> impl IntoResponse { let modes: Vec = filetree::mode::list_modes() .iter() .map(|m| { serde_json::json!({ "name": m.name(), "sort_options": m.sort_options(), "filter_options": m.filter_options(), }) }) .collect(); (StatusCode::OK, Json(serde_json::json!({"modes": modes}))) } async fn create_node( State(state): State, Path(user_id): Path, Json(body): Json, ) -> impl IntoResponse { let _db_dir = state.db_dir.clone(); let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db(&user_id)?; let mut tree = FileTree::load(&conn, &user_id)?; let label = body["label"].as_str().unwrap_or("Untitled"); let parent_id = body["parent_id"].as_str().map(|s| s.to_string()); let node_type = body["node_type"] .as_str() .map(|s| { filetree::node::NodeType::from_str(s).unwrap_or(filetree::node::NodeType::Folder) }) .unwrap_or(filetree::node::NodeType::Folder); let node = match node_type { filetree::node::NodeType::Folder => FileTree::new_folder(label, parent_id), filetree::node::NodeType::File => { let default_uuid = uuid::Uuid::new_v4().to_string().replace('-', ""); let raw_uuid = body["file_uuid"].as_str().unwrap_or(&default_uuid); let file_uuid = if raw_uuid.len() >= 32 { &raw_uuid[..32] } else { raw_uuid }; let (file_node, register_sql) = FileTree::new_file_node( label, file_uuid, body["sha256"].as_str(), body["original_name"].as_str().unwrap_or(label), body["file_size"].as_i64(), body["file_type"].as_str(), body["registered_at"].as_str(), parent_id, ); if let Some(sql) = register_sql { conn.execute_batch(&sql)?; } file_node } _ => FileTree::new_folder(label, parent_id), }; let node_id = node.node_id.clone(); tree.insert_node(&conn, &node)?; Ok(serde_json::json!({ "ok": true, "node_id": node_id, })) }) .await; match result { Ok(Ok(data)) => (StatusCode::CREATED, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn update_node( State(state): State, Path((user_id, node_id)): Path<(String, String)>, Json(body): Json, ) -> impl IntoResponse { let _ = &state.db_dir; let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db(&user_id)?; let mut tree = FileTree::load(&conn, &user_id)?; let existing = tree .nodes .iter() .find(|n| n.node_id == node_id) .cloned() .with_context(|| format!("Node {} not found", node_id))?; let now = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(); let updated = filetree::node::FileNode { label: body["label"] .as_str() .unwrap_or(&existing.label) .to_string(), icon: body["icon"].as_str().map(|s| s.to_string()), color: body["color"].as_str().map(|s| s.to_string()), bg_color: body["bg_color"].as_str().map(|s| s.to_string()), updated_at: now, ..existing }; tree.update_node(&conn, &node_id, &updated)?; Ok(serde_json::json!({"ok": true})) }) .await; match result { Ok(Ok(data)) => (StatusCode::OK, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn delete_node( State(state): State, Path((user_id, node_id)): Path<(String, String)>, ) -> impl IntoResponse { let _ = &state.db_dir; let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db(&user_id)?; let mut tree = FileTree::load(&conn, &user_id)?; tree.delete_node(&conn, &node_id)?; Ok(serde_json::json!({"ok": true})) }) .await; match result { Ok(Ok(data)) => (StatusCode::OK, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } #[derive(Deserialize)] struct MoveNodeReq { parent_id: Option, } async fn move_node( State(state): State, Path((user_id, node_id)): Path<(String, String)>, Json(req): Json, ) -> impl IntoResponse { let _ = &state.db_dir; let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db(&user_id)?; let mut tree = FileTree::load(&conn, &user_id)?; tree.move_node(&conn, &node_id, req.parent_id)?; Ok(serde_json::json!({"ok": true})) }) .await; match result { Ok(Ok(data)) => (StatusCode::OK, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } #[derive(Deserialize)] struct AliasReq { lang: String, value: String, } async fn update_alias( State(state): State, Path((user_id, node_id)): Path<(String, String)>, Json(req): Json, ) -> impl IntoResponse { let _ = &state.db_dir; let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db(&user_id)?; let mut tree = FileTree::load(&conn, &user_id)?; tree.update_node_alias(&conn, &node_id, &req.lang, &req.value)?; Ok(serde_json::json!({"ok": true})) }) .await; match result { Ok(Ok(data)) => (StatusCode::OK, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn upload_file( State(_state): State, Path(user_id): Path, mut multipart: axum_extra::extract::Multipart, ) -> impl IntoResponse { use sha2::{Digest, Sha256}; use tokio::io::AsyncWriteExt; const MAX_UPLOAD_SIZE: u64 = 10_737_418_240; // 10GB let _uid = user_id; let demo_dir = "/Users/accusys/momentry/var/sftpgo/data/demo"; let mut filename = String::new(); let mut file_size: i64 = 0; let mut file_hash = String::new(); while let Ok(Some(mut field)) = multipart.next_field().await { let name = field.name().unwrap_or("").to_string(); if name != "file" { continue; } filename = field.file_name().unwrap_or("upload.bin").to_string(); let file_path = format!("{}/{}", demo_dir, filename); let mut hasher = Sha256::new(); let mut total_written: u64 = 0; let mut file = match tokio::fs::File::create(&file_path).await { Ok(f) => f, Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": format!("create error: {}", e)})), ) .into_response() } }; while let Ok(Some(chunk)) = field.chunk().await { total_written += chunk.len() as u64; if total_written > MAX_UPLOAD_SIZE { let _ = tokio::fs::remove_file(&file_path).await; return ( StatusCode::PAYLOAD_TOO_LARGE, Json(serde_json::json!({"error": "file too large (max 10GB)"})), ) .into_response(); } hasher.update(&chunk); if let Err(e) = file.write_all(&chunk).await { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": format!("write error: {}", e)})), ) .into_response(); } } let _ = file.flush().await; drop(file); file_hash = format!("{:x}", hasher.finalize()); file_size = total_written as i64; } if filename.is_empty() || file_size == 0 { return ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "no file provided"})), ) .into_response(); } let file_path = format!("{}/{}", demo_dir, filename); // Register with 3002 API via curl let api_key = "muser_68600856036340bcafc01930eb4bd839_1774418104_97221b69"; let register_json = format!( r#"{{"file_name":"{}","file_path":"{}"}}"#, filename, file_path ); let register_output = std::process::Command::new("curl") .args([ "-s", "-X", "POST", "http://localhost:3002/api/v1/files/register", ]) .args(["-H", &format!("X-API-Key: {}", api_key)]) .args(["-H", "Content-Type: application/json"]) .args(["-d", ®ister_json]) .output() .map(|o| String::from_utf8_lossy(&o.stdout).to_string()) .unwrap_or_default(); let file_uuid = serde_json::from_str::(®ister_output) .ok() .and_then(|v| v["file_uuid"].as_str().map(|s| s.to_string())) .unwrap_or_else(|| uuid::Uuid::new_v4().to_string().replace('-', "")); // Add to file tree let sha_clone = file_hash.clone(); let fname_clone = filename.clone(); let fuuid_clone = file_uuid.clone(); let fpath_clone = file_path.clone(); let db_result = tokio::task::spawn_blocking(move || -> anyhow::Result<()> { let conn = FileTree::open_user_db("demo")?; let other_id: Option = conn .query_row( "SELECT node_id FROM file_nodes WHERE label = 'Other' AND node_type = 'folder' LIMIT 1", [], |row| row.get(0), ) .ok(); let nid = uuid::Uuid::new_v4().to_string(); let now = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(); conn.execute( "INSERT INTO file_nodes (node_id, label, aliases_json, file_uuid, sha256, node_type, parent_id, file_size, created_at, updated_at) VALUES (?1, ?2, '{}', ?3, ?4, 'file', ?5, ?6, ?7, ?8)", rusqlite::params![nid, fname_clone, fuuid_clone, sha_clone, other_id, file_size, now, now], )?; conn.execute( "INSERT OR IGNORE INTO file_locations (file_uuid, location, label) VALUES (?1, ?2, 'origin')", rusqlite::params![fuuid_clone, fpath_clone], )?; Ok(()) }) .await; match db_result { Ok(Ok(())) => {} Ok(Err(e)) => { eprintln!("[markbase] tree insert error: {}", e); } Err(e) => { eprintln!("[markbase] spawn_blocking error: {}", e); } } ( StatusCode::CREATED, Json(serde_json::json!({ "ok": true, "filename": filename, "file_uuid": file_uuid, "sha256": file_hash, "size": file_size, })), ) .into_response() } async fn render_file_body(Path(file_uuid): Path) -> impl IntoResponse { let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db("demo")?; let path = conn.query_row( "SELECT location FROM file_locations WHERE file_uuid = ?1 ORDER BY added_at LIMIT 1", [&file_uuid], |row| row.get::<_, String>(0), )?; drop(conn); let content = std::fs::read_to_string(&path) .unwrap_or_else(|_| format!("(cannot read: {})", path)); let body = if path.ends_with(".md") || path.ends_with(".markdown") { crate::render::md_to_html(&content) .replace("", "
") .replace("", "
") } else { format!("
{}
", html_escape(&content)) }; Ok(body) }).await; match result { Ok(Ok(html)) => ( StatusCode::OK, [(axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8")], html, ) .into_response(), Ok(Err(e)) => ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn render_file(Path(file_uuid): Path) -> impl IntoResponse { let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db("demo")?; let (label,): (String,) = conn.query_row( "SELECT label FROM file_nodes WHERE file_uuid = ?1 LIMIT 1", [&file_uuid], |row| Ok((row.get(0)?,)), )?; let path = conn.query_row( "SELECT location FROM file_locations WHERE file_uuid = ?1 ORDER BY added_at LIMIT 1", [&file_uuid], |row| row.get::<_, String>(0), )?; drop(conn); let content = std::fs::read_to_string(&path) .unwrap_or_else(|_| format!("(cannot read file: {})", path)); let html = if path.ends_with(".md") || path.ends_with(".markdown") { let body = crate::render::md_to_html(&content); crate::render::render_page(&label, &body) } else { format!("
{}
", html_escape(&content)) }; Ok(html) }) .await; match result { Ok(Ok(html)) => ( StatusCode::OK, [(axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8")], html, ) .into_response(), Ok(Err(e)) => ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn get_dupes(Path(user_id): Path) -> impl IntoResponse { let _uid = user_id; let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let query = "SELECT file_name, COUNT(*) as cnt, string_agg(file_uuid, ',') as uuids, string_agg(status, ',') as statuses FROM public.videos GROUP BY file_name HAVING COUNT(*) > 1 ORDER BY cnt DESC"; let output = std::process::Command::new("psql") .args(["-U", "accusys", "-d", "momentry", "-t", "-A", "-F", "|", "-c", query]) .output() .map_err(|e| anyhow::anyhow!("psql: {}", e))?; let text = String::from_utf8_lossy(&output.stdout); let mut dupes: Vec = Vec::new(); for line in text.trim().lines() { let parts: Vec<&str> = line.split('|').collect(); if parts.len() >= 4 { let uuids: Vec = parts[2].split(',').map(|s| s.trim().to_string()).collect(); let statuses: Vec = parts[3].split(',').map(|s| s.trim().to_string()).collect(); dupes.push(serde_json::json!({ "file_name": parts[0], "count": parts[1].parse::().unwrap_or(0), "uuids": uuids, "statuses": statuses, })); } } Ok(serde_json::json!({"dup_groups": dupes.len(), "dupes": dupes})) }).await; match result { Ok(Ok(data)) => (StatusCode::OK, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn unregister_file(Path(file_uuid): Path) -> impl IntoResponse { let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let tables = vec!["tkg_edges","tkg_nodes","identity_bindings","identities","face_detections","chunk_vectors","chunk","videos"]; for table in tables { let col = if table == "chunk_vectors" { "uuid" } else { "file_uuid" }; if table == "identity_bindings" { std::process::Command::new("psql") .args(["-U", "accusys", "-d", "momentry", "-c", &format!("DELETE FROM public.{} WHERE identity_id IN (SELECT id FROM public.identities WHERE file_uuid = '{}')", table, file_uuid)]) .output()?; } else { std::process::Command::new("psql") .args(["-U", "accusys", "-d", "momentry", "-c", &format!("DELETE FROM public.{} WHERE {} = '{}'", table, col, file_uuid)]) .output()?; } } Ok(serde_json::json!({"ok": true, "unregistered": file_uuid})) }).await; match result { Ok(Ok(data)) => (StatusCode::OK, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn get_file_info(Path(file_uuid): Path) -> impl IntoResponse { let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db("demo")?; FileTree::get_file_info(&conn, &file_uuid) }) .await; match result { Ok(Ok(data)) => (StatusCode::OK, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn stream_file(Path(file_uuid): Path) -> impl IntoResponse { use axum::body::Body; use axum::http::header; use tokio_util::io::ReaderStream; let (path, mime) = match tokio::task::spawn_blocking(move || -> anyhow::Result<(String, String)> { let conn = FileTree::open_user_db("demo")?; let location: String = conn.query_row( "SELECT location FROM file_locations WHERE file_uuid = ?1 ORDER BY added_at LIMIT 1", [&file_uuid], |row| row.get(0), ).map_err(|e| anyhow::anyhow!("No location: {}", e))?; let ext = std::path::Path::new(&location) .extension() .and_then(|e| e.to_str()) .unwrap_or("") .to_lowercase(); // Document conversion: Phase 1 (textutil/unzip) → Phase 2 (soffice/qlmanage) if crate::filetree::convert::is_document_ext(&ext) { if let Some((cached, mime)) = crate::filetree::convert::get_cached_preview(&file_uuid, &ext) { return Ok((cached.to_string_lossy().to_string(), mime.to_string())); } // Convert on first access let input = std::path::Path::new(&location); if input.exists() { match crate::filetree::convert::convert_document(input, &file_uuid) { Ok((cached, mime)) => { return Ok((cached.to_string_lossy().to_string(), mime.to_string())); } Err(e) => { eprintln!( "[markbase] Document conversion failed for {}: {}", file_uuid, e ); } } } } let mime = if location.ends_with(".mp4") || location.ends_with(".mov") { "video/mp4" } else if location.ends_with(".jpg") || location.ends_with(".jpeg") { "image/jpeg" } else if location.ends_with(".png") { "image/png" } else if location.ends_with(".svg") { "image/svg+xml" } else if location.ends_with(".html") || location.ends_with(".htm") { "text/html; charset=utf-8" } else if location.ends_with(".pdf") { "application/pdf" } else { "application/octet-stream" }; Ok((location, mime.to_string())) }) .await { Ok(Ok((p, m))) => (p, m), _ => { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "no location"})), ) .into_response() } }; match tokio::fs::File::open(&path).await { Ok(file) => { let stream = ReaderStream::new(file); let body = Body::from_stream(stream); ([(header::CONTENT_TYPE, mime.as_str())], body).into_response() } Err(_) => (StatusCode::NOT_FOUND, "file not found").into_response(), } } async fn get_file_probe(Path(file_uuid): Path) -> impl IntoResponse { let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db("demo")?; let node: Option<(Option, Option)> = conn .query_row( "SELECT label, registered_at FROM file_nodes WHERE file_uuid = ?1 LIMIT 1", [&file_uuid], |row| Ok((row.get(0)?, row.get(1)?)), ) .ok(); let (label, registered_at) = node.unwrap_or((None, None)); // Fetch probe data from 3002 PostgreSQL let _pg_url = "postgres://accusys@localhost:5432/momentry"; let pg_conn = rusqlite::Connection::open(":memory:")?; // placeholder drop(pg_conn); let query = format!( "SELECT probe_json, duration, width, height, fps, file_type, total_frames FROM public.videos WHERE file_uuid = '{}'", file_uuid.replace('\'', "''") ); let output = std::process::Command::new("psql") .args(["-U", "accusys", "-d", "momentry", "-t", "-A", "-F", "|", "-c", &query]) .output() .map_err(|e| anyhow::anyhow!("psql: {}", e))?; let text = String::from_utf8_lossy(&output.stdout); let line = text.trim(); if line.is_empty() { return Ok(serde_json::json!({"file_uuid": file_uuid, "probe": null, "error": "not found in 3002"})); } let parts: Vec<&str> = line.split('|').collect(); let probe_str = parts.first().unwrap_or(&"null"); let duration = parts.get(1).and_then(|s| s.parse::().ok()); let width = parts.get(2).and_then(|s| s.parse::().ok()); let height = parts.get(3).and_then(|s| s.parse::().ok()); let fps = parts.get(4).and_then(|s| s.parse::().ok()); let file_type = parts.get(5).map(|s| s.to_string()); let total_frames = parts.get(6).and_then(|s| s.parse::().ok()); let probe_json: Option = serde_json::from_str(probe_str).ok(); Ok(serde_json::json!({ "file_uuid": file_uuid, "label": label, "registered_at": registered_at, "duration": duration, "width": width, "height": height, "fps": fps, "file_type": file_type, "total_frames": total_frames, "probe": probe_json, })) }) .await; match result { Ok(Ok(data)) => (StatusCode::OK, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } async fn add_file_location( Path(file_uuid): Path, Json(body): Json, ) -> impl IntoResponse { let location = body["location"].as_str().unwrap_or("").to_string(); let label = body["label"].as_str().map(|s| s.to_string()); let result = tokio::task::spawn_blocking(move || -> anyhow::Result { let conn = FileTree::open_user_db("demo")?; FileTree::add_location(&conn, &file_uuid, &location, label.as_deref())?; Ok(serde_json::json!({"ok": true})) }) .await; match result { Ok(Ok(data)) => (StatusCode::CREATED, Json(data)).into_response(), Ok(Err(e)) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } // === Auth Handlers === async fn login_handler( State(state): State, Json(body): Json, ) -> impl IntoResponse { match state.auth.login_with_sync(&body.username, &body.password) { Some(response) => (StatusCode::OK, Json(response)).into_response(), None => ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"error": "Invalid credentials"})), ) .into_response(), } } async fn logout_handler( State(state): State, headers: HeaderMap, ) -> impl IntoResponse { let auth_header = headers .get("Authorization") .and_then(|h| h.to_str().ok()) .and_then(|h| crate::auth::parse_auth_header(h)); match auth_header { Some(token) => { if state.auth.logout(&token) { (StatusCode::OK, Json(serde_json::json!({"success": true}))).into_response() } else { ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Token not found"})), ) .into_response() } } None => ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "Missing Authorization header"})), ) .into_response(), } } async fn verify_handler( State(state): State, headers: HeaderMap, ) -> impl IntoResponse { let auth_header = headers .get("Authorization") .and_then(|h| h.to_str().ok()) .and_then(|h| crate::auth::parse_auth_header(h)); match auth_header { Some(token) => { match state.auth.verify_token(&token) { Some(session) => { ( StatusCode::OK, Json(serde_json::json!({ "valid": true, "user_id": session.user_id, "username": session.username, "expires_at": session.expires_at })), ) .into_response() } None => ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"valid": false, "error": "Token expired or invalid"})), ) .into_response(), } } None => ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "Missing Authorization header"})), ) .into_response(), } } // Auth middleware helper fn verify_auth(state: &AppState, headers: &HeaderMap) -> Result { let auth_header = headers .get("Authorization") .and_then(|h| h.to_str().ok()) .and_then(|h| crate::auth::parse_auth_header(h)); match auth_header { Some(token) => { match state.auth.verify_token(&token) { Some(session) => Ok(session.user_id), None => Err(StatusCode::UNAUTHORIZED), } } None => Err(StatusCode::UNAUTHORIZED), } } // === Sync Handlers === async fn manual_sync_handler( State(state): State, ) -> impl IntoResponse { let syncer = crate::pg_client::SftpGoSync::new(&state.auth_db_path); match syncer { Ok(syncer) => { match syncer.full_sync().await { Ok(result) => { if result.status == "success" { ( StatusCode::OK, Json(serde_json::json!({ "status": "success", "users_synced": result.users_synced, "groups_synced": result.groups_synced, "mappings_synced": result.mappings_synced })) ).into_response() } else if result.status == "partial_success" { ( StatusCode::OK, Json(serde_json::json!({ "status": "partial_success", "users_synced": result.users_synced, "users_failed": result.users_failed, "groups_synced": result.groups_synced, "groups_failed": result.groups_failed, "errors": result.errors })) ).into_response() } else { ( StatusCode::OK, Json(serde_json::json!({ "status": result.status, "errors": result.errors })) ).into_response() } } Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "status": "failed", "error": e.to_string() })) ).into_response(), } } Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "status": "failed", "error": e.to_string() })) ).into_response(), } } async fn sync_status_handler( State(state): State, ) -> impl IntoResponse { let auth_db = crate::sync::AuthDb::new(&state.auth_db_path); match auth_db { Ok(db) => { match db.open() { Ok(conn) => { match conn.query_row( "SELECT sync_type, sync_time, users_synced, users_failed, groups_synced, groups_failed, mappings_synced, status FROM sync_log ORDER BY sync_time DESC LIMIT 5", [], |row| { Ok(serde_json::json!({ "sync_type": row.get::<_, String>(0)?, "sync_time": row.get::<_, i64>(1)?, "users_synced": row.get::<_, usize>(2)?, "users_failed": row.get::<_, usize>(3)?, "groups_synced": row.get::<_, usize>(4)?, "groups_failed": row.get::<_, usize>(5)?, "mappings_synced": row.get::<_, usize>(6)?, "status": row.get::<_, String>(7)?, })) } ) { Ok(log) => ( StatusCode::OK, Json(serde_json::json!({ "status": "ok", "latest_sync": log })) ).into_response(), Err(_) => ( StatusCode::OK, Json(serde_json::json!({ "status": "ok", "message": "No sync logs found" })) ).into_response(), } } Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})) ).into_response(), } } Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})) ).into_response(), } } fn html_escape(s: &str) -> String { s.replace('&', "&") .replace('<', "<") .replace('>', ">") .replace('"', """) } #[derive(Debug, serde::Deserialize)] struct EditConfigQuery { key: String, value: String, } async fn get_config_handler() -> impl IntoResponse { let config_path = std::path::Path::new("config/markbase.toml"); if !config_path.exists() { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Config file not found"})) ).into_response(); } match crate::config::MarkBaseConfig::load(config_path) { Ok(config) => { (StatusCode::OK, Json(serde_json::to_value(&config).unwrap_or_default())).into_response() } Err(e) => { (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()}))).into_response() } } } async fn edit_config_handler(Query(params): Query) -> impl IntoResponse { let config_path = std::path::Path::new("config/markbase.toml"); if !config_path.exists() { return (StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Config file not found"}))).into_response(); } match crate::config::MarkBaseConfig::load(config_path) { Ok(mut config) => { match config.set(¶ms.key, ¶ms.value) { Ok(_) => { match config.validate() { Ok(_) => { match config.save(config_path) { Ok(_) => { (StatusCode::OK, Json(serde_json::json!({"ok": true}))).into_response() } Err(e) => { (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()}))).into_response() } } } Err(e) => { (StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": e.to_string()}))).into_response() } } } Err(e) => { (StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": e.to_string()}))).into_response() } } } Err(e) => { (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()}))).into_response() } } } async fn validate_config_handler() -> impl IntoResponse { let config_path = std::path::Path::new("config/markbase.toml"); if !config_path.exists() { return (StatusCode::NOT_FOUND, Json(serde_json::json!({"ok": false, "error": "Config file not found"}))).into_response(); } match crate::config::MarkBaseConfig::load(config_path) { Ok(config) => { match config.validate() { Ok(_) => (StatusCode::OK, Json(serde_json::json!({"ok": true}))).into_response(), Err(e) => (StatusCode::BAD_REQUEST, Json(serde_json::json!({"ok": false, "error": e.to_string()}))).into_response() } } Err(e) => { (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"ok": false, "error": e.to_string()}))).into_response() } } } async fn admin_login_handler( State(state): State, Json(body): Json, ) -> impl IntoResponse { match state.auth.admin_login(&body.username, &body.password) { Some(response) => (StatusCode::OK, Json(response)).into_response(), None => ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"error": "Invalid admin credentials"})), ).into_response(), } } async fn admin_verify_handler( State(state): State, headers: axum::http::HeaderMap, ) -> impl IntoResponse { let auth_header = headers .get("Authorization") .and_then(|v| v.to_str().ok()) .and_then(|v| v.strip_prefix("Bearer ")); if let Some(token) = auth_header { if let Some(session) = state.auth.verify_admin_token(token) { return ( StatusCode::OK, Json(serde_json::json!({ "ok": true, "username": session.username, "expires_at": session.expires_at })), ).into_response(); } } ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"ok": false, "error": "Invalid admin token"})), ).into_response() }