feat: unified probe — dispatcher detects category, runs ffprobe/Python/meta per file type

This commit is contained in:
Accusys
2026-05-15 14:38:47 +08:00
parent 4ee8a42e76
commit 29eca5a224
5 changed files with 270 additions and 65 deletions

93
scripts/probe_file.py Normal file
View File

@@ -0,0 +1,93 @@
#!/opt/homebrew/bin/python3.11
"""
Unified file probe — metadata extraction for all managed file types.
Called by Rust unified_probe() via PythonExecutor.
Output: JSON to stdout.
"""
import sys, json, os
def probe_pdf(path):
from PyPDF2 import PdfReader
r = PdfReader(path)
meta = {"pages": len(r.pages)}
if r.metadata:
if r.metadata.get('/Author'): meta["author"] = r.metadata['/Author']
if r.metadata.get('/Title'): meta["title"] = r.metadata['/Title']
if r.metadata.get('/Producer'): meta["producer"] = r.metadata['/Producer']
return meta
def probe_docx(path):
from docx import Document
d = Document(path)
meta = {"paragraphs": len(d.paragraphs), "sections": len(d.sections)}
if d.core_properties.author: meta["author"] = d.core_properties.author
return meta
def probe_xlsx(path):
import openpyxl
wb = openpyxl.load_workbook(path, read_only=True)
meta = {"sheet_names": wb.sheetnames, "sheet_count": len(wb.sheetnames)}
wb.close()
return meta
def probe_pptx(path):
from pptx import Presentation
prs = Presentation(path)
meta = {"slide_count": len(prs.slides)}
texts = []
for slide in prs.slides:
for shape in slide.shapes:
if shape.has_text_frame:
t = shape.text_frame.text.strip()
if t: texts.append(t)
if texts:
meta["text_preview"] = " | ".join(texts[:5])
return meta
def probe_archive(path):
import zipfile, tarfile
name = os.path.basename(path)
entries = []
if name.endswith('.zip'):
with zipfile.ZipFile(path) as z:
for e in z.infolist():
entries.append({"name": e.filename, "size": e.file_size})
elif name.endswith('.tar') or '.tar.' in name:
with tarfile.open(path) as t:
for e in t.getmembers():
entries.append({"name": e.name, "size": e.size})
return {"entry_count": len(entries), "entries": entries[:50]}
def probe_iwork(path):
import zipfile
try:
with zipfile.ZipFile(path) as z:
names = z.namelist()
has_preview = "preview.pdf" in names or "preview.jpg" in names
thumb = [n for n in names if "Thumbnail" in n or "preview" in n]
return {"has_preview": has_preview, "preview_files": thumb}
except Exception:
return {"has_preview": False}
def probe(path):
ext = os.path.splitext(path)[1].lower()
result = {"streams": []}
if ext in ('.pdf',):
result["document"] = probe_pdf(path)
elif ext in ('.docx', '.doc'):
result["document"] = probe_docx(path)
elif ext in ('.xlsx', '.xls'):
result["spreadsheet"] = probe_xlsx(path)
elif ext in ('.pptx', '.ppt'):
result["presentation"] = probe_pptx(path)
elif ext in ('.pages', '.numbers', '.key'):
result["apple_iwork"] = probe_iwork(path)
elif ext in ('.zip', '.tar', '.gz', '.tgz', '.7z', '.rar'):
result["archive"] = probe_archive(path)
return json.dumps(result)
if __name__ == '__main__':
if len(sys.argv) < 2:
print(json.dumps({"error": "Usage: probe_file.py <path>"}))
sys.exit(1)
print(probe(sys.argv[1]))

View File

