use anyhow::{Context, Result}; use serde::Deserialize; use tracing::{info, warn}; use crate::core::config; use crate::core::db::PostgresDb; #[derive(Debug, Deserialize)] struct TmdbSearchResult { results: Vec, } #[derive(Debug, Deserialize)] struct TmdbApiMovie { id: u64, title: String, release_date: Option, overview: Option, poster_path: Option, } #[derive(Debug, Deserialize)] struct TmdbCredits { cast: Vec, } #[derive(Debug, Deserialize)] struct TmdbApiCastMember { id: u64, name: String, character: String, profile_path: Option, order: u32, } pub struct TmdbProbeResult { pub tmdb_id: u64, pub title: String, pub cast_count: usize, pub identities_created: usize, } fn extract_movie_name(filename: &str) -> Option { let name = std::path::Path::new(filename) .file_stem() .and_then(|s| s.to_str())?; let cleaned = name.replace(['.', '_'], " ").trim().to_string(); if cleaned.is_empty() || cleaned.len() < 3 { return None; } Some(cleaned) } pub async fn probe_from_cache( db: &PostgresDb, file_uuid: &str, ) -> Result { let cache = crate::core::tmdb::cache::read_tmdb_cache(file_uuid)?; if cache.identities.is_empty() && !cache.cast.is_empty() { return create_identities_from_data(db, file_uuid, &cache.movie, &cache.cast).await; } upsert_identities_from_disk(db, &cache, file_uuid).await } async fn upsert_identities_from_disk( db: &PostgresDb, cache: &crate::core::tmdb::cache::TmdbCache, file_uuid: &str, ) -> Result { info!( "[TMDB] Upserting identities from disk for: {} (TMDB id={})", cache.movie.title, cache.movie.tmdb_id ); let mut identities_created = 0usize; for entry in &cache.identities { let path = crate::core::identity::storage::identity_file_path(&entry.identity_uuid); if !path.exists() { warn!("[TMDB] Identity file not found on disk: {}", path.display()); continue; } match std::fs::read_to_string(&path) { Ok(content) => { match serde_json::from_str::(&content) { Ok(identity_file) => { let identities_table = crate::core::db::schema::table_name("identities"); let result = sqlx::query(&format!( "INSERT INTO {} (uuid, name, identity_type, source, status, tmdb_id, tmdb_profile, metadata) \ VALUES ($1::uuid, $2, 'people', 'tmdb', 'confirmed', $3, $4, $5::jsonb) \ ON CONFLICT (name) DO UPDATE SET \ uuid = COALESCE({}.uuid, $1::uuid), \ tmdb_id = COALESCE(EXCLUDED.tmdb_id, {}.tmdb_id), \ tmdb_profile = COALESCE(EXCLUDED.tmdb_profile, {}.tmdb_profile), \ metadata = {}.metadata || $5::jsonb", identities_table, identities_table, identities_table, identities_table, identities_table )) .bind(&identity_file.identity_uuid) .bind(&identity_file.name) .bind(identity_file.tmdb_id) .bind(&identity_file.tmdb_profile) .bind(&identity_file.metadata) .execute(db.pool()) .await; match result { Ok(_) => { info!("[TMDB] Upserted identity: {} (uuid={})", identity_file.name, identity_file.identity_uuid); identities_created += 1; } Err(e) => { warn!("[TMDB] Failed to upsert identity '{}': {}", identity_file.name, e); } } } Err(e) => { warn!("[TMDB] Failed to parse identity file {}: {}", path.display(), e); } } } Err(e) => { warn!("[TMDB] Failed to read identity file {}: {}", path.display(), e); } } } drop_identities_cache(db, file_uuid, &cache.movie, identities_created).await; Ok(TmdbProbeResult { tmdb_id: cache.movie.tmdb_id, title: cache.movie.title.clone(), cast_count: cache.cast_count, identities_created, }) } async fn drop_identities_cache( db: &PostgresDb, file_uuid: &str, movie: &crate::core::tmdb::cache::TmdbMovie, identities_created: usize, ) { let videos_table = crate::core::db::schema::table_name("videos"); let tmdb_label = "tmdb"; let _ = sqlx::query(&format!( "UPDATE {} SET birth_registration = \ jsonb_set(COALESCE(birth_registration, '{{}}'::jsonb), '{{{}}}'::text[], $1::jsonb) \ WHERE file_uuid = $2", videos_table, tmdb_label )) .bind(serde_json::json!({ "movie_id": movie.tmdb_id, "movie_title": movie.title, "release_date": movie.release_date, "poster": movie.poster_path, "cast_count": movie.tmdb_id, "identities_created": identities_created, })) .bind(file_uuid) .execute(db.pool()) .await .ok(); } pub async fn create_identities_from_data( db: &PostgresDb, file_uuid: &str, movie: &crate::core::tmdb::cache::TmdbMovie, cast: &[crate::core::tmdb::cache::TmdbCastMember], ) -> Result { info!( "[TMDB] Creating identities for: {} (TMDB id={})", movie.title, movie.tmdb_id ); let identities_table = crate::core::db::schema::table_name("identities"); let mut identities_created = 0usize; for member in cast.iter() { if member.name.trim().is_empty() { continue; } let profile_url = member.profile_path.as_ref() .map(|p| format!("https://image.tmdb.org/t/p/w185{}", p)); let metadata = serde_json::json!({ "tmdb_character": member.character, "tmdb_cast_order": member.order, "tmdb_movie_id": movie.tmdb_id, "tmdb_movie_title": movie.title, "tmdb_biography": member.biography, "tmdb_birthday": member.birthday, "tmdb_place_of_birth": member.place_of_birth, "tmdb_aliases": member.also_known_as, "tmdb_imdb_id": member.imdb_id, "tmdb_department": member.known_for_department, "tmdb_popularity": member.popularity, "tmdb_deathday": member.deathday, "tmdb_gender": member.gender, "tmdb_homepage": member.homepage, }); let result = sqlx::query_as::<_, (uuid::Uuid,)>(&format!( "INSERT INTO {} (name, identity_type, source, status, tmdb_id, tmdb_profile, metadata) \ VALUES ($1, 'people', 'tmdb', 'confirmed', $2, $3, $4::jsonb) \ ON CONFLICT (name) DO UPDATE SET \ tmdb_id = COALESCE(EXCLUDED.tmdb_id, {}.tmdb_id), \ tmdb_profile = COALESCE(EXCLUDED.tmdb_profile, {}.tmdb_profile), \ metadata = {}.metadata || $4::jsonb \ RETURNING uuid", identities_table, identities_table, identities_table, identities_table )) .bind(&member.name) .bind(member.id as i64) .bind(&profile_url) .bind(&metadata) .fetch_optional(db.pool()) .await; match result { Ok(Some((identity_uuid,))) => { let uuid_str = identity_uuid.to_string().replace('-', ""); info!( "[TMDB] Created/updated identity: {} as {} (uuid={})", member.name, member.character, uuid_str ); identities_created += 1; if let Err(e) = crate::core::identity::storage::save_identity_file(db, &uuid_str).await { warn!("[TMDB] Failed to save identity file for {}: {}", member.name, e); } // Download and save TMDb profile image locally if let Some(url) = &profile_url { let dir = crate::core::identity::storage::identity_dir(&uuid_str); std::fs::create_dir_all(&dir).ok(); let img_path = dir.join("profile.jpg"); if !img_path.exists() { if let Ok(resp) = reqwest::get(url).await { if let Ok(bytes) = resp.bytes().await { std::fs::write(&img_path, &bytes).ok(); } } } } } Ok(None) => { warn!("[TMDB] INSERT returned no uuid for: {}", member.name); } Err(e) => { warn!("[TMDB] Failed to create identity '{}': {}", member.name, e); } } } // Step 4: Trigger background embedding extraction if identities_created > 0 { let scripts_dir = std::env::var("MOMENTRY_SCRIPTS_DIR") .unwrap_or_else(|_| "/Users/accusys/momentry_core_0.1/scripts".to_string()); let python_path = std::env::var("MOMENTRY_PYTHON_PATH") .unwrap_or_else(|_| "/opt/homebrew/bin/python3.11".to_string()); let schema = crate::core::config::DATABASE_SCHEMA.clone(); tokio::spawn(async move { let output = tokio::process::Command::new(&python_path) .arg(&format!("{}/tmdb_embed_extractor.py", scripts_dir)) .arg("--schema") .arg(&schema) .output() .await; match output { Ok(o) => { if !o.status.success() { let stderr = String::from_utf8_lossy(&o.stderr); warn!("[TMDB] Embed extraction script failed: {}", stderr); } else { info!("[TMDB] Background face embedding extraction complete"); } } Err(e) => warn!("[TMDB] Failed to run embed extraction script: {}", e), } }); } // Step 5: Store tmdb_id on the video record for later use let videos_table = crate::core::db::schema::table_name("videos"); let tmdb_label = "tmdb"; let _ = sqlx::query(&format!( "UPDATE {} SET birth_registration = \ jsonb_set(COALESCE(birth_registration, '{{}}'::jsonb), '{{{}}}'::text[], $1::jsonb) \ WHERE file_uuid = $2", videos_table, tmdb_label )) .bind(serde_json::json!({ "movie_id": movie.tmdb_id, "movie_title": movie.title, "release_date": movie.release_date, "poster": movie.poster_path, "cast_count": cast.len(), "identities_created": identities_created, })) .bind(file_uuid) .execute(db.pool()) .await .ok(); info!( "[TMDB] Probe complete: {} cast members, {} identities created/updated", cast.len(), identities_created ); Ok(TmdbProbeResult { tmdb_id: movie.tmdb_id, title: movie.title.clone(), cast_count: cast.len(), identities_created, }) } pub async fn probe_movie( db: &PostgresDb, filename: &str, file_uuid: &str, ) -> Result> { let api_key = match config::tmdb::API_KEY.as_ref() { Some(k) => k.clone(), None => { info!("[TMDB] No API key configured, skipping TMDb probe"); return Ok(None); } }; let movie_name = match extract_movie_name(filename) { Some(n) => n, None => { info!("[TMDB] Could not extract movie name from: {}", filename); return Ok(None); } }; info!("[TMDB] Searching for movie: {}", movie_name); let client = reqwest::Client::new(); // Step 1: Search movie let search_url = format!( "https://api.themoviedb.org/3/search/movie?api_key={}&query={}", api_key, urlencoding(&movie_name) ); let search_resp = client .get(&search_url) .send() .await .context("TMDb search request failed")? .json::() .await .context("Failed to parse TMDb search response")?; let movie = match search_resp.results.into_iter().next() { Some(m) => m, None => { info!("[TMDB] No movie found for: {}", movie_name); return Ok(None); } }; info!("[TMDB] Matched: {} (TMDB id={})", movie.title, movie.id); // Step 2: Fetch cast let credits_url = format!( "https://api.themoviedb.org/3/movie/{}/credits?api_key={}", movie.id, api_key ); let credits = client .get(&credits_url) .send() .await .context("TMDb credits request failed")? .json::() .await .context("Failed to parse TMDb credits response")?; // Step 3: Convert API types to cache types and use shared logic use crate::core::tmdb::cache; let cache_movie = cache::TmdbMovie { tmdb_id: movie.id, title: movie.title.clone(), release_date: movie.release_date.clone(), overview: movie.overview.clone(), poster_path: movie.poster_path.clone(), }; let cache_cast: Vec = credits.cast.iter().map(|m| { cache::TmdbCastMember { id: m.id, name: m.name.clone(), character: m.character.clone(), profile_path: m.profile_path.clone(), order: m.order, biography: None, birthday: None, place_of_birth: None, also_known_as: vec![], imdb_id: None, known_for_department: None, popularity: None, deathday: None, gender: None, homepage: None, } }).collect(); // Write TMDb cache so probe_from_cache can be used next time let cache_obj = cache::TmdbCache { file_uuid: file_uuid.to_string(), fetched_at: chrono::Utc::now().to_rfc3339(), source: "probe_movie".to_string(), movie: cache_movie.clone(), cast: cache_cast.clone(), cast_count: credits.cast.len(), identities_created: 0, identities: vec![], }; cache::write_tmdb_cache(&cache_obj).ok(); let result = create_identities_from_data(db, file_uuid, &cache_movie, &cache_cast).await?; // Update cache with actual identities_created count if let Ok(mut cache_obj) = cache::read_tmdb_cache(file_uuid) { cache_obj.identities_created = result.identities_created; cache::write_tmdb_cache(&cache_obj).ok(); } Ok(Some(result)) } fn urlencoding(s: &str) -> String { s.chars() .map(|c| match c { 'A'..='Z' | 'a'..='z' | '0'..='9' | '-' | '_' | '.' | '~' => c.to_string(), ' ' => '+'.to_string(), _ => format!("%{:02X}", c as u8), }) .collect() }