use anyhow::{Context, Result}; use chrono::Utc; use sqlx; use std::path::Path; use tracing::{info, warn}; use crate::core::db::{schema, PostgresDb, VideoRecord, VideoStatus}; use crate::core::probe; use crate::core::storage::uuid as uuid_utils; use crate::core::storage::FileManager; pub struct IngestionService { db: PostgresDb, } impl IngestionService { pub fn new(db: PostgresDb) -> Self { Self { db } } /// Resolve name collision: if file_name exists with different content, append ` (N)` suffix. async fn resolve_filename(&self, file_name: &str, content_hash: &str) -> String { let table = schema::table_name("videos"); let base = file_name.to_string(); let dot_pos = base.rfind('.'); let (stem, ext) = match dot_pos { Some(p) => (base[..p].to_string(), base[p..].to_string()), None => (base.clone(), String::new()), }; let mut candidate = base.clone(); let mut attempt = 0usize; loop { let conflict: Option = sqlx::query_scalar( &format!("SELECT file_uuid FROM {} WHERE file_name = $1 AND (content_hash IS DISTINCT FROM $2 OR content_hash IS NULL)", table) ) .bind(&candidate) .bind(content_hash) .fetch_optional(self.db.pool()) .await .unwrap_or(None); if conflict.is_none() { return candidate; } attempt += 1; candidate = format!("{} ({}){}", stem, attempt, ext); } } pub async fn ingest(&self, file_path: &str) -> Result> { let path = Path::new(file_path); let canonical_path = path.canonicalize().unwrap_or_else(|_| path.to_path_buf()); let filename = path .file_name() .unwrap_or_default() .to_string_lossy() .to_string(); // 1. Compute SHA256 for dedup let content_hash = crate::core::storage::content_hash::compute_sha256(&canonical_path).ok().unwrap_or_default(); // 2. Hash check — same content = already registered let videos_table = schema::table_name("videos"); if !content_hash.is_empty() { if let Ok(Some(existing_uuid)) = sqlx::query_scalar::<_, String>( &format!("SELECT file_uuid FROM {} WHERE content_hash = $1 LIMIT 1", videos_table) ) .bind(&content_hash) .fetch_optional(self.db.pool()) .await { info!("Content already registered: {} ({})", filename, existing_uuid); return Ok(Some(existing_uuid)); } } // 3. Resolve name conflict (same name, different content → auto-rename) let final_name = self.resolve_filename(&filename, &content_hash).await; // 4. Compute UUID with resolved name let videos_table = schema::table_name("videos"); let birthday = sqlx::query_scalar::<_, chrono::DateTime>( &format!("SELECT registration_time FROM {} WHERE file_name = $1 AND registration_time IS NOT NULL LIMIT 1", videos_table) ) .bind(&final_name) .fetch_optional(self.db.pool()) .await .ok() .flatten() .map(|t| t.to_rfc3339()) .unwrap_or_else(|| Utc::now().to_rfc3339()); let parent = canonical_path .parent() .map(|p| p.to_string_lossy().to_string()) .unwrap_or_default(); let uuid = uuid_utils::compute_birth_uuid( &uuid_utils::get_mac_address(), &birthday, &canonical_path.to_string_lossy(), &final_name, ); let username = uuid_utils::extract_username_from_path(&parent); info!("Starting ingestion for: {} ({})", path.display(), uuid); let probe_result = probe::probe_video(file_path).ok(); let file_meta = std::fs::metadata(&canonical_path).ok(); let duration = probe_result.as_ref() .and_then(|r| r.format.duration.as_ref()) .and_then(|s| s.parse::().ok()) .unwrap_or(0.0); let mut width = 0u32; let mut height = 0u32; let mut fps = 0.0; if let Some(ref probe) = probe_result { for stream in &probe.streams { if stream.codec_type.as_deref() == Some("video") { width = stream.width.unwrap_or(0); height = stream.height.unwrap_or(0); if let Some(fps_str) = &stream.r_frame_rate { if let Some((num, den)) = fps_str.split_once('/') { if let (Ok(n), Ok(d)) = (num.parse::(), den.parse::()) { if d > 0.0 { fps = n / d; } } } } } } } let file_manager = FileManager::new(std::path::PathBuf::from(".")); let probe_json_val: serde_json::Value = probe_result.as_ref().map(|r| serde_json::to_value(r)).and_then(|r| r.ok()).unwrap_or_else(|| { serde_json::json!({"format": {"size": file_meta.map(|m| m.len().to_string()).unwrap_or_default(), "filename": &canonical_path.to_string_lossy().to_string(), "format_name": "unknown"}, "streams": []}) }); let probe_json_str = serde_json::to_string_pretty(&probe_json_val)?; if let Err(e) = file_manager.save_json(&uuid, "probe", &probe_json_str) { warn!("Failed to save probe JSON for {}: {}", uuid, e); } else { info!("Probe JSON saved for {}", uuid); } let total_frames = { let video_stream = probe_result.as_ref().and_then(|pr| pr.streams.iter().find(|s| s.codec_type.as_deref() == Some("video"))); if let Some(stream) = video_stream { if let Some(nb_frames_str) = &stream.nb_frames { if let Ok(nb_frames) = nb_frames_str.parse::() { info!( "Using nb_frames from ffprobe: {} frames for {}", nb_frames, path.display() ); Some(nb_frames) } else { warn!( "Failed to parse nb_frames, using duration * fps fallback for {}", path.display() ); Some((duration * fps).floor() as u64) } } else { warn!( "nb_frames not available, using duration * fps fallback for {}", path.display() ); Some((duration * fps).floor() as u64) } } else { warn!("No video stream found for {}", path.display()); Some(0) } }; let birth_registration = serde_json::json!({ "registration_source": { "username": username, "original_path": parent, "original_filename": filename } }); let record = VideoRecord { id: 0, file_uuid: uuid.clone(), file_path: canonical_path.to_string_lossy().to_string(), file_name: filename, file_type: None, duration, width, height, fps, probe_json: serde_json::from_str(&probe_json_str).ok(), storage: Default::default(), status: VideoStatus::Pending, processing_status: Some(serde_json::json!({"phase": "REGISTERED"})), birth_registration: None, user_id: None, job_id: None, created_at: String::new(), registration_time: None, total_frames: total_frames.unwrap_or(0), parent_uuid: None, cut_done: false, cut_count: 0, cut_max_duration: 0.0, scene_done: false, audio_tracks: None, }; self.db .register_video(&record) .await .with_context(|| "Failed to register video in database")?; // Store content_hash for dedup if !content_hash.is_empty() { let vt = schema::table_name("videos"); let _ = sqlx::query(&format!("UPDATE {} SET content_hash = $1 WHERE file_uuid = $2", vt)) .bind(&content_hash) .bind(&uuid) .execute(self.db.pool()) .await; } self.db .set_registration_time(&uuid) .await .with_context(|| "Failed to set registration_time")?; self.db .update_birth_registration(&uuid, &birth_registration) .await .with_context(|| "Failed to set birth_registration")?; Ok(Some(uuid)) } }