Files
momentry_core/src/processing/decision.rs
Warren 2b23d1cfbd feat: update core API, database layer, and worker modules
- Remove unused imports (n8n_search, universal_search, Client, Arc, etc.)
- Update API endpoints for identity, face recognition, search
- Fix postgres_db.rs search_videos parent_uuid column
- Add snapshot API and identity agent API
- Clean up backup files (.bak, .bak2)
2026-04-30 15:07:02 +08:00

297 lines
9.4 KiB
Rust

//! Processing decision logic and system resource management
use std::path::Path;
use std::process::Command;
/// Decision on how to process a video file
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum ProcessingDecision {
Process,
SkipComplete,
ResumePartial,
ForceReprocess,
}
impl ProcessingDecision {
/// Check if processing should proceed
pub fn should_process(&self) -> bool {
matches!(
self,
ProcessingDecision::Process
| ProcessingDecision::ResumePartial
| ProcessingDecision::ForceReprocess
)
}
/// Check if processing should resume from checkpoint
pub fn should_resume(&self) -> bool {
matches!(self, ProcessingDecision::ResumePartial)
}
}
/// System resource information
#[derive(Debug, Clone)]
pub struct SystemResources {
pub cpu_idle_percent: f64,
pub memory_available_mb: u64,
pub memory_total_mb: u64,
pub memory_used_percent: f64,
pub gpu_available: bool,
pub gpu_type: GpuType,
pub gpu_utilization: Option<f64>,
}
/// GPU type enumeration
#[derive(Debug, Clone, Copy)]
pub enum GpuType {
Nvidia,
AppleMps,
}
impl SystemResources {
/// Check current system resources
pub fn check() -> Self {
let cpu_idle = Self::get_cpu_idle();
let (mem_available, mem_total) = Self::get_memory_info();
let mem_used_pct = if mem_total > 0 && mem_available <= mem_total {
((mem_total - mem_available) as f64 / mem_total as f64) * 100.0
} else if mem_total > 0 {
100.0
} else {
0.0
};
let (gpu_available, gpu_type, gpu_util) = Self::get_gpu_info();
Self {
cpu_idle_percent: cpu_idle,
memory_available_mb: mem_available,
memory_total_mb: mem_total,
memory_used_percent: mem_used_pct,
gpu_available,
gpu_type,
gpu_utilization: gpu_util,
}
}
/// Check if parallel processing is possible
pub fn can_parallel(&self, required_memory_mb: u64) -> bool {
const MIN_CPU_IDLE: f64 = 30.0;
const MIN_MEMORY_MB: u64 = 4096;
self.cpu_idle_percent >= MIN_CPU_IDLE
&& self.memory_available_mb >= required_memory_mb
&& self.memory_available_mb >= MIN_MEMORY_MB
}
/// Recommend which modules can be processed in parallel
pub fn recommend_parallel_modules(&self) -> Vec<&'static str> {
let mut recommended = Vec::new();
if self.gpu_available {
recommended.push("yolo");
}
if self.memory_available_mb >= 8192 {
recommended.push("ocr");
recommended.push("face");
recommended.push("pose");
}
recommended
}
/// Get CPU idle percentage
fn get_cpu_idle() -> f64 {
let output = Command::new("top").args(["-l", "1", "-n", "1"]).output();
match output {
Ok(o) => {
let s = String::from_utf8_lossy(&o.stdout);
if let Some(line) = s.lines().find(|l| l.contains("idle")) {
if let Some(pct) = line
.split_whitespace()
.find_map(|s| s.strip_suffix("%idle"))
{
pct.trim().parse().ok().unwrap_or(50.0)
} else {
50.0
}
} else {
50.0
}
}
Err(_) => 50.0,
}
}
/// Get memory information (available and total in MB)
fn get_memory_info() -> (u64, u64) {
let output = Command::new("sysctl").args(["hw.memsize"]).output();
match output {
Ok(o) => {
let s = String::from_utf8_lossy(&o.stdout);
let total = s
.split_whitespace()
.nth(1)
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(0)
/ 1024
/ 1024;
let vm_stat = Command::new("vm_stat").output();
let available = match vm_stat {
Ok(v) => {
let vs = String::from_utf8_lossy(&v.stdout);
let mut free_pages: u64 = 0;
let mut inactive_pages: u64 = 0;
for line in vs.lines() {
if line.contains("Pages free:") {
free_pages = line
.split_whitespace()
.last()
.and_then(|v| v.trim_end_matches('.').parse().ok())
.unwrap_or(0);
} else if line.contains("Pages inactive:") {
inactive_pages = line
.split_whitespace()
.last()
.and_then(|v| v.trim_end_matches('.').parse().ok())
.unwrap_or(0);
}
}
// Pages * 4096 bytes / 1024 / 1024 = MB
(free_pages + inactive_pages) * 4096 / 1024 / 1024
}
Err(_) => total / 4,
};
(available, total)
}
Err(_) => (0, 0),
}
}
/// Get GPU information
fn get_gpu_info() -> (bool, GpuType, Option<f64>) {
// Check NVIDIA GPU
let nvidia_output = Command::new("nvidia-smi")
.args([
"--query-gpu=utilization.gpu",
"--format=csv,noheader,nounits",
])
.output();
if let Ok(o) = nvidia_output {
if o.status.success() {
let s = String::from_utf8_lossy(&o.stdout);
let util = s.trim().parse::<f64>().ok();
return (true, GpuType::Nvidia, util);
}
}
// Check Apple MPS (Metal Performance Shaders)
let mps_output = Command::new("system_profiler")
.args(["SPDisplaysDataType", "-detailLevel", "mini"])
.output();
if let Ok(o) = mps_output {
let s = String::from_utf8_lossy(&o.stdout);
if s.contains("Metal") || s.contains("Apple") {
return (true, GpuType::AppleMps, Some(0.0));
}
}
(false, GpuType::Nvidia, None)
}
}
impl std::fmt::Display for SystemResources {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"CPU: {:.1}% idle, Memory: {:.1}GB/{:.1}GB ({:.0}% used), GPU: {}",
self.cpu_idle_percent,
self.memory_available_mb as f64 / 1024.0,
self.memory_total_mb as f64 / 1024.0,
self.memory_used_percent,
if self.gpu_available {
format!("{:.0}% utilized", self.gpu_utilization.unwrap_or(0.0))
} else {
"N/A".to_string()
}
)
}
}
/// JSON file completeness status
#[derive(Debug, Clone, PartialEq)]
pub enum JsonCompleteness {
Complete,
Partial { current: u32, total: u32 },
Empty,
}
/// Decide processing strategy based on JSON file state
pub fn decide_processing(json_path: &Path, force: bool, resume: bool) -> ProcessingDecision {
if !json_path.exists() {
return ProcessingDecision::Process;
}
if force {
return ProcessingDecision::ForceReprocess;
}
if resume {
return ProcessingDecision::ResumePartial;
}
match check_json_completeness(json_path) {
JsonCompleteness::Complete => ProcessingDecision::SkipComplete,
JsonCompleteness::Partial { current, total } => {
eprintln!("\n⚠️ Found incomplete JSON file: {}", json_path.display());
eprintln!(
" Progress: {}/{} ({:.1}%)",
current,
total,
(current as f64 / total as f64) * 100.0
);
eprintln!(" Use --resume to continue from checkpoint");
eprintln!(" Use --force to reprocess from scratch");
ProcessingDecision::SkipComplete
}
JsonCompleteness::Empty => ProcessingDecision::Process,
}
}
/// Check JSON file completeness
pub fn check_json_completeness(json_path: &Path) -> JsonCompleteness {
let content = match std::fs::read_to_string(json_path) {
Ok(c) => c,
Err(_) => return JsonCompleteness::Empty,
};
if content.trim().is_empty() {
return JsonCompleteness::Empty;
}
let json: serde_json::Value = match serde_json::from_str(&content) {
Ok(v) => v,
Err(_) => return JsonCompleteness::Empty,
};
match json.get("segments") {
Some(serde_json::Value::Array(arr)) if !arr.is_empty() => JsonCompleteness::Complete,
Some(serde_json::Value::Object(obj)) => {
let current = obj.get("current").and_then(|v| v.as_u64()).unwrap_or(0) as u32;
let total = obj.get("total").and_then(|v| v.as_u64()).unwrap_or(0) as u32;
if total > 0 && current < total {
JsonCompleteness::Partial { current, total }
} else {
JsonCompleteness::Complete
}
}
_ => JsonCompleteness::Complete,
}
}