@@ -999,70 +999,52 @@ async fn register_single_file(
&final_name,
);
// Step 5: Probe — use pre.json if available, otherwise run ffprobe
let cached_probe = pre_data.as_ref()
.and_then(|p| p.get("probe_json"))
.and_then(|v| serde_json::from_value::<crate::core::probe::ProbeResult>(v.clone()).ok());
let probe_result = cached_probe.or_else(|| crate::core::probe::probe_video(&canonical_path).ok());
let file_meta = std::fs::metadata(&canonical_path).ok();
let probe_json: Option<serde_json::Value> = if let Some(ref pre) = pre_data {
pre.get("probe_json").cloned()
// Step 5: Unified probe — use pre.json, otherwise run unified_probe()
let temp_probe_json: serde_json::Value = if let Some(ref pre) = pre_data {
pre.get("probe_json").cloned().unwrap_or_default()
} else {
probe_result.as_ref().map(|r| serde_json::to_value(r)).and_then(|r| r.ok()).or_else(|| {
file_meta.map(|m| serde_json::json!({
"format": {"size": m.len().to_string(), "filename": &canonical_path, "format_name": "unknown"},
"streams": []
}))
})
let scripts_dir = std::env::var("MOMENTRY_SCRIPTS_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry_core_0.1/scripts".to_string());
let python_path = std::env::var("MOMENTRY_PYTHON_PATH")
.unwrap_or_else(|_| "/opt/homebrew/bin/python3.11".to_string());
crate::core::probe::unified::unified_probe(&path, &scripts_dir, &python_path).await
};
let probe_json = Some(temp_probe_json.clone());
let has_video = probe_result.as_ref().map_or(false, |r| r.streams.iter().any(|s| s.codec_type.as_deref() == Some("video")));
let has_audio = probe_result.as_ref().map_or(false, |r| r.streams.iter().any(|s| s.codec_type.as_deref() == Some("audio")));
let has_video = temp_probe_json.get("streams").and_then(|s| s.as_array())
.map_or(false, |streams| streams.iter().any(|st| st.get("codec_type").and_then(|c| c.as_str()) == Some("video")));
let has_audio = temp_probe_json.get("streams").and_then(|s| s.as_array())
.map_or(false, |streams| streams.iter().any(|st| st.get("codec_type").and_then(|c| c.as_str()) == Some("audio")));
// Determine file_type: check ffprobe result, then extension
let final_file_type = if has_video {
Some("video".to_string())
} else if has_audio {
Some("audio".to_string())
} else {
let ext = std::path::Path::new(&canonical_path).extension().and_then(|e| e.to_str()).map(|e| e.to_lowercase());
match ext.as_deref() {
Some("jpg" | "jpeg" | "png" | "gif" | "bmp" | "webp" | "svg") => Some("image".to_string()),
Some("pdf") => Some("document".to_string()),
Some("doc" | "docx") => Some("document".to_string()),
Some("pages") => Some("document".to_string()),
Some("xls" | "xlsx" | "numbers") => Some("spreadsheet".to_string()),
Some("ppt" | "pptx" | "key") => Some("presentation".to_string()),
_ => probe_result.as_ref().and_then(|r| {
if r.streams.is_empty() && r.format.duration.is_some() { Some("unknown".to_string()) } else { None }
}),
}
Some(temp_probe_json.get("format").and_then(|f| f.get("file_type")).and_then(|v| v.as_str()).unwrap_or("unknown").to_string())
};
let duration = probe_result.as_ref()
.and_then(|r| r.format.duration.as_ref())
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(0.0);
let duration = temp_probe_json.get("format").and_then(|f| {
let src = if has_video { f.get("duration") } else { None };
src.and_then(|v| v.as_str()).and_then(|s| s.parse::<f64>().ok())
}).unwrap_or(0.0);
let mut width = 0u32;
let mut height = 0u32;
let mut fps = 0.0;
let mut total_frames = 0u64;
if let Some(ref probe) = probe_result {
if let Some(s) = probe.streams.iter().find(|s| s.codec_type.as_deref() == Some("video")) {
width = s.width.unwrap_or(0);
height = s.height.unwrap_or(0);
if let Some(fps_str) = &s.r_frame_rate {
if let Some(streams) = temp_probe_json.get("streams").and_then(|s| s.as_array()) {
if let Some(s) = streams.iter().find(|st| st.get("codec_type").and_then(|c| c.as_str()) == Some("video")) {
width = s.get("width").and_then(|v| v.as_i64()).unwrap_or(0) as u32;
height = s.get("height").and_then(|v| v.as_i64()).unwrap_or(0) as u32;
if let Some(fps_str) = s.get("r_frame_rate").and_then(|v| v.as_str()) {
if let Some((num, den)) = fps_str.split_once('/') {
if let (Ok(n), Ok(d)) = (num.parse::<f64>(), den.parse::<f64>()) {
if d > 0.0 {
fps = n / d;
}
if d > 0.0 { fps = n / d; }
}
}
}
total_frames = s.nb_frames.as_ref().and_then(|s| s.parse().ok()).unwrap_or((duration * fps) as u64);
total_frames = s.get("nb_frames").and_then(|v| v.as_str())
.and_then(|s| s.parse().ok()).unwrap_or((duration * fps) as u64);
}
}
@@ -1158,16 +1140,16 @@ async fn register_single_file(
}
// 更新 DB: cut_done, scene_done, audio_tracks
let audio_tracks: Vec<serde_json::Value> = probe_result.as_ref().map_or(vec![], |pr| {
pr.streams.iter()
.filter(|s| s.codec_type.as_deref() == Some("audio"))
.map(|s| {
let audio_tracks: Vec<serde_json::Value> = temp_probe_json.get("streams").and_then(|s| s.as_array()).map_or(vec![], |streams| {
streams.iter()
.filter(|st| st.get("codec_type").and_then(|c| c.as_str()) == Some("audio"))
.map(|st| {
serde_json::json!({
"index": s.index,
"codec": s.codec_name,
"channels": s.channels,
"sample_rate": s.sample_rate,
"language": s.tags.as_ref().and_then(|t| t.get("language")).unwrap_or(&serde_json::Value::Null),
"index": st.get("index").and_then(|v| v.as_i64()),
"codec": st.get("codec_name").and_then(|v| v.as_str()),
"channels": st.get("channels").and_then(|v| v.as_i64()),
"sample_rate": st.get("sample_rate").and_then(|v| v.as_str()),
"language": st.get("tags").and_then(|t| t.get("language")),
})
})
.collect()

View File

@@ -1,3 +1,4 @@
pub mod ffprobe;
pub mod unified;
pub use ffprobe::{probe_video, FormatInfo, ProbeResult, StreamInfo};

135
src/core/probe/unified.rs Normal file
View File

@@ -0,0 +1,135 @@
use std::path::Path;
use std::time::SystemTime;
/// File category derived from extension
#[derive(Debug, Clone, PartialEq)]
pub enum FileCategory {
Video,
Image,
Document,
Spreadsheet,
Presentation,
Archive,
Unknown,
}
/// Detect file category from path extension
pub fn detect_category(path: &Path) -> FileCategory {
let ext = path.extension()
.and_then(|e| e.to_str())
.map(|e| e.to_lowercase());
match ext.as_deref() {
Some("mp4" | "mov" | "mkv" | "avi" | "webm" | "m4v" | "mpeg") => FileCategory::Video,
Some("jpg" | "jpeg" | "png" | "gif" | "bmp" | "webp" | "svg" | "heic" | "tiff") => FileCategory::Image,
Some("pdf" | "doc" | "docx" | "odt" | "pages" | "rtf" | "txt" | "md" | "rst") => FileCategory::Document,
Some("xls" | "xlsx" | "csv" | "ods" | "numbers") => FileCategory::Spreadsheet,
Some("ppt" | "pptx" | "odp" | "key") => FileCategory::Presentation,
Some("zip" | "tar" | "gz" | "tgz" | "7z" | "rar") => FileCategory::Archive,
_ => FileCategory::Unknown,
}
}
/// Build universal format info from filesystem metadata
pub fn base_format_info(path: &Path) -> serde_json::Value {
let meta = std::fs::metadata(path).ok();
let size = meta.as_ref().map(|m| m.len()).unwrap_or(0);
let mtime = meta.as_ref()
.and_then(|m| m.modified().ok())
.and_then(|t| {
let secs = t.duration_since(SystemTime::UNIX_EPOCH).ok()?.as_secs() as i64;
chrono::DateTime::from_timestamp(secs, 0)
.map(|dt| dt.to_rfc3339())
})
.unwrap_or_default();
let fname = path.to_string_lossy().to_string();
let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("").to_lowercase();
let cat = detect_category(path);
let file_type = match cat {
FileCategory::Video => "video",
FileCategory::Image => "image",
FileCategory::Document => "document",
FileCategory::Spreadsheet => "spreadsheet",
FileCategory::Presentation => "presentation",
FileCategory::Archive => "archive",
FileCategory::Unknown => "unknown",
};
serde_json::json!({
"filename": fname,
"format_name": ext,
"file_type": file_type,
"size": size.to_string(),
"mtime": mtime,
})
}
/// Run ffprobe for video/image files
fn ffprobe_probe(path: &Path, format_base: serde_json::Value) -> serde_json::Value {
let canonical = path.to_string_lossy();
if let Ok(result) = crate::core::probe::probe_video(&canonical) {
if let Ok(mut val) = serde_json::to_value(&result) {
if let Some(obj) = val.as_object_mut() {
obj.insert("format".to_string(), format_base);
}
return val;
}
}
// ffprobe failed — return minimal
serde_json::json!({
"format": format_base,
"streams": []
})
}
/// Run Python probe for document/spreadsheet/presentation files
fn python_probe(path: &Path, category: &FileCategory, scripts_dir: &str, python_path: &str, format_base: serde_json::Value) -> serde_json::Value {
let script = format!("{}/probe_file.py", scripts_dir);
if !std::path::Path::new(&script).exists() {
return minimal_probe(format_base);
}
match std::process::Command::new(python_path)
.arg(&script)
.arg(path.to_string_lossy().as_ref())
.output()
{
Ok(output) if output.status.success() => {
let stdout = String::from_utf8_lossy(&output.stdout);
if let Ok(mut result) = serde_json::from_str::<serde_json::Value>(&stdout) {
if let Some(obj) = result.as_object_mut() {
obj.insert("format".to_string(), format_base);
}
return result;
}
minimal_probe(format_base)
}
_ => minimal_probe(format_base),
}
}
/// Minimal fallback — filesystem metadata only
fn minimal_probe(format_base: serde_json::Value) -> serde_json::Value {
serde_json::json!({
"format": format_base,
"streams": []
})
}
/// Unified probe: dispatches to the right probe based on file type
/// Returns a probe_json-compatible Value
pub async fn unified_probe(
path: &Path,
scripts_dir: &str,
python_path: &str,
) -> serde_json::Value {
let cat = detect_category(path);
let format_base = base_format_info(path);
match cat {
FileCategory::Video | FileCategory::Image => {
ffprobe_probe(path, format_base)
}
FileCategory::Document | FileCategory::Spreadsheet | FileCategory::Presentation => {
python_probe(path, &cat, scripts_dir, python_path, format_base)
}
_ => minimal_probe(format_base),
}
}

View File

@@ -117,19 +117,13 @@ pub async fn pre_process_file(file_path: &str) -> Option<String> {
let content_hash = crate::core::storage::content_hash::compute_sha256(&path).unwrap_or_default();
let probe_json: serde_json::Value = if let Ok(result) = crate::core::probe::probe_video(&canonical_str) {
serde_json::to_value(&result).unwrap_or_default()
} else {
let size = std::fs::metadata(&path).ok().map(|m| m.len()).unwrap_or(0);
serde_json::json!({
"format": {"filename": canonical_str, "size": size.to_string(), "format_name": "unknown"},
"streams": []
})
};
let scripts_dir = std::env::var("MOMENTRY_SCRIPTS_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry_core_0.1/scripts".to_string());
let python_path = std::env::var("MOMENTRY_PYTHON_PATH")
.unwrap_or_else(|_| "/opt/homebrew/bin/python3.11".to_string());
let probe_json = crate::core::probe::unified::unified_probe(&path, &scripts_dir, &python_path).await;
let file_type = if probe_json.get("streams").and_then(|s| s.as_array())
.map_or(false, |streams| streams.iter().any(|st| st.get("codec_type").and_then(|c| c.as_str()) == Some("video")))
{ "video" } else { "unknown" };
let file_type = probe_json.get("format").and_then(|f| f.get("file_type")).and_then(|v| v.as_str()).unwrap_or("unknown").to_string();
let pre_data = serde_json::json!({
"file_name": filename,