//! Release Manager — deploy/undeploy/list video packages. //! Binary: `cargo run --bin release -- ` use anyhow::{Context, Result}; use chrono::Utc; use clap::{Parser, Subcommand}; use momentry_core::core::config; use momentry_core::core::db::PostgresDb; use sqlx::Row; use std::fs; use std::io::Write; use std::path::{Path, PathBuf}; use std::process::Command; const DEMO_DIR: &str = "/Users/accusys/momentry/var/sftpgo/data/demo"; const OUTPUT_DIR: &str = "/Users/accusys/momentry/output_dev"; const RELEASE_DIR: &str = "/Users/accusys/momentry_core_0.1/release/files"; const PG_BIN: &str = "/Users/accusys/pgsql/18.3/bin"; #[derive(Parser)] #[command(name = "release", about = "Release Manager — deploy/undeploy video packages")] struct Cli { #[command(subcommand)] command: Commands, } #[derive(Subcommand)] enum Commands { /// Deploy a release package (.tar.gz) Deploy { /// Path to .tar.gz package tarball: String, }, /// Undeploy (remove all data for a video UUID) Undeploy { /// File UUID uuid: String, /// Skip confirmation #[arg(short = 'y', long)] yes: bool, }, /// List deployed videos List, /// Create release package for a deployed video Package { /// File UUID uuid: String, }, /// Show package contents and statistics Stats, /// Generate visual reports from video data Visualize { /// File UUID uuid: String, /// Visualization type: heatmap, timeline #[arg(short, long, default_value = "heatmap")] typ: String, /// Output path (default: output_dev/_heatmap.html) #[arg(short, long)] output: Option, /// Filter by identity_id #[arg(short = 'i', long)] identity: Option, }, /// Generate offline report from .sqlite file (no PostgreSQL needed) VisualizeOffline { /// Path to .sqlite file sqlite_path: String, /// Output path #[arg(short, long)] output: Option, /// Filter by identity_id #[arg(short = 'i', long)] identity: Option, }, } /// Run psql command and return stdout fn psql_exec(sql: &str) -> Result { let output = Command::new(format!("{}/psql", PG_BIN)) .args(["-U", "accusys", "-d", "momentry", "-t", "-A", "-c", sql]) .output() .context("psql command failed")?; Ok(String::from_utf8_lossy(&output.stdout).trim().to_string()) } /// Run a SQL file via psql fn psql_file(path: &Path) -> Result<()> { let status = Command::new(format!("{}/psql", PG_BIN)) .args(["-U", "accusys", "-d", "momentry", "-f"]) .arg(path) .status() .context("psql file execution failed")?; if !status.success() { anyhow::bail!("psql returned non-zero exit code"); } Ok(()) } /// Extract tar.gz archive to a temp directory, return the top-level dir fn extract_tarball(tarball: &Path) -> Result { let tmpdir = std::env::temp_dir().join(format!("release_{}", Utc::now().timestamp())); fs::create_dir_all(&tmpdir)?; let status = Command::new("tar") .args(["-xzf", tarball.to_str().unwrap(), "-C", tmpdir.to_str().unwrap()]) .status() .context("tar extraction failed")?; if !status.success() { anyhow::bail!("tar returned non-zero"); } // Find the UUID directory (first subdir) for entry in fs::read_dir(&tmpdir)? { let entry = entry?; if entry.file_type()?.is_dir() { return Ok(entry.path()); } } anyhow::bail!("no directory found in tarball"); } /// Get file_info.json from package directory fn read_file_info(pkg_dir: &Path) -> Result { let info_path = pkg_dir.join("file_info.json"); let content = fs::read_to_string(&info_path) .with_context(|| format!("Cannot read {:?}", info_path))?; serde_json::from_str(&content).context("Invalid file_info.json") } // ---- Deploy ---- async fn cmd_deploy(db: &PostgresDb, tarball: &str) -> Result<()> { let tarball_path = Path::new(tarball); if !tarball_path.exists() { anyhow::bail!("File not found: {}", tarball); } println!("=== Deploy: {} ===", tarball_path.file_name().unwrap().to_str().unwrap()); // Extract let pkg_dir = extract_tarball(tarball_path)?; println!("Extracted to {:?}", pkg_dir); // Read file_info let info = read_file_info(&pkg_dir)?; let uuid = info["file_uuid"].as_str().context("Missing file_uuid in file_info.json")?; let file_name = info["file_name"].as_str().unwrap_or("?"); println!("UUID: {}\nVideo: {}", uuid, file_name); // Import data.sql let sql_path = pkg_dir.join("data.sql"); if sql_path.exists() { let size = fs::metadata(&sql_path)?.len(); println!("Importing data.sql ({} MB)...", size / 1024 / 1024); psql_file(&sql_path)?; println!(" SQL imported OK"); } else { println!(" No data.sql in package"); } // Copy video to demo dir for entry in fs::read_dir(&pkg_dir)? { let entry = entry?; let fname = entry.file_name(); let fname_str = fname.to_str().unwrap_or(""); if fname_str.ends_with(".mp4") || fname_str.ends_with(".mov") || fname_str.ends_with(".avi") { let dest = Path::new(DEMO_DIR).join(&fname); if !dest.exists() { fs::copy(entry.path(), &dest)?; println!("Video: {} → {}", fname_str, DEMO_DIR); } else { println!("Video: {} already in demo dir", fname_str); } } } // Copy output JSONs for entry in fs::read_dir(&pkg_dir)? { let entry = entry?; let fname = entry.file_name(); let fname_str = fname.to_str().unwrap_or(""); if fname_str.ends_with(".json") && fname_str != "file_info.json" { let dest = Path::new(OUTPUT_DIR).join(&fname); fs::copy(entry.path(), &dest)?; } } println!("Output files copied to {}", OUTPUT_DIR); // Verify let chunk_count: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM dev.chunk WHERE file_uuid = $1" ).bind(uuid).fetch_one(db.pool()).await?; let face_count: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM dev.face_detections WHERE file_uuid = $1" ).bind(uuid).fetch_one(db.pool()).await?; // Cleanup fs::remove_dir_all(&pkg_dir.parent().unwrap_or(&pkg_dir))?; println!("\n=== Deploy Complete ==="); println!(" Video: {}", file_name); println!(" Chunks: {}", chunk_count.0); println!(" Face detections: {}", face_count.0); Ok(()) } // ---- Undeploy ---- async fn cmd_undeploy(db: &PostgresDb, uuid: &str, skip_confirm: bool) -> Result<()> { // Get video info let rows: Vec<(String, String)> = sqlx::query_as( "SELECT file_name, file_path FROM dev.videos WHERE file_uuid = $1" ).bind(uuid).fetch_all(db.pool()).await?; if rows.is_empty() { anyhow::bail!("UUID {} not found in DB", uuid); } let (file_name, file_path) = &rows[0]; println!("=== Undeploy: {} ===", uuid); println!("Video: {}", file_name); println!("This will DELETE all data for this video."); if !skip_confirm { print!("Continue? (y/N): "); std::io::stdout().flush()?; let mut input = String::new(); std::io::stdin().read_line(&mut input)?; if input.trim().to_lowercase() != "y" { println!("Cancelled"); return Ok(()); } } // Delete DB data let tables = [ ("dev.chunk", "file_uuid"), ("dev.chunk_vectors", "uuid"), ("dev.face_detections", "file_uuid"), ("dev.processor_results", "file_uuid"), ("dev.monitor_jobs", "uuid"), ("dev.pre_chunks", "file_uuid"), ]; for (tbl, col) in &tables { let sql = format!("DELETE FROM {} WHERE {} = $1", tbl, col); let result = sqlx::query(&sql).bind(uuid).execute(db.pool()).await?; println!(" {}: {} rows deleted", tbl, result.rows_affected()); } sqlx::query("DELETE FROM dev.videos WHERE file_uuid = $1") .bind(uuid).execute(db.pool()).await?; println!(" dev.videos: removed"); // Delete output files for entry in fs::read_dir(OUTPUT_DIR)? { let entry = entry?; let fname = entry.file_name().to_string_lossy().to_string(); if fname.starts_with(uuid) { fs::remove_file(entry.path())?; } } println!(" Output files: removed"); // Delete video file if !file_path.is_empty() { let vp = Path::new(file_path); if vp.exists() { fs::remove_file(vp)?; println!(" Video file: removed ({})", vp.file_name().unwrap().to_str().unwrap_or("?")); } } // Delete release directory let release_path = Path::new(RELEASE_DIR).join(uuid); if release_path.exists() { fs::remove_dir_all(&release_path)?; println!(" Release dir: removed"); } println!("\n=== Undeploy Complete ==="); Ok(()) } // ---- List ---- async fn cmd_list(db: &PostgresDb) -> Result<()> { let rows = sqlx::query( "SELECT file_uuid, file_name, duration, status, (SELECT COUNT(*) FROM dev.chunk WHERE file_uuid = v.file_uuid) as chunks, (SELECT COUNT(*) FROM dev.face_detections WHERE file_uuid = v.file_uuid) as faces FROM dev.videos v ORDER BY id DESC" ).fetch_all(db.pool()).await?; println!("{:<36} {:<44} {:>8} {:>10} {:>6} {:>6}", "UUID", "Name", "Duration", "Status", "Chunks", "Faces"); println!("{}", "-".repeat(116)); for row in &rows { let uuid: String = row.get(0); let name: String = row.get::, _>(1).unwrap_or_default(); let duration: Option = row.get(2); let status: Option = row.get(3); let chunks: Option = row.get(4); let faces: Option = row.get(5); let dur_str = match duration { Some(d) if d > 60.0 => format!("{:5.0}min", d / 60.0), Some(d) => format!("{:5.0}s", d), None => "?".to_string(), }; let short_name = if name.chars().count() > 42 { format!("{}..", name.chars().take(40).collect::()) } else { name.clone() }; println!("{:<36} {:<44} {:>8} {:>10} {:>6} {:>6}", uuid, short_name, dur_str, status.as_deref().unwrap_or("?"), chunks.unwrap_or(0), faces.unwrap_or(0)); } Ok(()) } // ---- Package ---- async fn cmd_package(db: &PostgresDb, uuid: &str) -> Result<()> { println!("=== Package: {} ===", uuid); // Verify video exists let row = sqlx::query( "SELECT file_uuid, file_name, file_path, duration, fps, width, height FROM dev.videos WHERE file_uuid = $1" ).bind(uuid).fetch_optional(db.pool()).await?; let (_, file_name, file_path, duration, fps, width, height): ( String, String, String, Option, Option, Option, Option ) = match row { Some(r) => (r.get(0), r.get(1), r.get(2), r.get(3), r.get(4), r.get(5), r.get(6)), None => anyhow::bail!("UUID {} not found", uuid), }; let outdir = Path::new(RELEASE_DIR).join(uuid); if outdir.exists() { fs::remove_dir_all(&outdir)?; } fs::create_dir_all(&outdir)?; // Write file_info.json let info = serde_json::json!({ "file_uuid": uuid, "file_name": file_name, "duration": duration, "fps": fps, "width": width, "height": height, "status": "completed", }); fs::write(outdir.join("file_info.json"), serde_json::to_string_pretty(&info)?)?; // Export data.sql let sql_path = outdir.join("data.sql"); let tables = [ ("dev.videos", "file_uuid"), ("dev.chunk", "file_uuid"), ("dev.chunk_vectors", "uuid"), ("dev.face_detections", "file_uuid"), ]; { let mut f = fs::File::create(&sql_path)?; writeln!(f, "-- Release package: {}", uuid)?; writeln!(f, "BEGIN;")?; writeln!(f)?; for (tbl, col) in &tables { writeln!(f, "-- {} WHERE {} = '{}'", tbl, col, uuid)?; // Get columns let parts: Vec<&str> = tbl.split('.').collect(); let cols = psql_exec(&format!( "SELECT string_agg(column_name, ', ' ORDER BY ordinal_position) FROM information_schema.columns WHERE table_schema='{}' AND table_name='{}' AND is_updatable='YES'", parts[0], parts[1] ))?; // COPY let data = psql_exec(&format!( "COPY (SELECT * FROM {} WHERE {} = '{}') TO STDOUT WITH CSV HEADER", tbl, col, uuid ))?; if !data.is_empty() { writeln!(f, "COPY {} ({}) FROM STDIN WITH CSV HEADER;", tbl, cols)?; writeln!(f, "{}", data)?; writeln!(f, "\\.")?; writeln!(f)?; } } // Export identities referenced by this file writeln!(f, "-- dev.identities (referenced by face_detections)")?; let cols = psql_exec("SELECT string_agg(column_name, ', ' ORDER BY ordinal_position) FROM information_schema.columns WHERE table_schema='dev' AND table_name='identities' AND is_updatable='YES'")?; let data = psql_exec(&format!( "COPY (SELECT DISTINCT i.* FROM dev.identities i INNER JOIN dev.face_detections fd ON fd.identity_id = i.id WHERE fd.file_uuid = '{}') TO STDOUT WITH CSV HEADER", uuid ))?; if !data.is_empty() { writeln!(f, "COPY dev.identities ({}) FROM STDIN WITH CSV HEADER;", cols)?; writeln!(f, "{}", data)?; writeln!(f, "\\.")?; writeln!(f)?; } // Export identity_bindings for identities referenced by this file writeln!(f, "-- dev.identity_bindings (for identities in face_detections)")?; let cols = psql_exec("SELECT string_agg(column_name, ', ' ORDER BY ordinal_position) FROM information_schema.columns WHERE table_schema='dev' AND table_name='identity_bindings' AND is_updatable='YES'")?; let data = psql_exec(&format!( "COPY (SELECT DISTINCT ib.* FROM dev.identity_bindings ib INNER JOIN dev.face_detections fd ON fd.identity_id = ib.identity_id WHERE fd.file_uuid = '{}') TO STDOUT WITH CSV HEADER", uuid ))?; if !data.is_empty() { writeln!(f, "COPY dev.identity_bindings ({}) FROM STDIN WITH CSV HEADER;", cols)?; writeln!(f, "{}", data)?; writeln!(f, "\\.")?; writeln!(f)?; } writeln!(f, "COMMIT;")?; } let sql_size = fs::metadata(&sql_path)?.len(); println!(" data.sql ({} MB)", sql_size / 1024 / 1024); // Copy video file if !file_path.is_empty() { let vp = Path::new(&file_path); if vp.exists() { let dest = outdir.join(vp.file_name().unwrap()); fs::copy(vp, &dest)?; let vsize = fs::metadata(&dest)?.len(); println!(" {} ({} MB)", vp.file_name().unwrap().to_str().unwrap_or("?"), vsize / 1024 / 1024); } } // Generate identities.json for offline analysis let id_script = "/Users/accusys/momentry_core_0.1/scripts/export_identities.py"; let id_out = format!("{}/{}.identities.json", OUTPUT_DIR, uuid); let _ = Command::new("/opt/homebrew/bin/python3.11") .args([id_script, uuid, &id_out]) .status(); if Path::new(&id_out).exists() { println!(" Identities JSON generated"); } // Generate SQLite database for offline app use let sqlite_script = "/Users/accusys/momentry_core_0.1/scripts/export_sqlite.py"; let sqlite_out = format!("{}/{}.sqlite", OUTPUT_DIR, uuid); let _ = Command::new("/opt/homebrew/bin/python3.11") .args([sqlite_script, uuid, &sqlite_out]) .status(); if Path::new(&sqlite_out).exists() { let sz = fs::metadata(&sqlite_out)?.len(); println!(" SQLite database: {}MB", sz / 1048576); } // Copy output files (JSONs + SQLite + any data files) for entry in fs::read_dir(OUTPUT_DIR)? { let entry = entry?; let fname = entry.file_name().to_string_lossy().to_string(); if fname.starts_with(uuid) { fs::copy(entry.path(), outdir.join(&fname))?; } } println!(" Output files copied"); // Copy deploy + verify scripts into package let deploy_src = "/Users/accusys/momentry_core_0.1/scripts/deploy_package.sh"; let verify_src = "/Users/accusys/momentry_core_0.1/scripts/verify_package.sh"; if Path::new(deploy_src).exists() { fs::copy(deploy_src, outdir.join("deploy.sh"))?; } if Path::new(verify_src).exists() { fs::copy(verify_src, outdir.join("verify.sh"))?; } // Create tar.gz let tarball = Path::new(RELEASE_DIR).join(format!("{}_v{}.tar.gz", uuid, Utc::now().format("%Y%m%d_%H%M%S"))); let status = Command::new("tar") .args(["-czf", tarball.to_str().unwrap(), "-C", RELEASE_DIR, uuid]) .status()?; if !status.success() { anyhow::bail!("tar creation failed"); } let tsize = fs::metadata(&tarball)?.len(); println!("\n Package: {} ({} MB)", tarball.display(), tsize / 1024 / 1024); Ok(()) } fn cmd_visualize_offline(sqlite_path: &str, output: Option<&str>, identity: Option) -> Result<()> { let outpath = match output { Some(p) => p.to_string(), None => sqlite_path.replace(".sqlite", "_report.html"), }; let script = "/Users/accusys/momentry_core_0.1/scripts/render_offline_report.py"; let mut args: Vec = vec![script.to_string(), sqlite_path.to_string(), outpath.clone()]; if let Some(id) = identity { args.push("--identity".to_string()); args.push(id.to_string()); } let output = Command::new("/opt/homebrew/bin/python3.11") .args(&args) .output() .context("Offline report script failed")?; if !output.status.success() { anyhow::bail!("Offline report: {}", String::from_utf8_lossy(&output.stderr)); } println!("{}", String::from_utf8_lossy(&output.stdout)); println!("\n Open: {}", outpath); Ok(()) } // ---- Visualize ---- fn cmd_visualize(uuid: &str, typ: &str, output: Option<&str>, identity: Option) -> Result<()> { let outpath = match output { Some(p) => p.to_string(), None => format!("/Users/accusys/momentry/output_dev/{}_heatmap.html", uuid), }; match typ { "heatmap" | "density" => generate_face_heatmap(uuid, &outpath, identity)?, "timeline" => generate_face_timeline(uuid, &outpath, identity)?, _ => anyhow::bail!("Unknown visualization type: {}. Try: heatmap, density, timeline", typ), } Ok(()) } fn generate_face_heatmap(uuid: &str, outpath: &str, identity: Option) -> Result<()> { let script = "/Users/accusys/momentry_core_0.1/scripts/render_face_heatmap.py"; let mut args: Vec = vec![script.to_string(), uuid.to_string(), outpath.to_string()]; if let Some(id) = identity { args.push("--identity".to_string()); args.push(id.to_string()); } let output = Command::new("/opt/homebrew/bin/python3.11") .args(&args) .output() .context("Python heatmap script failed")?; if !output.status.success() { anyhow::bail!("Heatmap: {}", String::from_utf8_lossy(&output.stderr)); } println!("{}", String::from_utf8_lossy(&output.stdout)); println!("\n Open: {}", outpath); Ok(()) } fn generate_face_timeline(uuid: &str, outpath: &str, identity: Option) -> Result<()> { generate_face_heatmap(uuid, outpath, identity) } // ---- Stats ---- fn cmd_stats() -> Result<()> { let pkg_dir = Path::new(RELEASE_DIR); if !pkg_dir.exists() { println!("No release packages found at {}", pkg_dir.display()); return Ok(()); } let mut packages: Vec = Vec::new(); for entry in fs::read_dir(&pkg_dir)? { let entry = entry?; let name = entry.file_name().to_string_lossy().to_string(); if name.ends_with(".tar.gz") { packages.push(entry.path()); } } packages.sort_by(|a, b| b.cmp(a)); // newest first if packages.is_empty() { println!("No .tar.gz packages found."); return Ok(()); } for pkg_path in &packages { let pkg_name = pkg_path.file_name().unwrap().to_str().unwrap_or("?"); let pkg_size = fs::metadata(pkg_path)?.len(); println!("📦 {} ({} MB)", pkg_name, pkg_size / 1024 / 1024); // List contents via tar -tvzf (shows sizes without extraction) let output = Command::new("tar") .args(["-tvzf", pkg_path.to_str().unwrap()]) .output() .context("tar list failed")?; let listing = String::from_utf8_lossy(&output.stdout); let mut total_sql = 0u64; let mut total_video = 0u64; let mut total_json = 0u64; let mut sql_count = 0u64; let mut video_count = 0u64; let mut json_count = 0u64; for line in listing.lines() { let trimmed = line.trim(); if trimmed.is_empty() || trimmed.ends_with('/') { continue; } // tar -tvzf format: perms link owner group size date_month date_day time path... // Fields are space-separated; size is 5th field, path starts at 8th field let parts: Vec<&str> = trimmed.split_whitespace().collect(); if parts.len() < 8 { continue; } let fsize = parts[4].parse::().unwrap_or(0); let fpath = parts[8..].join(" "); let fname = Path::new(&fpath).file_name().unwrap_or_default().to_str().unwrap_or("?"); let ext = Path::new(&fpath).extension().unwrap_or_default().to_str().unwrap_or(""); match ext { "sql" => { println!(" 📄 {} ({:.0} MB)", fname, fsize as f64 / 1048576.0); total_sql += fsize; sql_count += 1; } "mp4" | "mov" | "avi" | "mkv" => { println!(" 🎬 {} ({:.0} MB)", fname, fsize as f64 / 1048576.0); total_video += fsize; video_count += 1; } "json" => { if fname != "file_info.json" { println!(" 📋 {} ({:.0} MB)", fname, fsize as f64 / 1048576.0); } total_json += fsize; json_count += 1; } _ => {} } } println!(" ─────────────────────────────"); println!(" SQL: {} files, {:.0} MB", sql_count, total_sql as f64 / 1048576.0); println!(" Video: {} files, {:.0} MB", video_count, total_video as f64 / 1048576.0); println!(" JSON: {} files, {:.0} MB", json_count, total_json as f64 / 1048576.0); println!(" Total: {:.0} MB (compressed: {:.0} MB)", (total_sql + total_video + total_json) as f64 / 1048576.0, pkg_size as f64 / 1048576.0); println!(); } Ok(()) } // ---- Main ---- #[tokio::main] async fn main() -> Result<()> { dotenv::from_filename(".env.development").ok(); let cli = Cli::parse(); let db = PostgresDb::new(&config::DATABASE_URL).await?; match cli.command { Commands::Deploy { tarball } => cmd_deploy(&db, &tarball).await?, Commands::Undeploy { uuid, yes } => cmd_undeploy(&db, &uuid, yes).await?, Commands::List => cmd_list(&db).await?, Commands::Package { uuid } => cmd_package(&db, &uuid).await?, Commands::Stats => cmd_stats()?, Commands::Visualize { uuid, typ, output, identity } => cmd_visualize(&uuid, &typ, output.as_deref(), identity)?, Commands::VisualizeOffline { sqlite_path, output, identity } => cmd_visualize_offline(&sqlite_path, output.as_deref(), identity)?, } Ok(()) }