Files
momentry_core/src/main.rs.backup
2026-05-08 00:48:15 +08:00

3323 lines
139 KiB
Plaintext
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//! Momentry Core - Digital asset management system with video analysis and RAG
//!
//! This is the main entry point for the CLI application.
use anyhow::{Context, Result};
use clap::Parser;
use futures_util::StreamExt;
use std::io::Write;
use std::path::Path;
use std::str;
use std::sync::{Arc, Mutex};
// Local modules
mod cli;
mod processing;
use cli::*;
use processing::*;
use processing::modules::*;
// Core dependencies
use momentry_core::core::api_key::{ApiKeyService, ApiKeyType};
use momentry_core::core::chunk::types::{Chunk, ChunkRule, ChunkType};
use momentry_core::core::db::Database;
use momentry_core::core::time::FrameTime;
use momentry_core::ui::progress::{ProcessorType, ProgressState, ProgressUi};
use momentry_core::{
Embedder, OutputDir, PostgresDb, QdrantDb, RedisClient, VectorPayload, VideoRecord, VideoStatus,
};
#[derive(Debug, Clone)]
pub struct SystemResources {
pub cpu_idle_percent: f64,
pub memory_available_mb: u64,
pub memory_total_mb: u64,
pub memory_used_percent: f64,
pub gpu_available: bool,
pub gpu_type: GpuType,
pub gpu_utilization: Option<f64>,
}
#[derive(Debug, Clone, Copy)]
pub enum GpuType {
Nvidia,
AppleMps,
}
impl SystemResources {
pub fn check() -> Self {
let cpu_idle = Self::get_cpu_idle();
let (mem_available, mem_total) = Self::get_memory_info();
let mem_used_pct = if mem_total > 0 && mem_available <= mem_total {
((mem_total - mem_available) as f64 / mem_total as f64) * 100.0
} else if mem_total > 0 {
100.0
} else {
0.0
};
let (gpu_available, gpu_type, gpu_util) = Self::get_gpu_info();
Self {
cpu_idle_percent: cpu_idle,
memory_available_mb: mem_available,
memory_total_mb: mem_total,
memory_used_percent: mem_used_pct,
gpu_available,
gpu_type,
gpu_utilization: gpu_util,
}
}
pub fn can_parallel(&self, required_memory_mb: u64) -> bool {
const MIN_CPU_IDLE: f64 = 30.0;
const MIN_MEMORY_MB: u64 = 4096;
self.cpu_idle_percent >= MIN_CPU_IDLE
&& self.memory_available_mb >= required_memory_mb
&& self.memory_available_mb >= MIN_MEMORY_MB
}
pub fn recommend_parallel_modules(&self) -> Vec<&'static str> {
let mut recommended = Vec::new();
if self.gpu_available {
recommended.push("yolo");
}
if self.memory_available_mb >= 8192 {
recommended.push("ocr");
recommended.push("face");
recommended.push("pose");
}
recommended
}
fn get_cpu_idle() -> f64 {
use std::process::Command;
let output = Command::new("top").args(["-l", "1", "-n", "1"]).output();
match output {
Ok(o) => {
let s = String::from_utf8_lossy(&o.stdout);
if let Some(line) = s.lines().find(|l| l.contains("idle")) {
if let Some(pct) = line
.split_whitespace()
.find_map(|s| s.strip_suffix("%idle"))
{
pct.trim().parse().ok().unwrap_or(50.0)
} else {
50.0
}
} else {
50.0
}
}
Err(_) => 50.0,
}
}
fn get_memory_info() -> (u64, u64) {
use std::process::Command;
let output = Command::new("sysctl").args(["hw.memsize"]).output();
match output {
Ok(o) => {
let s = String::from_utf8_lossy(&o.stdout);
let total = s
.split_whitespace()
.nth(1)
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(0)
/ 1024
/ 1024;
let vm_stat = Command::new("vm_stat").output();
let available = match vm_stat {
Ok(v) => {
let vs = String::from_utf8_lossy(&v.stdout);
let mut free_pages: u64 = 0;
let mut inactive_pages: u64 = 0;
for line in vs.lines() {
if line.contains("Pages free:") {
free_pages = line
.split_whitespace()
.last()
.and_then(|v| v.trim_end_matches('.').parse().ok())
.unwrap_or(0);
} else if line.contains("Pages inactive:") {
inactive_pages = line
.split_whitespace()
.last()
.and_then(|v| v.trim_end_matches('.').parse().ok())
.unwrap_or(0);
}
}
// Pages * 4096 bytes / 1024 / 1024 = MB
(free_pages + inactive_pages) * 4096 / 1024 / 1024
}
Err(_) => total / 4,
};
(available, total)
}
Err(_) => (0, 0),
}
}
fn get_gpu_info() -> (bool, GpuType, Option<f64>) {
use std::process::Command;
// Check NVIDIA GPU
let nvidia_output = Command::new("nvidia-smi")
.args([
"--query-gpu=utilization.gpu",
"--format=csv,noheader,nounits",
])
.output();
if let Ok(o) = nvidia_output {
if o.status.success() {
let s = String::from_utf8_lossy(&o.stdout);
let util = s.trim().parse::<f64>().ok();
return (true, GpuType::Nvidia, util);
}
}
// Check Apple MPS (Metal Performance Shaders)
let mps_output = Command::new("system_profiler")
.args(["SPDisplaysDataType", "-detailLevel", "mini"])
.output();
if let Ok(o) = mps_output {
let s = String::from_utf8_lossy(&o.stdout);
if s.contains("Metal") || s.contains("Apple") {
return (true, GpuType::AppleMps, Some(0.0));
}
}
(false, GpuType::Nvidia, None)
}
}
impl std::fmt::Display for SystemResources {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"CPU: {:.1}% idle, Memory: {:.1}GB/{:.1}GB ({:.0}% used), GPU: {}",
self.cpu_idle_percent,
self.memory_available_mb as f64 / 1024.0,
self.memory_total_mb as f64 / 1024.0,
self.memory_used_percent,
if self.gpu_available {
format!("{:.0}% utilized", self.gpu_utilization.unwrap_or(0.0))
} else {
"N/A".to_string()
}
)
}
}
fn decide_processing(json_path: &Path, force: bool, resume: bool) -> ProcessingDecision {
if !json_path.exists() {
return ProcessingDecision::Process;
}
if force {
return ProcessingDecision::ForceReprocess;
}
if resume {
return ProcessingDecision::ResumePartial;
}
match check_json_completeness(json_path) {
JsonCompleteness::Complete => ProcessingDecision::SkipComplete,
JsonCompleteness::Partial { current, total } => {
eprintln!("\n⚠ Found incomplete JSON file: {}", json_path.display());
eprintln!(
" Progress: {}/{} ({:.1}%)",
current,
total,
(current as f64 / total as f64) * 100.0
);
eprintln!(" Use --resume to continue from checkpoint");
eprintln!(" Use --force to reprocess from scratch");
ProcessingDecision::SkipComplete
}
JsonCompleteness::Empty => ProcessingDecision::Process,
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum JsonCompleteness {
Complete,
Partial { current: u32, total: u32 },
Empty,
}
fn check_json_completeness(json_path: &Path) -> JsonCompleteness {
let content = match std::fs::read_to_string(json_path) {
Ok(c) => c,
Err(_) => return JsonCompleteness::Empty,
};
if content.trim().is_empty() {
return JsonCompleteness::Empty;
}
let json: serde_json::Value = match serde_json::from_str(&content) {
Ok(v) => v,
Err(_) => return JsonCompleteness::Empty,
};
match json.get("segments") {
Some(serde_json::Value::Array(arr)) if !arr.is_empty() => JsonCompleteness::Complete,
Some(serde_json::Value::Object(obj)) => {
let current = obj.get("current").and_then(|v| v.as_u64()).unwrap_or(0) as u32;
let total = obj.get("total").and_then(|v| v.as_u64()).unwrap_or(0) as u32;
if total > 0 && current < total {
JsonCompleteness::Partial { current, total }
} else {
JsonCompleteness::Complete
}
}
_ => JsonCompleteness::Complete,
}
}
async fn process_asr_module(
asr_path: &Path,
video_path: &str,
uuid: &str,
progress_state: &Arc<Mutex<ProgressState>>,
ui: &Arc<Mutex<Option<ProgressUi>>>,
) -> anyhow::Result<()> {
{
let mut state = progress_state.lock().unwrap();
state.get_processor(ProcessorType::Asr).start(1);
}
let asr_result = momentry_core::core::processor::process_asr(
video_path,
asr_path.to_str().unwrap(),
Some(uuid),
)
.await?;
let asr_json = serde_json::to_string_pretty(&asr_result)?;
std::fs::write(asr_path, &asr_json)?;
let output_dir = OutputDir::new();
let _ = output_dir.backup_file(uuid, "asr.json");
println!(" ✓ ASR saved: {} segments", asr_result.segments.len());
{
let mut state = progress_state.lock().unwrap();
state
.get_processor(ProcessorType::Asr)
.complete(&format!("{} segments", asr_result.segments.len()));
}
if let Some(ref mut ui) = *ui.lock().unwrap() {
let _ = ui.render();
}
Ok(())
}
async fn process_cut_module(
cut_path: &Path,
video_path: &str,
uuid: &str,
progress_state: &Arc<Mutex<ProgressState>>,
ui: &Arc<Mutex<Option<ProgressUi>>>,
) -> anyhow::Result<()> {
{
let mut state = progress_state.lock().unwrap();
state.get_processor(ProcessorType::Cut).start(1);
}
let cut_result = momentry_core::core::processor::process_cut(
video_path,
cut_path.to_str().unwrap(),
Some(uuid),
)
.await?;
let cut_json = serde_json::to_string_pretty(&cut_result)?;
std::fs::write(cut_path, &cut_json)?;
let output_dir = OutputDir::new();
let _ = output_dir.backup_file(uuid, "cut.json");
println!(" ✓ CUT saved: {} scenes", cut_result.scenes.len());
{
let mut state = progress_state.lock().unwrap();
state
.get_processor(ProcessorType::Cut)
.complete(&format!("{} scenes", cut_result.scenes.len()));
}
if let Some(ref mut ui) = *ui.lock().unwrap() {
let _ = ui.render();
}
Ok(())
}
async fn process_asrx_module(
asrx_path: &Path,
video_path: &str,
uuid: &str,
progress_state: &Arc<Mutex<ProgressState>>,
ui: &Arc<Mutex<Option<ProgressUi>>>,
) -> anyhow::Result<()> {
{
let mut state = progress_state.lock().unwrap();
state.get_processor(ProcessorType::Asrx).start(1);
}
let asrx_result = momentry_core::core::processor::process_asrx(
video_path,
asrx_path.to_str().unwrap(),
Some(uuid),
)
.await?;
let asrx_json = serde_json::to_string_pretty(&asrx_result)?;
std::fs::write(asrx_path, &asrx_json)?;
let output_dir = OutputDir::new();
let _ = output_dir.backup_file(uuid, "asrx.json");
println!(" ✓ ASRX saved: {} segments", asrx_result.segments.len());
{
let mut state = progress_state.lock().unwrap();
state
.get_processor(ProcessorType::Asrx)
.complete(&format!("{} segments", asrx_result.segments.len()));
}
if let Some(ref mut ui) = *ui.lock().unwrap() {
let _ = ui.render();
}
Ok(())
}
async fn process_yolo_module(
yolo_path: &Path,
video_path: &str,
uuid: &str,
progress_state: &Arc<Mutex<ProgressState>>,
ui: &Arc<Mutex<Option<ProgressUi>>>,
) -> anyhow::Result<()> {
{
let mut state = progress_state.lock().unwrap();
state.get_processor(ProcessorType::Yolo).start(1);
}
let yolo_result = momentry_core::core::processor::process_yolo(
video_path,
yolo_path.to_str().unwrap(),
Some(uuid),
)
.await?;
let yolo_json = serde_json::to_string_pretty(&yolo_result)?;
std::fs::write(yolo_path, &yolo_json)?;
let output_dir = OutputDir::new();
let _ = output_dir.backup_file(uuid, "yolo.json");
println!(" ✓ YOLO saved: {} frames", yolo_result.frame_count);
{
let mut state = progress_state.lock().unwrap();
state
.get_processor(ProcessorType::Yolo)
.complete(&format!("{} frames", yolo_result.frame_count));
}
if let Some(ref mut ui) = *ui.lock().unwrap() {
let _ = ui.render();
}
Ok(())
}
async fn process_ocr_module(
ocr_path: &Path,
video_path: &str,
uuid: &str,
progress_state: &Arc<Mutex<ProgressState>>,
ui: &Arc<Mutex<Option<ProgressUi>>>,
) -> anyhow::Result<()> {
{
let mut state = progress_state.lock().unwrap();
state.get_processor(ProcessorType::Ocr).start(1);
}
let ocr_result = momentry_core::core::processor::process_ocr(
video_path,
ocr_path.to_str().unwrap(),
Some(uuid),
)
.await?;
let ocr_json = serde_json::to_string_pretty(&ocr_result)?;
std::fs::write(ocr_path, &ocr_json)?;
let output_dir = OutputDir::new();
let _ = output_dir.backup_file(uuid, "ocr.json");
println!(
" ✓ OCR saved: {} frames with text",
ocr_result.frames.len()
);
{
let mut state = progress_state.lock().unwrap();
state
.get_processor(ProcessorType::Ocr)
.complete(&format!("{} frames", ocr_result.frames.len()));
}
if let Some(ref mut ui) = *ui.lock().unwrap() {
let _ = ui.render();
}
Ok(())
}
async fn process_face_module(
face_path: &Path,
video_path: &str,
uuid: &str,
progress_state: &Arc<Mutex<ProgressState>>,
ui: &Arc<Mutex<Option<ProgressUi>>>,
) -> anyhow::Result<()> {
{
let mut state = progress_state.lock().unwrap();
state.get_processor(ProcessorType::Face).start(1);
}
let face_result = momentry_core::core::processor::process_face(
video_path,
face_path.to_str().unwrap(),
Some(uuid),
)
.await?;
let face_json = serde_json::to_string_pretty(&face_result)?;
std::fs::write(face_path, &face_json)?;
let output_dir = OutputDir::new();
let _ = output_dir.backup_file(uuid, "face.json");
println!(" ✓ Face saved: {} frames", face_result.frames.len());
{
let mut state = progress_state.lock().unwrap();
state
.get_processor(ProcessorType::Face)
.complete(&format!("{} frames", face_result.frames.len()));
}
if let Some(ref mut ui) = *ui.lock().unwrap() {
let _ = ui.render();
}
Ok(())
}
async fn process_pose_module(
pose_path: &Path,
video_path: &str,
uuid: &str,
progress_state: &Arc<Mutex<ProgressState>>,
ui: &Arc<Mutex<Option<ProgressUi>>>,
) -> anyhow::Result<()> {
{
let mut state = progress_state.lock().unwrap();
state.get_processor(ProcessorType::Pose).start(1);
}
let pose_result = momentry_core::core::processor::process_pose(
video_path,
pose_path.to_str().unwrap(),
Some(uuid),
)
.await?;
let pose_json = serde_json::to_string_pretty(&pose_result)?;
std::fs::write(pose_path, &pose_json)?;
let output_dir = OutputDir::new();
let _ = output_dir.backup_file(uuid, "pose.json");
println!(" ✓ Pose saved: {} frames", pose_result.frames.len());
{
let mut state = progress_state.lock().unwrap();
state
.get_processor(ProcessorType::Pose)
.complete(&format!("{} frames", pose_result.frames.len()));
state.stop();
}
if let Some(ref mut ui) = *ui.lock().unwrap() {
let _ = ui.render();
}
Ok(())
}
async fn process_story_module(
story_path: &Path,
video_path: &str,
uuid: &str,
progress_state: &Arc<Mutex<ProgressState>>,
ui: &Arc<Mutex<Option<ProgressUi>>>,
) -> anyhow::Result<()> {
{
let mut state = progress_state.lock().unwrap();
state.get_processor(ProcessorType::Story).start(1);
}
let story_result = momentry_core::core::processor::process_story(
video_path,
story_path.to_str().unwrap(),
Some(uuid),
)
.await?;
let story_json = serde_json::to_string_pretty(&story_result)?;
std::fs::write(story_path, &story_json)?;
let output_dir = OutputDir::new();
let _ = output_dir.backup_file(uuid, "story.json");
println!(
" ✓ Story saved: {} parent chunks, {} child chunks",
story_result.stats.total_parent_chunks, story_result.stats.total_child_chunks
);
{
let mut state = progress_state.lock().unwrap();
state.get_processor(ProcessorType::Story).complete(&format!(
"{} parents, {} children",
story_result.stats.total_parent_chunks, story_result.stats.total_child_chunks
));
}
if let Some(ref mut ui) = *ui.lock().unwrap() {
let _ = ui.render();
}
Ok(())
}
async fn process_caption_module(
caption_path: &Path,
video_path: &str,
uuid: &str,
progress_state: &Arc<Mutex<ProgressState>>,
ui: &Arc<Mutex<Option<ProgressUi>>>,
) -> anyhow::Result<()> {
{
let mut state = progress_state.lock().unwrap();
state.get_processor(ProcessorType::Caption).start(1);
}
let caption_result = momentry_core::core::processor::process_caption(
video_path,
caption_path.to_str().unwrap(),
Some(uuid),
)
.await?;
let caption_json = serde_json::to_string_pretty(&caption_result)?;
std::fs::write(caption_path, &caption_json)?;
let output_dir = OutputDir::new();
let _ = output_dir.backup_file(uuid, "caption.json");
println!(" ✓ Caption saved: {} frames", caption_result.total_frames);
{
let mut state = progress_state.lock().unwrap();
state
.get_processor(ProcessorType::Caption)
.complete(&format!("{} frames", caption_result.total_frames));
}
if let Some(ref mut ui) = *ui.lock().unwrap() {
let _ = ui.render();
}
Ok(())
}
#[derive(Parser)]
#[command(name = "momentry")]
#[command(about = "Digital asset management system with video analysis and RAG")]
#[command(version = env!("BUILD_VERSION"))]
struct Cli {
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
/// Register a video file
Register {
/// Video file path or URL
path: String,
},
/// Process video (generate all JSON files)
Process {
/// UUID or path
target: String,
/// Modules to process (comma separated: asr,cut,asrx,yolo,ocr,face,pose,story,caption)
/// If not specified, processes all modules
#[arg(short, long, value_delimiter = ',')]
modules: Option<Vec<String>>,
/// Modules to process via cloud (comma separated)
/// Example: --cloud asr,yolo
#[arg(long, value_delimiter = ',')]
cloud: Option<Vec<String>>,
/// Force reprocess even if JSON exists (skip completeness check)
#[arg(long, default_value = "false")]
force: bool,
/// Resume from last checkpoint if processing was interrupted
#[arg(long, default_value = "false")]
resume: bool,
},
/// Generate chunks and store in database
Chunk {
/// UUID
uuid: String,
},
/// Generate story for cut scenes
Story {
/// UUID
uuid: String,
},
/// Vectorize chunks
Vectorize {
/// UUID (or 'all' for all)
uuid: String,
},
/// Play video with overlays
Play {
/// Video path or UUID
target: String,
},
/// Start watching directories
Watch {
/// Directories to watch (comma separated)
directories: Option<String>,
},
/// Check system resources and recommend processing strategy
System {
/// Show detailed GPU info (NVIDIA/MPS)
#[arg(long)]
gpu: bool,
},
/// Start API server
Server {
/// Host
#[arg(long, default_value = "127.0.0.1")]
host: String,
/// Port (defaults to MOMENTRY_SERVER_PORT env var, or3002 for production)
#[arg(long)]
port: Option<u16>,
},
/// Start job worker
Worker {
/// Max concurrent processors
#[arg(long)]
max_concurrent: Option<usize>,
/// Poll interval in seconds
#[arg(long)]
poll_interval: Option<u64>,
/// Batch size
#[arg(long)]
batch_size: Option<i32>,
},
/// Query using RAG
Query {
/// Query text
query: String,
},
/// Lookup UUID from path
Lookup {
/// File path
path: String,
},
/// Resolve path from UUID
Resolve {
/// UUID
uuid: String,
},
/// Generate thumbnails for videos
Thumbnails {
/// UUID (optional, generates for all if not specified)
uuid: Option<String>,
/// Number of thumbnails per video
#[arg(short, long, default_value = "6")]
count: u32,
},
/// Show storage status report
Status {
/// UUID (optional, shows all if not specified)
uuid: Option<String>,
},
/// Manage output backups
Backup {
/// Action: list, cleanup
action: String,
/// Days to keep (for cleanup)
days: Option<u32>,
},
/// Manage API keys
ApiKey {
/// Action: create, list, validate, revoke, rotate, stats
#[arg(value_enum)]
action: ApiKeyAction,
/// Key name (for create)
name: Option<String>,
/// Key type (system, user, service, integration, emergency)
#[arg(long)]
key_type: Option<String>,
/// TTL in days (for create)
#[arg(long)]
ttl: Option<i64>,
/// API key to validate/revoke
#[arg(long)]
key: Option<String>,
},
/// Manage Gitea API tokens
Gitea {
/// Action: create, list, delete, verify
#[arg(value_enum)]
action: GiteaAction,
/// Gitea username
#[arg(long)]
username: Option<String>,
/// Gitea password (for create/list/delete)
#[arg(long)]
password: Option<String>,
/// Token name (for create/delete)
#[arg(long)]
token_name: Option<String>,
/// Token scopes (comma separated: read:repository,write:issue)
#[arg(long)]
scopes: Option<String>,
},
/// Manage n8n API keys
N8n {
/// Action: create, list, delete, verify
#[arg(value_enum)]
action: N8nAction,
/// n8n API key (for create/list/delete)
#[arg(long)]
api_key: Option<String>,
/// API key label (for create/delete)
#[arg(long)]
label: Option<String>,
/// Expiration days (for create)
#[arg(long)]
expires_in_days: Option<i64>,
},
}
#[derive(clap::ValueEnum, Clone)]
enum ApiKeyAction {
Create,
List,
Validate,
Revoke,
Rotate,
Stats,
}
#[derive(clap::ValueEnum, Clone)]
enum GiteaAction {
Create,
List,
Delete,
Verify,
}
#[derive(clap::ValueEnum, Clone)]
enum N8nAction {
Create,
List,
Delete,
Verify,
}
#[tokio::main]
async fn main() -> Result<()> {
dotenv::dotenv().ok();
tracing_subscriber::fmt::init();
let cli = Cli::parse();
match cli.command {
Commands::Register { path } => {
println!("Registering: {}", path);
// Compute UUID
let uuid = momentry_core::uuid::compute_uuid_from_path(&path);
println!("UUID: {}", uuid);
// Run ffprobe
let probe_result = momentry_core::core::probe::probe_video(&path)?;
println!("\nVideo probe results:");
let duration = probe_result
.format
.duration
.as_ref()
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(0.0);
println!(" Duration: {}s", duration);
if let Some(size) = &probe_result.format.size {
println!(" Size: {}", size);
}
let mut width = 0u32;
let mut height = 0u32;
let mut fps = 0.0;
for stream in &probe_result.streams {
if stream.codec_type.as_deref() == Some("video") {
width = stream.width.unwrap_or(0);
height = stream.height.unwrap_or(0);
if let Some(fps_str) = &stream.r_frame_rate {
if let Some((num, den)) = fps_str.split_once('/') {
if let (Ok(n), Ok(d)) = (num.parse::<f64>(), den.parse::<f64>()) {
if d > 0.0 {
fps = n / d;
}
}
}
}
println!(" Video: {}x{}", width, height);
if let Some(fps) = &stream.r_frame_rate {
println!(" FPS: {}", fps);
}
}
if stream.codec_type.as_deref() == Some("audio") {
println!(" Audio: {} channels", stream.channels.unwrap_or(0));
if let Some(sr) = &stream.sample_rate {
println!(" Sample Rate: {}", sr);
}
}
}
// Save probe JSON to file
let file_manager = momentry_core::FileManager::new(std::path::PathBuf::from("."));
let json_str = serde_json::to_string_pretty(&probe_result)?;
let json_path = file_manager.save_json(&uuid, "probe", &json_str)?;
println!("\nProbe JSON saved to: {:?}", json_path);
// Store in PostgreSQL
println!("\nStoring in database...");
let db = PostgresDb::init().await?;
let file_path = Path::new(&path)
.canonicalize()
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|_| path.clone());
let file_name = Path::new(&path)
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_default();
let record = VideoRecord {
id: 0,
uuid: uuid.clone(),
file_path,
file_name,
duration,
width,
height,
fps,
probe_json: Some(json_str),
storage: Default::default(),
status: VideoStatus::Pending,
user_id: None,
job_id: None,
created_at: String::new(),
registration_time: None,
};
let video_id = db.register_video(&record).await?;
println!("Video registered with ID: {}", video_id);
Ok(())
}
Commands::Process {
target,
modules,
cloud,
force,
resume,
} => {
println!("Processing: {}", target);
println!(" force: {}, resume: {}", force, resume);
// Parse selected modules
let selected_modules: Option<Vec<ProcessorType>> = modules.as_ref().map(|m| {
m.iter()
.filter_map(|name| {
let name_lower = name.to_lowercase();
match name_lower.as_str() {
"asr" => Some(ProcessorType::Asr),
"cut" => Some(ProcessorType::Cut),
"asrx" => Some(ProcessorType::Asrx),
"yolo" => Some(ProcessorType::Yolo),
"ocr" => Some(ProcessorType::Ocr),
"face" => Some(ProcessorType::Face),
"pose" => Some(ProcessorType::Pose),
"story" => Some(ProcessorType::Story),
"caption" => Some(ProcessorType::Caption),
_ => {
eprintln!("Unknown module: {}", name);
None
}
}
})
.collect()
});
// Parse cloud modules
let cloud_modules: Vec<ProcessorType> = cloud
.as_ref()
.map(|c| {
c.iter()
.filter_map(|name| {
let name_lower = name.to_lowercase();
match name_lower.as_str() {
"asr" => Some(ProcessorType::Asr),
"cut" => Some(ProcessorType::Cut),
"asrx" => Some(ProcessorType::Asrx),
"yolo" => Some(ProcessorType::Yolo),
"ocr" => Some(ProcessorType::Ocr),
"face" => Some(ProcessorType::Face),
"pose" => Some(ProcessorType::Pose),
"story" => Some(ProcessorType::Story),
"caption" => Some(ProcessorType::Caption),
_ => {
eprintln!("Unknown cloud module: {}", name);
None
}
}
})
.collect()
})
.unwrap_or_default();
if let Some(ref mods) = selected_modules {
println!(
" Modules: {}",
mods.iter()
.map(|m| m.to_string())
.collect::<Vec<_>>()
.join(", ")
);
} else {
println!(" Modules: ALL");
}
if !cloud_modules.is_empty() {
println!(
" Cloud: {}",
cloud_modules
.iter()
.map(|m| m.to_string())
.collect::<Vec<_>>()
.join(", ")
);
}
let processing_mode = if force {
"FORCE (reprocess all)"
} else if resume {
"RESUME (continue from checkpoint)"
} else {
"SMART (skip complete, resume partial)"
};
println!(" Mode: {}", processing_mode);
// Compute UUID if path is given
let uuid = if target.len() == 16 && !target.contains('/') {
target.clone()
} else {
momentry_core::uuid::compute_uuid_from_path(&target)
};
// Get video from database
let db = PostgresDb::init().await?;
let video = db
.get_video_by_uuid(&uuid)
.await?
.ok_or_else(|| anyhow::anyhow!("Video not found: {}", uuid))?;
let video_path = &video.file_path;
let video_name = video.file_name.clone();
let _file_manager = momentry_core::FileManager::new(std::path::PathBuf::from("."));
// Initialize output directory
let output_dir = OutputDir::new();
output_dir.ensure_dir()?;
println!("Output directory: {:?}", output_dir.get_base_path());
// Initialize progress UI
let progress_state = Arc::new(Mutex::new(ProgressState::new(&video_name)));
progress_state.lock().unwrap().start();
// Helper closure to check if a module should be processed
let should_process = |module: ProcessorType| -> bool {
selected_modules
.as_ref()
.map(|mods| mods.contains(&module))
.unwrap_or(true)
};
// Helper closure to check if a module should run in the cloud
let is_cloud = |module: ProcessorType| -> bool { cloud_modules.contains(&module) };
// Create UI and wrap in Arc for sharing with Redis subscriber
let ui = Arc::new(Mutex::new(ProgressUi::new(&video_name).ok()));
if let Some(ref mut ui) = *ui.lock().unwrap() {
let _ = ui.render();
}
// Spawn Redis subscriber for real-time progress updates
let redis_progress_state = progress_state.clone();
let redis_ui = ui.clone();
let redis_uuid = uuid.clone();
let redis_handle = tokio::spawn(async move {
if let Ok(redis_client) = momentry_core::core::db::RedisClient::new() {
loop {
if let Ok(mut pubsub) = redis_client.subscribe_progress(&redis_uuid).await {
let mut stream = pubsub.on_message();
while let Some(msg) = stream.next().await {
if let Ok(payload) = msg.get_payload::<String>() {
if let Ok(progress_msg) =
serde_json::from_str::<
momentry_core::core::db::ProgressMessage,
>(&payload)
{
let mut state = redis_progress_state.lock().unwrap();
state.update_from_redis(
&progress_msg.msg_type,
&progress_msg.processor,
progress_msg.data.current,
progress_msg.data.total,
progress_msg.data.message.as_deref(),
);
// Store progress in Redis Hash for HTTP API
let uuid = progress_msg.uuid.clone();
let processor = progress_msg.processor.clone();
let msg_type = progress_msg.msg_type.clone();
let current = progress_msg.data.current;
let total = progress_msg.data.total;
let message = progress_msg.data.message.clone();
tokio::spawn(async move {
if let Ok(redis_client) =
momentry_core::core::db::RedisClient::new()
{
if let Ok(mut conn) = redis_client.get_conn().await
{
let prefix = momentry_core::core::config::REDIS_KEY_PREFIX.as_str();
let key = format!(
"{}job:{}:processor:{}",
prefix, uuid, processor
);
let _: () = redis::cmd("HSET")
.arg(&key)
.arg("status")
.arg(&msg_type)
.query_async(&mut conn)
.await
.unwrap_or(());
if let Some(c) = current {
let _: () = redis::cmd("HSET")
.arg(&key)
.arg("current")
.arg(c)
.query_async(&mut conn)
.await
.unwrap_or(());
}
if let Some(t) = total {
let _: () = redis::cmd("HSET")
.arg(&key)
.arg("total")
.arg(t)
.query_async(&mut conn)
.await
.unwrap_or(());
}
if let Some(ref m) = message {
let _: () = redis::cmd("HSET")
.arg(&key)
.arg("message")
.arg(m)
.query_async(&mut conn)
.await
.unwrap_or(());
}
let _: () = redis::cmd("EXPIRE")
.arg(&key)
.arg(86400i64)
.query_async(&mut conn)
.await
.unwrap_or(());
}
}
});
// Trigger UI render on progress update
if let Some(ref mut ui) = *redis_ui.lock().unwrap() {
let _ = ui.render();
}
}
}
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
});
// Process ASR (Automatic Speech Recognition)
if should_process(ProcessorType::Asr) {
let asr_path = output_dir.get_output_path(&uuid, "asr.json");
let decision = decide_processing(&asr_path, force, resume);
match decision {
ProcessingDecision::SkipComplete => {
println!("\nASR: ✓ Already complete, skipping");
}
ProcessingDecision::ForceReprocess => {
println!("\nASR: ⟳ Force reprocessing from scratch...");
std::fs::remove_file(&asr_path).ok();
if is_cloud(ProcessorType::Asr) {
println!(" [Cloud processing not implemented yet - run locally]");
} else {
process_asr_module(&asr_path, video_path, &uuid, &progress_state, &ui)
.await?;
}
}
ProcessingDecision::ResumePartial => {
println!("\nASR: ↻ Resuming from checkpoint...");
if is_cloud(ProcessorType::Asr) {
println!(" [Cloud processing not implemented yet - run locally]");
} else {
process_asr_module(&asr_path, video_path, &uuid, &progress_state, &ui)
.await?;
}
}
ProcessingDecision::Process => {
if is_cloud(ProcessorType::Asr) {
println!("\nASR: ☁️ Running via cloud...");
println!(" [Cloud processing not implemented yet - run locally]");
} else {
println!("\nASR: ⚙️ Processing...");
process_asr_module(&asr_path, video_path, &uuid, &progress_state, &ui)
.await?;
}
}
}
}
// Update storage status
db.update_storage_status(&uuid, "fs_json", true).await?;
// Process CUT (scene detection)
if should_process(ProcessorType::Cut) {
let cut_path = output_dir.get_output_path(&uuid, "cut.json");
let decision = decide_processing(&cut_path, force, resume);
match decision {
ProcessingDecision::SkipComplete => {
println!("\nCUT: ✓ Already complete, skipping");
}
ProcessingDecision::ForceReprocess => {
println!("\nCUT: ⟳ Force reprocessing from scratch...");
std::fs::remove_file(&cut_path).ok();
if is_cloud(ProcessorType::Cut) {
println!(" [Cloud processing not implemented yet - run locally]");
} else {
process_cut_module(&cut_path, video_path, &uuid, &progress_state, &ui)
.await?;
}
}
ProcessingDecision::ResumePartial => {
println!("\nCUT: ↻ Resuming from checkpoint...");
if is_cloud(ProcessorType::Cut) {
println!(" [Cloud processing not implemented yet - run locally]");
} else {
process_cut_module(&cut_path, video_path, &uuid, &progress_state, &ui)
.await?;
}
}
ProcessingDecision::Process => {
if is_cloud(ProcessorType::Cut) {
println!("\nCUT: ☁️ Running via cloud...");
println!(" [Cloud processing not implemented yet - run locally]");
} else {
println!("\nCUT: ⚙️ Processing...");
process_cut_module(&cut_path, video_path, &uuid, &progress_state, &ui)
.await?;
}
}
}
}
// Process ASRX (speaker diarization)
if should_process(ProcessorType::Asrx) {
let asrx_path = output_dir.get_output_path(&uuid, "asrx.json");
let decision = decide_processing(&asrx_path, force, resume);
match decision {
ProcessingDecision::SkipComplete => {
println!("\nASRX: ✓ Already complete, skipping");
}
ProcessingDecision::ForceReprocess => {
println!("\nASRX: ⟳ Force reprocessing from scratch...");
std::fs::remove_file(&asrx_path).ok();
if is_cloud(ProcessorType::Asrx) {
println!(" [Cloud processing not implemented yet - run locally]");
} else {
process_asrx_module(
&asrx_path,
video_path,
&uuid,
&progress_state,
&ui,
)
.await?;
}
}
ProcessingDecision::ResumePartial => {
println!("\nASRX: ↻ Resuming from checkpoint...");
if is_cloud(ProcessorType::Asrx) {
println!(" [Cloud processing not implemented yet - run locally]");
} else {
process_asrx_module(
&asrx_path,
video_path,
&uuid,
&progress_state,
&ui,
)
.await?;
}
}
ProcessingDecision::Process => {
if is_cloud(ProcessorType::Asrx) {
println!("\nASRX: ☁️ Running via cloud...");
println!(" [Cloud processing not implemented yet - run locally]");
} else {
println!("\nASRX: ⚙️ Processing...");
process_asrx_module(
&asrx_path,
video_path,
&uuid,
&progress_state,
&ui,
)
.await?;
}
}
}
}
// Process YOLO (object detection)
if should_process(ProcessorType::Yolo) {
let yolo_path = output_dir.get_output_path(&uuid, "yolo.json");
let decision = decide_processing(&yolo_path, force, resume);
match decision {
ProcessingDecision::SkipComplete => {
println!("\nYOLO: ✓ Already complete, skipping");
}
ProcessingDecision::ForceReprocess => {
println!("\nYOLO: ⟳ Force reprocessing from scratch...");
std::fs::remove_file(&yolo_path).ok();
if is_cloud(ProcessorType::Yolo) {
println!(" [Cloud processing not implemented yet - run locally]");
} else {
process_yolo_module(
&yolo_path,
video_path,
&uuid,
&progress_state,
&ui,
)
.await?;
}
}
ProcessingDecision::ResumePartial => {
println!("\nYOLO: ↻ Resuming from checkpoint...");
if is_cloud(ProcessorType::Yolo) {
println!(" [Cloud processing not implemented yet - run locally]");
} else {
process_yolo_module(
&yolo_path,
video_path,
&uuid,
&progress_state,
&ui,
)
.await?;
}
}
ProcessingDecision::Process => {
if is_cloud(ProcessorType::Yolo) {
println!("\nYOLO: ☁️ Running via cloud...");
println!(" [Cloud processing not implemented yet - run locally]");
} else {
println!("\nYOLO: ⚙️ Processing...");
process_yolo_module(
&yolo_path,
video_path,
&uuid,
&progress_state,
&ui,
)
.await?;
}
}
}
}
// Process OCR (text recognition)
if should_process(ProcessorType::Ocr) {
let ocr_path = output_dir.get_output_path(&uuid, "ocr.json");
let decision = decide_processing(&ocr_path, force, resume);
match decision {
ProcessingDecision::SkipComplete => {
println!("\nOCR: ✓ Already complete, skipping");
}
ProcessingDecision::ForceReprocess => {
println!("\nOCR: ⟳ Force reprocessing from scratch...");
std::fs::remove_file(&ocr_path).ok();
if is_cloud(ProcessorType::Ocr) {
println!(" [Cloud processing not implemented yet - run locally]");
} else {
process_ocr_module(&ocr_path, video_path, &uuid, &progress_state, &ui)
.await?;
}
}
ProcessingDecision::ResumePartial => {
println!("\nOCR: ↻ Resuming from checkpoint...");
if is_cloud(ProcessorType::Ocr) {
println!(" [Cloud processing not implemented yet - run locally]");
} else {
process_ocr_module(&ocr_path, video_path, &uuid, &progress_state, &ui)
.await?;
}
}
ProcessingDecision::Process => {
if is_cloud(ProcessorType::Ocr) {
println!("\nOCR: ☁️ Running via cloud...");
println!(" [Cloud processing not implemented yet - run locally]");
} else {
println!("\nOCR: ⚙️ Processing...");
process_ocr_module(&ocr_path, video_path, &uuid, &progress_state, &ui)
.await?;
}
}
}
}
// Process Face (face detection)
if should_process(ProcessorType::Face) {
let face_path = output_dir.get_output_path(&uuid, "face.json");
let decision = decide_processing(&face_path, force, resume);
match decision {
ProcessingDecision::SkipComplete => {
println!("\nFace: ✓ Already complete, skipping");
}
ProcessingDecision::ForceReprocess => {
println!("\nFace: ⟳ Force reprocessing from scratch...");
std::fs::remove_file(&face_path).ok();
if is_cloud(ProcessorType::Face) {
println!(" [Cloud processing not implemented yet - run locally]");
} else {
process_face_module(
&face_path,
video_path,
&uuid,
&progress_state,
&ui,
)
.await?;
}
}
ProcessingDecision::ResumePartial => {
println!("\nFace: ↻ Resuming from checkpoint...");
if is_cloud(ProcessorType::Face) {
println!(" [Cloud processing not implemented yet - run locally]");
} else {
process_face_module(
&face_path,
video_path,
&uuid,
&progress_state,
&ui,
)
.await?;
}
}
ProcessingDecision::Process => {
if is_cloud(ProcessorType::Face) {
println!("\nFace: ☁️ Running via cloud...");
println!(" [Cloud processing not implemented yet - run locally]");
} else {
println!("\nFace: ⚙️ Processing...");
process_face_module(
&face_path,
video_path,
&uuid,
&progress_state,
&ui,
)
.await?;
}
}
}
}
// Process Pose (pose estimation)
if should_process(ProcessorType::Pose) {
let pose_path = output_dir.get_output_path(&uuid, "pose.json");
let decision = decide_processing(&pose_path, force, resume);
match decision {
ProcessingDecision::SkipComplete => {
println!("\nPose: ✓ Already complete, skipping");
}
ProcessingDecision::ForceReprocess => {
println!("\nPose: ⟳ Force reprocessing from scratch...");
std::fs::remove_file(&pose_path).ok();
if is_cloud(ProcessorType::Pose) {
println!(" [Cloud processing not implemented yet - run locally]");
} else {
process_pose_module(
&pose_path,
video_path,
&uuid,
&progress_state,
&ui,
)
.await?;
}
}
ProcessingDecision::ResumePartial => {
println!("\nPose: ↻ Resuming from checkpoint...");
if is_cloud(ProcessorType::Pose) {
println!(" [Cloud processing not implemented yet - run locally]");
} else {
process_pose_module(
&pose_path,
video_path,
&uuid,
&progress_state,
&ui,
)
.await?;
}
}
ProcessingDecision::Process => {
if is_cloud(ProcessorType::Pose) {
println!("\nPose: ☁️ Running via cloud...");
println!(" [Cloud processing not implemented yet - run locally]");
} else {
println!("\nPose: ⚙️ Processing...");
process_pose_module(
&pose_path,
video_path,
&uuid,
&progress_state,
&ui,
)
.await?;
}
}
}
}
// Process Story (video narrative)
if should_process(ProcessorType::Story) {
let story_path = output_dir.get_output_path(&uuid, "story.json");
let decision = decide_processing(&story_path, force, resume);
match decision {
ProcessingDecision::SkipComplete => {
println!("\nStory: ✓ Already complete, skipping");
}
ProcessingDecision::ForceReprocess => {
println!("\nStory: ⟳ Force reprocessing from scratch...");
std::fs::remove_file(&story_path).ok();
if is_cloud(ProcessorType::Story) {
println!(" [Cloud processing not implemented yet - run locally]");
} else {
process_story_module(
&story_path,
video_path,
&uuid,
&progress_state,
&ui,
)
.await?;
}
}
ProcessingDecision::ResumePartial => {
println!("\nStory: ↻ Resuming from checkpoint...");
if is_cloud(ProcessorType::Story) {
println!(" [Cloud processing not implemented yet - run locally]");
} else {
process_story_module(
&story_path,
video_path,
&uuid,
&progress_state,
&ui,
)
.await?;
}
}
ProcessingDecision::Process => {
if is_cloud(ProcessorType::Story) {
println!("\nStory: ☁️ Running via cloud...");
println!(" [Cloud processing not implemented yet - run locally]");
} else {
println!("\nStory: ⚙️ Processing...");
process_story_module(
&story_path,
video_path,
&uuid,
&progress_state,
&ui,
)
.await?;
}
}
}
}
// Process Caption (image captions)
if should_process(ProcessorType::Caption) {
let caption_path = output_dir.get_output_path(&uuid, "caption.json");
let decision = decide_processing(&caption_path, force, resume);
match decision {
ProcessingDecision::SkipComplete => {
println!("\nCaption: ✓ Already complete, skipping");
}
ProcessingDecision::ForceReprocess => {
println!("\nCaption: ⟳ Force reprocessing from scratch...");
std::fs::remove_file(&caption_path).ok();
if is_cloud(ProcessorType::Caption) {
println!(" [Cloud processing not implemented yet - run locally]");
} else {
process_caption_module(
&caption_path,
video_path,
&uuid,
&progress_state,
&ui,
)
.await?;
}
}
ProcessingDecision::ResumePartial => {
println!("\nCaption: ↻ Resuming from checkpoint...");
if is_cloud(ProcessorType::Caption) {
println!(" [Cloud processing not implemented yet - run locally]");
} else {
process_caption_module(
&caption_path,
video_path,
&uuid,
&progress_state,
&ui,
)
.await?;
}
}
ProcessingDecision::Process => {
if is_cloud(ProcessorType::Caption) {
println!("\nCaption: ☁️ Running via cloud...");
println!(" [Cloud processing not implemented yet - run locally]");
} else {
println!("\nCaption: ⚙️ Processing...");
process_caption_module(
&caption_path,
video_path,
&uuid,
&progress_state,
&ui,
)
.await?;
}
}
}
}
// TODO: Store pre_chunks and frames to database
// Stop Redis subscriber
redis_handle.abort();
println!("\n✓ Process stage completed!");
if should_process(ProcessorType::Asr) {
let path = output_dir.get_output_path(&uuid, "asr.json");
println!(" - ASR JSON: {}", path.display());
}
if should_process(ProcessorType::Cut) {
let path = output_dir.get_output_path(&uuid, "cut.json");
println!(" - CUT JSON: {}", path.display());
}
if should_process(ProcessorType::Asrx) {
let path = output_dir.get_output_path(&uuid, "asrx.json");
println!(" - ASRX JSON: {}", path.display());
}
if should_process(ProcessorType::Yolo) {
let path = output_dir.get_output_path(&uuid, "yolo.json");
println!(" - YOLO JSON: {}", path.display());
}
if should_process(ProcessorType::Ocr) {
let path = output_dir.get_output_path(&uuid, "ocr.json");
println!(" - OCR JSON: {}", path.display());
}
if should_process(ProcessorType::Face) {
let path = output_dir.get_output_path(&uuid, "face.json");
println!(" - Face JSON: {}", path.display());
}
if should_process(ProcessorType::Pose) {
let path = output_dir.get_output_path(&uuid, "pose.json");
println!(" - Pose JSON: {}", path.display());
}
if should_process(ProcessorType::Story) {
let path = output_dir.get_output_path(&uuid, "story.json");
println!(" - Story JSON: {}", path.display());
}
if should_process(ProcessorType::Caption) {
let path = output_dir.get_output_path(&uuid, "caption.json");
println!(" - Caption JSON: {}", path.display());
}
Ok(())
}
Commands::Chunk { uuid } => {
println!("Chunking: {}", uuid);
let db = PostgresDb::init().await?;
let video = db
.get_video_by_uuid(&uuid)
.await?
.ok_or_else(|| anyhow::anyhow!("Video not found: {}", uuid))?;
let file_id = video.id;
let fps = video.fps;
// ========== Read all JSON files ==========
// Read ASR JSON
let asr_path = format!("{}.asr.json", uuid);
let asr_json = std::fs::read_to_string(&asr_path)
.context("ASR file not found. Run 'process' first.")?;
let asr_result: momentry_core::core::processor::asr::AsrResult =
serde_json::from_str(&asr_json)?;
println!("Loaded ASR: {} segments", asr_result.segments.len());
// Read CUT JSON
let cut_path = format!("{}.cut.json", uuid);
let cut_json = std::fs::read_to_string(&cut_path)
.context("CUT file not found. Run 'process' first.")?;
let cut_result: momentry_core::core::processor::cut::CutResult =
serde_json::from_str(&cut_json)?;
println!("Loaded CUT: {} scenes", cut_result.scenes.len());
// Read YOLO JSON (optional)
let yolo_path = format!("{}.yolo.json", uuid);
let yolo_result = match std::fs::read_to_string(&yolo_path) {
Ok(yolo_json) => match serde_json::from_str::<
momentry_core::core::processor::yolo::YoloResult,
>(&yolo_json)
{
Ok(result) => {
println!("Loaded YOLO: {} frames", result.frames.len());
result
}
Err(e) => {
println!("Warning: Failed to parse YOLO JSON: {}. Skipping YOLO.", e);
momentry_core::core::processor::yolo::YoloResult {
frame_count: 0,
fps: 0.0,
frames: vec![],
}
}
},
Err(_) => {
println!("Warning: YOLO file not found. Skipping YOLO.");
momentry_core::core::processor::yolo::YoloResult {
frame_count: 0,
fps: 0.0,
frames: vec![],
}
}
};
// Read OCR JSON (optional)
let ocr_path = format!("{}.ocr.json", uuid);
let ocr_result = match std::fs::read_to_string(&ocr_path) {
Ok(ocr_json) => match serde_json::from_str::<
momentry_core::core::processor::ocr::OcrResult,
>(&ocr_json)
{
Ok(result) => {
println!("Loaded OCR: {} frames", result.frames.len());
result
}
Err(e) => {
println!("Warning: Failed to parse OCR JSON: {}. Skipping OCR.", e);
momentry_core::core::processor::ocr::OcrResult {
frame_count: 0,
fps: 0.0,
frames: vec![],
}
}
},
Err(_) => {
println!("Warning: OCR file not found. Skipping OCR.");
momentry_core::core::processor::ocr::OcrResult {
frame_count: 0,
fps: 0.0,
frames: vec![],
}
}
};
// Read Face JSON (optional)
let face_path = format!("{}.face.json", uuid);
let face_result = match std::fs::read_to_string(&face_path) {
Ok(face_json) => match serde_json::from_str::<
momentry_core::core::processor::face::FaceResult,
>(&face_json)
{
Ok(result) => {
println!("Loaded Face: {} frames", result.frames.len());
result
}
Err(e) => {
println!("Warning: Failed to parse Face JSON: {}. Skipping Face.", e);
momentry_core::core::processor::face::FaceResult {
frame_count: 0,
fps: 0.0,
frames: vec![],
}
}
},
Err(_) => {
println!("Warning: Face file not found. Skipping Face.");
momentry_core::core::processor::face::FaceResult {
frame_count: 0,
fps: 0.0,
frames: vec![],
}
}
};
// Read Pose JSON (optional)
let pose_path = format!("{}.pose.json", uuid);
let pose_result = match std::fs::read_to_string(&pose_path) {
Ok(pose_json) => match serde_json::from_str::<
momentry_core::core::processor::pose::PoseResult,
>(&pose_json)
{
Ok(result) => {
println!("Loaded Pose: {} frames", result.frames.len());
result
}
Err(e) => {
println!("Warning: Failed to parse Pose JSON: {}. Skipping Pose.", e);
momentry_core::core::processor::pose::PoseResult {
frame_count: 0,
fps: 0.0,
frames: vec![],
}
}
},
Err(_) => {
println!("Warning: Pose file not found. Skipping Pose.");
momentry_core::core::processor::pose::PoseResult {
frame_count: 0,
fps: 0.0,
frames: vec![],
}
}
};
// Read ASRX JSON (optional)
let asrx_path = format!("{}.asrx.json", uuid);
let asrx_result = match std::fs::read_to_string(&asrx_path) {
Ok(asrx_json) => match serde_json::from_str::<
momentry_core::core::processor::asrx::AsrxResult,
>(&asrx_json)
{
Ok(result) => {
println!("Loaded ASRX: {} segments", result.segments.len());
result
}
Err(e) => {
println!("Warning: Failed to parse ASRX JSON: {}. Skipping ASRX.", e);
momentry_core::core::processor::asrx::AsrxResult {
language: None,
segments: vec![],
}
}
},
Err(_) => {
println!("Warning: ASRX file not found. Skipping ASRX.");
momentry_core::core::processor::asrx::AsrxResult {
language: None,
segments: vec![],
}
}
};
// ========== Store pre_chunks (from ASR, CUT) ==========
println!("\nStoring pre_chunks...");
// Store ASR sentence pre_chunks
let mut asr_pre_chunk_ids = Vec::new();
for seg in asr_result.segments.iter() {
let start_frame = FrameTime::from_seconds(seg.start, fps).frames();
let end_frame = FrameTime::from_seconds(seg.end, fps).frames();
let pre_chunk = momentry_core::core::db::postgres_db::PreChunk {
id: 0,
file_id,
source_type: "asr".to_string(),
source_file: Some(asr_path.clone()),
chunk_type: "sentence".to_string(),
start_frame,
end_frame,
fps,
raw_json: serde_json::json!({"text": seg.text}),
text_content: Some(seg.text.clone()),
processed: false,
chunk_id: None,
created_at: String::new(),
};
let pre_chunk_id = db.store_pre_chunk(&pre_chunk).await?;
asr_pre_chunk_ids.push(pre_chunk_id);
}
// Store CUT scene pre_chunks
let mut cut_pre_chunk_ids = Vec::new();
for scene in &cut_result.scenes {
let pre_chunk = momentry_core::core::db::postgres_db::PreChunk {
id: 0,
file_id,
source_type: "cut".to_string(),
source_file: Some(cut_path.clone()),
chunk_type: "cut".to_string(),
start_frame: scene.start_frame as i64,
end_frame: scene.end_frame as i64,
fps,
raw_json: serde_json::json!({
"scene_number": scene.scene_number,
}),
text_content: None,
processed: false,
chunk_id: None,
created_at: String::new(),
};
let pre_chunk_id = db.store_pre_chunk(&pre_chunk).await?;
cut_pre_chunk_ids.push(pre_chunk_id);
}
// Store time-based pre_chunks (every 10 seconds)
let duration = video.duration;
let mut time_pre_chunk_ids = Vec::new();
let mut time_start = 0.0;
while time_start < duration {
let time_end = (time_start + 10.0).min(duration);
let start_frame = FrameTime::from_seconds(time_start, fps).frames();
let end_frame = FrameTime::from_seconds(time_end, fps).frames();
let pre_chunk = momentry_core::core::db::postgres_db::PreChunk {
id: 0,
file_id,
source_type: "time".to_string(),
source_file: None,
chunk_type: "time".to_string(),
start_frame,
end_frame,
fps,
raw_json: serde_json::json!({"interval": 10.0}),
text_content: None,
processed: false,
chunk_id: None,
created_at: String::new(),
};
let pre_chunk_id = db.store_pre_chunk(&pre_chunk).await?;
time_pre_chunk_ids.push(pre_chunk_id);
time_start = time_end;
}
println!(
"Stored pre_chunks: {} asr + {} cut + {} time",
asr_result.segments.len(),
cut_result.scenes.len(),
time_pre_chunk_ids.len()
);
// ========== Store frames (from YOLO, OCR, Face) ==========
println!("\nStoring frames...");
// Group YOLO, OCR, Face results by frame_number
let mut frame_data: std::collections::HashMap<
u64,
momentry_core::core::processor::yolo::YoloFrame,
> = std::collections::HashMap::new();
for frame in &yolo_result.frames {
frame_data.insert(frame.frame, frame.clone());
}
let mut ocr_by_frame: std::collections::HashMap<
u64,
momentry_core::core::processor::ocr::OcrFrame,
> = std::collections::HashMap::new();
for frame in &ocr_result.frames {
ocr_by_frame.insert(frame.frame, frame.clone());
}
let mut face_by_frame: std::collections::HashMap<
u64,
momentry_core::core::processor::face::FaceFrame,
> = std::collections::HashMap::new();
for frame in &face_result.frames {
face_by_frame.insert(frame.frame, frame.clone());
}
let mut pose_by_frame: std::collections::HashMap<
u64,
momentry_core::core::processor::pose::PoseFrame,
> = std::collections::HashMap::new();
for frame in &pose_result.frames {
pose_by_frame.insert(frame.frame, frame.clone());
}
// Store frames (merge data from YOLO, OCR, Face, Pose)
let mut all_frames: Vec<u64> = frame_data
.keys()
.cloned()
.chain(ocr_by_frame.keys().cloned())
.chain(face_by_frame.keys().cloned())
.chain(pose_by_frame.keys().cloned())
.collect();
all_frames.sort();
all_frames.dedup();
for frame_num in &all_frames {
let timestamp = (*frame_num as f64) / fps;
let yolo_frame = frame_data.get(frame_num);
let ocr_frame = ocr_by_frame.get(frame_num);
let face_frame = face_by_frame.get(frame_num);
let pose_frame = pose_by_frame.get(frame_num);
let frame = momentry_core::core::db::postgres_db::Frame {
id: 0,
file_id,
frame_number: *frame_num as i64,
timestamp,
fps,
yolo_objects: yolo_frame.map(|f| serde_json::json!(&f.objects)),
ocr_results: ocr_frame.map(|f| serde_json::json!(&f.texts)),
face_results: face_frame.map(|f| serde_json::json!(&f.faces)),
pose_results: pose_frame.map(|f| serde_json::json!(&f.persons)),
frame_path: None,
created_at: String::new(),
};
db.store_frame(&frame).await?;
}
println!("Stored {} frames", all_frames.len());
// ========== Create chunks ==========
println!("\nCreating chunks...");
// Rule 1: Direct conversion (sentence pre_chunk -> sentence chunk)
// Merge ASRX speaker_id by time overlap
let mut sentence_chunks = Vec::new();
for (i, seg) in asr_result.segments.iter().enumerate() {
let pre_chunk_id = asr_pre_chunk_ids.get(i).copied().unwrap_or(0);
// Find matching ASRX segment by time overlap
let speaker_id = asrx_result
.segments
.iter()
.find(|ax| {
// Overlap: ASRX segment overlaps with ASR segment
ax.start <= seg.end && ax.end >= seg.start
})
.and_then(|ax| ax.speaker_id.clone());
let content = if let Some(ref sid) = speaker_id {
serde_json::json!({
"text": seg.text,
"speaker_id": sid,
})
} else {
serde_json::json!({
"text": seg.text,
})
};
let mut chunk = Chunk::from_seconds(
file_id as i32,
uuid.clone(),
i as u32,
ChunkType::Sentence,
ChunkRule::Rule1,
seg.start,
seg.end,
fps,
content,
)
.with_text_content(seg.text.clone())
.with_pre_chunk_ids(vec![pre_chunk_id as i32]);
// Add ASRX metadata if available
if speaker_id.is_some() {
chunk = chunk.with_metadata(serde_json::json!({
"language": asr_result.language,
"language_probability": asr_result.language_probability,
"speaker_matched": true,
}));
}
sentence_chunks.push(chunk);
}
if !asrx_result.segments.is_empty() {
let matched = sentence_chunks
.iter()
.filter(|c| {
c.content
.get("speaker_id")
.and_then(|v| v.as_str())
.is_some()
})
.count();
println!(
" ASRX merge: {}/{} sentence chunks matched to speakers",
matched,
sentence_chunks.len()
);
}
// Rule 1: CUT chunks
let mut cut_chunks = Vec::new();
for (i, scene) in cut_result.scenes.iter().enumerate() {
let pre_chunk_id = cut_pre_chunk_ids.get(i).copied().unwrap_or(0);
let chunk = Chunk::from_seconds(
file_id as i32,
uuid.clone(),
i as u32,
ChunkType::Cut,
ChunkRule::Rule1,
scene.start_time,
scene.end_time,
fps,
serde_json::json!({
"scene_number": scene.scene_number,
}),
)
.with_pre_chunk_ids(vec![pre_chunk_id as i32]);
cut_chunks.push(chunk);
}
// Rule 1: Time-based chunks
let splitter = momentry_core::core::chunk::ChunkSplitter::new(10.0);
let mut time_chunks = Vec::new();
let time_chunk_list = splitter.split_time_based(&uuid, video.duration);
for (i, tc) in time_chunk_list.iter().enumerate() {
let pre_chunk_id = time_pre_chunk_ids.get(i).copied().unwrap_or(0);
let chunk = Chunk::new(
file_id as i32,
uuid.clone(),
i as u32,
ChunkType::TimeBased,
ChunkRule::Rule1,
tc.start_frame,
tc.end_frame,
fps,
serde_json::json!({"interval": 10.0}),
)
.with_pre_chunk_ids(vec![pre_chunk_id as i32]);
time_chunks.push(chunk);
}
// Store chunks
println!(
"Storing {} sentence chunks (rule_1)...",
sentence_chunks.len()
);
for chunk in &sentence_chunks {
db.store_chunk(chunk).await?;
}
println!("Storing {} cut chunks (rule_1)...", cut_chunks.len());
for chunk in &cut_chunks {
db.store_chunk(chunk).await?;
}
println!(
"Storing {} time-based chunks (rule_1)...",
time_chunks.len()
);
for chunk in &time_chunks {
db.store_chunk(chunk).await?;
}
let total_chunks = sentence_chunks.len() + cut_chunks.len() + time_chunks.len();
// Update storage status
db.update_storage_status(&uuid, "psql_chunk", true).await?;
println!("\n✓ Chunk stage completed!");
println!(
" - pre_chunks: {} (asr + cut + time)",
asr_result.segments.len() + cut_result.scenes.len() + time_pre_chunk_ids.len()
);
println!(" - frames: {}", all_frames.len());
println!(" - chunks: {} (sentence + cut + time_based)", total_chunks);
Ok(())
}
Commands::Story { uuid } => {
println!("Generating story for: {}", uuid);
let db = PostgresDb::init().await?;
let video = db
.get_video_by_uuid(&uuid)
.await?
.ok_or_else(|| anyhow::anyhow!("Video not found: {}", uuid))?;
let file_id = video.id;
let _fps = video.fps;
let duration = video.duration;
// Get all chunks
let all_chunks = db.get_chunks_by_uuid(&uuid).await?;
// Try cut chunks first, fall back to sentence chunks
let mut story_chunks: Vec<&Chunk> = all_chunks
.iter()
.filter(|c| c.chunk_type == ChunkType::Cut)
.collect();
let story_type = if story_chunks.is_empty() {
// Fall back to sentence chunks
story_chunks = all_chunks
.iter()
.filter(|c| c.chunk_type == ChunkType::Sentence && c.text_content.is_some())
.collect();
"sentence"
} else {
"cut"
};
if story_chunks.is_empty() {
println!("No story chunks found. Run 'chunk' command first.");
return Ok(());
}
println!("Found {} {} scenes", story_chunks.len(), story_type);
// Generate story for each scene
for (i, story_chunk) in story_chunks.iter().enumerate() {
println!("\n=== Scene {} ===", i + 1);
println!(
"Time: {:.2}s - {:.2}s",
story_chunk.start_time().seconds(),
story_chunk.end_time().seconds()
);
// Get context: expand time range by 5 seconds before and after
let context_start = (story_chunk.start_time().seconds() - 5.0).max(0.0);
let context_end = (story_chunk.end_time().seconds() + 5.0).min(duration);
// Get chunks in context range (sentence chunks with ASR text)
let context_chunks = db
.get_chunks_by_time_range(file_id, context_start, context_end)
.await?;
// Get frames in context range
let context_frames = db
.get_frames_by_time_range(file_id, context_start, context_end)
.await?;
// Build story
let mut story = String::new();
story.push_str(&format!(
"Scene {} ({:.1}s - {:.1}s)\n\n",
i + 1,
story_chunk.start_time().seconds(),
story_chunk.end_time().seconds()
));
// Add audio/text content
let sentence_chunks: Vec<&Chunk> = context_chunks
.iter()
.filter(|c| c.chunk_type == ChunkType::Sentence)
.collect();
if !sentence_chunks.is_empty() {
story.push_str("【Speech】\n");
for sc in &sentence_chunks {
if let Some(text) = &sc.text_content {
story.push_str(&format!(" - {}\n", text));
}
}
story.push('\n');
}
// Aggregate YOLO objects
let mut all_objects: std::collections::HashMap<String, u32> =
std::collections::HashMap::new();
for frame in &context_frames {
if let Some(objects) = &frame.yolo_objects {
if let Some(arr) = objects.as_array() {
for obj in arr {
if let Some(class_name) =
obj.get("class_name").and_then(|v| v.as_str())
{
*all_objects.entry(class_name.to_string()).or_insert(0) += 1;
}
}
}
}
}
if !all_objects.is_empty() {
story.push_str("【Objects】\n");
let mut sorted_objects: Vec<_> = all_objects.iter().collect();
sorted_objects.sort_by(|a, b| b.1.cmp(a.1));
for (obj, count) in sorted_objects.iter().take(10) {
story.push_str(&format!(" - {} ({} frames)\n", obj, count));
}
story.push('\n');
}
// Aggregate OCR text
let mut all_texts: Vec<String> = Vec::new();
for frame in &context_frames {
if let Some(texts) = &frame.ocr_results {
if let Some(arr) = texts.as_array() {
for txt in arr {
if let Some(text) = txt.get("text").and_then(|v| v.as_str()) {
if !text.is_empty() && text.len() > 2 {
all_texts.push(text.to_string());
}
}
}
}
}
}
if !all_texts.is_empty() {
story.push_str("【Text in video】\n");
for txt in all_texts.iter().take(10) {
story.push_str(&format!(" - {}\n", txt));
}
story.push('\n');
}
// Aggregate faces
let mut face_count = 0;
for frame in &context_frames {
if let Some(faces) = &frame.face_results {
if let Some(arr) = faces.as_array() {
face_count += arr.len();
}
}
}
if face_count > 0 {
story.push_str(&format!(
"【Faces】\n - {} face(s) detected\n\n",
face_count
));
}
println!("{}", story);
}
Ok(())
}
Commands::Vectorize { uuid } => {
println!("Vectorizing: {}", uuid);
let pg = PostgresDb::init()
.await
.context("Failed to init PostgreSQL")?;
let qdrant = QdrantDb::init().await.context("Failed to init Qdrant")?;
let embedder = Embedder::new("nomic-embed-text-v2-moe:latest".to_string());
let mut stored_count = 0usize;
// Get list of videos to process
let videos_to_process = if uuid == "all" {
// Get all videos
let videos = pg.list_videos(10000, 0).await?.0;
videos.into_iter().map(|v| v.uuid).collect::<Vec<_>>()
} else {
// Process single video
vec![uuid.clone()]
};
for target in &videos_to_process {
println!("\n=== Processing video: {} ===", target);
let chunks = pg.get_chunks_by_uuid(target.as_str()).await?;
let sentence_chunks: Vec<_> = chunks
.into_iter()
.filter(|c| c.chunk_type == ChunkType::Sentence)
.collect();
println!(
"Found {} sentence chunks for {}",
sentence_chunks.len(),
target
);
let mut video_stored_count = 0usize;
for chunk in sentence_chunks {
// Try to extract text from different possible locations
let text = chunk
.content
.get("data") // Try data->text structure first
.and_then(|data| data.get("text"))
.and_then(|v| v.as_str())
.or_else(|| chunk.content.get("text").and_then(|v| v.as_str())) // Try root text structure
.unwrap_or("");
if text.is_empty() {
eprintln!(
"Empty text for chunk {}, content: {:?}",
chunk.chunk_id, chunk.content
);
continue;
}
print!("Embedding chunk {}... ", chunk.chunk_id);
std::io::stdout().flush().unwrap();
match embedder.embed_document(text).await {
Ok(vector) => {
println!("embedding success ({} dims)", vector.len());
let vector_id = format!("{}_{}", chunk.uuid, chunk.chunk_id);
if let Err(e) =
pg.store_vector(&chunk.chunk_id, &vector, &chunk.uuid).await
{
eprintln!("store_vector error for {}: {}", chunk.chunk_id, e);
continue;
}
let qdrant_payload = VectorPayload {
uuid: chunk.uuid.clone(),
chunk_id: chunk.chunk_id.clone(),
chunk_type: "sentence".to_string(),
start_time: chunk.start_time().seconds(),
end_time: chunk.end_time().seconds(),
text: Some(text.to_string()),
};
if let Err(e) = qdrant
.upsert_vector(&chunk.chunk_id, &vector, qdrant_payload)
.await
{
eprintln!("upsert_vector error for {}: {}", chunk.chunk_id, e);
continue;
}
if let Err(e) = pg.update_vector_id(&chunk.chunk_id, &vector_id).await {
eprintln!("update_vector_id error for {}: {}", chunk.chunk_id, e);
continue;
}
stored_count += 1;
video_stored_count += 1;
println!(
"stored (video: {}, total: {})",
video_stored_count, stored_count
);
}
Err(e) => {
println!("embedding failed: {}", e);
}
}
}
// Only update storage status if vectors were actually stored for this video
if video_stored_count > 0 {
pg.update_storage_status(target.as_str(), "pvector_chunk", true)
.await?;
pg.update_storage_status(target.as_str(), "qvector_chunk", true)
.await?;
println!(
"✓ Vectorize stage completed for {}! ({} vectors stored)",
target, video_stored_count
);
} else {
println!(
"✗ Vectorize stage failed for {}! (0 vectors stored)",
target
);
}
}
println!("\n=== Vectorization Summary ===");
println!("Total vectors stored: {}", stored_count);
if uuid == "all" {
println!("✓ Vectorize stage completed for all videos!");
}
Ok(())
}
Commands::Play { target } => {
println!("Playing: {}", target);
// TODO: Implement play
Ok(())
}
Commands::Watch { directories } => {
println!("Starting watcher: {:?}", directories);
// TODO: Implement watch
Ok(())
}
Commands::System { gpu } => {
let resources = SystemResources::check();
println!("╔══════════════════════════════════════════════════════════════╗");
println!("║ System Resources Report ║");
println!("╠══════════════════════════════════════════════════════════════╣");
println!(
"║ CPU: {:.1}% idle ║",
resources.cpu_idle_percent
);
println!(
"║ Memory: {:.1}GB / {:.1}GB available ({:.0}% used) ║",
resources.memory_available_mb as f64 / 1024.0,
resources.memory_total_mb as f64 / 1024.0,
resources.memory_used_percent
);
if resources.gpu_available {
match resources.gpu_type {
GpuType::Nvidia => {
let util = resources.gpu_utilization.unwrap_or(0.0);
println!(
"║ GPU: NVIDIA - {:.0}% utilized ║",
util
);
}
GpuType::AppleMps => {
println!(
"║ GPU: Apple MPS (Metal) - available ║"
);
}
}
} else {
println!("║ GPU: None detected ║");
}
println!("╠══════════════════════════════════════════════════════════════╣");
if resources.can_parallel(4096) {
println!("║ Mode: PARALLEL - Can run multiple modules together ║");
println!(
"║ Recommended modules: {} ║",
resources.recommend_parallel_modules().join(", ")
);
} else {
println!("║ Mode: SEQUENTIAL - Low resources, run one at a time ║");
}
println!("╚══════════════════════════════════════════════════════════════╝");
if gpu {
println!("\n=== GPU Details ===");
let output = std::process::Command::new("system_profiler")
.args(["SPDisplaysDataType", "-detailLevel", "mini"])
.output();
if let Ok(o) = output {
println!("{}", String::from_utf8_lossy(&o.stdout));
}
}
Ok(())
}
Commands::Server { host, port } => {
let port = port.unwrap_or_else(|| *momentry_core::core::config::SERVER_PORT);
momentry_core::api::start_server(&host, port).await?;
Ok(())
}
Commands::Worker {
max_concurrent,
poll_interval,
batch_size,
} => {
use momentry_core::worker::{JobWorker, WorkerConfig};
let mut config = WorkerConfig::default();
if let Some(max) = max_concurrent {
config.max_concurrent = max;
}
if let Some(interval) = poll_interval {
config.poll_interval_secs = interval;
}
if let Some(batch) = batch_size {
config.batch_size = batch;
}
let db = PostgresDb::init().await?;
let redis = RedisClient::new()?;
let worker = JobWorker::new(
std::sync::Arc::new(db),
std::sync::Arc::new(redis),
config.clone(),
);
println!(
"Starting worker with max_concurrent={}, poll_interval={}s",
config.max_concurrent, config.poll_interval_secs
);
worker.run().await?;
Ok(())
}
Commands::Query { query } => {
println!("Query: {}", query);
// TODO: Implement query
Ok(())
}
Commands::Lookup { path } => {
let uuid = momentry_core::uuid::compute_uuid_from_path(&path);
println!("Path: {}", path);
println!("UUID: {}", uuid);
Ok(())
}
Commands::Resolve { uuid } => {
println!("Resolving UUID: {}", uuid);
// TODO: Look up path from UUID in database
println!("(Database lookup not implemented yet)");
Ok(())
}
Commands::Thumbnails { uuid, count } => {
let db = PostgresDb::init().await?;
let videos = if let Some(ref uuid) = uuid {
vec![db
.get_video_by_uuid(uuid)
.await?
.ok_or_else(|| anyhow::anyhow!("Video not found: {}", uuid))?]
} else {
db.list_videos(10000, 0).await?.0
};
let output_dir = std::path::PathBuf::from("thumbnails");
let extractor = momentry_core::ThumbnailExtractor::new(output_dir, count);
for video in videos {
println!(
"\nGenerating thumbnails for: {} ({})",
video.file_name, video.uuid
);
match extractor.get_or_create(&video.file_path, &video.uuid) {
Ok(result) => {
println!(" Generated {} thumbnails", result.count);
}
Err(e) => {
println!(" Error: {}", e);
}
}
}
println!("\nThumbnails generated successfully!");
Ok(())
}
Commands::Status { uuid } => {
let db = PostgresDb::init().await?;
let videos = if let Some(ref u) = uuid {
vec![db
.get_video_by_uuid(u)
.await?
.ok_or_else(|| anyhow::anyhow!("Video not found: {}", u))?]
} else {
db.list_videos(10000, 0).await?.0
};
println!("\n╔══════════════════════════════════════════════════════════════════════════════════╗");
println!(
"║ 📊 Storage Status Report ║"
);
println!("╠══════════════════════════════════════════════════════════════════════════════════╣");
println!(
"║ {:32} │ {:8} │ {:8} │ {:8} │ {:8} │ {:8} │ {:8} │ {:8} ║",
"Video", "FS", "FS", "PSQL", "PObj", "MObj", "PVec", "QVec"
);
println!(
"║ {:32} │ {:8} │ {:8} │ {:8} │ {:8} │ {:8} │ {:8} │ {:8} ║",
"", "Video", "JSON", "Chunk", "Chunk", "Chunk", "Chunk", "Chunk"
);
println!(
"╠{:33}╪{:9}╪{:9}╪{:9}╪{:9}╪{:9}╪{:9}╪{:9}╣",
str::repeat("─", 32),
str::repeat("─", 8),
str::repeat("─", 8),
str::repeat("─", 8),
str::repeat("─", 8),
str::repeat("─", 8),
str::repeat("─", 8),
str::repeat("─", 8)
);
for video in videos {
let (sentence_count, time_count) =
db.get_chunk_count(&video.uuid).await.unwrap_or((0, 0));
let vector_count = db.get_vector_count(&video.uuid).await.unwrap_or(0);
let total_chunks = sentence_count + time_count;
let psql_status = if total_chunks > 0 { "✓" } else { "-" };
let pvec_status = if vector_count > 0 && total_chunks > 0 {
if vector_count >= total_chunks {
"✓"
} else {
"◐"
}
} else {
"-"
};
let qvec_status = if video.storage.qvector_chunk {
"✓"
} else {
"-"
};
let file_name = if video.file_name.len() > 30 {
format!("...{}", &video.file_name[video.file_name.len() - 27..])
} else {
video.file_name
};
println!(
"║ {:32} │ {} │ {} │ {} │ - │ - │ {} │ {} ║",
file_name,
if video.storage.fs_video { "✓" } else { "✗" },
if video.storage.fs_json { "✓" } else { "-" },
psql_status,
pvec_status,
qvec_status
);
}
println!("╠══════════════════════════════════════════════════════════════════════════════════╣");
println!(
"║ Storage Types: ║"
);
println!(
"║ FS_Video - Video file on filesystem ║"
);
println!(
"║ FS_JSON - JSON files (probe, ASR, YOLO, etc.) ║"
);
println!(
"║ PSQL_Chunk - Chunks stored in PostgreSQL ║"
);
println!(
"║ PObject - Chunks as JSON objects in PostgreSQL (future) ║"
);
println!(
"║ MObject - Chunks as JSON objects in MongoDB (future) ║"
);
println!(
"║ PVector - Vectors in PostgreSQL ║"
);
println!(
"║ QVector - Vectors in Qdrant ║"
);
println!("╚══════════════════════════════════════════════════════════════════════════════════╝");
Ok(())
}
Commands::Backup { action, days } => {
let output_dir = OutputDir::new();
output_dir.ensure_dir()?;
println!("\n📁 Backup directory: {:?}", output_dir.get_backup_dir());
match action.as_str() {
"list" => {
let backups = output_dir.list_backups()?;
println!("\n📦 Available backups:");
if backups.is_empty() {
println!(" (no backups found)");
} else {
for backup in &backups {
println!(" - {}", backup.filename);
}
}
println!("\nTotal: {} backup(s)", backups.len());
}
"cleanup" => {
let days = days.unwrap_or(30);
let deleted = output_dir.cleanup_old_backups(days)?;
println!(
"\n🗑 Cleaned up {} old backup(s) (older than {} days)",
deleted, days
);
}
"verify" => {
println!("\n🔍 Verifying backups...");
let backups = output_dir.list_backups()?;
let mut verified = 0;
let mut failed = 0;
for backup in &backups {
match output_dir.verify_backup(&backup.path) {
Ok(true) => {
println!(" ✓ {}", backup.filename);
verified += 1;
}
Ok(false) => {
println!(" ✗ {} (missing checksum)", backup.filename);
failed += 1;
}
Err(e) => {
println!(" ✗ {} ({})", backup.filename, e);
failed += 1;
}
}
}
println!("\nVerified: {} OK, {} failed", verified, failed);
}
_ => {
println!("\n⚠ Unknown action: {}", action);
println!("Available actions: list, cleanup, verify");
}
}
Ok(())
}
Commands::ApiKey {
action,
name,
key_type,
ttl,
key,
} => {
let db = PostgresDb::init().await?;
let db_url = std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://accusys@localhost:5432/momentry".to_string());
let service = ApiKeyService::new(db_url);
match action {
ApiKeyAction::Create => {
let name = name.unwrap_or_else(|| "unnamed-key".to_string());
let kt = parse_key_type(key_type.as_deref());
let request = momentry_core::core::api_key::CreateApiKeyRequest {
name: name.clone(),
key_type: kt,
user_id: None,
service_name: None,
permissions: vec!["read".to_string(), "write".to_string()],
ttl_days: ttl,
};
match service.create_key(request) {
Ok(response) => {
let key_hash = service.hash_key(&response.key);
let key_type_str =
serde_json::to_string(&kt).unwrap_or_else(|_| "user".to_string());
let permissions = serde_json::json!(["read", "write"]);
let config = momentry_core::core::db::CreateApiKeyConfig::new(
&response.key_id,
&key_hash,
kt.prefix(),
&name,
&key_type_str,
)
.with_permissions(&permissions)
.with_expires_at(response.expires_at);
if let Err(e) = db.create_api_key(config).await {
eprintln!(
"\n⚠ Key generated but failed to store in database: {}",
e
);
}
println!("\n✅ API Key created successfully!");
println!("\n┌─────────────────────────────────────────────────────────────────────────────┐");
println!("│ ⚠️ IMPORTANT: Save this key now - it will not be shown again! │");
println!("└─────────────────────────────────────────────────────────────────────────────┘");
println!("\nKey ID: {}", response.key_id);
println!("API Key: {}", response.key);
println!("Expires: {}", response.expires_at);
if !response.warning.is_empty() {
println!("\n⚠ {}", response.warning);
}
}
Err(e) => {
eprintln!("\n❌ Failed to create API key: {}", e);
}
}
}
ApiKeyAction::List => match db.list_api_keys().await {
Ok(keys) => {
println!("\n📋 API Key List");
if keys.is_empty() {
println!(" (no API keys found)");
} else {
println!("\n┌────────────────────────────────────────────────────────────────────────────┐");
println!(
"│ {:8} │ {:20} │ {:12} │ {:8} │ {:15} │",
"Status", "Name", "Type", "Usage", "Last Used"
);
println!("├────────────────────────────────────────────────────────────────────────────┤");
for k in &keys {
let status = if k.status == "active" {
"✓ active"
} else {
&k.status
};
let last_used = k
.last_used_at
.map(|dt| dt.format("%Y-%m-%d %H:%M").to_string())
.unwrap_or_else(|| "never".to_string());
println!(
"│ {:8} │ {:20} │ {:12} │ {:8} │ {:15} │",
status,
if k.name.len() > 20 {
&k.name[..17]
} else {
&k.name
},
k.key_type,
k.usage_count,
last_used
);
}
println!("└────────────────────────────────────────────────────────────────────────────┘");
println!("\nTotal: {} key(s)", keys.len());
}
}
Err(e) => {
eprintln!("\n❌ Failed to list API keys: {}", e);
}
},
ApiKeyAction::Validate => {
let api_key =
key.ok_or_else(|| anyhow::anyhow!("--key required for validate"))?;
let key_hash = service.hash_key(&api_key);
match db.get_api_key_by_hash(&key_hash).await {
Ok(Some(record)) => {
if record.status == "active" {
db.update_api_key_usage(&record.key_id, None).await.ok();
println!("\n✅ API Key is valid");
println!("Key ID: {}", record.key_id);
println!("Name: {}", record.name);
println!("Type: {}", record.key_type);
println!("Usage: {} times", record.usage_count + 1);
if record.rotation_required {
println!(
"⚠️ Rotation required: {}",
record.rotation_reason.as_deref().unwrap_or("unknown")
);
}
} else {
println!("\n❌ API Key is {}", record.status);
}
}
Ok(None) => {
println!("\n❌ API Key is invalid or not found");
}
Err(e) => {
eprintln!("\n❌ Validation error: {}", e);
}
}
}
ApiKeyAction::Revoke => {
let key = key.ok_or_else(|| anyhow::anyhow!("--key required for revoke"))?;
let key_id = service.extract_key_id(&key);
match db.revoke_api_key(&key_id).await {
Ok(_) => {
println!("\n🔴 API Key {} revoked successfully", key_id);
}
Err(e) => {
eprintln!("\n❌ Failed to revoke API key: {}", e);
}
}
}
ApiKeyAction::Rotate => {
let key = key.ok_or_else(|| anyhow::anyhow!("--key required for rotate"))?;
let key_id = service.extract_key_id(&key);
let grace_period_end =
service.calculate_grace_period_end(parse_key_type(key_type.as_deref()));
match db
.require_api_key_rotation(
&key_id,
"manual rotation requested",
grace_period_end,
)
.await
{
Ok(_) => {
println!("\n🔄 Rotation requested for key: {}", key_id);
println!("Grace period ends: {}", grace_period_end);
}
Err(e) => {
eprintln!("\n❌ Rotation request failed: {}", e);
}
}
}
ApiKeyAction::Stats => {
match db.get_api_key_stats().await {
Ok(stats) => {
println!("\n📊 API Key Statistics");
println!("\n┌─────────────────────────────────────────┐");
println!("│ Total Keys: {:5} │", stats.total_keys);
println!(
"│ Active Keys: {:5} │",
stats.active_keys
);
println!(
"│ Expired Keys: {:5} │",
stats.expired_keys
);
println!(
"│ Rotation Required: {:4} │",
stats.rotation_required
);
println!(
"│ Anomalies (24h): {:5} │",
stats.anomalies_last_24h
);
println!("└─────────────────────────────────────────┘");
}
Err(e) => {
eprintln!("\n⚠ Failed to get stats: {}", e);
}
}
let config = service.get_config();
println!("\n┌─────────────────────────────────────────┐");
println!("│ Anomaly Detection Thresholds │");
println!("├─────────────────────────────────────────┤");
println!(
"│ Requests/minute: {:5} │",
config.requests_per_minute_threshold
);
println!(
"│ Requests/hour: {:5} │",
config.requests_per_hour_threshold
);
println!(
"│ Error rate: {:5.1}% │",
config.error_rate_threshold * 100.0
);
println!(
"│ Unique IPs/hour: {:5} │",
config.unique_ips_per_hour_threshold
);
println!(
"│ Lockout threshold: {:5} │",
config.lockout_threshold
);
println!("└─────────────────────────────────────────┘");
}
}
Ok(())
}
Commands::Gitea {
action,
username,
password,
token_name,
scopes,
} => {
use momentry_core::core::api_key::gitea::{
CreateGiteaTokenRequest, GiteaClient, GiteaScope,
};
let db = PostgresDb::init().await?;
let gitea = GiteaClient::new()?;
match action {
GiteaAction::Create => {
let username = username
.ok_or_else(|| anyhow::anyhow!("--username required for create"))?;
let password = password
.ok_or_else(|| anyhow::anyhow!("--password required for create"))?;
let token_name = token_name
.ok_or_else(|| anyhow::anyhow!("--token-name required for create"))?;
let scopes_vec: Vec<GiteaScope> = scopes
.map(|s| {
s.split(',')
.filter_map(|scope| scope.trim().parse::<GiteaScope>().ok())
.collect()
})
.unwrap_or_else(|| {
vec![GiteaScope::ReadRepository, GiteaScope::WriteRepository]
});
let request = CreateGiteaTokenRequest {
username: username.clone(),
password,
token_name: token_name.clone(),
scopes: scopes_vec.clone(),
};
match gitea.create_token(&request).await {
Ok(response) => {
if let Err(e) = db
.create_gitea_token(
response.id,
&username,
&token_name,
&response.token_last_eight,
&serde_json::json!(scopes_vec
.iter()
.map(|s| s.as_str())
.collect::<Vec<_>>()),
None,
)
.await
{
eprintln!("\n⚠ Token created but failed to store: {}", e);
}
println!("\n✅ Gitea Token created successfully!");
println!("\n┌─────────────────────────────────────────────────────────────────────────────┐");
println!("│ ⚠️ IMPORTANT: Save this token now - it will not be shown again! │");
println!("└─────────────────────────────────────────────────────────────────────────────┘");
println!("\nToken ID: {}", response.id);
println!("Token Name: {}", response.name);
println!("SHA1: {}", response.sha1);
println!("Last 8: {}", response.token_last_eight);
println!("\nAuthorization Header:");
println!(" Authorization: token {}", response.sha1);
}
Err(e) => {
eprintln!("\n❌ Failed to create Gitea token: {}", e);
}
}
}
GiteaAction::List => {
let username =
username.ok_or_else(|| anyhow::anyhow!("--username required for list"))?;
let password =
password.ok_or_else(|| anyhow::anyhow!("--password required for list"))?;
match gitea.list_tokens(&username, &password).await {
Ok(tokens) => {
println!("\n📋 Gitea Tokens for user: {}", username);
if tokens.is_empty() {
println!(" (no tokens found)");
} else {
println!("\n┌────────────────────────────────────────────────────────────────────────────┐");
println!("│ ID │ Name │ Last 8 │ Registered │");
println!("├────────────────────────────────────────────────────────────────────────────┤");
for token in &tokens {
let registered = db
.get_gitea_token_by_name(&username, &token.name)
.await
.ok()
.flatten()
.map(|_| "✓")
.unwrap_or("-");
println!(
"│ {:8} │ {:20} │ {:9} │ {:27} │",
token.id,
if token.name.len() > 20 {
&token.name[..17]
} else {
&token.name
},
token.token_last_eight,
registered
);
}
println!("└────────────────────────────────────────────────────────────────────────────┘");
println!("\nTotal: {} token(s)", tokens.len());
}
}
Err(e) => {
eprintln!("\n❌ Failed to list Gitea tokens: {}", e);
}
}
}
GiteaAction::Delete => {
let username = username
.ok_or_else(|| anyhow::anyhow!("--username required for delete"))?;
let password = password
.ok_or_else(|| anyhow::anyhow!("--password required for delete"))?;
let token_name = token_name
.ok_or_else(|| anyhow::anyhow!("--token-name required for delete"))?;
match gitea.delete_token(&username, &password, &token_name).await {
Ok(_) => {
let _ = db.delete_gitea_token(&username, &token_name).await;
println!("\n🗑 Token '{}' deleted successfully", token_name);
}
Err(e) => {
eprintln!("\n❌ Failed to delete Gitea token: {}", e);
}
}
}
GiteaAction::Verify => {
let token_name = token_name
.ok_or_else(|| anyhow::anyhow!("--token-name required for verify"))?;
let record = db
.get_gitea_token_by_name(
&username.unwrap_or_else(|| "unknown".to_string()),
&token_name,
)
.await?;
match record {
Some(r) => {
println!("\n📋 Gitea Token: {}", r.token_name);
println!(" User: {}", r.gitea_user);
println!(" Token ID: {}", r.gitea_token_id);
println!(" Last 8: {}", r.token_last_eight);
println!(" Scopes: {}", r.scopes);
println!(" Created: {}", r.created_at);
if let Some(verified) = r.last_verified {
println!(" Last Verified: {}", verified);
} else {
println!(" Last Verified: never");
}
}
None => {
println!("\n❌ Token not found in local database");
}
}
}
}
Ok(())
}
Commands::N8n {
action,
api_key,
label,
expires_in_days,
} => {
use momentry_core::core::api_key::n8n::{
extract_last_eight, CreateN8nApiKeyRequest, N8nClient,
};
let db = PostgresDb::init().await?;
match action {
N8nAction::Create => {
let api_key_value = api_key.ok_or_else(|| {
anyhow::anyhow!("--api-key required for create (existing n8n API key)")
})?;
let label =
label.ok_or_else(|| anyhow::anyhow!("--label required for create"))?;
let n8n = N8nClient::new(api_key_value)?;
let expires_at = expires_in_days
.map(|days| chrono::Utc::now() + chrono::Duration::days(days));
let request = CreateN8nApiKeyRequest {
label: label.clone(),
expires_at,
};
match n8n.create_api_key(&request).await {
Ok(response) => {
if let Err(e) = db
.create_n8n_api_key(
&response.id,
&label,
&extract_last_eight(&response.api_key),
None,
response.expires_at,
)
.await
{
eprintln!("\n⚠ API key created but failed to store: {}", e);
}
println!("\n✅ n8n API Key created successfully!");
println!("\n┌─────────────────────────────────────────────────────────────────────────────┐");
println!("│ ⚠️ IMPORTANT: Save this API key now - it will not be shown again! │");
println!("└─────────────────────────────────────────────────────────────────────────────┘");
println!("\nKey ID: {}", response.id);
println!("Label: {}", response.label);
println!("API Key: {}", response.api_key);
println!("\nUsage:");
println!(" curl -H 'X-N8N-API-KEY: {}' https://n8n.momentry.ddns.net/api/v1/workflows", response.api_key);
}
Err(e) => {
eprintln!("\n❌ Failed to create n8n API key: {}", e);
}
}
}
N8nAction::List => {
let api_key_value =
api_key.ok_or_else(|| anyhow::anyhow!("--api-key required for list"))?;
let n8n = N8nClient::new(api_key_value)?;
match n8n.list_api_keys().await {
Ok(keys) => {
println!("\n📋 n8n API Keys");
if keys.is_empty() {
println!(" (no API keys found)");
} else {
println!("\n┌────────────────────────────────────────────────────────────────────────────┐");
println!("│ Label │ ID │");
println!("├────────────────────────────────────────────────────────────────────────────┤");
for key in &keys {
println!(
"│ {:27} │ {:39} │",
if key.label.len() > 27 {
&key.label[..24]
} else {
&key.label
},
key.id
);
}
println!("└────────────────────────────────────────────────────────────────────────────┘");
println!("\nTotal: {} key(s)", keys.len());
}
}
Err(e) => {
eprintln!("\n❌ Failed to list n8n API keys: {}", e);
}
}
}
N8nAction::Delete => {
let api_key_value =
api_key.ok_or_else(|| anyhow::anyhow!("--api-key required for delete"))?;
let label =
label.ok_or_else(|| anyhow::anyhow!("--label required for delete"))?;
let record = db.get_n8n_api_key_by_label(&label).await?;
if let Some(r) = record {
let n8n = N8nClient::new(api_key_value)?;
match n8n.delete_api_key(&r.n8n_key_id).await {
Ok(_) => {
let _ = db.delete_n8n_api_key(&label).await;
println!("\n🗑 API key '{}' deleted successfully", label);
}
Err(e) => {
eprintln!("\n❌ Failed to delete n8n API key: {}", e);
}
}
} else {
println!("\n❌ API key '{}' not found in local database", label);
}
}
N8nAction::Verify => {
let label =
label.ok_or_else(|| anyhow::anyhow!("--label required for verify"))?;
let record = db.get_n8n_api_key_by_label(&label).await?;
match record {
Some(r) => {
println!("\n📋 n8n API Key: {}", r.label);
println!(" Key ID: {}", r.n8n_key_id);
println!(" Last 8: {}", r.api_key_last_eight);
println!(" Created: {}", r.created_at);
if let Some(expires) = r.expires_at {
println!(" Expires: {}", expires);
}
if let Some(verified) = r.last_verified {
println!(" Last Verified: {}", verified);
} else {
println!(" Last Verified: never");
}
}
None => {
println!("\n❌ API key not found in local database");
}
}
}
}
Ok(())
}
}
}