feat: file pre-processor in watcher — SHA256 + probe + UUID → .pre.json for all file types
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
use anyhow::Result;
|
||||
use std::path::Path;
|
||||
use std::collections::HashSet;
|
||||
use tokio::time;
|
||||
use tracing::{info, warn};
|
||||
|
||||
@@ -10,19 +11,17 @@ pub struct WatcherConfig {
|
||||
|
||||
impl Default for WatcherConfig {
|
||||
fn default() -> Self {
|
||||
// Default to SFTP demo directory if not specified
|
||||
let default_dir = std::env::var("MOMENTRY_SFTP_ROOT")
|
||||
.unwrap_or_else(|_| "/Users/accusys/momentry/var/sftpgo/data/demo/".to_string());
|
||||
|
||||
Self {
|
||||
directories: vec![default_dir],
|
||||
poll_interval_ms: 60000, // 60 seconds polling interval
|
||||
poll_interval_ms: 60000,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Starts the file watcher in the background.
|
||||
/// Scans directories for video files (auto-registration disabled).
|
||||
/// Pre-processes new files: SHA256 + probe + UUID → .pre.json
|
||||
pub async fn run_watcher() -> Result<()> {
|
||||
let config = WatcherConfig::default();
|
||||
let dirs = config.directories.clone();
|
||||
@@ -32,62 +31,154 @@ pub async fn run_watcher() -> Result<()> {
|
||||
return Err(anyhow::anyhow!("No watch directories"));
|
||||
}
|
||||
|
||||
info!("Starting Video File Watcher (auto-registration disabled)...");
|
||||
info!("Starting File Watcher (pre-processor)...");
|
||||
info!("Watch directories: {:?}", dirs);
|
||||
|
||||
// Spawn background task for monitoring only (no auto-registration)
|
||||
tokio::spawn(async move {
|
||||
let mut interval = time::interval(time::Duration::from_millis(config.poll_interval_ms));
|
||||
|
||||
let mut interval = time::interval(std::time::Duration::from_millis(config.poll_interval_ms));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
scan_videos(&dirs).await;
|
||||
pre_process_new_files(&dirs).await;
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn scan_videos(directories: &[String]) {
|
||||
// Allowed extensions list
|
||||
let allowed_extensions = vec!["mp4", "mov", "mkv"];
|
||||
async fn pre_process_new_files(directories: &[String]) {
|
||||
let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR")
|
||||
.unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string());
|
||||
let scripts_dir = std::env::var("MOMENTRY_SCRIPTS_DIR")
|
||||
.unwrap_or_else(|_| "/Users/accusys/momentry_core_0.1/scripts".to_string());
|
||||
let python_path = std::env::var("MOMENTRY_PYTHON_PATH")
|
||||
.unwrap_or_else(|_| "/opt/homebrew/bin/python3.11".to_string());
|
||||
|
||||
// Collect existing .pre.json UUIDs to detect new files
|
||||
let existing: HashSet<String> = std::fs::read_dir(&output_dir)
|
||||
.map(|d| d.flatten().filter_map(|e| {
|
||||
let name = e.file_name().to_string_lossy().to_string();
|
||||
if name.ends_with(".pre.json") {
|
||||
Some(name.trim_end_matches(".pre.json").to_string())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}).collect())
|
||||
.unwrap_or_default();
|
||||
|
||||
for dir in directories {
|
||||
let path = Path::new(dir);
|
||||
if !path.exists() {
|
||||
warn!("Directory does not exist, skipping: {}", dir);
|
||||
let dir_path = Path::new(dir);
|
||||
if !dir_path.is_dir() {
|
||||
continue;
|
||||
}
|
||||
let entries = match std::fs::read_dir(dir_path) {
|
||||
Ok(e) => e,
|
||||
_ => continue,
|
||||
};
|
||||
|
||||
if let Ok(entries) = std::fs::read_dir(path) {
|
||||
let video_count = entries
|
||||
.flatten()
|
||||
.filter(|entry| {
|
||||
let file_path = entry.path();
|
||||
if file_path.is_dir() {
|
||||
if let Some(name) = file_path.file_name().and_then(|n| n.to_str()) {
|
||||
if name.starts_with('.') {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
if !file_path.is_file() {
|
||||
return false;
|
||||
}
|
||||
if let Some(ext) = file_path.extension().and_then(|e| e.to_str()) {
|
||||
allowed_extensions.contains(&ext.to_lowercase().as_str())
|
||||
for entry in entries.flatten() {
|
||||
let file_path = entry.path();
|
||||
if !file_path.is_file() {
|
||||
continue;
|
||||
}
|
||||
let fname = file_path.file_name().and_then(|n| n.to_str()).unwrap_or("");
|
||||
if fname.starts_with('.') { continue; }
|
||||
if fname.ends_with(".pre.json") { continue; }
|
||||
|
||||
// Compute UUID the same way register does
|
||||
let canonical = match file_path.canonicalize() {
|
||||
Ok(p) => p,
|
||||
_ => continue,
|
||||
};
|
||||
let canonical_str = canonical.to_string_lossy().to_string();
|
||||
let filename = fname.to_string();
|
||||
|
||||
// Birthday = file creation time
|
||||
let birthday = std::fs::metadata(&file_path)
|
||||
.ok()
|
||||
.and_then(|m| m.created().ok())
|
||||
.map(|t| {
|
||||
let secs = t.duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs();
|
||||
// Format as RFC 3339
|
||||
chrono::DateTime::from_timestamp(secs as i64, 0)
|
||||
.map(|dt| dt.to_rfc3339())
|
||||
.unwrap_or_else(|| chrono::Utc::now().to_rfc3339())
|
||||
})
|
||||
.unwrap_or_else(|| chrono::Utc::now().to_rfc3339());
|
||||
|
||||
let mac = crate::core::storage::uuid::get_mac_address();
|
||||
let file_uuid = crate::core::storage::uuid::compute_birth_uuid(
|
||||
&mac, &birthday, &canonical_str, &filename,
|
||||
);
|
||||
|
||||
// Check if .pre.json already exists
|
||||
let pre_path = std::path::PathBuf::from(&output_dir).join(format!("{}.pre.json", file_uuid));
|
||||
if pre_path.exists() {
|
||||
continue; // Already pre-processed
|
||||
}
|
||||
|
||||
info!("[PRE-PROCESS] New file: {} → {}", filename, file_uuid);
|
||||
|
||||
// Compute SHA256
|
||||
let content_hash = crate::core::storage::content_hash::compute_sha256(&file_path)
|
||||
.unwrap_or_default();
|
||||
|
||||
// ffprobe
|
||||
let probe_json: serde_json::Value = probe_or_minimal(&file_path, &canonical_str);
|
||||
|
||||
// Determine file_type
|
||||
let file_type = probe_json.get("streams").and_then(|s| s.as_array())
|
||||
.and_then(|streams| {
|
||||
if streams.iter().any(|st| st.get("codec_type").and_then(|c| c.as_str()) == Some("video")) {
|
||||
Some("video")
|
||||
} else if streams.iter().any(|st| st.get("codec_type").and_then(|c| c.as_str()) == Some("audio")) {
|
||||
Some("audio")
|
||||
} else {
|
||||
false
|
||||
None
|
||||
}
|
||||
})
|
||||
.count();
|
||||
.or_else(|| {
|
||||
let ext = file_path.extension().and_then(|e| e.to_str()).map(|e| e.to_lowercase());
|
||||
match ext.as_deref() {
|
||||
Some("jpg"|"jpeg"|"png"|"gif"|"bmp"|"webp"|"svg") => Some("image"),
|
||||
Some("pdf"|"doc"|"docx"|"pages"|"key"|"numbers"|"ppt"|"pptx"|"xls"|"xlsx") => Some("document"),
|
||||
_ => None,
|
||||
}
|
||||
})
|
||||
.unwrap_or("unknown");
|
||||
|
||||
if video_count > 0 {
|
||||
info!(
|
||||
"Found {} video files in {} (use API to register)",
|
||||
video_count, dir
|
||||
);
|
||||
let file_size = std::fs::metadata(&file_path).ok().map(|m| m.len()).unwrap_or(0);
|
||||
|
||||
let pre_data = serde_json::json!({
|
||||
"file_name": filename,
|
||||
"file_path": canonical_str,
|
||||
"content_hash": content_hash,
|
||||
"probe_json": probe_json,
|
||||
"birthday": birthday,
|
||||
"file_uuid": file_uuid,
|
||||
"file_size": file_size,
|
||||
"file_type": file_type,
|
||||
"pre_processed_at": chrono::Utc::now().to_rfc3339(),
|
||||
});
|
||||
|
||||
if let Ok(content) = serde_json::to_string_pretty(&pre_data) {
|
||||
if std::fs::write(&pre_path, content).is_ok() {
|
||||
info!("[PRE-PROCESS] {} → {}.pre.json", filename, file_uuid);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Run ffprobe or return minimal filesystem metadata
|
||||
fn probe_or_minimal(file_path: &Path, canonical: &str) -> serde_json::Value {
|
||||
if let Ok(result) = crate::core::probe::probe_video(canonical) {
|
||||
if let Ok(val) = serde_json::to_value(&result) {
|
||||
return val;
|
||||
}
|
||||
}
|
||||
let size = std::fs::metadata(file_path).ok().map(|m| m.len()).unwrap_or(0);
|
||||
serde_json::json!({
|
||||
"format": {"filename": canonical, "size": size.to_string(), "format_name": "unknown"},
|
||||
"streams": []
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user