use anyhow::Result; use std::path::Path; use std::collections::HashSet; use tokio::time; use tracing::{info, warn}; pub struct WatcherConfig { pub directories: Vec, pub poll_interval_ms: u64, } impl Default for WatcherConfig { fn default() -> Self { let default_dir = std::env::var("MOMENTRY_SFTP_ROOT") .unwrap_or_else(|_| "/Users/accusys/momentry/var/sftpgo/data/demo/".to_string()); Self { directories: vec![default_dir], poll_interval_ms: 60000, } } } /// Starts the file watcher in the background. /// Pre-processes new files: SHA256 + probe + UUID → .pre.json pub async fn run_watcher() -> Result<()> { let config = WatcherConfig::default(); let dirs = config.directories.clone(); if dirs.is_empty() { warn!("No directories configured for watching."); return Err(anyhow::anyhow!("No watch directories")); } info!("Starting File Watcher (pre-processor)..."); info!("Watch directories: {:?}", dirs); tokio::spawn(async move { let mut interval = time::interval(std::time::Duration::from_millis(config.poll_interval_ms)); loop { interval.tick().await; pre_process_new_files(&dirs).await; } }); Ok(()) } async fn pre_process_new_files(directories: &[String]) { let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR") .unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string()); let scripts_dir = std::env::var("MOMENTRY_SCRIPTS_DIR") .unwrap_or_else(|_| "/Users/accusys/momentry_core_0.1/scripts".to_string()); let python_path = std::env::var("MOMENTRY_PYTHON_PATH") .unwrap_or_else(|_| "/opt/homebrew/bin/python3.11".to_string()); // Collect existing .pre.json UUIDs to detect new files let existing: HashSet = std::fs::read_dir(&output_dir) .map(|d| d.flatten().filter_map(|e| { let name = e.file_name().to_string_lossy().to_string(); if name.ends_with(".pre.json") { Some(name.trim_end_matches(".pre.json").to_string()) } else { None } }).collect()) .unwrap_or_default(); for dir in directories { let dir_path = Path::new(dir); if !dir_path.is_dir() { continue; } let entries = match std::fs::read_dir(dir_path) { Ok(e) => e, _ => continue, }; for entry in entries.flatten() { let file_path = entry.path(); if !file_path.is_file() { continue; } let fname = file_path.file_name().and_then(|n| n.to_str()).unwrap_or(""); if fname.starts_with('.') { continue; } if fname.ends_with(".pre.json") { continue; } // Compute UUID the same way register does let canonical = match file_path.canonicalize() { Ok(p) => p, _ => continue, }; let canonical_str = canonical.to_string_lossy().to_string(); let filename = fname.to_string(); // Birthday = file creation time let birthday = std::fs::metadata(&file_path) .ok() .and_then(|m| m.created().ok()) .map(|t| { let secs = t.duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs(); // Format as RFC 3339 chrono::DateTime::from_timestamp(secs as i64, 0) .map(|dt| dt.to_rfc3339()) .unwrap_or_else(|| chrono::Utc::now().to_rfc3339()) }) .unwrap_or_else(|| chrono::Utc::now().to_rfc3339()); let mac = crate::core::storage::uuid::get_mac_address(); let file_uuid = crate::core::storage::uuid::compute_birth_uuid( &mac, &birthday, &canonical_str, &filename, ); let pre_path = std::path::PathBuf::from(&output_dir).join(format!("{}.pre.json", file_uuid)); tracing::info!("[PRE-PROCESS] UUID inputs: mac={} birthday={} path={} name={} → {}", mac, birthday, canonical_str, filename, file_uuid); if pre_path.exists() { continue; // Already pre-processed } info!("[PRE-PROCESS] New file: {} → {}", filename, file_uuid); // Compute SHA256 let content_hash = crate::core::storage::content_hash::compute_sha256(&file_path) .unwrap_or_default(); // ffprobe let probe_json: serde_json::Value = probe_or_minimal(&file_path, &canonical_str); // Determine file_type let file_type = probe_json.get("streams").and_then(|s| s.as_array()) .and_then(|streams| { if streams.iter().any(|st| st.get("codec_type").and_then(|c| c.as_str()) == Some("video")) { Some("video") } else if streams.iter().any(|st| st.get("codec_type").and_then(|c| c.as_str()) == Some("audio")) { Some("audio") } else { None } }) .or_else(|| { let ext = file_path.extension().and_then(|e| e.to_str()).map(|e| e.to_lowercase()); match ext.as_deref() { Some("jpg"|"jpeg"|"png"|"gif"|"bmp"|"webp"|"svg") => Some("image"), Some("pdf"|"doc"|"docx"|"pages"|"key"|"numbers"|"ppt"|"pptx"|"xls"|"xlsx") => Some("document"), _ => None, } }) .unwrap_or("unknown"); let file_size = std::fs::metadata(&file_path).ok().map(|m| m.len()).unwrap_or(0); let pre_data = serde_json::json!({ "file_name": filename, "file_path": canonical_str, "content_hash": content_hash, "probe_json": probe_json, "birthday": birthday, "file_uuid": file_uuid, "file_size": file_size, "file_type": file_type, "pre_processed_at": chrono::Utc::now().to_rfc3339(), }); if let Ok(content) = serde_json::to_string_pretty(&pre_data) { if std::fs::write(&pre_path, content).is_ok() { info!("[PRE-PROCESS] {} → {}.pre.json", filename, file_uuid); } } } } } /// Run ffprobe or return minimal filesystem metadata fn probe_or_minimal(file_path: &Path, canonical: &str) -> serde_json::Value { if let Ok(result) = crate::core::probe::probe_video(canonical) { if let Ok(val) = serde_json::to_value(&result) { return val; } } let size = std::fs::metadata(file_path).ok().map(|m| m.len()).unwrap_or(0); serde_json::json!({ "format": {"filename": canonical, "size": size.to_string(), "format_name": "unknown"}, "streams": [] }) }