use axum::{ body::Body, extract::{Path, Query, State}, http::{header, StatusCode}, response::IntoResponse, response::Json, routing::{get, patch, post}, Router, }; use serde::{Deserialize, Serialize}; use uuid::Uuid; use crate::core::db::{Database, PostgresDb}; use crate::core::person_identity::{ ChunkPersonInfo, CreatePersonIdentityRequest, PersonIdentity, PersonIdentityResponse, PersonMatch, PersonStatistics, PersonTimelineEntry, PersonTimelineResponse, UpdatePersonIdentityRequest, }; #[derive(Debug, Deserialize)] pub struct IdentifyPersonsRequest { pub video_uuid: String, pub auto_match: Option, pub match_threshold: Option, } #[derive(Debug, Serialize)] pub struct IdentifyPersonsResponse { pub success: bool, pub message: String, pub persons: Vec, } #[derive(Debug, Deserialize)] pub struct PersonTimelineQuery { pub video_uuid: String, } #[derive(Debug, Deserialize)] pub struct FaceThumbnailQuery { pub video_uuid: String, #[serde(default)] pub index: Option, // Which face detection to use (default: 0) } // Structs for parsing face_clustered.json #[derive(Debug, Deserialize)] struct FaceDetection { #[serde(default)] person_id: Option, x: i32, y: i32, width: i32, height: i32, } #[derive(Debug, Deserialize)] struct FaceFrame { timestamp: f64, faces: Vec, } #[derive(Debug, Deserialize)] struct FaceClusteredData { frames: Vec, } #[derive(Debug, Serialize)] pub struct ChunkPersonsResponse { pub success: bool, pub chunk_id: String, pub persons: Vec, } #[derive(Debug, Deserialize)] pub struct MergePersonsRequest { pub video_uuid: String, pub target_person_id: String, pub source_person_ids: Vec, } #[derive(Debug, Deserialize)] pub struct UndoMergeRequest { pub merge_id: String, } #[derive(Debug, Serialize)] pub struct MergeHistoryEntry { pub merge_id: String, pub target_person_id: String, pub source_person_ids: Vec, pub original_target_stats: serde_json::Value, pub original_source_stats: serde_json::Value, pub merged_at: String, pub is_undone: bool, pub undone_at: Option, } #[derive(Debug, Serialize)] pub struct MergeHistoryResponse { pub success: bool, pub history: Vec, } #[derive(Debug, Deserialize)] pub struct PersonListQuery { pub video_uuid: String, pub limit: Option, pub offset: Option, pub min_appearances: Option, pub has_speaker: Option, } #[derive(Debug, Deserialize)] pub struct AutoIdentifyRequest { pub video_uuid: String, pub min_speaker_confidence: Option, } #[derive(Debug, Deserialize)] pub struct SimilarPersonsQuery { pub video_uuid: String, pub threshold: Option, pub limit: Option, } #[derive(Debug, Serialize)] pub struct NamingSuggestion { pub person_id: String, pub current_name: Option, pub suggested_name: String, pub confidence: f64, pub sources: Vec, pub action: String, // "auto_apply" or "needs_review" } #[derive(Debug, Serialize)] pub struct SuggestionSource { pub r#type: String, // "speaker_match", "talent_db", "ocr_context", "face_similarity" pub detail: String, pub weight: f64, } #[derive(Debug, Serialize)] pub struct MergeSuggestion { pub person_id: String, pub merge_with: Vec, pub confidence: f64, pub reasons: Vec, pub action: String, // "auto_apply" or "needs_review" } #[derive(Debug, Serialize)] pub struct SuggestionsResponse { pub success: bool, pub naming_suggestions: Vec, pub merge_suggestions: Vec, pub total_naming: usize, pub total_merge: usize, } #[derive(Debug, Serialize)] pub struct PersonSummary { pub person_id: String, pub name: Option, pub speaker_id: Option, pub appearance_count: i32, pub total_appearance_duration: f64, pub first_appearance_time: Option, pub last_appearance_time: Option, pub is_confirmed: bool, pub speaker_confidence: Option, } #[derive(Debug, Serialize)] pub struct PersonListResponse { pub success: bool, pub persons: Vec, pub total: i64, } #[derive(Debug, Serialize)] pub struct MergePersonsResponse { pub success: bool, pub message: String, pub target_person_id: String, pub merge_id: String, } #[derive(Debug, Serialize)] pub struct AutoIdentifyResponse { pub success: bool, pub message: String, pub total_persons: i32, pub matched_speakers: i32, pub persons: Vec, } pub fn person_identity_routes() -> Router { Router::new() .route("/api/v1/person/identify", post(identify_persons)) .route("/api/v1/person/auto-identify", post(auto_identify_persons)) .route("/api/v1/person/suggest", post(get_person_suggestions)) .route("/api/v1/person/list", get(list_persons)) .route("/api/v1/person/merge", post(merge_persons)) .route("/api/v1/person/merge/undo", post(undo_merge)) .route("/api/v1/person/merge/history", get(get_merge_history)) .route( "/api/v1/person/:person_id/unbind-speaker", post(unbind_speaker), ) .route( "/api/v1/person/:person_id/reassign-speaker", post(reassign_speaker), ) .route( "/api/v1/person/:person_id/remove-appearance", post(remove_appearance), ) .route( "/api/v1/person/:person_id/reassign-appearance", post(reassign_appearance), ) .route("/api/v1/person/:person_id/split", post(split_person)) .route( "/api/v1/person/:person_id/similar", get(get_similar_persons), ) .route( "/api/v1/person/:person_id/confirm", patch(confirm_person_suggestion), ) .route("/api/v1/person/:person_id", get(get_person_details)) .route("/api/v1/person/:person_id", patch(update_person_identity)) .route( "/api/v1/person/:person_id/timeline", get(get_person_timeline), ) .route( "/api/v1/person/:person_id/appearances", get(get_person_appearances), ) .route( "/api/v1/person/:person_id/thumbnail", get(get_person_thumbnail), ) .route("/api/v1/chunks/:chunk_id/persons", get(get_chunk_persons)) } async fn identify_persons( State(_state): State, Json(request): Json, ) -> Result, (StatusCode, String)> { let db = match PostgresDb::init().await { Ok(db) => db, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to connect to database: {}", e), )) } }; tracing::info!( "[PERSON_IDENTITY] Identifying persons for video: {}", request.video_uuid ); let auto_match = request.auto_match.unwrap_or(true); let threshold = request.match_threshold.unwrap_or(0.5); if auto_match { let matches = match auto_match_face_speaker(&db, &request.video_uuid, threshold).await { Ok(m) => m, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to auto-match: {}", e), )) } }; let mut persons = Vec::new(); for match_result in matches { let person = match create_person_identity( &db, CreatePersonIdentityRequest { video_uuid: request.video_uuid.clone(), face_identity_id: None, speaker_id: Some(match_result.speaker_id.clone()), name: None, metadata: Some(serde_json::json!({ "auto_matched": true, "confidence": match_result.confidence, "match_count": match_result.match_count })), }, ) .await { Ok(p) => p, Err(e) => { tracing::warn!("Failed to create person identity: {}", e); continue; } }; persons.push(PersonIdentityResponse::from(person)); } Ok(Json(IdentifyPersonsResponse { success: true, message: format!("Identified {} persons", persons.len()), persons, })) } else { Ok(Json(IdentifyPersonsResponse { success: true, message: "Auto-match disabled, no persons identified".to_string(), persons: vec![], })) } } async fn get_person_details( State(_state): State, Path(person_id): Path, ) -> Result, (StatusCode, String)> { let db = match PostgresDb::init().await { Ok(db) => db, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to connect to database: {}", e), )) } }; let query = r#" SELECT person_id, name, face_identity_id, speaker_id, confidence, appearance_count, total_appearance_duration, first_appearance_time, last_appearance_time, is_confirmed, created_at, updated_at FROM person_identities WHERE person_id = $1 "#; let person: Option<( String, Option, Option, Option, f64, i32, f64, Option, Option, bool, chrono::DateTime, chrono::DateTime, )> = match sqlx::query_as(query) .bind(&person_id) .fetch_optional(db.pool()) .await { Ok(person) => person, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to fetch person: {}", e), )) } }; match person { Some(person_data) => { let response = serde_json::json!({ "success": true, "person_id": person_data.0, "name": person_data.1, "face_identity_id": person_data.2, "speaker_id": person_data.3, "confidence": person_data.4, "appearance_count": person_data.5, "total_appearance_duration": person_data.6, "first_appearance_time": person_data.7, "last_appearance_time": person_data.8, "is_confirmed": person_data.9, "created_at": person_data.10.to_rfc3339(), "updated_at": person_data.11.to_rfc3339() }); Ok(Json(response)) } None => Err(( StatusCode::NOT_FOUND, format!("Person not found: {}", person_id), )), } } async fn update_person_identity( State(_state): State, Path(person_id): Path, Json(request): Json, ) -> Result, (StatusCode, String)> { let db = match PostgresDb::init().await { Ok(db) => db, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to connect to database: {}", e), )) } }; tracing::info!("[PERSON_IDENTITY] Updating person: {}", person_id); let query = r#" UPDATE person_identities SET name = COALESCE($2, name), metadata = COALESCE($3, metadata), is_confirmed = COALESCE($4, is_confirmed), updated_at = CURRENT_TIMESTAMP WHERE person_id = $1 RETURNING person_id, name "#; let updated: Option<(String, Option)> = match sqlx::query_as(query) .bind(&person_id) .bind(&request.name) .bind(&request.metadata) .bind(&request.is_confirmed) .fetch_optional(db.pool()) .await { Ok(result) => result, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to update person: {}", e), )) } }; match updated { Some((id, name)) => { let response = serde_json::json!({ "success": true, "message": format!("Person '{}' updated successfully", name.unwrap_or_else(|| id.clone())), "person_id": id }); Ok(Json(response)) } None => Err(( StatusCode::NOT_FOUND, format!("Person not found: {}", person_id), )), } } async fn get_person_timeline( State(_state): State, Path(person_id): Path, Query(query): Query, ) -> Result, (StatusCode, String)> { let db = match PostgresDb::init().await { Ok(db) => db, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to connect to database: {}", e), )) } }; let name_query = "SELECT name FROM person_identities WHERE person_id = $1"; let name: Option = match sqlx::query_scalar(name_query) .bind(&person_id) .fetch_optional(db.pool()) .await { Ok(name) => name, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to fetch person name: {}", e), )) } }; let timeline_query = r#" SELECT start_time, end_time, duration, confidence FROM person_appearances WHERE person_id = $1 AND video_uuid = $2 ORDER BY start_time ASC "#; let timeline: Vec<(f64, f64, f64, f64)> = match sqlx::query_as(timeline_query) .bind(&person_id) .bind(&query.video_uuid) .fetch_all(db.pool()) .await { Ok(timeline) => timeline, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to fetch timeline: {}", e), )) } }; let timeline: Vec = timeline .into_iter() .map(|(start, end, duration, confidence)| PersonTimelineEntry { start_time: start, end_time: end, duration, confidence, }) .collect(); let stats_query = r#" SELECT COUNT(*) as total_appearances, SUM(duration) as total_duration, MIN(start_time) as first_appearance, MAX(end_time) as last_appearance, AVG(confidence) as average_confidence FROM person_appearances WHERE person_id = $1 AND video_uuid = $2 "#; let stats: (i64, Option, Option, Option, Option) = match sqlx::query_as(stats_query) .bind(&person_id) .bind(&query.video_uuid) .fetch_one(db.pool()) .await { Ok(stats) => stats, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to fetch statistics: {}", e), )) } }; let statistics = PersonStatistics { total_appearances: stats.0 as i32, total_duration: stats.1.unwrap_or(0.0), first_appearance: stats.2, last_appearance: stats.3, average_confidence: stats.4.unwrap_or(0.0), }; Ok(Json(PersonTimelineResponse { person_id, name, timeline, statistics, })) } async fn get_person_appearances( State(_state): State, Path(person_id): Path, ) -> Result, (StatusCode, String)> { let db = match PostgresDb::init().await { Ok(db) => db, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to connect to database: {}", e), )) } }; let query = r#" SELECT person_id, video_uuid, start_time, end_time, duration, face_detection_id, asrx_segment_start, asrx_segment_end, confidence, created_at FROM person_appearances WHERE person_id = $1 ORDER BY start_time DESC LIMIT 100 "#; let appearances: Vec = match sqlx::query(query) .bind(&person_id) .fetch_all(db.pool()) .await { Ok(rows) => { rows.iter() .map(|row| { use sqlx::Row; serde_json::json!({ "person_id": row.get::("person_id"), "video_uuid": row.get::("video_uuid"), "start_time": row.get::("start_time"), "end_time": row.get::("end_time"), "duration": row.get::("duration"), "confidence": row.get::("confidence"), "created_at": row.get::, _>("created_at").to_rfc3339() }) }) .collect() } Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to fetch appearances: {}", e), )) } }; Ok(Json(serde_json::json!({ "success": true, "person_id": person_id, "appearances": appearances }))) } async fn get_chunk_persons( State(_state): State, Path(chunk_id): Path, ) -> Result, (StatusCode, String)> { let db = match PostgresDb::init().await { Ok(db) => db, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to connect to database: {}", e), )) } }; let chunk_query = r#" SELECT uuid, start_time, end_time, metadata FROM chunks WHERE chunk_id = $1 "#; let chunk: Option<(String, f64, f64, Option)> = match sqlx::query_as(chunk_query) .bind(&chunk_id) .fetch_optional(db.pool()) .await { Ok(chunk) => chunk, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to fetch chunk: {}", e), )) } }; let chunk = match chunk { Some(c) => c, None => { return Err(( StatusCode::NOT_FOUND, format!("Chunk not found: {}", chunk_id), )) } }; let (video_uuid, start_time, end_time, _metadata) = chunk; let persons_query = r#" SELECT pi.person_id, pi.name, pa.confidence, LEAST(pa.end_time, $3) - GREATEST(pa.start_time, $2) as overlap_duration FROM person_appearances pa JOIN person_identities pi ON pa.person_id = pi.person_id WHERE pa.video_uuid = $1 AND pa.start_time < $3 AND pa.end_time > $2 ORDER BY overlap_duration DESC "#; let persons: Vec = match sqlx::query_as::<_, (String, Option, f64, f64)>(persons_query) .bind(&video_uuid) .bind(start_time) .bind(end_time) .fetch_all(db.pool()) .await { Ok(rows) => rows .into_iter() .map( |(person_id, name, confidence, overlap_duration)| ChunkPersonInfo { person_id, name, confidence, overlap_duration, }, ) .collect(), Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to fetch persons: {}", e), )) } }; Ok(Json(ChunkPersonsResponse { success: true, chunk_id, persons, })) } /// Extracts a face thumbnail for a given person from the video async fn get_person_thumbnail( State(_state): State, Path(person_id): Path, Query(query): Query, ) -> Result { // 1. Locate the face_clustered.json file let json_path = format!( "output/{}/{}_face_clustered.json", query.video_uuid, query.video_uuid ); let json_path2 = format!( "output/{}/{}.face_clustered.json", query.video_uuid, query.video_uuid ); // Fallback path if the naming convention is slightly different let fallback_path = format!("output/{}/face_clustered.json", query.video_uuid); let path = if std::path::Path::new(&json_path).exists() { json_path } else if std::path::Path::new(&json_path2).exists() { json_path2 } else if std::path::Path::new(&fallback_path).exists() { fallback_path } else { return Err(( StatusCode::NOT_FOUND, format!( "Face data not found for video: {}. Tried: {}, {}, {}", query.video_uuid, json_path, json_path2, fallback_path ), )); }; // 2. Parse the JSON to find the person's face let content = match tokio::fs::read_to_string(&path).await { Ok(c) => c, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Read error: {}", e), )) } }; let data: FaceClusteredData = match serde_json::from_str(&content) { Ok(d) => d, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Parse error: {}", e), )) } }; let mut detections = Vec::new(); for frame in &data.frames { for face in &frame.faces { if let Some(pid) = &face.person_id { if pid == &person_id { detections.push((frame.timestamp, face)); } } } } if detections.is_empty() { return Err(( StatusCode::NOT_FOUND, format!("No detections found for person: {}", person_id), )); } let index = query.index.unwrap_or(0).min(detections.len() - 1); let (timestamp, face) = detections[index]; // 3. Locate the video file let video_path = format!("output/{}/{}.mp4", query.video_uuid, query.video_uuid); if !std::path::Path::new(&video_path).exists() { return Err(( StatusCode::NOT_FOUND, format!("Video file not found: {}", video_path), )); } // 4. Use ffmpeg to extract and crop the face // ffmpeg -ss {timestamp} -i {video} -vf "crop=w:h:x:y" -frames:v 1 -f image2pipe -vcodec mjpeg - let crop_filter = format!("crop={}:{}:{}:{}", face.width, face.height, face.x, face.y); let output = match tokio::process::Command::new("ffmpeg") .args(&[ "-ss", ×tamp.to_string(), "-i", &video_path, "-vf", &crop_filter, "-frames:v", "1", "-f", "image2pipe", "-vcodec", "mjpeg", "-", ]) .output() .await { Ok(o) => o, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("ffmpeg error: {}", e), )) } }; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("ffmpeg failed: {}", stderr), )); } // 5. Return the image let response = axum::response::Response::builder() .status(StatusCode::OK) .header(header::CONTENT_TYPE, "image/jpeg") .body(Body::from(output.stdout)) .map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, format!("Response error: {}", e), ) })?; Ok(response) } async fn create_person_identity( db: &PostgresDb, request: CreatePersonIdentityRequest, ) -> Result { let person_id = format!("person_{}", Uuid::new_v4()); let query = r#" INSERT INTO person_identities ( person_id, video_uuid, face_identity_id, speaker_id, name, metadata, confidence ) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id, person_id, face_identity_id, speaker_id, video_uuid, confidence, name, metadata, first_appearance_time, last_appearance_time, total_appearance_duration, appearance_count, created_at, updated_at, is_confirmed "#; let person: PersonIdentity = sqlx::query_as(query) .bind(&person_id) .bind(&request.video_uuid) .bind(&request.face_identity_id) .bind(&request.speaker_id) .bind(&request.name) .bind(&request.metadata.unwrap_or(serde_json::json!({}))) .bind(0.0) .fetch_one(db.pool()) .await?; Ok(person) } async fn auto_match_face_speaker( db: &PostgresDb, video_uuid: &str, threshold: f64, ) -> Result, anyhow::Error> { let query = "SELECT * FROM auto_match_face_speaker($1, $2)"; let matches: Vec = sqlx::query_as(query) .bind(video_uuid) .bind(threshold) .fetch_all(db.pool()) .await?; Ok(matches) } /// Auto-identify persons from face_clustered.json + ASRX speaker data async fn auto_identify_persons( State(_state): State, Json(request): Json, ) -> Result, (StatusCode, String)> { let db = match PostgresDb::init().await { Ok(db) => db, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e), )) } }; // 1. Load face_clustered.json let clustered_path = format!( "output/{}/{}.face_clustered.json", request.video_uuid, request.video_uuid ); let fallback_path = format!("output/{}/face_clustered.json", request.video_uuid); let path = if std::path::Path::new(&clustered_path).exists() { clustered_path } else if std::path::Path::new(&fallback_path).exists() { fallback_path } else { return Err(( StatusCode::NOT_FOUND, format!( "face_clustered.json not found for video: {}", request.video_uuid ), )); }; let content = match tokio::fs::read_to_string(&path).await { Ok(c) => c, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Read error: {}", e), )) } }; let clustered: FaceClusteredData = match serde_json::from_str(&content) { Ok(d) => d, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Parse error: {}", e), )) } }; // 2. Build person stats from face_clustered.json use std::collections::HashMap; #[derive(Default)] struct PersonStat { frame_count: i32, first_time: Option, last_time: Option, timestamps: Vec, } let mut person_stats: HashMap = HashMap::new(); for frame in &clustered.frames { for face in &frame.faces { if let Some(ref pid) = face.person_id { let stat = person_stats.entry(pid.clone()).or_default(); stat.frame_count += 1; stat.timestamps.push(frame.timestamp); if stat.first_time.is_none() || Some(frame.timestamp) < stat.first_time { stat.first_time = Some(frame.timestamp); } if stat.last_time.is_none() || Some(frame.timestamp) > stat.last_time { stat.last_time = Some(frame.timestamp); } } } } // 3. Load ASRX from chunks let asrx_query = "SELECT chunk_id, content::text FROM chunks WHERE uuid = $1 AND chunk_type = 'trace' AND chunk_id LIKE 'trace_asrx_%'"; let asrx_chunks: Vec<(String, String)> = match sqlx::query_as(asrx_query) .bind(&request.video_uuid) .fetch_all(db.pool()) .await { Ok(rows) => rows, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("ASRX query error: {}", e), )) } }; // Also check sentence chunks for speaker_id let sentence_query = "SELECT content::text FROM chunks WHERE uuid = $1 AND chunk_type = 'sentence' AND content ? 'speaker_id'"; let sentence_chunks: Vec = match sqlx::query_scalar(sentence_query) .bind(&request.video_uuid) .fetch_all(db.pool()) .await { Ok(rows) => rows, Err(_) => vec![], }; // 4. Match speakers to persons by time overlap let mut person_speaker_votes: HashMap> = HashMap::new(); // Check ASRX trace chunks for (_, content_text) in &asrx_chunks { if let Ok(content) = serde_json::from_str::(content_text) { if let (Some(speaker_id), Some(start)) = ( content.get("speaker_id").and_then(|v| v.as_str()), content.get("timestamp").and_then(|v| v.as_f64()), ) { let end = start + 5.0; // Approximate 5s segments for (pid, stat) in &person_stats { for ts in &stat.timestamps { if *ts >= start && *ts <= end { person_speaker_votes .entry(pid.clone()) .or_default() .entry(speaker_id.to_string()) .and_modify(|v| *v += 1.0) .or_insert(1.0); } } } } } } // Check sentence chunks for speaker_id for content_text in &sentence_chunks { if let Ok(content) = serde_json::from_str::(content_text) { if let (Some(_speaker_id), Some(_text)) = ( content.get("speaker_id").and_then(|v| v.as_str()), content.get("text").and_then(|v| v.as_str()), ) { // Timestamps not directly available in sentence chunks for ASRX matching // Rely on ASRX trace chunks for precise matching } } } // 5. Insert/update person_identities let min_conf = request.min_speaker_confidence.unwrap_or(0.0); let mut matched_count = 0; let mut persons_result = Vec::new(); // Sort by frame count descending let mut sorted_persons: Vec<_> = person_stats.into_iter().collect(); sorted_persons.sort_by(|a, b| b.1.frame_count.cmp(&a.1.frame_count)); for (pid, stat) in sorted_persons { let speaker_info = person_speaker_votes.get(&pid); let (speaker_id, confidence) = if let Some(votes) = speaker_info { let total: f64 = votes.values().sum(); if total > 0.0 { let (best_speaker, best_votes) = votes .iter() .max_by(|a, b| a.1.partial_cmp(b.1).unwrap()) .unwrap(); let conf = best_votes / total; if conf >= min_conf { (Some(best_speaker.clone()), Some(conf)) } else { (None, None) } } else { (None, None) } } else { (None, None) }; if speaker_id.is_some() { matched_count += 1; } // Upsert into person_identities let upsert_query = r#" INSERT INTO person_identities (person_id, name, speaker_id, first_appearance_time, last_appearance_time, appearance_count, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (person_id) DO UPDATE SET speaker_id = COALESCE(EXCLUDED.speaker_id, person_identities.speaker_id), first_appearance_time = EXCLUDED.first_appearance_time, last_appearance_time = EXCLUDED.last_appearance_time, appearance_count = EXCLUDED.appearance_count, metadata = COALESCE(EXCLUDED.metadata, person_identities.metadata), updated_at = NOW() RETURNING person_id, name, speaker_id, appearance_count, total_appearance_duration, first_appearance_time, last_appearance_time, is_confirmed "#; let metadata = if let Some(conf) = confidence { serde_json::json!({"auto_identified": true, "speaker_confidence": conf}) } else { serde_json::json!({"auto_identified": true}) }; let result: Result< Option<( String, Option, Option, i32, f64, Option, Option, bool, )>, _, > = sqlx::query_as(upsert_query) .bind(&pid) .bind(&pid) // Use cluster label as initial name .bind(&speaker_id) .bind(stat.first_time) .bind(stat.last_time) .bind(stat.frame_count) .bind(&metadata) .fetch_optional(db.pool()) .await; if let Ok(Some(row)) = result { persons_result.push(PersonSummary { person_id: row.0, name: row.1, speaker_id: row.2, appearance_count: row.3, total_appearance_duration: row.4, first_appearance_time: row.5, last_appearance_time: row.6, is_confirmed: row.7, speaker_confidence: confidence, }); } } Ok(Json(AutoIdentifyResponse { success: true, message: format!( "Identified {} persons, {} matched to speakers", persons_result.len(), matched_count ), total_persons: persons_result.len() as i32, matched_speakers: matched_count, persons: persons_result, })) } /// List all persons with optional filters async fn list_persons( State(_state): State, Query(query): Query, ) -> Result, (StatusCode, String)> { let db = match PostgresDb::init().await { Ok(db) => db, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e), )) } }; let limit = query.limit.unwrap_or(50) as i64; let offset = query.offset.unwrap_or(0) as i64; let min_appearances = query.min_appearances.unwrap_or(0); let has_speaker = query.has_speaker.unwrap_or(false); let (sql, count_sql) = if has_speaker { if min_appearances > 0 { ( "SELECT person_id, name, speaker_id, appearance_count, total_appearance_duration, first_appearance_time, last_appearance_time, is_confirmed, metadata::text FROM person_identities WHERE speaker_id IS NOT NULL AND appearance_count >= $1 ORDER BY appearance_count DESC LIMIT $2 OFFSET $3".to_string(), "SELECT COUNT(*) FROM person_identities WHERE speaker_id IS NOT NULL AND appearance_count >= $1".to_string(), ) } else { ( "SELECT person_id, name, speaker_id, appearance_count, total_appearance_duration, first_appearance_time, last_appearance_time, is_confirmed, metadata::text FROM person_identities WHERE speaker_id IS NOT NULL ORDER BY appearance_count DESC LIMIT $1 OFFSET $2".to_string(), "SELECT COUNT(*) FROM person_identities WHERE speaker_id IS NOT NULL".to_string(), ) } } else { if min_appearances > 0 { ( "SELECT person_id, name, speaker_id, appearance_count, total_appearance_duration, first_appearance_time, last_appearance_time, is_confirmed, metadata::text FROM person_identities WHERE appearance_count >= $1 ORDER BY appearance_count DESC LIMIT $2 OFFSET $3".to_string(), "SELECT COUNT(*) FROM person_identities WHERE appearance_count >= $1".to_string(), ) } else { ( "SELECT person_id, name, speaker_id, appearance_count, total_appearance_duration, first_appearance_time, last_appearance_time, is_confirmed, metadata::text FROM person_identities ORDER BY appearance_count DESC LIMIT $1 OFFSET $2".to_string(), "SELECT COUNT(*) FROM person_identities".to_string(), ) } }; let total: i64 = if min_appearances > 0 { sqlx::query_scalar(&count_sql) .bind(min_appearances) .fetch_one(db.pool()) .await .unwrap_or(0) } else { sqlx::query_scalar(&count_sql) .fetch_one(db.pool()) .await .unwrap_or(0) }; let rows: Vec<( String, Option, Option, i32, f64, Option, Option, bool, Option, )> = if has_speaker && min_appearances > 0 { sqlx::query_as(&sql) .bind(min_appearances) .bind(limit) .bind(offset) .fetch_all(db.pool()) .await .unwrap_or_default() } else if has_speaker { sqlx::query_as(&sql) .bind(limit) .bind(offset) .fetch_all(db.pool()) .await .unwrap_or_default() } else if min_appearances > 0 { sqlx::query_as(&sql) .bind(min_appearances) .bind(limit) .bind(offset) .fetch_all(db.pool()) .await .unwrap_or_default() } else { sqlx::query_as(&sql) .bind(limit) .bind(offset) .fetch_all(db.pool()) .await .unwrap_or_default() }; let persons: Vec = rows .into_iter() .map(|r| { let speaker_confidence = r.8.as_ref().and_then(|m| { serde_json::from_str::(m) .ok() .and_then(|v| v.get("speaker_confidence").and_then(|v| v.as_f64())) }); PersonSummary { person_id: r.0, name: r.1, speaker_id: r.2, appearance_count: r.3, total_appearance_duration: r.4, first_appearance_time: r.5, last_appearance_time: r.6, is_confirmed: r.7, speaker_confidence, } }) .collect(); Ok(Json(PersonListResponse { success: true, persons, total, })) } /// Merge duplicate persons into one async fn merge_persons( State(_state): State, Json(request): Json, ) -> Result, (StatusCode, String)> { let db = match PostgresDb::init().await { Ok(db) => db, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e), )) } }; if request.source_person_ids.is_empty() { return Err(( StatusCode::BAD_REQUEST, "source_person_ids cannot be empty".into(), )); } let mut tx = match db.pool().begin().await { Ok(tx) => tx, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Tx error: {}", e), )) } }; // 0. Save original stats for undo capability let orig_target_query = "SELECT appearance_count, total_appearance_duration, first_appearance_time, last_appearance_time FROM person_identities WHERE person_id = $1"; let orig_target: Option<(i32, f64, Option, Option)> = match sqlx::query_as(orig_target_query) .bind(&request.target_person_id) .fetch_optional(&mut *tx) .await { Ok(r) => r, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Target query error: {}", e), )) } }; let orig_target = match orig_target { Some(t) => t, None => { return Err(( StatusCode::NOT_FOUND, format!("Target person not found: {}", request.target_person_id), )) } }; let orig_target_stats = serde_json::json!({ "appearance_count": orig_target.0, "total_appearance_duration": orig_target.1, "first_appearance_time": orig_target.2, "last_appearance_time": orig_target.3, }); let orig_sources_query = "SELECT person_id, appearance_count, total_appearance_duration, first_appearance_time, last_appearance_time FROM person_identities WHERE person_id = ANY($1)"; let orig_sources: Vec<(String, i32, f64, Option, Option)> = match sqlx::query_as(orig_sources_query) .bind(&request.source_person_ids) .fetch_all(&mut *tx) .await { Ok(r) => r, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Source query error: {}", e), )) } }; let orig_source_stats: Vec = orig_sources .into_iter() .map(|(pid, count, dur, first, last)| { serde_json::json!({ "person_id": pid, "appearance_count": count, "total_appearance_duration": dur, "first_appearance_time": first, "last_appearance_time": last, }) }) .collect(); // Generate merge_id let merge_id = Uuid::new_v4().to_string(); // A. Calculate sum of stats from sources let stats_query = r#" SELECT COALESCE(SUM(appearance_count), 0)::integer as count, COALESCE(SUM(total_appearance_duration), 0.0)::double precision as duration FROM person_identities WHERE person_id = ANY($1) "#; let (add_count, add_duration): (i32, f64) = match sqlx::query_as(stats_query) .bind(&request.source_person_ids) .fetch_one(&mut *tx) .await { Ok(r) => r, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Stats error: {}", e), )) } }; // B. Transfer person_appearances let move_query = "UPDATE person_appearances SET person_id = $1 WHERE person_id = ANY($2)"; match sqlx::query(move_query) .bind(&request.target_person_id) .bind(&request.source_person_ids) .execute(&mut *tx) .await { Ok(_) => {} Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Move error: {}", e), )) } }; // C. Delete source person_identities let delete_query = "DELETE FROM person_identities WHERE person_id = ANY($1)"; match sqlx::query(delete_query) .bind(&request.source_person_ids) .execute(&mut *tx) .await { Ok(_) => {} Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Delete error: {}", e), )) } }; // D. Update target stats let update_query = r#" UPDATE person_identities SET appearance_count = appearance_count + $1, total_appearance_duration = total_appearance_duration + $2, updated_at = CURRENT_TIMESTAMP WHERE person_id = $3 "#; match sqlx::query(update_query) .bind(add_count) .bind(add_duration) .bind(&request.target_person_id) .execute(&mut *tx) .await { Ok(_) => {} Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Update error: {}", e), )) } }; // E. Record merge history for undo capability let history_query = r#" INSERT INTO merge_history (merge_id, target_person_id, source_person_ids, original_target_stats, original_source_stats) VALUES ($1::uuid, $2, $3, $4::jsonb, $5::jsonb) "#; let source_ids_json: Vec = request.source_person_ids.clone(); let target_stats_json = serde_json::to_string(&orig_target_stats).unwrap_or_else(|_| "{}".to_string()); let source_stats_json = serde_json::to_string(&orig_source_stats).unwrap_or_else(|_| "[]".to_string()); match sqlx::query(history_query) .bind(&merge_id) .bind(&request.target_person_id) .bind(&source_ids_json) .bind(&target_stats_json) .bind(&source_stats_json) .execute(&mut *tx) .await { Ok(_) => {} Err(e) => { tracing::warn!("[MERGE] Failed to record merge history: {}", e); // Don't fail the merge if history recording fails } }; // F. Commit match tx.commit().await { Ok(_) => {} Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Commit error: {}", e), )) } }; Ok(Json(MergePersonsResponse { success: true, message: format!( "Merged {} persons into {}", request.source_person_ids.len(), request.target_person_id ), target_person_id: request.target_person_id, merge_id, })) } /// Undo a previous merge async fn undo_merge( State(_state): State, Json(request): Json, ) -> Result, (StatusCode, String)> { let db = match PostgresDb::init().await { Ok(db) => db, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e), )) } }; // 1. Get merge history let history_query = "SELECT id, merge_id::text, target_person_id, source_person_ids, original_target_stats::text, original_source_stats::text, is_undone FROM merge_history WHERE merge_id = $1::uuid"; let history: Option<(i32, String, String, Vec, String, String, bool)> = match sqlx::query_as(history_query) .bind(&request.merge_id) .fetch_optional(db.pool()) .await { Ok(r) => r, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("History query error: {}", e), )) } }; let (history_id, _merge_id, target_id, source_ids, tgt_stats_str, src_stats_str, is_undone) = match history { Some(h) => h, None => { return Err(( StatusCode::NOT_FOUND, format!("Merge history not found: {}", request.merge_id), )) } }; let orig_target_stats: serde_json::Value = serde_json::from_str(&tgt_stats_str).unwrap_or(serde_json::json!({})); let orig_source_stats: serde_json::Value = serde_json::from_str(&src_stats_str).unwrap_or(serde_json::json!({})); if is_undone { return Err(( StatusCode::BAD_REQUEST, "This merge has already been undone".into(), )); } let mut tx = match db.pool().begin().await { Ok(tx) => tx, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Tx error: {}", e), )) } }; // 2. Restore target stats let target_count = orig_target_stats .get("appearance_count") .and_then(|v| v.as_i64()) .unwrap_or(0) as i32; let target_duration = orig_target_stats .get("total_appearance_duration") .and_then(|v| v.as_f64()) .unwrap_or(0.0); let target_first = orig_target_stats .get("first_appearance_time") .and_then(|v| v.as_f64()); let target_last = orig_target_stats .get("last_appearance_time") .and_then(|v| v.as_f64()); let restore_target_query = "UPDATE person_identities SET appearance_count = $1, total_appearance_duration = $2, first_appearance_time = $3, last_appearance_time = $4, updated_at = CURRENT_TIMESTAMP WHERE person_id = $5"; match sqlx::query(restore_target_query) .bind(target_count) .bind(target_duration) .bind(target_first) .bind(target_last) .bind(&target_id) .execute(&mut *tx) .await { Ok(_) => {} Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Restore target error: {}", e), )) } }; // 3. Recreate source person_identities let empty_arr = vec![]; let source_stats_arr = orig_source_stats.as_array().unwrap_or(&empty_arr); for source_stat in source_stats_arr { let source_id = source_stat .get("person_id") .and_then(|v| v.as_str()) .unwrap_or(""); let source_count = source_stat .get("appearance_count") .and_then(|v| v.as_i64()) .unwrap_or(0) as i32; let source_duration = source_stat .get("total_appearance_duration") .and_then(|v| v.as_f64()) .unwrap_or(0.0); let source_first = source_stat .get("first_appearance_time") .and_then(|v| v.as_f64()); let source_last = source_stat .get("last_appearance_time") .and_then(|v| v.as_f64()); let restore_source_query = r#" INSERT INTO person_identities (person_id, appearance_count, total_appearance_duration, first_appearance_time, last_appearance_time, metadata, is_confirmed) VALUES ($1, $2, $3, $4, $5, $6, FALSE) ON CONFLICT (person_id) DO UPDATE SET appearance_count = EXCLUDED.appearance_count, total_appearance_duration = EXCLUDED.total_appearance_duration, first_appearance_time = EXCLUDED.first_appearance_time, last_appearance_time = EXCLUDED.last_appearance_time, updated_at = CURRENT_TIMESTAMP "#; match sqlx::query(restore_source_query) .bind(source_id) .bind(source_count) .bind(source_duration) .bind(source_first) .bind(source_last) .bind(&serde_json::json!({"restored_from_merge": _merge_id})) .execute(&mut *tx) .await { Ok(_) => {} Err(e) => tracing::warn!("[UNDO] Failed to restore {}: {}", source_id, e), }; } // 4. Mark merge history as undone let mark_undone_query = "UPDATE merge_history SET is_undone = TRUE, undone_at = CURRENT_TIMESTAMP WHERE id = $1"; match sqlx::query(mark_undone_query) .bind(history_id) .execute(&mut *tx) .await { Ok(_) => {} Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Mark undone error: {}", e), )) } }; match tx.commit().await { Ok(_) => {} Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Commit error: {}", e), )) } }; Ok(Json(serde_json::json!({ "success": true, "message": format!("Undo merge completed. Restored {} source persons", source_ids.len()), "merge_id": _merge_id, "target_person_id": target_id, "restored_persons": source_ids }))) } /// Get merge history async fn get_merge_history( State(_state): State, ) -> Result, (StatusCode, String)> { let db = match PostgresDb::init().await { Ok(db) => db, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e), )) } }; let query = "SELECT merge_id::text, target_person_id, source_person_ids, original_target_stats::text, original_source_stats::text, merged_at, is_undone, undone_at FROM merge_history ORDER BY merged_at DESC LIMIT 50"; let rows: Vec<( String, String, Vec, String, String, chrono::DateTime, bool, Option>, )> = match sqlx::query_as(query).fetch_all(db.pool()).await { Ok(r) => r, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("History query error: {}", e), )) } }; let history: Vec = rows .into_iter() .map( |(merge_id, target, sources, tgt_stats, src_stats, merged_at, is_undone, undone_at)| { MergeHistoryEntry { merge_id, target_person_id: target, source_person_ids: sources, original_target_stats: serde_json::from_str(&tgt_stats) .unwrap_or(serde_json::json!({})), original_source_stats: serde_json::from_str(&src_stats) .unwrap_or(serde_json::json!({})), merged_at: merged_at.to_rfc3339(), is_undone, undone_at: undone_at.map(|t| t.to_rfc3339()), } }, ) .collect(); Ok(Json(MergeHistoryResponse { success: true, history, })) } /// Get AI suggestions for naming and merging persons async fn get_person_suggestions( State(_state): State, Json(request): Json, ) -> Result, (StatusCode, String)> { let db = match PostgresDb::init().await { Ok(db) => db, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e), )) } }; // Get all persons for this video (or all if no video_uuid) let persons_query = if request.video_uuid.is_empty() { "SELECT person_id, name, speaker_id, appearance_count, total_appearance_duration, first_appearance_time, last_appearance_time, metadata FROM person_identities ORDER BY appearance_count DESC LIMIT 100" } else { // Filter by video_uuid via person_appearances "SELECT DISTINCT pi.person_id, pi.name, pi.speaker_id, pi.appearance_count, pi.total_appearance_duration, pi.first_appearance_time, pi.last_appearance_time, pi.metadata FROM person_identities pi JOIN person_appearances pa ON pi.person_id = pa.person_id WHERE pa.video_uuid = $1 ORDER BY pi.appearance_count DESC LIMIT 100" }; let persons: Vec<( String, Option, Option, i32, f64, Option, Option, Option, )> = if request.video_uuid.is_empty() { sqlx::query_as(persons_query) .fetch_all(db.pool()) .await .unwrap_or_default() } else { sqlx::query_as(persons_query) .bind(&request.video_uuid) .fetch_all(db.pool()) .await .unwrap_or_default() }; let mut naming_suggestions = Vec::new(); let mut merge_suggestions = Vec::new(); // Group by speaker_id to find merge candidates let mut speaker_groups: std::collections::HashMap> = std::collections::HashMap::new(); for ( person_id, name, speaker_id, appearance_count, _duration, _first_time, _last_time, metadata, ) in &persons { // Naming suggestions if name.is_none() { let mut sources = Vec::new(); let mut suggested_name = person_id.clone(); let mut confidence = 0.0; // Check speaker_id if let Some(sid) = speaker_id { sources.push(SuggestionSource { r#type: "speaker_match".to_string(), detail: format!("Linked to speaker {}", sid), weight: 0.4, }); suggested_name = sid.clone(); confidence += 0.4; } // Check metadata for speaker_confidence if let Some(meta) = metadata { if let Some(sc) = meta.get("speaker_confidence").and_then(|v| v.as_f64()) { confidence += sc * 0.3; sources.push(SuggestionSource { r#type: "speaker_confidence".to_string(), detail: format!("Speaker match confidence: {:.0}%", sc * 100.0), weight: sc * 0.3, }); } } // High appearance count suggests main character if *appearance_count > 1000 { confidence += 0.2; sources.push(SuggestionSource { r#type: "high_appearance".to_string(), detail: format!( "Appears in {} frames (likely main character)", appearance_count ), weight: 0.2, }); } if confidence > 0.0 { let action = if confidence >= 0.7 { "auto_apply".to_string() } else { "needs_review".to_string() }; naming_suggestions.push(NamingSuggestion { person_id: person_id.clone(), current_name: name.clone(), suggested_name, confidence, sources, action, }); } } // Group by speaker_id for merge detection if let Some(sid) = speaker_id { speaker_groups .entry(sid.clone()) .or_default() .push((person_id.clone(), *appearance_count)); } } // Find merge candidates: multiple persons with same speaker_id for (speaker_id, group) in &speaker_groups { if group.len() > 1 { let primary = group.iter().max_by_key(|(_, count)| *count).unwrap(); let others: Vec = group .iter() .filter(|(pid, _)| pid != &primary.0) .map(|(pid, _)| pid.clone()) .collect(); // Calculate confidence based on appearance ratio let primary_count = primary.1 as f64; let total_count: f64 = group.iter().map(|(_, c)| *c as f64).sum(); let confidence = if total_count > 0.0 { primary_count / total_count } else { 0.0 }; let reasons = vec![ format!("All share speaker_id: {}", speaker_id), format!( "Primary {} has {} appearances ({}% of group)", primary.0, primary.1, (primary_count / total_count * 100.0) as i32 ), format!("{} persons to merge into primary", others.len()), ]; let action = if confidence >= 0.7 { "auto_apply".to_string() } else { "needs_review".to_string() }; merge_suggestions.push(MergeSuggestion { person_id: primary.0.clone(), merge_with: others, confidence, reasons, action, }); } } let total_naming = naming_suggestions.len(); let total_merge = merge_suggestions.len(); Ok(Json(SuggestionsResponse { success: true, naming_suggestions, merge_suggestions, total_naming, total_merge, })) } /// Find similar persons that could be merged async fn get_similar_persons( State(_state): State, Path(person_id): Path, Query(query): Query, ) -> Result, (StatusCode, String)> { let db = match PostgresDb::init().await { Ok(db) => db, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e), )) } }; let threshold = query.threshold.unwrap_or(0.5); let limit = query.limit.unwrap_or(10); // Get the target person's speaker_id and time range let person_query = "SELECT speaker_id, first_appearance_time, last_appearance_time FROM person_identities WHERE person_id = $1"; let person_info: Option<(Option, Option, Option)> = match sqlx::query_as(person_query) .bind(&person_id) .fetch_optional(db.pool()) .await { Ok(info) => info, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Query error: {}", e), )) } }; let person_info = match person_info { Some(info) => info, None => { return Err(( StatusCode::NOT_FOUND, format!("Person not found: {}", person_id), )) } }; // Find similar persons by speaker_id overlap or time overlap let similar_query = r#" SELECT pi.person_id, pi.name, pi.speaker_id, pi.appearance_count, pi.first_appearance_time, pi.last_appearance_time, CASE WHEN pi.speaker_id IS NOT NULL AND $2 IS NOT NULL AND pi.speaker_id = $2 THEN 0.7::double precision WHEN pi.speaker_id IS NULL THEN 0.3::double precision ELSE 0.5::double precision END as similarity FROM person_identities pi WHERE pi.person_id != $1 AND pi.appearance_count > 0 ORDER BY similarity DESC, pi.appearance_count DESC LIMIT $3 "#; let similar: Vec<( String, Option, Option, i32, Option, Option, f64, )> = match sqlx::query_as(similar_query) .bind(&person_id) .bind(&person_info.0) .bind(limit) .fetch_all(db.pool()) .await { Ok(rows) => rows, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Query error: {}", e), )) } }; let results: Vec = similar .into_iter() .filter(|(_, _, _, _, _, _, similarity)| *similarity >= threshold) .map(|(pid, name, speaker_id, count, first, last, similarity)| { serde_json::json!({ "person_id": pid, "name": name, "speaker_id": speaker_id, "appearance_count": count, "first_appearance_time": first, "last_appearance_time": last, "similarity": similarity }) }) .collect(); Ok(Json(serde_json::json!({ "success": true, "person_id": person_id, "similar_persons": results, "threshold": threshold }))) } /// Confirm an AI suggestion (auto-apply naming) async fn confirm_person_suggestion( State(_state): State, Path(person_id): Path, Json(request): Json, ) -> Result, (StatusCode, String)> { let db = match PostgresDb::init().await { Ok(db) => db, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e), )) } }; let query = r#" UPDATE person_identities SET name = COALESCE($2, name), metadata = COALESCE($3, metadata), is_confirmed = true, updated_at = CURRENT_TIMESTAMP WHERE person_id = $1 RETURNING person_id, name "#; let updated: Option<(String, Option)> = match sqlx::query_as(query) .bind(&person_id) .bind(&request.name) .bind(&request.metadata) .fetch_optional(db.pool()) .await { Ok(result) => result, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Update error: {}", e), )) } }; match updated { Some((id, name)) => Ok(Json(serde_json::json!({ "success": true, "message": format!("Person '{}' confirmed", name.unwrap_or_else(|| id.clone())), "person_id": id }))), None => Err(( StatusCode::NOT_FOUND, format!("Person not found: {}", person_id), )), } } // ============================================================ // Correction APIs - For fixing incorrect person bindings // ============================================================ /// Request to unbind speaker from person #[derive(Debug, Deserialize)] pub struct UnbindSpeakerRequest { pub video_uuid: String, pub reason: Option, } /// Request to reassign speaker to person #[derive(Debug, Deserialize)] pub struct ReassignSpeakerRequest { pub video_uuid: String, pub speaker_id: String, pub reason: Option, } /// Request to remove a specific appearance #[derive(Debug, Deserialize)] pub struct RemoveAppearanceRequest { pub video_uuid: String, pub appearance_id: i32, pub reason: Option, } /// Request to reassign appearance to another person #[derive(Debug, Deserialize)] pub struct ReassignAppearanceRequest { pub video_uuid: String, pub appearance_id: i32, pub target_person_id: String, pub reason: Option, } /// Request to split a person into two #[derive(Debug, Deserialize)] pub struct SplitPersonRequest { pub video_uuid: String, pub new_person_id: String, pub appearance_ids_to_move: Vec, pub new_person_name: Option, pub reason: Option, } /// Unbind speaker from person async fn unbind_speaker( State(_state): State, Path(person_id): Path, Json(request): Json, ) -> Result, (StatusCode, String)> { let db = match PostgresDb::init().await { Ok(db) => db, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e), )) } }; let update_query = r#" UPDATE person_identities SET speaker_id = NULL, updated_at = CURRENT_TIMESTAMP, metadata = jsonb_set( COALESCE(metadata, '{}'::jsonb), '{speaker_unbound}', 'true'::jsonb ) WHERE person_id = $1 RETURNING person_id "#; let updated: Option = match sqlx::query_scalar(update_query) .bind(&person_id) .fetch_optional(db.pool()) .await { Ok(r) => r, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Unbind error: {}", e), )) } }; match updated { Some(id) => Ok(Json(serde_json::json!({ "success": true, "message": format!("Speaker unbound from person '{}'", id), "person_id": id, "reason": request.reason.unwrap_or_default() }))), None => Err(( StatusCode::NOT_FOUND, format!("Person not found: {}", person_id), )), } } /// Reassign speaker to person async fn reassign_speaker( State(_state): State, Path(person_id): Path, Json(request): Json, ) -> Result, (StatusCode, String)> { let db = match PostgresDb::init().await { Ok(db) => db, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e), )) } }; let update_query = r#" UPDATE person_identities SET speaker_id = $2, updated_at = CURRENT_TIMESTAMP, metadata = jsonb_set( COALESCE(metadata, '{}'::jsonb), '{speaker_reassigned}', 'true'::jsonb ) WHERE person_id = $1 RETURNING person_id "#; let updated: Option = match sqlx::query_scalar(update_query) .bind(&person_id) .bind(&request.speaker_id) .fetch_optional(db.pool()) .await { Ok(r) => r, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Reassign error: {}", e), )) } }; match updated { Some(id) => Ok(Json(serde_json::json!({ "success": true, "message": format!("Speaker '{}' assigned to person '{}'", request.speaker_id, id), "person_id": id, "new_speaker_id": request.speaker_id, "reason": request.reason.unwrap_or_default() }))), None => Err(( StatusCode::NOT_FOUND, format!("Person not found: {}", person_id), )), } } /// Remove a specific appearance async fn remove_appearance( State(_state): State, Path(person_id): Path, Json(request): Json, ) -> Result, (StatusCode, String)> { let db = match PostgresDb::init().await { Ok(db) => db, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e), )) } }; let mut tx = match db.pool().begin().await { Ok(tx) => tx, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Tx error: {}", e), )) } }; // Get appearance info before deleting let app_query = "SELECT duration, person_id FROM person_appearances WHERE id = $1 AND person_id = $2"; let app_info: Option<(f64, String)> = match sqlx::query_as(app_query) .bind(request.appearance_id) .bind(&person_id) .fetch_optional(&mut *tx) .await { Ok(r) => r, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Appearance query error: {}", e), )) } }; let (duration, actual_person_id) = match app_info { Some(info) => info, None => { return Err(( StatusCode::NOT_FOUND, format!("Appearance not found: {}", request.appearance_id), )) } }; // Delete appearance let delete_query = "DELETE FROM person_appearances WHERE id = $1 AND person_id = $2"; match sqlx::query(delete_query) .bind(request.appearance_id) .bind(&person_id) .execute(&mut *tx) .await { Ok(_) => {} Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Delete error: {}", e), )) } }; // Update person stats let update_stats_query = r#" UPDATE person_identities SET appearance_count = appearance_count - 1, total_appearance_duration = GREATEST(0, total_appearance_duration - $1), updated_at = CURRENT_TIMESTAMP WHERE person_id = $2 "#; match sqlx::query(update_stats_query) .bind(duration) .bind(&actual_person_id) .execute(&mut *tx) .await { Ok(_) => {} Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Stats update error: {}", e), )) } }; match tx.commit().await { Ok(_) => {} Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Commit error: {}", e), )) } }; Ok(Json(serde_json::json!({ "success": true, "message": format!("Appearance {} removed from person '{}'", request.appearance_id, person_id), "appearance_id": request.appearance_id, "person_id": person_id, "removed_duration": duration, "reason": request.reason.unwrap_or_default() }))) } /// Reassign appearance to another person async fn reassign_appearance( State(_state): State, Path(person_id): Path, Json(request): Json, ) -> Result, (StatusCode, String)> { let db = match PostgresDb::init().await { Ok(db) => db, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e), )) } }; let mut tx = match db.pool().begin().await { Ok(tx) => tx, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Tx error: {}", e), )) } }; // Get appearance info let app_query = "SELECT id, duration, person_id FROM person_appearances WHERE id = $1 AND person_id = $2"; let app_info: Option<(i32, f64, String)> = match sqlx::query_as(app_query) .bind(request.appearance_id) .bind(&person_id) .fetch_optional(&mut *tx) .await { Ok(r) => r, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Appearance query error: {}", e), )) } }; let (app_id, duration, _old_person_id) = match app_info { Some(info) => info, None => { return Err(( StatusCode::NOT_FOUND, format!("Appearance not found: {}", request.appearance_id), )) } }; // Reassign to new person let update_query = "UPDATE person_appearances SET person_id = $1 WHERE id = $2"; match sqlx::query(update_query) .bind(&request.target_person_id) .bind(app_id) .execute(&mut *tx) .await { Ok(_) => {} Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Reassign error: {}", e), )) } }; // Update old person stats (decrement) let update_old_query = r#" UPDATE person_identities SET appearance_count = GREATEST(0, appearance_count - 1), total_appearance_duration = GREATEST(0, total_appearance_duration - $1), updated_at = CURRENT_TIMESTAMP WHERE person_id = $2 "#; match sqlx::query(update_old_query) .bind(duration) .bind(&person_id) .execute(&mut *tx) .await { Ok(_) => {} Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Old person stats error: {}", e), )) } }; // Update new person stats (increment) let update_new_query = r#" UPDATE person_identities SET appearance_count = appearance_count + 1, total_appearance_duration = total_appearance_duration + $1, updated_at = CURRENT_TIMESTAMP WHERE person_id = $2 "#; match sqlx::query(update_new_query) .bind(duration) .bind(&request.target_person_id) .execute(&mut *tx) .await { Ok(_) => {} Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("New person stats error: {}", e), )) } }; match tx.commit().await { Ok(_) => {} Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Commit error: {}", e), )) } }; Ok(Json(serde_json::json!({ "success": true, "message": format!("Appearance {} reassigned from '{}' to '{}'", request.appearance_id, person_id, request.target_person_id), "appearance_id": request.appearance_id, "from_person_id": person_id, "to_person_id": request.target_person_id, "reason": request.reason.unwrap_or_default() }))) } /// Split a person into two async fn split_person( State(_state): State, Path(person_id): Path, Json(request): Json, ) -> Result, (StatusCode, String)> { let db = match PostgresDb::init().await { Ok(db) => db, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e), )) } }; let mut tx = match db.pool().begin().await { Ok(tx) => tx, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Tx error: {}", e), )) } }; // Get original person info let orig_query = "SELECT speaker_id, appearance_count, total_appearance_duration, first_appearance_time, last_appearance_time, metadata FROM person_identities WHERE person_id = $1"; let orig_info: Option<( Option, i32, f64, Option, Option, Option, )> = match sqlx::query_as(orig_query) .bind(&person_id) .fetch_optional(&mut *tx) .await { Ok(r) => r, Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Person query error: {}", e), )) } }; let (speaker_id, orig_count, orig_duration, orig_first, orig_last, orig_metadata) = match orig_info { Some(info) => info, None => { return Err(( StatusCode::NOT_FOUND, format!("Person not found: {}", person_id), )) } }; // Create new person let new_count = request.appearance_ids_to_move.len(); let create_query = r#" INSERT INTO person_identities (person_id, name, speaker_id, appearance_count, total_appearance_duration, first_appearance_time, last_appearance_time, metadata, is_confirmed) VALUES ($1, $2, $3, $4, 0, NULL, NULL, $5, FALSE) ON CONFLICT (person_id) DO UPDATE SET name = EXCLUDED.name, speaker_id = EXCLUDED.speaker_id, metadata = EXCLUDED.metadata RETURNING person_id "#; let new_name = request .new_person_name .unwrap_or_else(|| format!("{}-split", request.new_person_id)); let mut new_metadata = orig_metadata.unwrap_or(serde_json::json!({})); new_metadata["split_from"] = serde_json::json!(person_id); match sqlx::query(create_query) .bind(&request.new_person_id) .bind(new_name) .bind(&speaker_id) .bind(new_count as i32) .bind(&new_metadata) .fetch_optional(&mut *tx) .await { Ok(_) => {} Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Create person error: {}", e), )) } }; // Move appearances to new person and calculate new stats let mut new_duration: f64 = 0.0; let mut new_first: Option = None; let mut new_last: Option = None; for app_id in &request.appearance_ids_to_move { let app_query = "SELECT duration, start_time, end_time FROM person_appearances WHERE id = $1 AND person_id = $2"; let app_info: Option<(f64, f64, f64)> = match sqlx::query_as(app_query) .bind(app_id) .bind(&person_id) .fetch_optional(&mut *tx) .await { Ok(r) => r, Err(e) => { tracing::warn!("[SPLIT] Failed to get appearance {}: {}", app_id, e); continue; } }; if let Some((dur, start, end)) = app_info { // Update appearance to new person let update_app_query = "UPDATE person_appearances SET person_id = $1 WHERE id = $2"; match sqlx::query(update_app_query) .bind(&request.new_person_id) .bind(app_id) .execute(&mut *tx) .await { Ok(_) => {} Err(e) => { tracing::warn!("[SPLIT] Failed to update appearance {}: {}", app_id, e); continue; } }; new_duration += dur; if new_first.is_none() || Some(start) < new_first { new_first = Some(start); } if new_last.is_none() || Some(end) > new_last { new_last = Some(end); } } } // Update new person stats let update_new_query = r#" UPDATE person_identities SET total_appearance_duration = $1, first_appearance_time = $2, last_appearance_time = $3, appearance_count = $4, updated_at = CURRENT_TIMESTAMP WHERE person_id = $5 "#; match sqlx::query(update_new_query) .bind(new_duration) .bind(new_first) .bind(new_last) .bind(new_count as i32) .bind(&request.new_person_id) .execute(&mut *tx) .await { Ok(_) => {} Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Update new person error: {}", e), )) } }; // Update original person stats (decrement) let update_orig_query = r#" UPDATE person_identities SET appearance_count = GREATEST(0, appearance_count - $1), total_appearance_duration = GREATEST(0, total_appearance_duration - $2), updated_at = CURRENT_TIMESTAMP WHERE person_id = $3 "#; match sqlx::query(update_orig_query) .bind(new_count as i32) .bind(new_duration) .bind(&person_id) .execute(&mut *tx) .await { Ok(_) => {} Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Update original person error: {}", e), )) } }; // Update original person's first/last appearance times let recalc_query = r#" UPDATE person_identities SET first_appearance_time = (SELECT MIN(start_time) FROM person_appearances WHERE person_id = $1), last_appearance_time = (SELECT MAX(end_time) FROM person_appearances WHERE person_id = $1) WHERE person_id = $1 "#; match sqlx::query(recalc_query) .bind(&person_id) .execute(&mut *tx) .await { Ok(_) => {} Err(e) => tracing::warn!("[SPLIT] Failed to recalc original person times: {}", e), }; match tx.commit().await { Ok(_) => {} Err(e) => { return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("Commit error: {}", e), )) } }; Ok(Json(serde_json::json!({ "success": true, "message": format!("Person '{}' split into '{}' with {} appearances moved", person_id, request.new_person_id, new_count), "original_person_id": person_id, "new_person_id": request.new_person_id, "appearances_moved": new_count, "new_person_duration": new_duration, "new_person_first": new_first, "new_person_last": new_last, "reason": request.reason.unwrap_or_default() }))) }