diff --git a/src/watcher/watcher.rs b/src/watcher/watcher.rs index 9fe8f35..47a106b 100644 --- a/src/watcher/watcher.rs +++ b/src/watcher/watcher.rs @@ -1,6 +1,5 @@ use anyhow::Result; use std::path::Path; -use std::collections::HashSet; use tokio::time; use tracing::{info, warn}; @@ -21,7 +20,7 @@ impl Default for WatcherConfig { } /// Starts the file watcher in the background. -/// Pre-processes new files: SHA256 + probe + UUID → .pre.json +/// Detects new files and logs them. Does NOT auto-register or auto-pre-process. pub async fn run_watcher() -> Result<()> { let config = WatcherConfig::default(); let dirs = config.directories.clone(); @@ -31,40 +30,23 @@ pub async fn run_watcher() -> Result<()> { return Err(anyhow::anyhow!("No watch directories")); } - info!("Starting File Watcher (pre-processor)..."); + info!("Starting File Watcher (detection only, no auto-modification)..."); info!("Watch directories: {:?}", dirs); tokio::spawn(async move { let mut interval = time::interval(std::time::Duration::from_millis(config.poll_interval_ms)); + // Track known files across cycles + let mut known = std::collections::HashSet::new(); loop { interval.tick().await; - pre_process_new_files(&dirs).await; + report_new_files(&dirs, &mut known).await; } }); Ok(()) } -async fn pre_process_new_files(directories: &[String]) { - let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR") - .unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string()); - let scripts_dir = std::env::var("MOMENTRY_SCRIPTS_DIR") - .unwrap_or_else(|_| "/Users/accusys/momentry_core_0.1/scripts".to_string()); - let python_path = std::env::var("MOMENTRY_PYTHON_PATH") - .unwrap_or_else(|_| "/opt/homebrew/bin/python3.11".to_string()); - - // Collect existing .pre.json UUIDs to detect new files - let existing: HashSet = std::fs::read_dir(&output_dir) - .map(|d| d.flatten().filter_map(|e| { - let name = e.file_name().to_string_lossy().to_string(); - if name.ends_with(".pre.json") { - Some(name.trim_end_matches(".pre.json").to_string()) - } else { - None - } - }).collect()) - .unwrap_or_default(); - +async fn report_new_files(directories: &[String], known: &mut std::collections::HashSet) { for dir in directories { let dir_path = Path::new(dir); if !dir_path.is_dir() { @@ -75,109 +57,96 @@ async fn pre_process_new_files(directories: &[String]) { _ => continue, }; + let mut current_files = std::collections::HashSet::new(); for entry in entries.flatten() { let file_path = entry.path(); if !file_path.is_file() { continue; } - let fname = file_path.file_name().and_then(|n| n.to_str()).unwrap_or(""); - if fname.starts_with('.') { continue; } - if fname.ends_with(".pre.json") { continue; } - - // Compute UUID the same way register does - let canonical = match file_path.canonicalize() { - Ok(p) => p, + let fname = match file_path.file_name().and_then(|n| n.to_str()) { + Some(n) if !n.starts_with('.') && !n.ends_with(".pre.json") => n.to_string(), _ => continue, }; - let canonical_str = canonical.to_string_lossy().to_string(); - let filename = fname.to_string(); - - // Birthday = file creation time - let birthday = std::fs::metadata(&file_path) - .ok() - .and_then(|m| m.created().ok()) - .map(|t| { - let secs = t.duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs(); - // Format as RFC 3339 - chrono::DateTime::from_timestamp(secs as i64, 0) - .map(|dt| dt.to_rfc3339()) - .unwrap_or_else(|| chrono::Utc::now().to_rfc3339()) - }) - .unwrap_or_else(|| chrono::Utc::now().to_rfc3339()); - - let mac = crate::core::storage::uuid::get_mac_address(); - let file_uuid = crate::core::storage::uuid::compute_birth_uuid( - &mac, &birthday, &canonical_str, &filename, - ); - let pre_path = std::path::PathBuf::from(&output_dir).join(format!("{}.pre.json", file_uuid)); - tracing::info!("[PRE-PROCESS] UUID inputs: mac={} birthday={} path={} name={} → {}", mac, birthday, canonical_str, filename, file_uuid); - if pre_path.exists() { - continue; // Already pre-processed - } - - info!("[PRE-PROCESS] New file: {} → {}", filename, file_uuid); - - // Compute SHA256 - let content_hash = crate::core::storage::content_hash::compute_sha256(&file_path) - .unwrap_or_default(); - - // ffprobe - let probe_json: serde_json::Value = probe_or_minimal(&file_path, &canonical_str); - - // Determine file_type - let file_type = probe_json.get("streams").and_then(|s| s.as_array()) - .and_then(|streams| { - if streams.iter().any(|st| st.get("codec_type").and_then(|c| c.as_str()) == Some("video")) { - Some("video") - } else if streams.iter().any(|st| st.get("codec_type").and_then(|c| c.as_str()) == Some("audio")) { - Some("audio") - } else { - None - } - }) - .or_else(|| { - let ext = file_path.extension().and_then(|e| e.to_str()).map(|e| e.to_lowercase()); - match ext.as_deref() { - Some("jpg"|"jpeg"|"png"|"gif"|"bmp"|"webp"|"svg") => Some("image"), - Some("pdf"|"doc"|"docx"|"pages"|"key"|"numbers"|"ppt"|"pptx"|"xls"|"xlsx") => Some("document"), - _ => None, - } - }) - .unwrap_or("unknown"); - - let file_size = std::fs::metadata(&file_path).ok().map(|m| m.len()).unwrap_or(0); - - let pre_data = serde_json::json!({ - "file_name": filename, - "file_path": canonical_str, - "content_hash": content_hash, - "probe_json": probe_json, - "birthday": birthday, - "file_uuid": file_uuid, - "file_size": file_size, - "file_type": file_type, - "pre_processed_at": chrono::Utc::now().to_rfc3339(), - }); - - if let Ok(content) = serde_json::to_string_pretty(&pre_data) { - if std::fs::write(&pre_path, content).is_ok() { - info!("[PRE-PROCESS] {} → {}.pre.json", filename, file_uuid); - } + current_files.insert(fname.clone()); + if !known.contains(&fname) { + info!("[WATCHER] New file detected: {} in {}", fname, dir); + known.insert(fname); } } + // Remove files that no longer exist (deleted between cycles) + known.retain(|k| current_files.contains(k)); } } -/// Run ffprobe or return minimal filesystem metadata -fn probe_or_minimal(file_path: &Path, canonical: &str) -> serde_json::Value { - if let Ok(result) = crate::core::probe::probe_video(canonical) { - if let Ok(val) = serde_json::to_value(&result) { - return val; +/// Pre-process a single file: compute SHA256 + probe + UUID → .pre.json +/// This is called explicitly from register API, NOT from the watcher. +pub async fn pre_process_file(file_path: &str) -> Option { + let path = std::path::Path::new(file_path); + if !path.is_file() { + return None; + } + let canonical = path.canonicalize().ok()?; + let canonical_str = canonical.to_string_lossy().to_string(); + let filename = path.file_name()?.to_string_lossy().to_string(); + + let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR") + .unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string()); + + let birthday = std::fs::metadata(&path).ok() + .and_then(|m| m.created().ok()) + .map(|t| { + let secs = t.duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs(); + chrono::DateTime::from_timestamp(secs as i64, 0) + .map(|dt| dt.to_rfc3339()) + .unwrap_or_else(|| chrono::Utc::now().to_rfc3339()) + }) + .unwrap_or_else(|| chrono::Utc::now().to_rfc3339()); + + let mac = crate::core::storage::uuid::get_mac_address(); + let file_uuid = crate::core::storage::uuid::compute_birth_uuid( + &mac, &birthday, &canonical_str, &filename, + ); + + let pre_path = std::path::PathBuf::from(&output_dir).join(format!("{}.pre.json", file_uuid)); + if pre_path.exists() { + info!("[PRE-PROCESS] Already pre-processed: {}", filename); + return Some(file_uuid); + } + + info!("[PRE-PROCESS] Pre-processing: {} → {}", filename, file_uuid); + + let content_hash = crate::core::storage::content_hash::compute_sha256(&path).unwrap_or_default(); + + let probe_json: serde_json::Value = if let Ok(result) = crate::core::probe::probe_video(&canonical_str) { + serde_json::to_value(&result).unwrap_or_default() + } else { + let size = std::fs::metadata(&path).ok().map(|m| m.len()).unwrap_or(0); + serde_json::json!({ + "format": {"filename": canonical_str, "size": size.to_string(), "format_name": "unknown"}, + "streams": [] + }) + }; + + let file_type = if probe_json.get("streams").and_then(|s| s.as_array()) + .map_or(false, |streams| streams.iter().any(|st| st.get("codec_type").and_then(|c| c.as_str()) == Some("video"))) + { "video" } else { "unknown" }; + + let pre_data = serde_json::json!({ + "file_name": filename, + "file_path": canonical_str, + "content_hash": content_hash, + "probe_json": probe_json, + "birthday": birthday, + "file_uuid": file_uuid, + "file_size": std::fs::metadata(&path).ok().map(|m| m.len()).unwrap_or(0), + "file_type": file_type, + "pre_processed_at": chrono::Utc::now().to_rfc3339(), + }); + + if let Ok(content) = serde_json::to_string_pretty(&pre_data) { + if std::fs::write(&pre_path, content).is_ok() { + info!("[PRE-PROCESS] {} → {}.pre.json", filename, file_uuid); } } - let size = std::fs::metadata(file_path).ok().map(|m| m.len()).unwrap_or(0); - serde_json::json!({ - "format": {"filename": canonical, "size": size.to_string(), "format_name": "unknown"}, - "streams": [] - }) + Some(file_uuid) }