//! Momentry Core - Digital asset management system with video analysis and RAG //! //! This is the main entry point for the CLI application. use anyhow::{Context, Result}; use clap::Parser; use futures_util::StreamExt; use std::io::Write; use std::path::Path; use std::str; use std::sync::{Arc, Mutex}; // Local modules mod cli; mod processing; use cli::*; use processing::*; use processing::modules::*; // Core dependencies use momentry_core::core::api_key::{ApiKeyService, ApiKeyType}; use momentry_core::core::chunk::types::{Chunk, ChunkRule, ChunkType}; use momentry_core::core::db::Database; use momentry_core::core::time::FrameTime; use momentry_core::ui::progress::{ProcessorType, ProgressState, ProgressUi}; use momentry_core::{ Embedder, OutputDir, PostgresDb, QdrantDb, RedisClient, VectorPayload, VideoRecord, VideoStatus, }; #[derive(Debug, Clone)] pub struct SystemResources { pub cpu_idle_percent: f64, pub memory_available_mb: u64, pub memory_total_mb: u64, pub memory_used_percent: f64, pub gpu_available: bool, pub gpu_type: GpuType, pub gpu_utilization: Option, } #[derive(Debug, Clone, Copy)] pub enum GpuType { Nvidia, AppleMps, } impl SystemResources { pub fn check() -> Self { let cpu_idle = Self::get_cpu_idle(); let (mem_available, mem_total) = Self::get_memory_info(); let mem_used_pct = if mem_total > 0 && mem_available <= mem_total { ((mem_total - mem_available) as f64 / mem_total as f64) * 100.0 } else if mem_total > 0 { 100.0 } else { 0.0 }; let (gpu_available, gpu_type, gpu_util) = Self::get_gpu_info(); Self { cpu_idle_percent: cpu_idle, memory_available_mb: mem_available, memory_total_mb: mem_total, memory_used_percent: mem_used_pct, gpu_available, gpu_type, gpu_utilization: gpu_util, } } pub fn can_parallel(&self, required_memory_mb: u64) -> bool { const MIN_CPU_IDLE: f64 = 30.0; const MIN_MEMORY_MB: u64 = 4096; self.cpu_idle_percent >= MIN_CPU_IDLE && self.memory_available_mb >= required_memory_mb && self.memory_available_mb >= MIN_MEMORY_MB } pub fn recommend_parallel_modules(&self) -> Vec<&'static str> { let mut recommended = Vec::new(); if self.gpu_available { recommended.push("yolo"); } if self.memory_available_mb >= 8192 { recommended.push("ocr"); recommended.push("face"); recommended.push("pose"); } recommended } fn get_cpu_idle() -> f64 { use std::process::Command; let output = Command::new("top").args(["-l", "1", "-n", "1"]).output(); match output { Ok(o) => { let s = String::from_utf8_lossy(&o.stdout); if let Some(line) = s.lines().find(|l| l.contains("idle")) { if let Some(pct) = line .split_whitespace() .find_map(|s| s.strip_suffix("%idle")) { pct.trim().parse().ok().unwrap_or(50.0) } else { 50.0 } } else { 50.0 } } Err(_) => 50.0, } } fn get_memory_info() -> (u64, u64) { use std::process::Command; let output = Command::new("sysctl").args(["hw.memsize"]).output(); match output { Ok(o) => { let s = String::from_utf8_lossy(&o.stdout); let total = s .split_whitespace() .nth(1) .and_then(|v| v.parse::().ok()) .unwrap_or(0) / 1024 / 1024; let vm_stat = Command::new("vm_stat").output(); let available = match vm_stat { Ok(v) => { let vs = String::from_utf8_lossy(&v.stdout); let mut free_pages: u64 = 0; let mut inactive_pages: u64 = 0; for line in vs.lines() { if line.contains("Pages free:") { free_pages = line .split_whitespace() .last() .and_then(|v| v.trim_end_matches('.').parse().ok()) .unwrap_or(0); } else if line.contains("Pages inactive:") { inactive_pages = line .split_whitespace() .last() .and_then(|v| v.trim_end_matches('.').parse().ok()) .unwrap_or(0); } } // Pages * 4096 bytes / 1024 / 1024 = MB (free_pages + inactive_pages) * 4096 / 1024 / 1024 } Err(_) => total / 4, }; (available, total) } Err(_) => (0, 0), } } fn get_gpu_info() -> (bool, GpuType, Option) { use std::process::Command; // Check NVIDIA GPU let nvidia_output = Command::new("nvidia-smi") .args([ "--query-gpu=utilization.gpu", "--format=csv,noheader,nounits", ]) .output(); if let Ok(o) = nvidia_output { if o.status.success() { let s = String::from_utf8_lossy(&o.stdout); let util = s.trim().parse::().ok(); return (true, GpuType::Nvidia, util); } } // Check Apple MPS (Metal Performance Shaders) let mps_output = Command::new("system_profiler") .args(["SPDisplaysDataType", "-detailLevel", "mini"]) .output(); if let Ok(o) = mps_output { let s = String::from_utf8_lossy(&o.stdout); if s.contains("Metal") || s.contains("Apple") { return (true, GpuType::AppleMps, Some(0.0)); } } (false, GpuType::Nvidia, None) } } impl std::fmt::Display for SystemResources { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, "CPU: {:.1}% idle, Memory: {:.1}GB/{:.1}GB ({:.0}% used), GPU: {}", self.cpu_idle_percent, self.memory_available_mb as f64 / 1024.0, self.memory_total_mb as f64 / 1024.0, self.memory_used_percent, if self.gpu_available { format!("{:.0}% utilized", self.gpu_utilization.unwrap_or(0.0)) } else { "N/A".to_string() } ) } } fn decide_processing(json_path: &Path, force: bool, resume: bool) -> ProcessingDecision { if !json_path.exists() { return ProcessingDecision::Process; } if force { return ProcessingDecision::ForceReprocess; } if resume { return ProcessingDecision::ResumePartial; } match check_json_completeness(json_path) { JsonCompleteness::Complete => ProcessingDecision::SkipComplete, JsonCompleteness::Partial { current, total } => { eprintln!("\n⚠️ Found incomplete JSON file: {}", json_path.display()); eprintln!( " Progress: {}/{} ({:.1}%)", current, total, (current as f64 / total as f64) * 100.0 ); eprintln!(" Use --resume to continue from checkpoint"); eprintln!(" Use --force to reprocess from scratch"); ProcessingDecision::SkipComplete } JsonCompleteness::Empty => ProcessingDecision::Process, } } #[derive(Debug, Clone, PartialEq)] pub enum JsonCompleteness { Complete, Partial { current: u32, total: u32 }, Empty, } fn check_json_completeness(json_path: &Path) -> JsonCompleteness { let content = match std::fs::read_to_string(json_path) { Ok(c) => c, Err(_) => return JsonCompleteness::Empty, }; if content.trim().is_empty() { return JsonCompleteness::Empty; } let json: serde_json::Value = match serde_json::from_str(&content) { Ok(v) => v, Err(_) => return JsonCompleteness::Empty, }; match json.get("segments") { Some(serde_json::Value::Array(arr)) if !arr.is_empty() => JsonCompleteness::Complete, Some(serde_json::Value::Object(obj)) => { let current = obj.get("current").and_then(|v| v.as_u64()).unwrap_or(0) as u32; let total = obj.get("total").and_then(|v| v.as_u64()).unwrap_or(0) as u32; if total > 0 && current < total { JsonCompleteness::Partial { current, total } } else { JsonCompleteness::Complete } } _ => JsonCompleteness::Complete, } } async fn process_asr_module( asr_path: &Path, video_path: &str, uuid: &str, progress_state: &Arc>, ui: &Arc>>, ) -> anyhow::Result<()> { { let mut state = progress_state.lock().unwrap(); state.get_processor(ProcessorType::Asr).start(1); } let asr_result = momentry_core::core::processor::process_asr( video_path, asr_path.to_str().unwrap(), Some(uuid), ) .await?; let asr_json = serde_json::to_string_pretty(&asr_result)?; std::fs::write(asr_path, &asr_json)?; let output_dir = OutputDir::new(); let _ = output_dir.backup_file(uuid, "asr.json"); println!(" ✓ ASR saved: {} segments", asr_result.segments.len()); { let mut state = progress_state.lock().unwrap(); state .get_processor(ProcessorType::Asr) .complete(&format!("{} segments", asr_result.segments.len())); } if let Some(ref mut ui) = *ui.lock().unwrap() { let _ = ui.render(); } Ok(()) } async fn process_cut_module( cut_path: &Path, video_path: &str, uuid: &str, progress_state: &Arc>, ui: &Arc>>, ) -> anyhow::Result<()> { { let mut state = progress_state.lock().unwrap(); state.get_processor(ProcessorType::Cut).start(1); } let cut_result = momentry_core::core::processor::process_cut( video_path, cut_path.to_str().unwrap(), Some(uuid), ) .await?; let cut_json = serde_json::to_string_pretty(&cut_result)?; std::fs::write(cut_path, &cut_json)?; let output_dir = OutputDir::new(); let _ = output_dir.backup_file(uuid, "cut.json"); println!(" ✓ CUT saved: {} scenes", cut_result.scenes.len()); { let mut state = progress_state.lock().unwrap(); state .get_processor(ProcessorType::Cut) .complete(&format!("{} scenes", cut_result.scenes.len())); } if let Some(ref mut ui) = *ui.lock().unwrap() { let _ = ui.render(); } Ok(()) } async fn process_asrx_module( asrx_path: &Path, video_path: &str, uuid: &str, progress_state: &Arc>, ui: &Arc>>, ) -> anyhow::Result<()> { { let mut state = progress_state.lock().unwrap(); state.get_processor(ProcessorType::Asrx).start(1); } let asrx_result = momentry_core::core::processor::process_asrx( video_path, asrx_path.to_str().unwrap(), Some(uuid), ) .await?; let asrx_json = serde_json::to_string_pretty(&asrx_result)?; std::fs::write(asrx_path, &asrx_json)?; let output_dir = OutputDir::new(); let _ = output_dir.backup_file(uuid, "asrx.json"); println!(" ✓ ASRX saved: {} segments", asrx_result.segments.len()); { let mut state = progress_state.lock().unwrap(); state .get_processor(ProcessorType::Asrx) .complete(&format!("{} segments", asrx_result.segments.len())); } if let Some(ref mut ui) = *ui.lock().unwrap() { let _ = ui.render(); } Ok(()) } async fn process_yolo_module( yolo_path: &Path, video_path: &str, uuid: &str, progress_state: &Arc>, ui: &Arc>>, ) -> anyhow::Result<()> { { let mut state = progress_state.lock().unwrap(); state.get_processor(ProcessorType::Yolo).start(1); } let yolo_result = momentry_core::core::processor::process_yolo( video_path, yolo_path.to_str().unwrap(), Some(uuid), ) .await?; let yolo_json = serde_json::to_string_pretty(&yolo_result)?; std::fs::write(yolo_path, &yolo_json)?; let output_dir = OutputDir::new(); let _ = output_dir.backup_file(uuid, "yolo.json"); println!(" ✓ YOLO saved: {} frames", yolo_result.frame_count); { let mut state = progress_state.lock().unwrap(); state .get_processor(ProcessorType::Yolo) .complete(&format!("{} frames", yolo_result.frame_count)); } if let Some(ref mut ui) = *ui.lock().unwrap() { let _ = ui.render(); } Ok(()) } async fn process_ocr_module( ocr_path: &Path, video_path: &str, uuid: &str, progress_state: &Arc>, ui: &Arc>>, ) -> anyhow::Result<()> { { let mut state = progress_state.lock().unwrap(); state.get_processor(ProcessorType::Ocr).start(1); } let ocr_result = momentry_core::core::processor::process_ocr( video_path, ocr_path.to_str().unwrap(), Some(uuid), ) .await?; let ocr_json = serde_json::to_string_pretty(&ocr_result)?; std::fs::write(ocr_path, &ocr_json)?; let output_dir = OutputDir::new(); let _ = output_dir.backup_file(uuid, "ocr.json"); println!( " ✓ OCR saved: {} frames with text", ocr_result.frames.len() ); { let mut state = progress_state.lock().unwrap(); state .get_processor(ProcessorType::Ocr) .complete(&format!("{} frames", ocr_result.frames.len())); } if let Some(ref mut ui) = *ui.lock().unwrap() { let _ = ui.render(); } Ok(()) } async fn process_face_module( face_path: &Path, video_path: &str, uuid: &str, progress_state: &Arc>, ui: &Arc>>, ) -> anyhow::Result<()> { { let mut state = progress_state.lock().unwrap(); state.get_processor(ProcessorType::Face).start(1); } let face_result = momentry_core::core::processor::process_face( video_path, face_path.to_str().unwrap(), Some(uuid), ) .await?; let face_json = serde_json::to_string_pretty(&face_result)?; std::fs::write(face_path, &face_json)?; let output_dir = OutputDir::new(); let _ = output_dir.backup_file(uuid, "face.json"); println!(" ✓ Face saved: {} frames", face_result.frames.len()); { let mut state = progress_state.lock().unwrap(); state .get_processor(ProcessorType::Face) .complete(&format!("{} frames", face_result.frames.len())); } if let Some(ref mut ui) = *ui.lock().unwrap() { let _ = ui.render(); } Ok(()) } async fn process_pose_module( pose_path: &Path, video_path: &str, uuid: &str, progress_state: &Arc>, ui: &Arc>>, ) -> anyhow::Result<()> { { let mut state = progress_state.lock().unwrap(); state.get_processor(ProcessorType::Pose).start(1); } let pose_result = momentry_core::core::processor::process_pose( video_path, pose_path.to_str().unwrap(), Some(uuid), ) .await?; let pose_json = serde_json::to_string_pretty(&pose_result)?; std::fs::write(pose_path, &pose_json)?; let output_dir = OutputDir::new(); let _ = output_dir.backup_file(uuid, "pose.json"); println!(" ✓ Pose saved: {} frames", pose_result.frames.len()); { let mut state = progress_state.lock().unwrap(); state .get_processor(ProcessorType::Pose) .complete(&format!("{} frames", pose_result.frames.len())); state.stop(); } if let Some(ref mut ui) = *ui.lock().unwrap() { let _ = ui.render(); } Ok(()) } async fn process_story_module( story_path: &Path, video_path: &str, uuid: &str, progress_state: &Arc>, ui: &Arc>>, ) -> anyhow::Result<()> { { let mut state = progress_state.lock().unwrap(); state.get_processor(ProcessorType::Story).start(1); } let story_result = momentry_core::core::processor::process_story( video_path, story_path.to_str().unwrap(), Some(uuid), ) .await?; let story_json = serde_json::to_string_pretty(&story_result)?; std::fs::write(story_path, &story_json)?; let output_dir = OutputDir::new(); let _ = output_dir.backup_file(uuid, "story.json"); println!( " ✓ Story saved: {} parent chunks, {} child chunks", story_result.stats.total_parent_chunks, story_result.stats.total_child_chunks ); { let mut state = progress_state.lock().unwrap(); state.get_processor(ProcessorType::Story).complete(&format!( "{} parents, {} children", story_result.stats.total_parent_chunks, story_result.stats.total_child_chunks )); } if let Some(ref mut ui) = *ui.lock().unwrap() { let _ = ui.render(); } Ok(()) } async fn process_caption_module( caption_path: &Path, video_path: &str, uuid: &str, progress_state: &Arc>, ui: &Arc>>, ) -> anyhow::Result<()> { { let mut state = progress_state.lock().unwrap(); state.get_processor(ProcessorType::Caption).start(1); } let caption_result = momentry_core::core::processor::process_caption( video_path, caption_path.to_str().unwrap(), Some(uuid), ) .await?; let caption_json = serde_json::to_string_pretty(&caption_result)?; std::fs::write(caption_path, &caption_json)?; let output_dir = OutputDir::new(); let _ = output_dir.backup_file(uuid, "caption.json"); println!(" ✓ Caption saved: {} frames", caption_result.total_frames); { let mut state = progress_state.lock().unwrap(); state .get_processor(ProcessorType::Caption) .complete(&format!("{} frames", caption_result.total_frames)); } if let Some(ref mut ui) = *ui.lock().unwrap() { let _ = ui.render(); } Ok(()) } #[derive(Parser)] #[command(name = "momentry")] #[command(about = "Digital asset management system with video analysis and RAG")] #[command(version = env!("BUILD_VERSION"))] struct Cli { #[command(subcommand)] command: Commands, } #[derive(Subcommand)] enum Commands { /// Register a video file Register { /// Video file path or URL path: String, }, /// Process video (generate all JSON files) Process { /// UUID or path target: String, /// Modules to process (comma separated: asr,cut,asrx,yolo,ocr,face,pose,story,caption) /// If not specified, processes all modules #[arg(short, long, value_delimiter = ',')] modules: Option>, /// Modules to process via cloud (comma separated) /// Example: --cloud asr,yolo #[arg(long, value_delimiter = ',')] cloud: Option>, /// Force reprocess even if JSON exists (skip completeness check) #[arg(long, default_value = "false")] force: bool, /// Resume from last checkpoint if processing was interrupted #[arg(long, default_value = "false")] resume: bool, }, /// Generate chunks and store in database Chunk { /// UUID uuid: String, }, /// Generate story for cut scenes Story { /// UUID uuid: String, }, /// Vectorize chunks Vectorize { /// UUID (or 'all' for all) uuid: String, }, /// Play video with overlays Play { /// Video path or UUID target: String, }, /// Start watching directories Watch { /// Directories to watch (comma separated) directories: Option, }, /// Check system resources and recommend processing strategy System { /// Show detailed GPU info (NVIDIA/MPS) #[arg(long)] gpu: bool, }, /// Start API server Server { /// Host #[arg(long, default_value = "127.0.0.1")] host: String, /// Port (defaults to MOMENTRY_SERVER_PORT env var, or3002 for production) #[arg(long)] port: Option, }, /// Start job worker Worker { /// Max concurrent processors #[arg(long)] max_concurrent: Option, /// Poll interval in seconds #[arg(long)] poll_interval: Option, /// Batch size #[arg(long)] batch_size: Option, }, /// Query using RAG Query { /// Query text query: String, }, /// Lookup UUID from path Lookup { /// File path path: String, }, /// Resolve path from UUID Resolve { /// UUID uuid: String, }, /// Generate thumbnails for videos Thumbnails { /// UUID (optional, generates for all if not specified) uuid: Option, /// Number of thumbnails per video #[arg(short, long, default_value = "6")] count: u32, }, /// Show storage status report Status { /// UUID (optional, shows all if not specified) uuid: Option, }, /// Manage output backups Backup { /// Action: list, cleanup action: String, /// Days to keep (for cleanup) days: Option, }, /// Manage API keys ApiKey { /// Action: create, list, validate, revoke, rotate, stats #[arg(value_enum)] action: ApiKeyAction, /// Key name (for create) name: Option, /// Key type (system, user, service, integration, emergency) #[arg(long)] key_type: Option, /// TTL in days (for create) #[arg(long)] ttl: Option, /// API key to validate/revoke #[arg(long)] key: Option, }, /// Manage Gitea API tokens Gitea { /// Action: create, list, delete, verify #[arg(value_enum)] action: GiteaAction, /// Gitea username #[arg(long)] username: Option, /// Gitea password (for create/list/delete) #[arg(long)] password: Option, /// Token name (for create/delete) #[arg(long)] token_name: Option, /// Token scopes (comma separated: read:repository,write:issue) #[arg(long)] scopes: Option, }, /// Manage n8n API keys N8n { /// Action: create, list, delete, verify #[arg(value_enum)] action: N8nAction, /// n8n API key (for create/list/delete) #[arg(long)] api_key: Option, /// API key label (for create/delete) #[arg(long)] label: Option, /// Expiration days (for create) #[arg(long)] expires_in_days: Option, }, } #[derive(clap::ValueEnum, Clone)] enum ApiKeyAction { Create, List, Validate, Revoke, Rotate, Stats, } #[derive(clap::ValueEnum, Clone)] enum GiteaAction { Create, List, Delete, Verify, } #[derive(clap::ValueEnum, Clone)] enum N8nAction { Create, List, Delete, Verify, } #[tokio::main] async fn main() -> Result<()> { dotenv::dotenv().ok(); tracing_subscriber::fmt::init(); let cli = Cli::parse(); match cli.command { Commands::Register { path } => { println!("Registering: {}", path); // Compute UUID let uuid = momentry_core::uuid::compute_uuid_from_path(&path); println!("UUID: {}", uuid); // Run ffprobe let probe_result = momentry_core::core::probe::probe_video(&path)?; println!("\nVideo probe results:"); let duration = probe_result .format .duration .as_ref() .and_then(|s| s.parse::().ok()) .unwrap_or(0.0); println!(" Duration: {}s", duration); if let Some(size) = &probe_result.format.size { println!(" Size: {}", size); } let mut width = 0u32; let mut height = 0u32; let mut fps = 0.0; for stream in &probe_result.streams { if stream.codec_type.as_deref() == Some("video") { width = stream.width.unwrap_or(0); height = stream.height.unwrap_or(0); if let Some(fps_str) = &stream.r_frame_rate { if let Some((num, den)) = fps_str.split_once('/') { if let (Ok(n), Ok(d)) = (num.parse::(), den.parse::()) { if d > 0.0 { fps = n / d; } } } } println!(" Video: {}x{}", width, height); if let Some(fps) = &stream.r_frame_rate { println!(" FPS: {}", fps); } } if stream.codec_type.as_deref() == Some("audio") { println!(" Audio: {} channels", stream.channels.unwrap_or(0)); if let Some(sr) = &stream.sample_rate { println!(" Sample Rate: {}", sr); } } } // Save probe JSON to file let file_manager = momentry_core::FileManager::new(std::path::PathBuf::from(".")); let json_str = serde_json::to_string_pretty(&probe_result)?; let json_path = file_manager.save_json(&uuid, "probe", &json_str)?; println!("\nProbe JSON saved to: {:?}", json_path); // Store in PostgreSQL println!("\nStoring in database..."); let db = PostgresDb::init().await?; let file_path = Path::new(&path) .canonicalize() .map(|p| p.to_string_lossy().to_string()) .unwrap_or_else(|_| path.clone()); let file_name = Path::new(&path) .file_name() .map(|n| n.to_string_lossy().to_string()) .unwrap_or_default(); let record = VideoRecord { id: 0, uuid: uuid.clone(), file_path, file_name, duration, width, height, fps, probe_json: Some(json_str), storage: Default::default(), status: VideoStatus::Pending, user_id: None, job_id: None, created_at: String::new(), registration_time: None, }; let video_id = db.register_video(&record).await?; println!("Video registered with ID: {}", video_id); Ok(()) } Commands::Process { target, modules, cloud, force, resume, } => { println!("Processing: {}", target); println!(" force: {}, resume: {}", force, resume); // Parse selected modules let selected_modules: Option> = modules.as_ref().map(|m| { m.iter() .filter_map(|name| { let name_lower = name.to_lowercase(); match name_lower.as_str() { "asr" => Some(ProcessorType::Asr), "cut" => Some(ProcessorType::Cut), "asrx" => Some(ProcessorType::Asrx), "yolo" => Some(ProcessorType::Yolo), "ocr" => Some(ProcessorType::Ocr), "face" => Some(ProcessorType::Face), "pose" => Some(ProcessorType::Pose), "story" => Some(ProcessorType::Story), "caption" => Some(ProcessorType::Caption), _ => { eprintln!("Unknown module: {}", name); None } } }) .collect() }); // Parse cloud modules let cloud_modules: Vec = cloud .as_ref() .map(|c| { c.iter() .filter_map(|name| { let name_lower = name.to_lowercase(); match name_lower.as_str() { "asr" => Some(ProcessorType::Asr), "cut" => Some(ProcessorType::Cut), "asrx" => Some(ProcessorType::Asrx), "yolo" => Some(ProcessorType::Yolo), "ocr" => Some(ProcessorType::Ocr), "face" => Some(ProcessorType::Face), "pose" => Some(ProcessorType::Pose), "story" => Some(ProcessorType::Story), "caption" => Some(ProcessorType::Caption), _ => { eprintln!("Unknown cloud module: {}", name); None } } }) .collect() }) .unwrap_or_default(); if let Some(ref mods) = selected_modules { println!( " Modules: {}", mods.iter() .map(|m| m.to_string()) .collect::>() .join(", ") ); } else { println!(" Modules: ALL"); } if !cloud_modules.is_empty() { println!( " Cloud: {}", cloud_modules .iter() .map(|m| m.to_string()) .collect::>() .join(", ") ); } let processing_mode = if force { "FORCE (reprocess all)" } else if resume { "RESUME (continue from checkpoint)" } else { "SMART (skip complete, resume partial)" }; println!(" Mode: {}", processing_mode); // Compute UUID if path is given let uuid = if target.len() == 16 && !target.contains('/') { target.clone() } else { momentry_core::uuid::compute_uuid_from_path(&target) }; // Get video from database let db = PostgresDb::init().await?; let video = db .get_video_by_uuid(&uuid) .await? .ok_or_else(|| anyhow::anyhow!("Video not found: {}", uuid))?; let video_path = &video.file_path; let video_name = video.file_name.clone(); let _file_manager = momentry_core::FileManager::new(std::path::PathBuf::from(".")); // Initialize output directory let output_dir = OutputDir::new(); output_dir.ensure_dir()?; println!("Output directory: {:?}", output_dir.get_base_path()); // Initialize progress UI let progress_state = Arc::new(Mutex::new(ProgressState::new(&video_name))); progress_state.lock().unwrap().start(); // Helper closure to check if a module should be processed let should_process = |module: ProcessorType| -> bool { selected_modules .as_ref() .map(|mods| mods.contains(&module)) .unwrap_or(true) }; // Helper closure to check if a module should run in the cloud let is_cloud = |module: ProcessorType| -> bool { cloud_modules.contains(&module) }; // Create UI and wrap in Arc for sharing with Redis subscriber let ui = Arc::new(Mutex::new(ProgressUi::new(&video_name).ok())); if let Some(ref mut ui) = *ui.lock().unwrap() { let _ = ui.render(); } // Spawn Redis subscriber for real-time progress updates let redis_progress_state = progress_state.clone(); let redis_ui = ui.clone(); let redis_uuid = uuid.clone(); let redis_handle = tokio::spawn(async move { if let Ok(redis_client) = momentry_core::core::db::RedisClient::new() { loop { if let Ok(mut pubsub) = redis_client.subscribe_progress(&redis_uuid).await { let mut stream = pubsub.on_message(); while let Some(msg) = stream.next().await { if let Ok(payload) = msg.get_payload::() { if let Ok(progress_msg) = serde_json::from_str::< momentry_core::core::db::ProgressMessage, >(&payload) { let mut state = redis_progress_state.lock().unwrap(); state.update_from_redis( &progress_msg.msg_type, &progress_msg.processor, progress_msg.data.current, progress_msg.data.total, progress_msg.data.message.as_deref(), ); // Store progress in Redis Hash for HTTP API let uuid = progress_msg.uuid.clone(); let processor = progress_msg.processor.clone(); let msg_type = progress_msg.msg_type.clone(); let current = progress_msg.data.current; let total = progress_msg.data.total; let message = progress_msg.data.message.clone(); tokio::spawn(async move { if let Ok(redis_client) = momentry_core::core::db::RedisClient::new() { if let Ok(mut conn) = redis_client.get_conn().await { let prefix = momentry_core::core::config::REDIS_KEY_PREFIX.as_str(); let key = format!( "{}job:{}:processor:{}", prefix, uuid, processor ); let _: () = redis::cmd("HSET") .arg(&key) .arg("status") .arg(&msg_type) .query_async(&mut conn) .await .unwrap_or(()); if let Some(c) = current { let _: () = redis::cmd("HSET") .arg(&key) .arg("current") .arg(c) .query_async(&mut conn) .await .unwrap_or(()); } if let Some(t) = total { let _: () = redis::cmd("HSET") .arg(&key) .arg("total") .arg(t) .query_async(&mut conn) .await .unwrap_or(()); } if let Some(ref m) = message { let _: () = redis::cmd("HSET") .arg(&key) .arg("message") .arg(m) .query_async(&mut conn) .await .unwrap_or(()); } let _: () = redis::cmd("EXPIRE") .arg(&key) .arg(86400i64) .query_async(&mut conn) .await .unwrap_or(()); } } }); // Trigger UI render on progress update if let Some(ref mut ui) = *redis_ui.lock().unwrap() { let _ = ui.render(); } } } } } tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; } } }); // Process ASR (Automatic Speech Recognition) if should_process(ProcessorType::Asr) { let asr_path = output_dir.get_output_path(&uuid, "asr.json"); let decision = decide_processing(&asr_path, force, resume); match decision { ProcessingDecision::SkipComplete => { println!("\nASR: ✓ Already complete, skipping"); } ProcessingDecision::ForceReprocess => { println!("\nASR: ⟳ Force reprocessing from scratch..."); std::fs::remove_file(&asr_path).ok(); if is_cloud(ProcessorType::Asr) { println!(" [Cloud processing not implemented yet - run locally]"); } else { process_asr_module(&asr_path, video_path, &uuid, &progress_state, &ui) .await?; } } ProcessingDecision::ResumePartial => { println!("\nASR: ↻ Resuming from checkpoint..."); if is_cloud(ProcessorType::Asr) { println!(" [Cloud processing not implemented yet - run locally]"); } else { process_asr_module(&asr_path, video_path, &uuid, &progress_state, &ui) .await?; } } ProcessingDecision::Process => { if is_cloud(ProcessorType::Asr) { println!("\nASR: ☁️ Running via cloud..."); println!(" [Cloud processing not implemented yet - run locally]"); } else { println!("\nASR: ⚙️ Processing..."); process_asr_module(&asr_path, video_path, &uuid, &progress_state, &ui) .await?; } } } } // Update storage status db.update_storage_status(&uuid, "fs_json", true).await?; // Process CUT (scene detection) if should_process(ProcessorType::Cut) { let cut_path = output_dir.get_output_path(&uuid, "cut.json"); let decision = decide_processing(&cut_path, force, resume); match decision { ProcessingDecision::SkipComplete => { println!("\nCUT: ✓ Already complete, skipping"); } ProcessingDecision::ForceReprocess => { println!("\nCUT: ⟳ Force reprocessing from scratch..."); std::fs::remove_file(&cut_path).ok(); if is_cloud(ProcessorType::Cut) { println!(" [Cloud processing not implemented yet - run locally]"); } else { process_cut_module(&cut_path, video_path, &uuid, &progress_state, &ui) .await?; } } ProcessingDecision::ResumePartial => { println!("\nCUT: ↻ Resuming from checkpoint..."); if is_cloud(ProcessorType::Cut) { println!(" [Cloud processing not implemented yet - run locally]"); } else { process_cut_module(&cut_path, video_path, &uuid, &progress_state, &ui) .await?; } } ProcessingDecision::Process => { if is_cloud(ProcessorType::Cut) { println!("\nCUT: ☁️ Running via cloud..."); println!(" [Cloud processing not implemented yet - run locally]"); } else { println!("\nCUT: ⚙️ Processing..."); process_cut_module(&cut_path, video_path, &uuid, &progress_state, &ui) .await?; } } } } // Process ASRX (speaker diarization) if should_process(ProcessorType::Asrx) { let asrx_path = output_dir.get_output_path(&uuid, "asrx.json"); let decision = decide_processing(&asrx_path, force, resume); match decision { ProcessingDecision::SkipComplete => { println!("\nASRX: ✓ Already complete, skipping"); } ProcessingDecision::ForceReprocess => { println!("\nASRX: ⟳ Force reprocessing from scratch..."); std::fs::remove_file(&asrx_path).ok(); if is_cloud(ProcessorType::Asrx) { println!(" [Cloud processing not implemented yet - run locally]"); } else { process_asrx_module( &asrx_path, video_path, &uuid, &progress_state, &ui, ) .await?; } } ProcessingDecision::ResumePartial => { println!("\nASRX: ↻ Resuming from checkpoint..."); if is_cloud(ProcessorType::Asrx) { println!(" [Cloud processing not implemented yet - run locally]"); } else { process_asrx_module( &asrx_path, video_path, &uuid, &progress_state, &ui, ) .await?; } } ProcessingDecision::Process => { if is_cloud(ProcessorType::Asrx) { println!("\nASRX: ☁️ Running via cloud..."); println!(" [Cloud processing not implemented yet - run locally]"); } else { println!("\nASRX: ⚙️ Processing..."); process_asrx_module( &asrx_path, video_path, &uuid, &progress_state, &ui, ) .await?; } } } } // Process YOLO (object detection) if should_process(ProcessorType::Yolo) { let yolo_path = output_dir.get_output_path(&uuid, "yolo.json"); let decision = decide_processing(&yolo_path, force, resume); match decision { ProcessingDecision::SkipComplete => { println!("\nYOLO: ✓ Already complete, skipping"); } ProcessingDecision::ForceReprocess => { println!("\nYOLO: ⟳ Force reprocessing from scratch..."); std::fs::remove_file(&yolo_path).ok(); if is_cloud(ProcessorType::Yolo) { println!(" [Cloud processing not implemented yet - run locally]"); } else { process_yolo_module( &yolo_path, video_path, &uuid, &progress_state, &ui, ) .await?; } } ProcessingDecision::ResumePartial => { println!("\nYOLO: ↻ Resuming from checkpoint..."); if is_cloud(ProcessorType::Yolo) { println!(" [Cloud processing not implemented yet - run locally]"); } else { process_yolo_module( &yolo_path, video_path, &uuid, &progress_state, &ui, ) .await?; } } ProcessingDecision::Process => { if is_cloud(ProcessorType::Yolo) { println!("\nYOLO: ☁️ Running via cloud..."); println!(" [Cloud processing not implemented yet - run locally]"); } else { println!("\nYOLO: ⚙️ Processing..."); process_yolo_module( &yolo_path, video_path, &uuid, &progress_state, &ui, ) .await?; } } } } // Process OCR (text recognition) if should_process(ProcessorType::Ocr) { let ocr_path = output_dir.get_output_path(&uuid, "ocr.json"); let decision = decide_processing(&ocr_path, force, resume); match decision { ProcessingDecision::SkipComplete => { println!("\nOCR: ✓ Already complete, skipping"); } ProcessingDecision::ForceReprocess => { println!("\nOCR: ⟳ Force reprocessing from scratch..."); std::fs::remove_file(&ocr_path).ok(); if is_cloud(ProcessorType::Ocr) { println!(" [Cloud processing not implemented yet - run locally]"); } else { process_ocr_module(&ocr_path, video_path, &uuid, &progress_state, &ui) .await?; } } ProcessingDecision::ResumePartial => { println!("\nOCR: ↻ Resuming from checkpoint..."); if is_cloud(ProcessorType::Ocr) { println!(" [Cloud processing not implemented yet - run locally]"); } else { process_ocr_module(&ocr_path, video_path, &uuid, &progress_state, &ui) .await?; } } ProcessingDecision::Process => { if is_cloud(ProcessorType::Ocr) { println!("\nOCR: ☁️ Running via cloud..."); println!(" [Cloud processing not implemented yet - run locally]"); } else { println!("\nOCR: ⚙️ Processing..."); process_ocr_module(&ocr_path, video_path, &uuid, &progress_state, &ui) .await?; } } } } // Process Face (face detection) if should_process(ProcessorType::Face) { let face_path = output_dir.get_output_path(&uuid, "face.json"); let decision = decide_processing(&face_path, force, resume); match decision { ProcessingDecision::SkipComplete => { println!("\nFace: ✓ Already complete, skipping"); } ProcessingDecision::ForceReprocess => { println!("\nFace: ⟳ Force reprocessing from scratch..."); std::fs::remove_file(&face_path).ok(); if is_cloud(ProcessorType::Face) { println!(" [Cloud processing not implemented yet - run locally]"); } else { process_face_module( &face_path, video_path, &uuid, &progress_state, &ui, ) .await?; } } ProcessingDecision::ResumePartial => { println!("\nFace: ↻ Resuming from checkpoint..."); if is_cloud(ProcessorType::Face) { println!(" [Cloud processing not implemented yet - run locally]"); } else { process_face_module( &face_path, video_path, &uuid, &progress_state, &ui, ) .await?; } } ProcessingDecision::Process => { if is_cloud(ProcessorType::Face) { println!("\nFace: ☁️ Running via cloud..."); println!(" [Cloud processing not implemented yet - run locally]"); } else { println!("\nFace: ⚙️ Processing..."); process_face_module( &face_path, video_path, &uuid, &progress_state, &ui, ) .await?; } } } } // Process Pose (pose estimation) if should_process(ProcessorType::Pose) { let pose_path = output_dir.get_output_path(&uuid, "pose.json"); let decision = decide_processing(&pose_path, force, resume); match decision { ProcessingDecision::SkipComplete => { println!("\nPose: ✓ Already complete, skipping"); } ProcessingDecision::ForceReprocess => { println!("\nPose: ⟳ Force reprocessing from scratch..."); std::fs::remove_file(&pose_path).ok(); if is_cloud(ProcessorType::Pose) { println!(" [Cloud processing not implemented yet - run locally]"); } else { process_pose_module( &pose_path, video_path, &uuid, &progress_state, &ui, ) .await?; } } ProcessingDecision::ResumePartial => { println!("\nPose: ↻ Resuming from checkpoint..."); if is_cloud(ProcessorType::Pose) { println!(" [Cloud processing not implemented yet - run locally]"); } else { process_pose_module( &pose_path, video_path, &uuid, &progress_state, &ui, ) .await?; } } ProcessingDecision::Process => { if is_cloud(ProcessorType::Pose) { println!("\nPose: ☁️ Running via cloud..."); println!(" [Cloud processing not implemented yet - run locally]"); } else { println!("\nPose: ⚙️ Processing..."); process_pose_module( &pose_path, video_path, &uuid, &progress_state, &ui, ) .await?; } } } } // Process Story (video narrative) if should_process(ProcessorType::Story) { let story_path = output_dir.get_output_path(&uuid, "story.json"); let decision = decide_processing(&story_path, force, resume); match decision { ProcessingDecision::SkipComplete => { println!("\nStory: ✓ Already complete, skipping"); } ProcessingDecision::ForceReprocess => { println!("\nStory: ⟳ Force reprocessing from scratch..."); std::fs::remove_file(&story_path).ok(); if is_cloud(ProcessorType::Story) { println!(" [Cloud processing not implemented yet - run locally]"); } else { process_story_module( &story_path, video_path, &uuid, &progress_state, &ui, ) .await?; } } ProcessingDecision::ResumePartial => { println!("\nStory: ↻ Resuming from checkpoint..."); if is_cloud(ProcessorType::Story) { println!(" [Cloud processing not implemented yet - run locally]"); } else { process_story_module( &story_path, video_path, &uuid, &progress_state, &ui, ) .await?; } } ProcessingDecision::Process => { if is_cloud(ProcessorType::Story) { println!("\nStory: ☁️ Running via cloud..."); println!(" [Cloud processing not implemented yet - run locally]"); } else { println!("\nStory: ⚙️ Processing..."); process_story_module( &story_path, video_path, &uuid, &progress_state, &ui, ) .await?; } } } } // Process Caption (image captions) if should_process(ProcessorType::Caption) { let caption_path = output_dir.get_output_path(&uuid, "caption.json"); let decision = decide_processing(&caption_path, force, resume); match decision { ProcessingDecision::SkipComplete => { println!("\nCaption: ✓ Already complete, skipping"); } ProcessingDecision::ForceReprocess => { println!("\nCaption: ⟳ Force reprocessing from scratch..."); std::fs::remove_file(&caption_path).ok(); if is_cloud(ProcessorType::Caption) { println!(" [Cloud processing not implemented yet - run locally]"); } else { process_caption_module( &caption_path, video_path, &uuid, &progress_state, &ui, ) .await?; } } ProcessingDecision::ResumePartial => { println!("\nCaption: ↻ Resuming from checkpoint..."); if is_cloud(ProcessorType::Caption) { println!(" [Cloud processing not implemented yet - run locally]"); } else { process_caption_module( &caption_path, video_path, &uuid, &progress_state, &ui, ) .await?; } } ProcessingDecision::Process => { if is_cloud(ProcessorType::Caption) { println!("\nCaption: ☁️ Running via cloud..."); println!(" [Cloud processing not implemented yet - run locally]"); } else { println!("\nCaption: ⚙️ Processing..."); process_caption_module( &caption_path, video_path, &uuid, &progress_state, &ui, ) .await?; } } } } // TODO: Store pre_chunks and frames to database // Stop Redis subscriber redis_handle.abort(); println!("\n✓ Process stage completed!"); if should_process(ProcessorType::Asr) { let path = output_dir.get_output_path(&uuid, "asr.json"); println!(" - ASR JSON: {}", path.display()); } if should_process(ProcessorType::Cut) { let path = output_dir.get_output_path(&uuid, "cut.json"); println!(" - CUT JSON: {}", path.display()); } if should_process(ProcessorType::Asrx) { let path = output_dir.get_output_path(&uuid, "asrx.json"); println!(" - ASRX JSON: {}", path.display()); } if should_process(ProcessorType::Yolo) { let path = output_dir.get_output_path(&uuid, "yolo.json"); println!(" - YOLO JSON: {}", path.display()); } if should_process(ProcessorType::Ocr) { let path = output_dir.get_output_path(&uuid, "ocr.json"); println!(" - OCR JSON: {}", path.display()); } if should_process(ProcessorType::Face) { let path = output_dir.get_output_path(&uuid, "face.json"); println!(" - Face JSON: {}", path.display()); } if should_process(ProcessorType::Pose) { let path = output_dir.get_output_path(&uuid, "pose.json"); println!(" - Pose JSON: {}", path.display()); } if should_process(ProcessorType::Story) { let path = output_dir.get_output_path(&uuid, "story.json"); println!(" - Story JSON: {}", path.display()); } if should_process(ProcessorType::Caption) { let path = output_dir.get_output_path(&uuid, "caption.json"); println!(" - Caption JSON: {}", path.display()); } Ok(()) } Commands::Chunk { uuid } => { println!("Chunking: {}", uuid); let db = PostgresDb::init().await?; let video = db .get_video_by_uuid(&uuid) .await? .ok_or_else(|| anyhow::anyhow!("Video not found: {}", uuid))?; let file_id = video.id; let fps = video.fps; // ========== Read all JSON files ========== // Read ASR JSON let asr_path = format!("{}.asr.json", uuid); let asr_json = std::fs::read_to_string(&asr_path) .context("ASR file not found. Run 'process' first.")?; let asr_result: momentry_core::core::processor::asr::AsrResult = serde_json::from_str(&asr_json)?; println!("Loaded ASR: {} segments", asr_result.segments.len()); // Read CUT JSON let cut_path = format!("{}.cut.json", uuid); let cut_json = std::fs::read_to_string(&cut_path) .context("CUT file not found. Run 'process' first.")?; let cut_result: momentry_core::core::processor::cut::CutResult = serde_json::from_str(&cut_json)?; println!("Loaded CUT: {} scenes", cut_result.scenes.len()); // Read YOLO JSON (optional) let yolo_path = format!("{}.yolo.json", uuid); let yolo_result = match std::fs::read_to_string(&yolo_path) { Ok(yolo_json) => match serde_json::from_str::< momentry_core::core::processor::yolo::YoloResult, >(&yolo_json) { Ok(result) => { println!("Loaded YOLO: {} frames", result.frames.len()); result } Err(e) => { println!("Warning: Failed to parse YOLO JSON: {}. Skipping YOLO.", e); momentry_core::core::processor::yolo::YoloResult { frame_count: 0, fps: 0.0, frames: vec![], } } }, Err(_) => { println!("Warning: YOLO file not found. Skipping YOLO."); momentry_core::core::processor::yolo::YoloResult { frame_count: 0, fps: 0.0, frames: vec![], } } }; // Read OCR JSON (optional) let ocr_path = format!("{}.ocr.json", uuid); let ocr_result = match std::fs::read_to_string(&ocr_path) { Ok(ocr_json) => match serde_json::from_str::< momentry_core::core::processor::ocr::OcrResult, >(&ocr_json) { Ok(result) => { println!("Loaded OCR: {} frames", result.frames.len()); result } Err(e) => { println!("Warning: Failed to parse OCR JSON: {}. Skipping OCR.", e); momentry_core::core::processor::ocr::OcrResult { frame_count: 0, fps: 0.0, frames: vec![], } } }, Err(_) => { println!("Warning: OCR file not found. Skipping OCR."); momentry_core::core::processor::ocr::OcrResult { frame_count: 0, fps: 0.0, frames: vec![], } } }; // Read Face JSON (optional) let face_path = format!("{}.face.json", uuid); let face_result = match std::fs::read_to_string(&face_path) { Ok(face_json) => match serde_json::from_str::< momentry_core::core::processor::face::FaceResult, >(&face_json) { Ok(result) => { println!("Loaded Face: {} frames", result.frames.len()); result } Err(e) => { println!("Warning: Failed to parse Face JSON: {}. Skipping Face.", e); momentry_core::core::processor::face::FaceResult { frame_count: 0, fps: 0.0, frames: vec![], } } }, Err(_) => { println!("Warning: Face file not found. Skipping Face."); momentry_core::core::processor::face::FaceResult { frame_count: 0, fps: 0.0, frames: vec![], } } }; // Read Pose JSON (optional) let pose_path = format!("{}.pose.json", uuid); let pose_result = match std::fs::read_to_string(&pose_path) { Ok(pose_json) => match serde_json::from_str::< momentry_core::core::processor::pose::PoseResult, >(&pose_json) { Ok(result) => { println!("Loaded Pose: {} frames", result.frames.len()); result } Err(e) => { println!("Warning: Failed to parse Pose JSON: {}. Skipping Pose.", e); momentry_core::core::processor::pose::PoseResult { frame_count: 0, fps: 0.0, frames: vec![], } } }, Err(_) => { println!("Warning: Pose file not found. Skipping Pose."); momentry_core::core::processor::pose::PoseResult { frame_count: 0, fps: 0.0, frames: vec![], } } }; // Read ASRX JSON (optional) let asrx_path = format!("{}.asrx.json", uuid); let asrx_result = match std::fs::read_to_string(&asrx_path) { Ok(asrx_json) => match serde_json::from_str::< momentry_core::core::processor::asrx::AsrxResult, >(&asrx_json) { Ok(result) => { println!("Loaded ASRX: {} segments", result.segments.len()); result } Err(e) => { println!("Warning: Failed to parse ASRX JSON: {}. Skipping ASRX.", e); momentry_core::core::processor::asrx::AsrxResult { language: None, segments: vec![], } } }, Err(_) => { println!("Warning: ASRX file not found. Skipping ASRX."); momentry_core::core::processor::asrx::AsrxResult { language: None, segments: vec![], } } }; // ========== Store pre_chunks (from ASR, CUT) ========== println!("\nStoring pre_chunks..."); // Store ASR sentence pre_chunks let mut asr_pre_chunk_ids = Vec::new(); for seg in asr_result.segments.iter() { let start_frame = FrameTime::from_seconds(seg.start, fps).frames(); let end_frame = FrameTime::from_seconds(seg.end, fps).frames(); let pre_chunk = momentry_core::core::db::postgres_db::PreChunk { id: 0, file_id, source_type: "asr".to_string(), source_file: Some(asr_path.clone()), chunk_type: "sentence".to_string(), start_frame, end_frame, fps, raw_json: serde_json::json!({"text": seg.text}), text_content: Some(seg.text.clone()), processed: false, chunk_id: None, created_at: String::new(), }; let pre_chunk_id = db.store_pre_chunk(&pre_chunk).await?; asr_pre_chunk_ids.push(pre_chunk_id); } // Store CUT scene pre_chunks let mut cut_pre_chunk_ids = Vec::new(); for scene in &cut_result.scenes { let pre_chunk = momentry_core::core::db::postgres_db::PreChunk { id: 0, file_id, source_type: "cut".to_string(), source_file: Some(cut_path.clone()), chunk_type: "cut".to_string(), start_frame: scene.start_frame as i64, end_frame: scene.end_frame as i64, fps, raw_json: serde_json::json!({ "scene_number": scene.scene_number, }), text_content: None, processed: false, chunk_id: None, created_at: String::new(), }; let pre_chunk_id = db.store_pre_chunk(&pre_chunk).await?; cut_pre_chunk_ids.push(pre_chunk_id); } // Store time-based pre_chunks (every 10 seconds) let duration = video.duration; let mut time_pre_chunk_ids = Vec::new(); let mut time_start = 0.0; while time_start < duration { let time_end = (time_start + 10.0).min(duration); let start_frame = FrameTime::from_seconds(time_start, fps).frames(); let end_frame = FrameTime::from_seconds(time_end, fps).frames(); let pre_chunk = momentry_core::core::db::postgres_db::PreChunk { id: 0, file_id, source_type: "time".to_string(), source_file: None, chunk_type: "time".to_string(), start_frame, end_frame, fps, raw_json: serde_json::json!({"interval": 10.0}), text_content: None, processed: false, chunk_id: None, created_at: String::new(), }; let pre_chunk_id = db.store_pre_chunk(&pre_chunk).await?; time_pre_chunk_ids.push(pre_chunk_id); time_start = time_end; } println!( "Stored pre_chunks: {} asr + {} cut + {} time", asr_result.segments.len(), cut_result.scenes.len(), time_pre_chunk_ids.len() ); // ========== Store frames (from YOLO, OCR, Face) ========== println!("\nStoring frames..."); // Group YOLO, OCR, Face results by frame_number let mut frame_data: std::collections::HashMap< u64, momentry_core::core::processor::yolo::YoloFrame, > = std::collections::HashMap::new(); for frame in &yolo_result.frames { frame_data.insert(frame.frame, frame.clone()); } let mut ocr_by_frame: std::collections::HashMap< u64, momentry_core::core::processor::ocr::OcrFrame, > = std::collections::HashMap::new(); for frame in &ocr_result.frames { ocr_by_frame.insert(frame.frame, frame.clone()); } let mut face_by_frame: std::collections::HashMap< u64, momentry_core::core::processor::face::FaceFrame, > = std::collections::HashMap::new(); for frame in &face_result.frames { face_by_frame.insert(frame.frame, frame.clone()); } let mut pose_by_frame: std::collections::HashMap< u64, momentry_core::core::processor::pose::PoseFrame, > = std::collections::HashMap::new(); for frame in &pose_result.frames { pose_by_frame.insert(frame.frame, frame.clone()); } // Store frames (merge data from YOLO, OCR, Face, Pose) let mut all_frames: Vec = frame_data .keys() .cloned() .chain(ocr_by_frame.keys().cloned()) .chain(face_by_frame.keys().cloned()) .chain(pose_by_frame.keys().cloned()) .collect(); all_frames.sort(); all_frames.dedup(); for frame_num in &all_frames { let timestamp = (*frame_num as f64) / fps; let yolo_frame = frame_data.get(frame_num); let ocr_frame = ocr_by_frame.get(frame_num); let face_frame = face_by_frame.get(frame_num); let pose_frame = pose_by_frame.get(frame_num); let frame = momentry_core::core::db::postgres_db::Frame { id: 0, file_id, frame_number: *frame_num as i64, timestamp, fps, yolo_objects: yolo_frame.map(|f| serde_json::json!(&f.objects)), ocr_results: ocr_frame.map(|f| serde_json::json!(&f.texts)), face_results: face_frame.map(|f| serde_json::json!(&f.faces)), pose_results: pose_frame.map(|f| serde_json::json!(&f.persons)), frame_path: None, created_at: String::new(), }; db.store_frame(&frame).await?; } println!("Stored {} frames", all_frames.len()); // ========== Create chunks ========== println!("\nCreating chunks..."); // Rule 1: Direct conversion (sentence pre_chunk -> sentence chunk) // Merge ASRX speaker_id by time overlap let mut sentence_chunks = Vec::new(); for (i, seg) in asr_result.segments.iter().enumerate() { let pre_chunk_id = asr_pre_chunk_ids.get(i).copied().unwrap_or(0); // Find matching ASRX segment by time overlap let speaker_id = asrx_result .segments .iter() .find(|ax| { // Overlap: ASRX segment overlaps with ASR segment ax.start <= seg.end && ax.end >= seg.start }) .and_then(|ax| ax.speaker_id.clone()); let content = if let Some(ref sid) = speaker_id { serde_json::json!({ "text": seg.text, "speaker_id": sid, }) } else { serde_json::json!({ "text": seg.text, }) }; let mut chunk = Chunk::from_seconds( file_id as i32, uuid.clone(), i as u32, ChunkType::Sentence, ChunkRule::Rule1, seg.start, seg.end, fps, content, ) .with_text_content(seg.text.clone()) .with_pre_chunk_ids(vec![pre_chunk_id as i32]); // Add ASRX metadata if available if speaker_id.is_some() { chunk = chunk.with_metadata(serde_json::json!({ "language": asr_result.language, "language_probability": asr_result.language_probability, "speaker_matched": true, })); } sentence_chunks.push(chunk); } if !asrx_result.segments.is_empty() { let matched = sentence_chunks .iter() .filter(|c| { c.content .get("speaker_id") .and_then(|v| v.as_str()) .is_some() }) .count(); println!( " ASRX merge: {}/{} sentence chunks matched to speakers", matched, sentence_chunks.len() ); } // Rule 1: CUT chunks let mut cut_chunks = Vec::new(); for (i, scene) in cut_result.scenes.iter().enumerate() { let pre_chunk_id = cut_pre_chunk_ids.get(i).copied().unwrap_or(0); let chunk = Chunk::from_seconds( file_id as i32, uuid.clone(), i as u32, ChunkType::Cut, ChunkRule::Rule1, scene.start_time, scene.end_time, fps, serde_json::json!({ "scene_number": scene.scene_number, }), ) .with_pre_chunk_ids(vec![pre_chunk_id as i32]); cut_chunks.push(chunk); } // Rule 1: Time-based chunks let splitter = momentry_core::core::chunk::ChunkSplitter::new(10.0); let mut time_chunks = Vec::new(); let time_chunk_list = splitter.split_time_based(&uuid, video.duration); for (i, tc) in time_chunk_list.iter().enumerate() { let pre_chunk_id = time_pre_chunk_ids.get(i).copied().unwrap_or(0); let chunk = Chunk::new( file_id as i32, uuid.clone(), i as u32, ChunkType::TimeBased, ChunkRule::Rule1, tc.start_frame, tc.end_frame, fps, serde_json::json!({"interval": 10.0}), ) .with_pre_chunk_ids(vec![pre_chunk_id as i32]); time_chunks.push(chunk); } // Store chunks println!( "Storing {} sentence chunks (rule_1)...", sentence_chunks.len() ); for chunk in &sentence_chunks { db.store_chunk(chunk).await?; } println!("Storing {} cut chunks (rule_1)...", cut_chunks.len()); for chunk in &cut_chunks { db.store_chunk(chunk).await?; } println!( "Storing {} time-based chunks (rule_1)...", time_chunks.len() ); for chunk in &time_chunks { db.store_chunk(chunk).await?; } let total_chunks = sentence_chunks.len() + cut_chunks.len() + time_chunks.len(); // Update storage status db.update_storage_status(&uuid, "psql_chunk", true).await?; println!("\n✓ Chunk stage completed!"); println!( " - pre_chunks: {} (asr + cut + time)", asr_result.segments.len() + cut_result.scenes.len() + time_pre_chunk_ids.len() ); println!(" - frames: {}", all_frames.len()); println!(" - chunks: {} (sentence + cut + time_based)", total_chunks); Ok(()) } Commands::Story { uuid } => { println!("Generating story for: {}", uuid); let db = PostgresDb::init().await?; let video = db .get_video_by_uuid(&uuid) .await? .ok_or_else(|| anyhow::anyhow!("Video not found: {}", uuid))?; let file_id = video.id; let _fps = video.fps; let duration = video.duration; // Get all chunks let all_chunks = db.get_chunks_by_uuid(&uuid).await?; // Try cut chunks first, fall back to sentence chunks let mut story_chunks: Vec<&Chunk> = all_chunks .iter() .filter(|c| c.chunk_type == ChunkType::Cut) .collect(); let story_type = if story_chunks.is_empty() { // Fall back to sentence chunks story_chunks = all_chunks .iter() .filter(|c| c.chunk_type == ChunkType::Sentence && c.text_content.is_some()) .collect(); "sentence" } else { "cut" }; if story_chunks.is_empty() { println!("No story chunks found. Run 'chunk' command first."); return Ok(()); } println!("Found {} {} scenes", story_chunks.len(), story_type); // Generate story for each scene for (i, story_chunk) in story_chunks.iter().enumerate() { println!("\n=== Scene {} ===", i + 1); println!( "Time: {:.2}s - {:.2}s", story_chunk.start_time().seconds(), story_chunk.end_time().seconds() ); // Get context: expand time range by 5 seconds before and after let context_start = (story_chunk.start_time().seconds() - 5.0).max(0.0); let context_end = (story_chunk.end_time().seconds() + 5.0).min(duration); // Get chunks in context range (sentence chunks with ASR text) let context_chunks = db .get_chunks_by_time_range(file_id, context_start, context_end) .await?; // Get frames in context range let context_frames = db .get_frames_by_time_range(file_id, context_start, context_end) .await?; // Build story let mut story = String::new(); story.push_str(&format!( "Scene {} ({:.1}s - {:.1}s)\n\n", i + 1, story_chunk.start_time().seconds(), story_chunk.end_time().seconds() )); // Add audio/text content let sentence_chunks: Vec<&Chunk> = context_chunks .iter() .filter(|c| c.chunk_type == ChunkType::Sentence) .collect(); if !sentence_chunks.is_empty() { story.push_str("【Speech】\n"); for sc in &sentence_chunks { if let Some(text) = &sc.text_content { story.push_str(&format!(" - {}\n", text)); } } story.push('\n'); } // Aggregate YOLO objects let mut all_objects: std::collections::HashMap = std::collections::HashMap::new(); for frame in &context_frames { if let Some(objects) = &frame.yolo_objects { if let Some(arr) = objects.as_array() { for obj in arr { if let Some(class_name) = obj.get("class_name").and_then(|v| v.as_str()) { *all_objects.entry(class_name.to_string()).or_insert(0) += 1; } } } } } if !all_objects.is_empty() { story.push_str("【Objects】\n"); let mut sorted_objects: Vec<_> = all_objects.iter().collect(); sorted_objects.sort_by(|a, b| b.1.cmp(a.1)); for (obj, count) in sorted_objects.iter().take(10) { story.push_str(&format!(" - {} ({} frames)\n", obj, count)); } story.push('\n'); } // Aggregate OCR text let mut all_texts: Vec = Vec::new(); for frame in &context_frames { if let Some(texts) = &frame.ocr_results { if let Some(arr) = texts.as_array() { for txt in arr { if let Some(text) = txt.get("text").and_then(|v| v.as_str()) { if !text.is_empty() && text.len() > 2 { all_texts.push(text.to_string()); } } } } } } if !all_texts.is_empty() { story.push_str("【Text in video】\n"); for txt in all_texts.iter().take(10) { story.push_str(&format!(" - {}\n", txt)); } story.push('\n'); } // Aggregate faces let mut face_count = 0; for frame in &context_frames { if let Some(faces) = &frame.face_results { if let Some(arr) = faces.as_array() { face_count += arr.len(); } } } if face_count > 0 { story.push_str(&format!( "【Faces】\n - {} face(s) detected\n\n", face_count )); } println!("{}", story); } Ok(()) } Commands::Vectorize { uuid } => { println!("Vectorizing: {}", uuid); let pg = PostgresDb::init() .await .context("Failed to init PostgreSQL")?; let qdrant = QdrantDb::init().await.context("Failed to init Qdrant")?; let embedder = Embedder::new("nomic-embed-text-v2-moe:latest".to_string()); let mut stored_count = 0usize; // Get list of videos to process let videos_to_process = if uuid == "all" { // Get all videos let videos = pg.list_videos(10000, 0).await?.0; videos.into_iter().map(|v| v.uuid).collect::>() } else { // Process single video vec![uuid.clone()] }; for target in &videos_to_process { println!("\n=== Processing video: {} ===", target); let chunks = pg.get_chunks_by_uuid(target.as_str()).await?; let sentence_chunks: Vec<_> = chunks .into_iter() .filter(|c| c.chunk_type == ChunkType::Sentence) .collect(); println!( "Found {} sentence chunks for {}", sentence_chunks.len(), target ); let mut video_stored_count = 0usize; for chunk in sentence_chunks { // Try to extract text from different possible locations let text = chunk .content .get("data") // Try data->text structure first .and_then(|data| data.get("text")) .and_then(|v| v.as_str()) .or_else(|| chunk.content.get("text").and_then(|v| v.as_str())) // Try root text structure .unwrap_or(""); if text.is_empty() { eprintln!( "Empty text for chunk {}, content: {:?}", chunk.chunk_id, chunk.content ); continue; } print!("Embedding chunk {}... ", chunk.chunk_id); std::io::stdout().flush().unwrap(); match embedder.embed_document(text).await { Ok(vector) => { println!("embedding success ({} dims)", vector.len()); let vector_id = format!("{}_{}", chunk.uuid, chunk.chunk_id); if let Err(e) = pg.store_vector(&chunk.chunk_id, &vector, &chunk.uuid).await { eprintln!("store_vector error for {}: {}", chunk.chunk_id, e); continue; } let qdrant_payload = VectorPayload { uuid: chunk.uuid.clone(), chunk_id: chunk.chunk_id.clone(), chunk_type: "sentence".to_string(), start_time: chunk.start_time().seconds(), end_time: chunk.end_time().seconds(), text: Some(text.to_string()), }; if let Err(e) = qdrant .upsert_vector(&chunk.chunk_id, &vector, qdrant_payload) .await { eprintln!("upsert_vector error for {}: {}", chunk.chunk_id, e); continue; } if let Err(e) = pg.update_vector_id(&chunk.chunk_id, &vector_id).await { eprintln!("update_vector_id error for {}: {}", chunk.chunk_id, e); continue; } stored_count += 1; video_stored_count += 1; println!( "stored (video: {}, total: {})", video_stored_count, stored_count ); } Err(e) => { println!("embedding failed: {}", e); } } } // Only update storage status if vectors were actually stored for this video if video_stored_count > 0 { pg.update_storage_status(target.as_str(), "pvector_chunk", true) .await?; pg.update_storage_status(target.as_str(), "qvector_chunk", true) .await?; println!( "✓ Vectorize stage completed for {}! ({} vectors stored)", target, video_stored_count ); } else { println!( "✗ Vectorize stage failed for {}! (0 vectors stored)", target ); } } println!("\n=== Vectorization Summary ==="); println!("Total vectors stored: {}", stored_count); if uuid == "all" { println!("✓ Vectorize stage completed for all videos!"); } Ok(()) } Commands::Play { target } => { println!("Playing: {}", target); // TODO: Implement play Ok(()) } Commands::Watch { directories } => { println!("Starting watcher: {:?}", directories); // TODO: Implement watch Ok(()) } Commands::System { gpu } => { let resources = SystemResources::check(); println!("╔══════════════════════════════════════════════════════════════╗"); println!("║ System Resources Report ║"); println!("╠══════════════════════════════════════════════════════════════╣"); println!( "║ CPU: {:.1}% idle ║", resources.cpu_idle_percent ); println!( "║ Memory: {:.1}GB / {:.1}GB available ({:.0}% used) ║", resources.memory_available_mb as f64 / 1024.0, resources.memory_total_mb as f64 / 1024.0, resources.memory_used_percent ); if resources.gpu_available { match resources.gpu_type { GpuType::Nvidia => { let util = resources.gpu_utilization.unwrap_or(0.0); println!( "║ GPU: NVIDIA - {:.0}% utilized ║", util ); } GpuType::AppleMps => { println!( "║ GPU: Apple MPS (Metal) - available ║" ); } } } else { println!("║ GPU: None detected ║"); } println!("╠══════════════════════════════════════════════════════════════╣"); if resources.can_parallel(4096) { println!("║ Mode: PARALLEL - Can run multiple modules together ║"); println!( "║ Recommended modules: {} ║", resources.recommend_parallel_modules().join(", ") ); } else { println!("║ Mode: SEQUENTIAL - Low resources, run one at a time ║"); } println!("╚══════════════════════════════════════════════════════════════╝"); if gpu { println!("\n=== GPU Details ==="); let output = std::process::Command::new("system_profiler") .args(["SPDisplaysDataType", "-detailLevel", "mini"]) .output(); if let Ok(o) = output { println!("{}", String::from_utf8_lossy(&o.stdout)); } } Ok(()) } Commands::Server { host, port } => { let port = port.unwrap_or_else(|| *momentry_core::core::config::SERVER_PORT); momentry_core::api::start_server(&host, port).await?; Ok(()) } Commands::Worker { max_concurrent, poll_interval, batch_size, } => { use momentry_core::worker::{JobWorker, WorkerConfig}; let mut config = WorkerConfig::default(); if let Some(max) = max_concurrent { config.max_concurrent = max; } if let Some(interval) = poll_interval { config.poll_interval_secs = interval; } if let Some(batch) = batch_size { config.batch_size = batch; } let db = PostgresDb::init().await?; let redis = RedisClient::new()?; let worker = JobWorker::new( std::sync::Arc::new(db), std::sync::Arc::new(redis), config.clone(), ); println!( "Starting worker with max_concurrent={}, poll_interval={}s", config.max_concurrent, config.poll_interval_secs ); worker.run().await?; Ok(()) } Commands::Query { query } => { println!("Query: {}", query); // TODO: Implement query Ok(()) } Commands::Lookup { path } => { let uuid = momentry_core::uuid::compute_uuid_from_path(&path); println!("Path: {}", path); println!("UUID: {}", uuid); Ok(()) } Commands::Resolve { uuid } => { println!("Resolving UUID: {}", uuid); // TODO: Look up path from UUID in database println!("(Database lookup not implemented yet)"); Ok(()) } Commands::Thumbnails { uuid, count } => { let db = PostgresDb::init().await?; let videos = if let Some(ref uuid) = uuid { vec![db .get_video_by_uuid(uuid) .await? .ok_or_else(|| anyhow::anyhow!("Video not found: {}", uuid))?] } else { db.list_videos(10000, 0).await?.0 }; let output_dir = std::path::PathBuf::from("thumbnails"); let extractor = momentry_core::ThumbnailExtractor::new(output_dir, count); for video in videos { println!( "\nGenerating thumbnails for: {} ({})", video.file_name, video.uuid ); match extractor.get_or_create(&video.file_path, &video.uuid) { Ok(result) => { println!(" Generated {} thumbnails", result.count); } Err(e) => { println!(" Error: {}", e); } } } println!("\nThumbnails generated successfully!"); Ok(()) } Commands::Status { uuid } => { let db = PostgresDb::init().await?; let videos = if let Some(ref u) = uuid { vec![db .get_video_by_uuid(u) .await? .ok_or_else(|| anyhow::anyhow!("Video not found: {}", u))?] } else { db.list_videos(10000, 0).await?.0 }; println!("\n╔══════════════════════════════════════════════════════════════════════════════════╗"); println!( "║ 📊 Storage Status Report ║" ); println!("╠══════════════════════════════════════════════════════════════════════════════════╣"); println!( "║ {:32} │ {:8} │ {:8} │ {:8} │ {:8} │ {:8} │ {:8} │ {:8} ║", "Video", "FS", "FS", "PSQL", "PObj", "MObj", "PVec", "QVec" ); println!( "║ {:32} │ {:8} │ {:8} │ {:8} │ {:8} │ {:8} │ {:8} │ {:8} ║", "", "Video", "JSON", "Chunk", "Chunk", "Chunk", "Chunk", "Chunk" ); println!( "╠{:33}╪{:9}╪{:9}╪{:9}╪{:9}╪{:9}╪{:9}╪{:9}╣", str::repeat("─", 32), str::repeat("─", 8), str::repeat("─", 8), str::repeat("─", 8), str::repeat("─", 8), str::repeat("─", 8), str::repeat("─", 8), str::repeat("─", 8) ); for video in videos { let (sentence_count, time_count) = db.get_chunk_count(&video.uuid).await.unwrap_or((0, 0)); let vector_count = db.get_vector_count(&video.uuid).await.unwrap_or(0); let total_chunks = sentence_count + time_count; let psql_status = if total_chunks > 0 { "✓" } else { "-" }; let pvec_status = if vector_count > 0 && total_chunks > 0 { if vector_count >= total_chunks { "✓" } else { "◐" } } else { "-" }; let qvec_status = if video.storage.qvector_chunk { "✓" } else { "-" }; let file_name = if video.file_name.len() > 30 { format!("...{}", &video.file_name[video.file_name.len() - 27..]) } else { video.file_name }; println!( "║ {:32} │ {} │ {} │ {} │ - │ - │ {} │ {} ║", file_name, if video.storage.fs_video { "✓" } else { "✗" }, if video.storage.fs_json { "✓" } else { "-" }, psql_status, pvec_status, qvec_status ); } println!("╠══════════════════════════════════════════════════════════════════════════════════╣"); println!( "║ Storage Types: ║" ); println!( "║ FS_Video - Video file on filesystem ║" ); println!( "║ FS_JSON - JSON files (probe, ASR, YOLO, etc.) ║" ); println!( "║ PSQL_Chunk - Chunks stored in PostgreSQL ║" ); println!( "║ PObject - Chunks as JSON objects in PostgreSQL (future) ║" ); println!( "║ MObject - Chunks as JSON objects in MongoDB (future) ║" ); println!( "║ PVector - Vectors in PostgreSQL ║" ); println!( "║ QVector - Vectors in Qdrant ║" ); println!("╚══════════════════════════════════════════════════════════════════════════════════╝"); Ok(()) } Commands::Backup { action, days } => { let output_dir = OutputDir::new(); output_dir.ensure_dir()?; println!("\n📁 Backup directory: {:?}", output_dir.get_backup_dir()); match action.as_str() { "list" => { let backups = output_dir.list_backups()?; println!("\n📦 Available backups:"); if backups.is_empty() { println!(" (no backups found)"); } else { for backup in &backups { println!(" - {}", backup.filename); } } println!("\nTotal: {} backup(s)", backups.len()); } "cleanup" => { let days = days.unwrap_or(30); let deleted = output_dir.cleanup_old_backups(days)?; println!( "\n🗑️ Cleaned up {} old backup(s) (older than {} days)", deleted, days ); } "verify" => { println!("\n🔍 Verifying backups..."); let backups = output_dir.list_backups()?; let mut verified = 0; let mut failed = 0; for backup in &backups { match output_dir.verify_backup(&backup.path) { Ok(true) => { println!(" ✓ {}", backup.filename); verified += 1; } Ok(false) => { println!(" ✗ {} (missing checksum)", backup.filename); failed += 1; } Err(e) => { println!(" ✗ {} ({})", backup.filename, e); failed += 1; } } } println!("\nVerified: {} OK, {} failed", verified, failed); } _ => { println!("\n⚠️ Unknown action: {}", action); println!("Available actions: list, cleanup, verify"); } } Ok(()) } Commands::ApiKey { action, name, key_type, ttl, key, } => { let db = PostgresDb::init().await?; let db_url = std::env::var("DATABASE_URL") .unwrap_or_else(|_| "postgres://accusys@localhost:5432/momentry".to_string()); let service = ApiKeyService::new(db_url); match action { ApiKeyAction::Create => { let name = name.unwrap_or_else(|| "unnamed-key".to_string()); let kt = parse_key_type(key_type.as_deref()); let request = momentry_core::core::api_key::CreateApiKeyRequest { name: name.clone(), key_type: kt, user_id: None, service_name: None, permissions: vec!["read".to_string(), "write".to_string()], ttl_days: ttl, }; match service.create_key(request) { Ok(response) => { let key_hash = service.hash_key(&response.key); let key_type_str = serde_json::to_string(&kt).unwrap_or_else(|_| "user".to_string()); let permissions = serde_json::json!(["read", "write"]); let config = momentry_core::core::db::CreateApiKeyConfig::new( &response.key_id, &key_hash, kt.prefix(), &name, &key_type_str, ) .with_permissions(&permissions) .with_expires_at(response.expires_at); if let Err(e) = db.create_api_key(config).await { eprintln!( "\n⚠️ Key generated but failed to store in database: {}", e ); } println!("\n✅ API Key created successfully!"); println!("\n┌─────────────────────────────────────────────────────────────────────────────┐"); println!("│ ⚠️ IMPORTANT: Save this key now - it will not be shown again! │"); println!("└─────────────────────────────────────────────────────────────────────────────┘"); println!("\nKey ID: {}", response.key_id); println!("API Key: {}", response.key); println!("Expires: {}", response.expires_at); if !response.warning.is_empty() { println!("\n⚠️ {}", response.warning); } } Err(e) => { eprintln!("\n❌ Failed to create API key: {}", e); } } } ApiKeyAction::List => match db.list_api_keys().await { Ok(keys) => { println!("\n📋 API Key List"); if keys.is_empty() { println!(" (no API keys found)"); } else { println!("\n┌────────────────────────────────────────────────────────────────────────────┐"); println!( "│ {:8} │ {:20} │ {:12} │ {:8} │ {:15} │", "Status", "Name", "Type", "Usage", "Last Used" ); println!("├────────────────────────────────────────────────────────────────────────────┤"); for k in &keys { let status = if k.status == "active" { "✓ active" } else { &k.status }; let last_used = k .last_used_at .map(|dt| dt.format("%Y-%m-%d %H:%M").to_string()) .unwrap_or_else(|| "never".to_string()); println!( "│ {:8} │ {:20} │ {:12} │ {:8} │ {:15} │", status, if k.name.len() > 20 { &k.name[..17] } else { &k.name }, k.key_type, k.usage_count, last_used ); } println!("└────────────────────────────────────────────────────────────────────────────┘"); println!("\nTotal: {} key(s)", keys.len()); } } Err(e) => { eprintln!("\n❌ Failed to list API keys: {}", e); } }, ApiKeyAction::Validate => { let api_key = key.ok_or_else(|| anyhow::anyhow!("--key required for validate"))?; let key_hash = service.hash_key(&api_key); match db.get_api_key_by_hash(&key_hash).await { Ok(Some(record)) => { if record.status == "active" { db.update_api_key_usage(&record.key_id, None).await.ok(); println!("\n✅ API Key is valid"); println!("Key ID: {}", record.key_id); println!("Name: {}", record.name); println!("Type: {}", record.key_type); println!("Usage: {} times", record.usage_count + 1); if record.rotation_required { println!( "⚠️ Rotation required: {}", record.rotation_reason.as_deref().unwrap_or("unknown") ); } } else { println!("\n❌ API Key is {}", record.status); } } Ok(None) => { println!("\n❌ API Key is invalid or not found"); } Err(e) => { eprintln!("\n❌ Validation error: {}", e); } } } ApiKeyAction::Revoke => { let key = key.ok_or_else(|| anyhow::anyhow!("--key required for revoke"))?; let key_id = service.extract_key_id(&key); match db.revoke_api_key(&key_id).await { Ok(_) => { println!("\n🔴 API Key {} revoked successfully", key_id); } Err(e) => { eprintln!("\n❌ Failed to revoke API key: {}", e); } } } ApiKeyAction::Rotate => { let key = key.ok_or_else(|| anyhow::anyhow!("--key required for rotate"))?; let key_id = service.extract_key_id(&key); let grace_period_end = service.calculate_grace_period_end(parse_key_type(key_type.as_deref())); match db .require_api_key_rotation( &key_id, "manual rotation requested", grace_period_end, ) .await { Ok(_) => { println!("\n🔄 Rotation requested for key: {}", key_id); println!("Grace period ends: {}", grace_period_end); } Err(e) => { eprintln!("\n❌ Rotation request failed: {}", e); } } } ApiKeyAction::Stats => { match db.get_api_key_stats().await { Ok(stats) => { println!("\n📊 API Key Statistics"); println!("\n┌─────────────────────────────────────────┐"); println!("│ Total Keys: {:5} │", stats.total_keys); println!( "│ Active Keys: {:5} │", stats.active_keys ); println!( "│ Expired Keys: {:5} │", stats.expired_keys ); println!( "│ Rotation Required: {:4} │", stats.rotation_required ); println!( "│ Anomalies (24h): {:5} │", stats.anomalies_last_24h ); println!("└─────────────────────────────────────────┘"); } Err(e) => { eprintln!("\n⚠️ Failed to get stats: {}", e); } } let config = service.get_config(); println!("\n┌─────────────────────────────────────────┐"); println!("│ Anomaly Detection Thresholds │"); println!("├─────────────────────────────────────────┤"); println!( "│ Requests/minute: {:5} │", config.requests_per_minute_threshold ); println!( "│ Requests/hour: {:5} │", config.requests_per_hour_threshold ); println!( "│ Error rate: {:5.1}% │", config.error_rate_threshold * 100.0 ); println!( "│ Unique IPs/hour: {:5} │", config.unique_ips_per_hour_threshold ); println!( "│ Lockout threshold: {:5} │", config.lockout_threshold ); println!("└─────────────────────────────────────────┘"); } } Ok(()) } Commands::Gitea { action, username, password, token_name, scopes, } => { use momentry_core::core::api_key::gitea::{ CreateGiteaTokenRequest, GiteaClient, GiteaScope, }; let db = PostgresDb::init().await?; let gitea = GiteaClient::new()?; match action { GiteaAction::Create => { let username = username .ok_or_else(|| anyhow::anyhow!("--username required for create"))?; let password = password .ok_or_else(|| anyhow::anyhow!("--password required for create"))?; let token_name = token_name .ok_or_else(|| anyhow::anyhow!("--token-name required for create"))?; let scopes_vec: Vec = scopes .map(|s| { s.split(',') .filter_map(|scope| scope.trim().parse::().ok()) .collect() }) .unwrap_or_else(|| { vec![GiteaScope::ReadRepository, GiteaScope::WriteRepository] }); let request = CreateGiteaTokenRequest { username: username.clone(), password, token_name: token_name.clone(), scopes: scopes_vec.clone(), }; match gitea.create_token(&request).await { Ok(response) => { if let Err(e) = db .create_gitea_token( response.id, &username, &token_name, &response.token_last_eight, &serde_json::json!(scopes_vec .iter() .map(|s| s.as_str()) .collect::>()), None, ) .await { eprintln!("\n⚠️ Token created but failed to store: {}", e); } println!("\n✅ Gitea Token created successfully!"); println!("\n┌─────────────────────────────────────────────────────────────────────────────┐"); println!("│ ⚠️ IMPORTANT: Save this token now - it will not be shown again! │"); println!("└─────────────────────────────────────────────────────────────────────────────┘"); println!("\nToken ID: {}", response.id); println!("Token Name: {}", response.name); println!("SHA1: {}", response.sha1); println!("Last 8: {}", response.token_last_eight); println!("\nAuthorization Header:"); println!(" Authorization: token {}", response.sha1); } Err(e) => { eprintln!("\n❌ Failed to create Gitea token: {}", e); } } } GiteaAction::List => { let username = username.ok_or_else(|| anyhow::anyhow!("--username required for list"))?; let password = password.ok_or_else(|| anyhow::anyhow!("--password required for list"))?; match gitea.list_tokens(&username, &password).await { Ok(tokens) => { println!("\n📋 Gitea Tokens for user: {}", username); if tokens.is_empty() { println!(" (no tokens found)"); } else { println!("\n┌────────────────────────────────────────────────────────────────────────────┐"); println!("│ ID │ Name │ Last 8 │ Registered │"); println!("├────────────────────────────────────────────────────────────────────────────┤"); for token in &tokens { let registered = db .get_gitea_token_by_name(&username, &token.name) .await .ok() .flatten() .map(|_| "✓") .unwrap_or("-"); println!( "│ {:8} │ {:20} │ {:9} │ {:27} │", token.id, if token.name.len() > 20 { &token.name[..17] } else { &token.name }, token.token_last_eight, registered ); } println!("└────────────────────────────────────────────────────────────────────────────┘"); println!("\nTotal: {} token(s)", tokens.len()); } } Err(e) => { eprintln!("\n❌ Failed to list Gitea tokens: {}", e); } } } GiteaAction::Delete => { let username = username .ok_or_else(|| anyhow::anyhow!("--username required for delete"))?; let password = password .ok_or_else(|| anyhow::anyhow!("--password required for delete"))?; let token_name = token_name .ok_or_else(|| anyhow::anyhow!("--token-name required for delete"))?; match gitea.delete_token(&username, &password, &token_name).await { Ok(_) => { let _ = db.delete_gitea_token(&username, &token_name).await; println!("\n🗑️ Token '{}' deleted successfully", token_name); } Err(e) => { eprintln!("\n❌ Failed to delete Gitea token: {}", e); } } } GiteaAction::Verify => { let token_name = token_name .ok_or_else(|| anyhow::anyhow!("--token-name required for verify"))?; let record = db .get_gitea_token_by_name( &username.unwrap_or_else(|| "unknown".to_string()), &token_name, ) .await?; match record { Some(r) => { println!("\n📋 Gitea Token: {}", r.token_name); println!(" User: {}", r.gitea_user); println!(" Token ID: {}", r.gitea_token_id); println!(" Last 8: {}", r.token_last_eight); println!(" Scopes: {}", r.scopes); println!(" Created: {}", r.created_at); if let Some(verified) = r.last_verified { println!(" Last Verified: {}", verified); } else { println!(" Last Verified: never"); } } None => { println!("\n❌ Token not found in local database"); } } } } Ok(()) } Commands::N8n { action, api_key, label, expires_in_days, } => { use momentry_core::core::api_key::n8n::{ extract_last_eight, CreateN8nApiKeyRequest, N8nClient, }; let db = PostgresDb::init().await?; match action { N8nAction::Create => { let api_key_value = api_key.ok_or_else(|| { anyhow::anyhow!("--api-key required for create (existing n8n API key)") })?; let label = label.ok_or_else(|| anyhow::anyhow!("--label required for create"))?; let n8n = N8nClient::new(api_key_value)?; let expires_at = expires_in_days .map(|days| chrono::Utc::now() + chrono::Duration::days(days)); let request = CreateN8nApiKeyRequest { label: label.clone(), expires_at, }; match n8n.create_api_key(&request).await { Ok(response) => { if let Err(e) = db .create_n8n_api_key( &response.id, &label, &extract_last_eight(&response.api_key), None, response.expires_at, ) .await { eprintln!("\n⚠️ API key created but failed to store: {}", e); } println!("\n✅ n8n API Key created successfully!"); println!("\n┌─────────────────────────────────────────────────────────────────────────────┐"); println!("│ ⚠️ IMPORTANT: Save this API key now - it will not be shown again! │"); println!("└─────────────────────────────────────────────────────────────────────────────┘"); println!("\nKey ID: {}", response.id); println!("Label: {}", response.label); println!("API Key: {}", response.api_key); println!("\nUsage:"); println!(" curl -H 'X-N8N-API-KEY: {}' https://n8n.momentry.ddns.net/api/v1/workflows", response.api_key); } Err(e) => { eprintln!("\n❌ Failed to create n8n API key: {}", e); } } } N8nAction::List => { let api_key_value = api_key.ok_or_else(|| anyhow::anyhow!("--api-key required for list"))?; let n8n = N8nClient::new(api_key_value)?; match n8n.list_api_keys().await { Ok(keys) => { println!("\n📋 n8n API Keys"); if keys.is_empty() { println!(" (no API keys found)"); } else { println!("\n┌────────────────────────────────────────────────────────────────────────────┐"); println!("│ Label │ ID │"); println!("├────────────────────────────────────────────────────────────────────────────┤"); for key in &keys { println!( "│ {:27} │ {:39} │", if key.label.len() > 27 { &key.label[..24] } else { &key.label }, key.id ); } println!("└────────────────────────────────────────────────────────────────────────────┘"); println!("\nTotal: {} key(s)", keys.len()); } } Err(e) => { eprintln!("\n❌ Failed to list n8n API keys: {}", e); } } } N8nAction::Delete => { let api_key_value = api_key.ok_or_else(|| anyhow::anyhow!("--api-key required for delete"))?; let label = label.ok_or_else(|| anyhow::anyhow!("--label required for delete"))?; let record = db.get_n8n_api_key_by_label(&label).await?; if let Some(r) = record { let n8n = N8nClient::new(api_key_value)?; match n8n.delete_api_key(&r.n8n_key_id).await { Ok(_) => { let _ = db.delete_n8n_api_key(&label).await; println!("\n🗑️ API key '{}' deleted successfully", label); } Err(e) => { eprintln!("\n❌ Failed to delete n8n API key: {}", e); } } } else { println!("\n❌ API key '{}' not found in local database", label); } } N8nAction::Verify => { let label = label.ok_or_else(|| anyhow::anyhow!("--label required for verify"))?; let record = db.get_n8n_api_key_by_label(&label).await?; match record { Some(r) => { println!("\n📋 n8n API Key: {}", r.label); println!(" Key ID: {}", r.n8n_key_id); println!(" Last 8: {}", r.api_key_last_eight); println!(" Created: {}", r.created_at); if let Some(expires) = r.expires_at { println!(" Expires: {}", expires); } if let Some(verified) = r.last_verified { println!(" Last Verified: {}", verified); } else { println!(" Last Verified: never"); } } None => { println!("\n❌ API key not found in local database"); } } } } Ok(()) } } }