From cdbd205972c952224b326c2dc5e76fb35d7a251f Mon Sep 17 00:00:00 2001 From: Accusys Date: Fri, 15 May 2026 12:51:43 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20file=20pre-processor=20in=20watcher=20?= =?UTF-8?q?=E2=80=94=20SHA256=20+=20probe=20+=20UUID=20=E2=86=92=20.pre.js?= =?UTF-8?q?on=20for=20all=20file=20types?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/watcher/watcher.rs | 169 +++++++++++++++++++++++++++++++---------- 1 file changed, 130 insertions(+), 39 deletions(-) diff --git a/src/watcher/watcher.rs b/src/watcher/watcher.rs index aa1d693..8031ffe 100644 --- a/src/watcher/watcher.rs +++ b/src/watcher/watcher.rs @@ -1,5 +1,6 @@ use anyhow::Result; use std::path::Path; +use std::collections::HashSet; use tokio::time; use tracing::{info, warn}; @@ -10,19 +11,17 @@ pub struct WatcherConfig { impl Default for WatcherConfig { fn default() -> Self { - // Default to SFTP demo directory if not specified let default_dir = std::env::var("MOMENTRY_SFTP_ROOT") .unwrap_or_else(|_| "/Users/accusys/momentry/var/sftpgo/data/demo/".to_string()); - Self { directories: vec![default_dir], - poll_interval_ms: 60000, // 60 seconds polling interval + poll_interval_ms: 60000, } } } /// Starts the file watcher in the background. -/// Scans directories for video files (auto-registration disabled). +/// Pre-processes new files: SHA256 + probe + UUID → .pre.json pub async fn run_watcher() -> Result<()> { let config = WatcherConfig::default(); let dirs = config.directories.clone(); @@ -32,62 +31,154 @@ pub async fn run_watcher() -> Result<()> { return Err(anyhow::anyhow!("No watch directories")); } - info!("Starting Video File Watcher (auto-registration disabled)..."); + info!("Starting File Watcher (pre-processor)..."); info!("Watch directories: {:?}", dirs); - // Spawn background task for monitoring only (no auto-registration) tokio::spawn(async move { - let mut interval = time::interval(time::Duration::from_millis(config.poll_interval_ms)); - + let mut interval = time::interval(std::time::Duration::from_millis(config.poll_interval_ms)); loop { interval.tick().await; - scan_videos(&dirs).await; + pre_process_new_files(&dirs).await; } }); Ok(()) } -async fn scan_videos(directories: &[String]) { - // Allowed extensions list - let allowed_extensions = vec!["mp4", "mov", "mkv"]; +async fn pre_process_new_files(directories: &[String]) { + let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR") + .unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string()); + let scripts_dir = std::env::var("MOMENTRY_SCRIPTS_DIR") + .unwrap_or_else(|_| "/Users/accusys/momentry_core_0.1/scripts".to_string()); + let python_path = std::env::var("MOMENTRY_PYTHON_PATH") + .unwrap_or_else(|_| "/opt/homebrew/bin/python3.11".to_string()); + + // Collect existing .pre.json UUIDs to detect new files + let existing: HashSet = std::fs::read_dir(&output_dir) + .map(|d| d.flatten().filter_map(|e| { + let name = e.file_name().to_string_lossy().to_string(); + if name.ends_with(".pre.json") { + Some(name.trim_end_matches(".pre.json").to_string()) + } else { + None + } + }).collect()) + .unwrap_or_default(); for dir in directories { - let path = Path::new(dir); - if !path.exists() { - warn!("Directory does not exist, skipping: {}", dir); + let dir_path = Path::new(dir); + if !dir_path.is_dir() { continue; } + let entries = match std::fs::read_dir(dir_path) { + Ok(e) => e, + _ => continue, + }; - if let Ok(entries) = std::fs::read_dir(path) { - let video_count = entries - .flatten() - .filter(|entry| { - let file_path = entry.path(); - if file_path.is_dir() { - if let Some(name) = file_path.file_name().and_then(|n| n.to_str()) { - if name.starts_with('.') { - return false; - } - } - } - if !file_path.is_file() { - return false; - } - if let Some(ext) = file_path.extension().and_then(|e| e.to_str()) { - allowed_extensions.contains(&ext.to_lowercase().as_str()) + for entry in entries.flatten() { + let file_path = entry.path(); + if !file_path.is_file() { + continue; + } + let fname = file_path.file_name().and_then(|n| n.to_str()).unwrap_or(""); + if fname.starts_with('.') { continue; } + if fname.ends_with(".pre.json") { continue; } + + // Compute UUID the same way register does + let canonical = match file_path.canonicalize() { + Ok(p) => p, + _ => continue, + }; + let canonical_str = canonical.to_string_lossy().to_string(); + let filename = fname.to_string(); + + // Birthday = file creation time + let birthday = std::fs::metadata(&file_path) + .ok() + .and_then(|m| m.created().ok()) + .map(|t| { + let secs = t.duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs(); + // Format as RFC 3339 + chrono::DateTime::from_timestamp(secs as i64, 0) + .map(|dt| dt.to_rfc3339()) + .unwrap_or_else(|| chrono::Utc::now().to_rfc3339()) + }) + .unwrap_or_else(|| chrono::Utc::now().to_rfc3339()); + + let mac = crate::core::storage::uuid::get_mac_address(); + let file_uuid = crate::core::storage::uuid::compute_birth_uuid( + &mac, &birthday, &canonical_str, &filename, + ); + + // Check if .pre.json already exists + let pre_path = std::path::PathBuf::from(&output_dir).join(format!("{}.pre.json", file_uuid)); + if pre_path.exists() { + continue; // Already pre-processed + } + + info!("[PRE-PROCESS] New file: {} → {}", filename, file_uuid); + + // Compute SHA256 + let content_hash = crate::core::storage::content_hash::compute_sha256(&file_path) + .unwrap_or_default(); + + // ffprobe + let probe_json: serde_json::Value = probe_or_minimal(&file_path, &canonical_str); + + // Determine file_type + let file_type = probe_json.get("streams").and_then(|s| s.as_array()) + .and_then(|streams| { + if streams.iter().any(|st| st.get("codec_type").and_then(|c| c.as_str()) == Some("video")) { + Some("video") + } else if streams.iter().any(|st| st.get("codec_type").and_then(|c| c.as_str()) == Some("audio")) { + Some("audio") } else { - false + None } }) - .count(); + .or_else(|| { + let ext = file_path.extension().and_then(|e| e.to_str()).map(|e| e.to_lowercase()); + match ext.as_deref() { + Some("jpg"|"jpeg"|"png"|"gif"|"bmp"|"webp"|"svg") => Some("image"), + Some("pdf"|"doc"|"docx"|"pages"|"key"|"numbers"|"ppt"|"pptx"|"xls"|"xlsx") => Some("document"), + _ => None, + } + }) + .unwrap_or("unknown"); - if video_count > 0 { - info!( - "Found {} video files in {} (use API to register)", - video_count, dir - ); + let file_size = std::fs::metadata(&file_path).ok().map(|m| m.len()).unwrap_or(0); + + let pre_data = serde_json::json!({ + "file_name": filename, + "file_path": canonical_str, + "content_hash": content_hash, + "probe_json": probe_json, + "birthday": birthday, + "file_uuid": file_uuid, + "file_size": file_size, + "file_type": file_type, + "pre_processed_at": chrono::Utc::now().to_rfc3339(), + }); + + if let Ok(content) = serde_json::to_string_pretty(&pre_data) { + if std::fs::write(&pre_path, content).is_ok() { + info!("[PRE-PROCESS] {} → {}.pre.json", filename, file_uuid); + } } } } } + +/// Run ffprobe or return minimal filesystem metadata +fn probe_or_minimal(file_path: &Path, canonical: &str) -> serde_json::Value { + if let Ok(result) = crate::core::probe::probe_video(canonical) { + if let Ok(val) = serde_json::to_value(&result) { + return val; + } + } + let size = std::fs::metadata(file_path).ok().map(|m| m.len()).unwrap_or(0); + serde_json::json!({ + "format": {"filename": canonical, "size": size.to_string(), "format_name": "unknown"}, + "streams": [] + }) +}