fix: watcher is detection-only — pre_process_file is now explicit, not automatic

This commit is contained in:
Accusys
2026-05-15 13:18:22 +08:00
parent 43cf702d05
commit 5af8df9201

View File

@@ -1,6 +1,5 @@
use anyhow::Result;
use std::path::Path;
use std::collections::HashSet;
use tokio::time;
use tracing::{info, warn};
@@ -21,7 +20,7 @@ impl Default for WatcherConfig {
}
/// Starts the file watcher in the background.
/// Pre-processes new files: SHA256 + probe + UUID → .pre.json
/// Detects new files and logs them. Does NOT auto-register or auto-pre-process.
pub async fn run_watcher() -> Result<()> {
let config = WatcherConfig::default();
let dirs = config.directories.clone();
@@ -31,40 +30,23 @@ pub async fn run_watcher() -> Result<()> {
return Err(anyhow::anyhow!("No watch directories"));
}
info!("Starting File Watcher (pre-processor)...");
info!("Starting File Watcher (detection only, no auto-modification)...");
info!("Watch directories: {:?}", dirs);
tokio::spawn(async move {
let mut interval = time::interval(std::time::Duration::from_millis(config.poll_interval_ms));
// Track known files across cycles
let mut known = std::collections::HashSet::new();
loop {
interval.tick().await;
pre_process_new_files(&dirs).await;
report_new_files(&dirs, &mut known).await;
}
});
Ok(())
}
async fn pre_process_new_files(directories: &[String]) {
let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string());
let scripts_dir = std::env::var("MOMENTRY_SCRIPTS_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry_core_0.1/scripts".to_string());
let python_path = std::env::var("MOMENTRY_PYTHON_PATH")
.unwrap_or_else(|_| "/opt/homebrew/bin/python3.11".to_string());
// Collect existing .pre.json UUIDs to detect new files
let existing: HashSet<String> = std::fs::read_dir(&output_dir)
.map(|d| d.flatten().filter_map(|e| {
let name = e.file_name().to_string_lossy().to_string();
if name.ends_with(".pre.json") {
Some(name.trim_end_matches(".pre.json").to_string())
} else {
None
}
}).collect())
.unwrap_or_default();
async fn report_new_files(directories: &[String], known: &mut std::collections::HashSet<String>) {
for dir in directories {
let dir_path = Path::new(dir);
if !dir_path.is_dir() {
@@ -75,109 +57,96 @@ async fn pre_process_new_files(directories: &[String]) {
_ => continue,
};
let mut current_files = std::collections::HashSet::new();
for entry in entries.flatten() {
let file_path = entry.path();
if !file_path.is_file() {
continue;
}
let fname = file_path.file_name().and_then(|n| n.to_str()).unwrap_or("");
if fname.starts_with('.') { continue; }
if fname.ends_with(".pre.json") { continue; }
// Compute UUID the same way register does
let canonical = match file_path.canonicalize() {
Ok(p) => p,
let fname = match file_path.file_name().and_then(|n| n.to_str()) {
Some(n) if !n.starts_with('.') && !n.ends_with(".pre.json") => n.to_string(),
_ => continue,
};
let canonical_str = canonical.to_string_lossy().to_string();
let filename = fname.to_string();
// Birthday = file creation time
let birthday = std::fs::metadata(&file_path)
.ok()
.and_then(|m| m.created().ok())
.map(|t| {
let secs = t.duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs();
// Format as RFC 3339
chrono::DateTime::from_timestamp(secs as i64, 0)
.map(|dt| dt.to_rfc3339())
.unwrap_or_else(|| chrono::Utc::now().to_rfc3339())
})
.unwrap_or_else(|| chrono::Utc::now().to_rfc3339());
let mac = crate::core::storage::uuid::get_mac_address();
let file_uuid = crate::core::storage::uuid::compute_birth_uuid(
&mac, &birthday, &canonical_str, &filename,
);
let pre_path = std::path::PathBuf::from(&output_dir).join(format!("{}.pre.json", file_uuid));
tracing::info!("[PRE-PROCESS] UUID inputs: mac={} birthday={} path={} name={} → {}", mac, birthday, canonical_str, filename, file_uuid);
if pre_path.exists() {
continue; // Already pre-processed
}
info!("[PRE-PROCESS] New file: {} → {}", filename, file_uuid);
// Compute SHA256
let content_hash = crate::core::storage::content_hash::compute_sha256(&file_path)
.unwrap_or_default();
// ffprobe
let probe_json: serde_json::Value = probe_or_minimal(&file_path, &canonical_str);
// Determine file_type
let file_type = probe_json.get("streams").and_then(|s| s.as_array())
.and_then(|streams| {
if streams.iter().any(|st| st.get("codec_type").and_then(|c| c.as_str()) == Some("video")) {
Some("video")
} else if streams.iter().any(|st| st.get("codec_type").and_then(|c| c.as_str()) == Some("audio")) {
Some("audio")
} else {
None
}
})
.or_else(|| {
let ext = file_path.extension().and_then(|e| e.to_str()).map(|e| e.to_lowercase());
match ext.as_deref() {
Some("jpg"|"jpeg"|"png"|"gif"|"bmp"|"webp"|"svg") => Some("image"),
Some("pdf"|"doc"|"docx"|"pages"|"key"|"numbers"|"ppt"|"pptx"|"xls"|"xlsx") => Some("document"),
_ => None,
}
})
.unwrap_or("unknown");
let file_size = std::fs::metadata(&file_path).ok().map(|m| m.len()).unwrap_or(0);
let pre_data = serde_json::json!({
"file_name": filename,
"file_path": canonical_str,
"content_hash": content_hash,
"probe_json": probe_json,
"birthday": birthday,
"file_uuid": file_uuid,
"file_size": file_size,
"file_type": file_type,
"pre_processed_at": chrono::Utc::now().to_rfc3339(),
});
if let Ok(content) = serde_json::to_string_pretty(&pre_data) {
if std::fs::write(&pre_path, content).is_ok() {
info!("[PRE-PROCESS] {} → {}.pre.json", filename, file_uuid);
}
current_files.insert(fname.clone());
if !known.contains(&fname) {
info!("[WATCHER] New file detected: {} in {}", fname, dir);
known.insert(fname);
}
}
// Remove files that no longer exist (deleted between cycles)
known.retain(|k| current_files.contains(k));
}
}
/// Run ffprobe or return minimal filesystem metadata
fn probe_or_minimal(file_path: &Path, canonical: &str) -> serde_json::Value {
if let Ok(result) = crate::core::probe::probe_video(canonical) {
if let Ok(val) = serde_json::to_value(&result) {
return val;
/// Pre-process a single file: compute SHA256 + probe + UUID → .pre.json
/// This is called explicitly from register API, NOT from the watcher.
pub async fn pre_process_file(file_path: &str) -> Option<String> {
let path = std::path::Path::new(file_path);
if !path.is_file() {
return None;
}
let canonical = path.canonicalize().ok()?;
let canonical_str = canonical.to_string_lossy().to_string();
let filename = path.file_name()?.to_string_lossy().to_string();
let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string());
let birthday = std::fs::metadata(&path).ok()
.and_then(|m| m.created().ok())
.map(|t| {
let secs = t.duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs();
chrono::DateTime::from_timestamp(secs as i64, 0)
.map(|dt| dt.to_rfc3339())
.unwrap_or_else(|| chrono::Utc::now().to_rfc3339())
})
.unwrap_or_else(|| chrono::Utc::now().to_rfc3339());
let mac = crate::core::storage::uuid::get_mac_address();
let file_uuid = crate::core::storage::uuid::compute_birth_uuid(
&mac, &birthday, &canonical_str, &filename,
);
let pre_path = std::path::PathBuf::from(&output_dir).join(format!("{}.pre.json", file_uuid));
if pre_path.exists() {
info!("[PRE-PROCESS] Already pre-processed: {}", filename);
return Some(file_uuid);
}
info!("[PRE-PROCESS] Pre-processing: {} → {}", filename, file_uuid);
let content_hash = crate::core::storage::content_hash::compute_sha256(&path).unwrap_or_default();
let probe_json: serde_json::Value = if let Ok(result) = crate::core::probe::probe_video(&canonical_str) {
serde_json::to_value(&result).unwrap_or_default()
} else {
let size = std::fs::metadata(&path).ok().map(|m| m.len()).unwrap_or(0);
serde_json::json!({
"format": {"filename": canonical_str, "size": size.to_string(), "format_name": "unknown"},
"streams": []
})
};
let file_type = if probe_json.get("streams").and_then(|s| s.as_array())
.map_or(false, |streams| streams.iter().any(|st| st.get("codec_type").and_then(|c| c.as_str()) == Some("video")))
{ "video" } else { "unknown" };
let pre_data = serde_json::json!({
"file_name": filename,
"file_path": canonical_str,
"content_hash": content_hash,
"probe_json": probe_json,
"birthday": birthday,
"file_uuid": file_uuid,
"file_size": std::fs::metadata(&path).ok().map(|m| m.len()).unwrap_or(0),
"file_type": file_type,
"pre_processed_at": chrono::Utc::now().to_rfc3339(),
});
if let Ok(content) = serde_json::to_string_pretty(&pre_data) {
if std::fs::write(&pre_path, content).is_ok() {
info!("[PRE-PROCESS] {} → {}.pre.json", filename, file_uuid);
}
}
let size = std::fs::metadata(file_path).ok().map(|m| m.len()).unwrap_or(0);
serde_json::json!({
"format": {"filename": canonical, "size": size.to_string(), "format_name": "unknown"},
"streams": []
})
Some(file_uuid)
}