Files
momentry_core/src/api/identity_binding.rs.bak
Accusys 2cfcfdd1af feat: Phase 2.6 edges migration to Qdrant (TKG-only architecture)
Phase 2.6.1: co_occurrence_edges migration
- build_co_occurrence_edges_from_qdrant()
- Qdrant embeddings → frame grouping → YOLO objects
- Result: 6679 edges (vs 6701 PostgreSQL)

Phase 2.6.2: face_face_edges migration
- build_face_face_edges_from_qdrant()
- Qdrant embeddings → frame grouping → face pairs
- mutual_gaze detection preserved
- Result: 6 edges (exact match)

Phase 2.6.3: speaker_face_edges migration
- build_speaker_face_edges_from_qdrant()
- Qdrant embeddings → trace_id frame ranges
- SPEAKS_AS edge creation

Architecture:
- All edges use Qdrant payload (no face_detections queries)
- PostgreSQL fallback for empty Qdrant
- Estimated 3.6x performance improvement

Testing:
- Playground (3003): ✓ All Phase 2.6 logs verified
- Edge counts: ✓ Close match with PostgreSQL
- Fallback: ✓ Working

Docs:
- docs_v1.0/DESIGN/TKG_PHASE2_6_EDGES_MIGRATION.md
- docs_v1.0/M4_workspace/2026-06-21_phase2_6_test.md
2026-06-21 04:47:49 +08:00

1841 lines
60 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use axum::{
extract::{Extension, Path, Query, State},
http::StatusCode,
response::Json,
routing::{get, post},
Router,
};
use serde::{Deserialize, Serialize};
use sqlx::Row;
use crate::core::db::{Database, PostgresDb};
use crate::core::person_identity::{
BindIdentityRequest, BindIdentityTraceRequest, Identity, MergeIdentitiesRequest,
UnbindIdentityRequest,
};
#[derive(Debug, Clone, Serialize)]
pub struct ApiResponse<T: Serialize> {
pub success: bool,
pub message: String,
pub data: Option<T>,
}
// ============================================================================
// API Handlers
// ============================================================================
async fn get_db() -> Result<PostgresDb, (StatusCode, Json<serde_json::Value>)> {
PostgresDb::init().await.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": format!("DB init failed: {}", e) })),
)
})
}
/// 獲取 Identity (人物) 列表
pub async fn list_identities(
Query(params): Query<ListIdentitiesParams>,
) -> Result<Json<ApiResponse<Vec<Identity>>>, (StatusCode, Json<serde_json::Value>)> {
let db = get_db().await?;
let limit = params.limit.unwrap_or(100);
let offset = params.offset.unwrap_or(0);
let search = params.search.unwrap_or_default();
let identities = db
.list_identities(&search, limit, offset)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": e.to_string() })),
)
})?;
Ok(Json(ApiResponse {
success: true,
message: format!("Found {} identities", identities.len()),
data: Some(identities),
}))
}
/// V4.0 直接綁定face_detections.identity_id = identities.id
pub async fn bind_identity(
State(state): State<crate::api::types::AppState>,
Extension(auth): Extension<crate::api::middleware::UserAuth>,
Path(identity_uuid): Path<String>,
Json(req): Json<BindIdentityRequest>,
) -> Result<Json<ApiResponse<serde_json::Value>>, (StatusCode, Json<serde_json::Value>)> {
let table = crate::core::db::schema::table_name("face_detections");
let id_table = crate::core::db::schema::table_name("identities");
let history_table = crate::core::db::schema::table_name("identity_history");
let uuid_clean = identity_uuid.replace('-', "");
let identity_row: Option<(i32, String)> = sqlx::query_as(&format!(
"SELECT id, name FROM {} WHERE REPLACE(uuid::text, '-', '') = $1",
id_table
))
.bind(&uuid_clean)
.fetch_optional(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?;
let (identity_id, name) = identity_row.ok_or_else(|| {
(
StatusCode::NOT_FOUND,
Json(serde_json::json!({"error": format!("Identity not found: {}", identity_uuid)})),
)
})?;
// Capture old identity_id before bind
let old_identity_id: Option<i32> = sqlx::query_scalar(&format!(
"SELECT identity_id FROM {} WHERE file_uuid = $1 AND face_id = $2",
table
))
.bind(&req.file_uuid)
.bind(&req.face_id)
.fetch_optional(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?
.flatten();
// Direct UPDATE face_detections.identity_id
let result = sqlx::query(&format!(
"UPDATE {} SET identity_id = $1 WHERE file_uuid = $2 AND face_id = $3",
table
))
.bind(identity_id)
.bind(&req.file_uuid)
.bind(&req.face_id)
.execute(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?;
// Clear bind redo stack
let _ = sqlx::query(&format!(
"DELETE FROM {} WHERE identity_id = $1 AND is_undone = true AND operation IN ('bind','unbind','bind_trace')",
history_table
))
.bind(identity_id)
.execute(state.db.pool())
.await;
// Insert history record
let uid = auth.user_id.to_string();
let usrc = match auth.source {
crate::api::middleware::AuthSource::Jwt => "jwt",
crate::api::middleware::AuthSource::Session => "session",
crate::api::middleware::AuthSource::ApiKey => "api_key",
};
let before = serde_json::json!({
"file_uuid": req.file_uuid, "face_id": req.face_id, "identity_id_before": old_identity_id
});
let after = serde_json::json!({
"file_uuid": req.file_uuid, "face_id": req.face_id, "identity_id_after": identity_id
});
let _ = sqlx::query(&format!(
"INSERT INTO {} (identity_id, operation, before_snapshot, after_snapshot, is_undone, user_id, user_source) VALUES ($1, 'bind', $2, $3, false, $4, $5)",
history_table
))
.bind(identity_id)
.bind(before)
.bind(after)
.bind(&uid)
.bind(usrc)
.execute(state.db.pool())
.await;
// Sync identity JSON file
if let Err(e) =
crate::core::identity::storage::save_identity_file_by_pool(state.db.pool(), &uuid_clean)
.await
{
tracing::warn!(
"[bind] Failed to sync identity file for {}: {}",
uuid_clean,
e
);
}
Ok(Json(ApiResponse {
success: true,
message: format!(
"Bound face {} of {} to {}",
req.face_id, req.file_uuid, name
),
data: Some(serde_json::json!({"rows_affected": result.rows_affected()})),
}))
}
/// V4.0 直接解綁SET face_detections.identity_id = NULL
pub async fn unbind_identity(
State(state): State<crate::api::types::AppState>,
Extension(auth): Extension<crate::api::middleware::UserAuth>,
Json(req): Json<UnbindIdentityRequest>,
) -> Result<Json<ApiResponse<serde_json::Value>>, (StatusCode, Json<serde_json::Value>)> {
let table = crate::core::db::schema::table_name("face_detections");
let id_table = crate::core::db::schema::table_name("identities");
let history_table = crate::core::db::schema::table_name("identity_history");
// Capture old identity_id before unbind
let old_identity_id: Option<i32> = sqlx::query_scalar(&format!(
"SELECT identity_id FROM {} WHERE file_uuid = $1 AND face_id = $2",
table
))
.bind(&req.file_uuid)
.bind(&req.face_id)
.fetch_optional(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?
.flatten();
let result = sqlx::query(&format!(
"UPDATE {} SET identity_id = NULL WHERE file_uuid = $1 AND face_id = $2",
table
))
.bind(&req.file_uuid)
.bind(&req.face_id)
.execute(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?;
// Record history if there was a binding
if let Some(identity_id) = old_identity_id {
// Clear bind redo stack
let _ = sqlx::query(&format!(
"DELETE FROM {} WHERE identity_id = $1 AND is_undone = true AND operation IN ('bind','unbind','bind_trace')",
history_table
))
.bind(identity_id)
.execute(state.db.pool())
.await;
// Insert history record
let uid = auth.user_id.to_string();
let usrc = match auth.source {
crate::api::middleware::AuthSource::Jwt => "jwt",
crate::api::middleware::AuthSource::Session => "session",
crate::api::middleware::AuthSource::ApiKey => "api_key",
};
let before = serde_json::json!({
"file_uuid": req.file_uuid, "face_id": req.face_id, "identity_id_before": old_identity_id
});
let after = serde_json::json!({
"file_uuid": req.file_uuid, "face_id": req.face_id, "identity_id_after": null
});
let _ = sqlx::query(&format!(
"INSERT INTO {} (identity_id, operation, before_snapshot, after_snapshot, is_undone, user_id, user_source) VALUES ($1, 'unbind', $2, $3, false, $4, $5)",
history_table
))
.bind(identity_id)
.bind(before)
.bind(after)
.bind(&uid)
.bind(usrc)
.execute(state.db.pool())
.await;
// Sync the identity JSON
let uuid: Option<String> = sqlx::query_scalar(&format!(
"SELECT uuid::text FROM {} WHERE id = $1",
id_table
))
.bind(identity_id)
.fetch_optional(state.db.pool())
.await
.ok()
.flatten();
if let Some(identity_uuid) = uuid {
if let Err(e) = crate::core::identity::storage::save_identity_file_by_pool(
state.db.pool(),
&identity_uuid,
)
.await
{
tracing::warn!(
"[unbind] Failed to sync identity file for {}: {}",
identity_uuid,
e
);
}
}
}
Ok(Json(ApiResponse {
success: true,
message: format!("Unbound face {} from {}", req.face_id, req.file_uuid),
data: Some(serde_json::json!({"rows_affected": result.rows_affected()})),
}))
}
fn deep_merge_metadata_fields(
from: &serde_json::Map<String, serde_json::Value>,
into: &mut serde_json::Map<String, serde_json::Value>,
added: &mut Vec<String>,
prefix: &str,
) {
for (key, value) in from.iter() {
if key == "aliases" {
continue;
}
let path = if prefix.is_empty() {
key.clone()
} else {
format!("{}.{}", prefix, key)
};
if !into.contains_key(key) {
into.insert(key.clone(), value.clone());
added.push(path);
} else if let (Some(from_obj), Some(into_obj)) = (value.as_object(), into[key].as_object())
{
let mut merged = into_obj.clone();
deep_merge_metadata_fields(from_obj, &mut merged, added, &path);
into.insert(key.clone(), serde_json::Value::Object(merged));
}
}
}
fn remove_nested_meta_field(value: &mut serde_json::Value, parts: &[&str]) {
if parts.is_empty() {
return;
}
if parts.len() == 1 {
if let Some(obj) = value.as_object_mut() {
obj.remove(parts[0]);
}
return;
}
if let Some(child) = value.get_mut(parts[0]) {
remove_nested_meta_field(child, &parts[1..]);
if let Some(obj) = value.get(parts[0]).and_then(|v| v.as_object()) {
if obj.is_empty() {
if let Some(parent_obj) = value.as_object_mut() {
parent_obj.remove(parts[0]);
}
}
}
}
}
/// V4.0 合併:將 identity A 合併入 identity BA 被刪除或標記為 merged
pub async fn merge_identities(
Extension(auth): Extension<crate::api::middleware::UserAuth>,
Path(identity_uuid): Path<String>,
Json(req): Json<MergeIdentitiesRequest>,
) -> Result<Json<ApiResponse<serde_json::Value>>, (StatusCode, Json<serde_json::Value>)> {
let face_table = crate::core::db::schema::table_name("face_detections");
let id_table = crate::core::db::schema::table_name("identities");
let db = sqlx::PgPool::connect(&crate::core::config::DATABASE_URL)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?;
let uuid_clean = identity_uuid.replace('-', "");
let into_uuid_clean = req.into_uuid.replace('-', "");
// Get full source identity snapshot
let source_row: Option<(
i32,
String,
Option<String>,
Option<String>,
String,
Option<i32>,
Option<String>,
Option<serde_json::Value>,
Option<chrono::NaiveDateTime>,
)> = sqlx::query_as(&format!(
"SELECT id, name, identity_type, source, status, tmdb_id, tmdb_profile, metadata, created_at FROM {} WHERE REPLACE(uuid::text, '-', '') = $1",
id_table
))
.bind(&uuid_clean)
.fetch_optional(&db)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?;
let (
from_id,
from_name,
from_type,
from_source,
from_status,
from_tmdb_id,
from_tmdb_profile,
from_metadata,
from_created_at,
) = source_row.ok_or((
StatusCode::NOT_FOUND,
Json(serde_json::json!({"error": "Source identity not found"})),
))?;
// Get full target identity snapshot
let target_row: Option<(i32, String, Option<serde_json::Value>)> = sqlx::query_as(&format!(
"SELECT id, name, metadata FROM {} WHERE REPLACE(uuid::text, '-', '') = $1",
id_table
))
.bind(&into_uuid_clean)
.fetch_optional(&db)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?;
let (into_id, into_name, into_metadata) = target_row.ok_or((
StatusCode::NOT_FOUND,
Json(serde_json::json!({"error": "Target identity not found"})),
))?;
// Get face_ids that will be transferred
let face_ids: Vec<(Option<String>, Option<i32>)> = sqlx::query_as(&format!(
"SELECT face_id, trace_id FROM {} WHERE identity_id = $1",
face_table
))
.bind(from_id)
.fetch_all(&db)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?;
let face_id_list: Vec<String> = face_ids.iter().filter_map(|(f, _)| f.clone()).collect();
let trace_id_list: Vec<i32> = face_ids.iter().filter_map(|(_, t)| *t).collect();
let faces_count = face_id_list.len() as i64;
// Get file_uuid for these faces
let file_uuid: Option<String> = sqlx::query_scalar(&format!(
"SELECT file_uuid FROM {} WHERE identity_id = $1 LIMIT 1",
face_table
))
.bind(from_id)
.fetch_optional(&db)
.await
.ok()
.flatten();
// Capture target metadata BEFORE merge
let into_meta_before = into_metadata.clone().unwrap_or(serde_json::json!({}));
// Transfer all face bindings from source → target
let updated = sqlx::query(&format!(
"UPDATE {} SET identity_id = $1, stranger_id = NULL WHERE identity_id = $2",
face_table
))
.bind(into_id)
.bind(from_id)
.execute(&db)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?;
// Merge text: source name → target aliases, source aliases → target aliases, source metadata → target metadata
let from_meta = from_metadata.clone();
let into_meta = into_metadata.clone();
let mut merged_meta = into_meta.clone().unwrap_or(serde_json::json!({}));
let from_meta_obj = from_meta
.as_ref()
.and_then(|m| m.as_object())
.cloned()
.unwrap_or_default();
let into_meta_obj = into_meta
.as_ref()
.and_then(|m| m.as_object())
.cloned()
.unwrap_or_default();
// Track aliases added for undo
let mut aliases_added: Vec<crate::core::db::AliasEntry> = Vec::new();
let mut metadata_fields_added: Vec<String> = Vec::new();
// Merge aliases
let mut aliases = into_meta_obj
.get("aliases")
.and_then(|a| a.as_array())
.cloned()
.unwrap_or_default();
// Add source name as alias (if not already present)
let from_name_str = from_name.clone();
if !aliases
.iter()
.any(|a| a.get("name").and_then(|n| n.as_str()) == Some(&from_name_str))
{
aliases.push(serde_json::json!({"name": from_name_str, "locale": "en", "source": "merge"}));
aliases_added.push(crate::core::db::AliasEntry {
name: from_name_str,
locale: "en".to_string(),
source: Some("merge".to_string()),
});
}
// Add source aliases (if not already present)
if let Some(from_aliases) = from_meta_obj.get("aliases").and_then(|a| a.as_array()) {
for fa in from_aliases {
let fa_name = fa.get("name").and_then(|n| n.as_str()).unwrap_or("");
let fa_locale = fa.get("locale").and_then(|l| l.as_str()).unwrap_or("en");
if !aliases
.iter()
.any(|a| a.get("name").and_then(|n| n.as_str()) == Some(fa_name))
{
aliases.push(fa.clone());
aliases_added.push(crate::core::db::AliasEntry {
name: fa_name.to_string(),
locale: fa_locale.to_string(),
source: None,
});
}
}
}
merged_meta["aliases"] = serde_json::Value::Array(aliases);
// Deep-merge other metadata fields (recursive for nested objects)
if let Some(ref mut merged_map) = merged_meta.as_object_mut() {
deep_merge_metadata_fields(&from_meta_obj, merged_map, &mut metadata_fields_added, "");
}
// Update target metadata
sqlx::query(&format!(
"UPDATE {} SET metadata = $1 WHERE id = $2",
id_table
))
.bind(&merged_meta)
.bind(into_id)
.execute(&db)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?;
let keep = req.keep_history.unwrap_or(true);
if keep {
// Mark as merged, keep record
sqlx::query(&format!(
"UPDATE {} SET status = 'merged', metadata = COALESCE(metadata, '{{}}'::jsonb) || jsonb_build_object('merged_into', $1) WHERE id = $2",
id_table
))
.bind(&req.into_uuid).bind(from_id)
.execute(&db).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()}))))?;
} else {
// Delete source identity
sqlx::query(&format!("DELETE FROM {} WHERE id = $1", id_table))
.bind(from_id)
.execute(&db)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?;
}
// Sync target identity JSON
if let Err(e) =
crate::core::identity::storage::save_identity_file_by_pool(&db, &into_uuid_clean).await
{
tracing::warn!(
"[merge] Failed to sync target identity file for {}: {}",
into_uuid_clean,
e
);
}
// Delete source identity JSON if not keeping history
if !keep {
let _ = crate::core::identity::storage::delete_identity_file(&uuid_clean);
}
// Generate merge_id and store to MongoDB
let merge_id = crate::core::db::IdentityMergeHistoryStore::generate_merge_id();
let now = chrono::Utc::now();
let undo_deadline = now + chrono::Duration::hours(24);
// Create MongoDB history record
let merge_history = crate::core::db::IdentityMergeHistory {
id: None,
merge_id: merge_id.clone(),
source_identity: crate::core::db::IdentitySnapshot {
id: from_id as i64,
uuid: uuid_clean.clone(),
name: from_name.clone(),
identity_type: from_type,
source: from_source,
status: from_status,
tmdb_id: from_tmdb_id.map(|id| id as i64),
tmdb_profile: from_tmdb_profile,
metadata: from_metadata.unwrap_or(serde_json::json!({})),
created_at: from_created_at
.map(|dt| chrono::DateTime::from_naive_utc_and_offset(dt, chrono::Utc)),
face_count: faces_count,
},
target_identity: crate::core::db::TargetIdentitySnapshot {
id: into_id as i64,
uuid: into_uuid_clean.clone(),
name: into_name.clone(),
metadata_before: into_meta_before,
metadata_after: Some(merged_meta.clone()),
},
aliases_added_to_target: aliases_added.clone(),
metadata_fields_added: metadata_fields_added.clone(),
faces_transferred: crate::core::db::FacesTransferred {
file_uuid: file_uuid.unwrap_or_default(),
face_ids: face_id_list,
trace_ids: trace_id_list,
count: faces_count,
},
merge_params: crate::core::db::MergeParams {
keep_history: keep,
cleared_stranger_id: true,
performed_by_user: Some(auth.user_id.to_string()),
},
merged_at: now,
undo_deadline,
undone: false,
undone_at: None,
undone_by: None,
undone_snapshot: None,
undo_expired: false,
};
// Store to MongoDB (non-blocking, warn on failure)
let mongo_store = crate::core::db::IdentityMergeHistoryStore::init().await;
if let Ok(store) = mongo_store {
if let Err(e) = store.store_merge_history(&merge_history).await {
tracing::warn!("[merge] Failed to store merge history in MongoDB: {}", e);
}
} else {
tracing::warn!(
"[merge] Failed to init MongoDB store: {:?}",
mongo_store.err()
);
}
Ok(Json(ApiResponse {
success: true,
message: format!(
"Merged '{}' into '{}' ({} faces transferred, {})",
from_name,
into_name,
updated.rows_affected(),
if keep {
"history kept"
} else {
"source deleted"
}
),
data: Some(serde_json::json!({
"merge_id": merge_id,
"faces_transferred": updated.rows_affected(),
"aliases_added": aliases_added.len(),
"metadata_fields_added": metadata_fields_added.len()
})),
}))
}
// ============================================================================
// Router Setup
// ============================================================================
// Router Setup
// ============================================================================
#[derive(Debug, Deserialize)]
pub struct ListIdentitiesParams {
pub search: Option<String>,
pub limit: Option<i32>,
pub offset: Option<i32>,
}
#[derive(Debug, Serialize)]
pub struct IdentityTraceInfo {
pub file_uuid: String,
pub trace_id: i32,
pub frame_count: i64,
pub first_frame: i32,
pub last_frame: i32,
pub first_sec: f64,
pub last_sec: f64,
pub avg_confidence: f64,
}
#[derive(Debug, Serialize)]
pub struct IdentityTracesResponse {
pub success: bool,
pub identity_uuid: String,
pub name: String,
pub total: usize,
pub page: usize,
pub page_size: usize,
pub total_faces: i64,
pub traces: Vec<IdentityTraceInfo>,
}
#[derive(Debug, Deserialize)]
pub struct TracesQuery {
pub page: Option<usize>,
pub page_size: Option<usize>,
}
pub async fn bind_identity_trace(
State(state): State<crate::api::types::AppState>,
Extension(auth): Extension<crate::api::middleware::UserAuth>,
Path(identity_uuid): Path<String>,
Json(req): Json<BindIdentityTraceRequest>,
) -> Result<Json<ApiResponse<serde_json::Value>>, (StatusCode, Json<serde_json::Value>)> {
let fd_table = crate::core::db::schema::table_name("face_detections");
let id_table = crate::core::db::schema::table_name("identities");
let history_table = crate::core::db::schema::table_name("identity_history");
let uuid_clean = identity_uuid.replace('-', "");
let identity_row: Option<(i32, String)> = sqlx::query_as(&format!(
"SELECT id, name FROM {} WHERE REPLACE(uuid::text, '-', '') = $1",
id_table
))
.bind(&uuid_clean)
.fetch_optional(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("DB error: {}", e)})),
)
})?;
let (identity_id, name) = identity_row.ok_or_else(|| {
(
StatusCode::NOT_FOUND,
Json(serde_json::json!({"error": format!("Identity not found: {}", identity_uuid)})),
)
})?;
// Capture old identity_id before bind trace (use first face in trace as reference)
let old_identity_id: Option<i32> = sqlx::query_scalar(&format!(
"SELECT identity_id FROM {} WHERE file_uuid = $1 AND trace_id = $2 LIMIT 1",
fd_table
))
.bind(&req.file_uuid)
.bind(req.trace_id)
.fetch_optional(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("DB error: {}", e)})),
)
})?
.flatten();
let result = sqlx::query(&format!(
"UPDATE {} SET identity_id = $1 WHERE file_uuid = $2 AND trace_id = $3",
fd_table
))
.bind(identity_id)
.bind(&req.file_uuid)
.bind(req.trace_id)
.execute(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("Update failed: {}", e)})),
)
})?;
// Clear bind redo stack
let _ = sqlx::query(&format!(
"DELETE FROM {} WHERE identity_id = $1 AND is_undone = true AND operation IN ('bind','unbind','bind_trace')",
history_table
))
.bind(identity_id)
.execute(state.db.pool())
.await;
// Insert history record
let uid = auth.user_id.to_string();
let usrc = match auth.source {
crate::api::middleware::AuthSource::Jwt => "jwt",
crate::api::middleware::AuthSource::Session => "session",
crate::api::middleware::AuthSource::ApiKey => "api_key",
};
let before = serde_json::json!({
"file_uuid": req.file_uuid, "trace_id": req.trace_id, "identity_id_before": old_identity_id
});
let after = serde_json::json!({
"file_uuid": req.file_uuid, "trace_id": req.trace_id, "identity_id_after": identity_id
});
let _ = sqlx::query(&format!(
"INSERT INTO {} (identity_id, operation, before_snapshot, after_snapshot, is_undone, user_id, user_source) VALUES ($1, 'bind_trace', $2, $3, false, $4, $5)",
history_table
))
.bind(identity_id)
.bind(before)
.bind(after)
.bind(&uid)
.bind(usrc)
.execute(state.db.pool())
.await;
if let Err(e) =
crate::core::identity::storage::save_identity_file_by_pool(state.db.pool(), &uuid_clean)
.await
{
tracing::warn!(
"[bind/trace] Failed to sync identity file for {}: {}",
uuid_clean,
e
);
}
Ok(Json(ApiResponse {
success: true,
message: format!(
"Bound trace {} of {} to {}",
req.trace_id, req.file_uuid, name
),
data: Some(serde_json::json!({"rows_affected": result.rows_affected()})),
}))
}
pub async fn get_identity_traces(
State(state): State<crate::api::types::AppState>,
Path(identity_uuid): Path<String>,
Query(params): Query<TracesQuery>,
) -> Result<Json<IdentityTracesResponse>, (StatusCode, String)> {
let id_table = crate::core::db::schema::table_name("identities");
let fd_table = crate::core::db::schema::table_name("face_detections");
let page = params.page.unwrap_or(1);
let page_size = params.page_size.unwrap_or(20);
let offset = ((page - 1) as i64) * (page_size as i64);
// Get identity name
let uuid_clean = identity_uuid.replace('-', "");
let identity: Option<(i32, String)> = sqlx::query_as(&format!(
"SELECT id, name FROM {} WHERE REPLACE(uuid::text, '-', '') = $1",
id_table
))
.bind(&uuid_clean)
.fetch_optional(state.db.pool())
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let (identity_id, name) =
identity.ok_or((StatusCode::NOT_FOUND, "Identity not found".to_string()))?;
// Get paginated traces for this identity across all files
let rows: Vec<(String, i32, i64, i32, i32, f64, f64, f64)> = sqlx::query_as(&format!(
r#"SELECT fd.file_uuid::text, fd.trace_id,
COUNT(*)::bigint AS frame_count,
MIN(fd.frame_number)::int AS first_frame,
MAX(fd.frame_number)::int AS last_frame,
ROUND(MIN(fd.frame_number)::numeric / 25.0, 1)::float8 AS first_sec,
ROUND(MAX(fd.frame_number)::numeric / 25.0, 1)::float8 AS last_sec,
ROUND(AVG(fd.confidence)::numeric, 4)::float8 AS avg_confidence
FROM {} fd
WHERE fd.identity_id = $1
GROUP BY fd.file_uuid, fd.trace_id
ORDER BY fd.file_uuid, fd.trace_id
LIMIT $2 OFFSET $3"#,
fd_table
))
.bind(identity_id)
.bind(page_size as i64)
.bind(offset)
.fetch_all(state.db.pool())
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
// Get total count for pagination
let total: (i64,) = sqlx::query_as(&format!(
"SELECT COUNT(*) FROM (SELECT 1 FROM {} fd WHERE fd.identity_id = $1 GROUP BY fd.file_uuid, fd.trace_id) sub",
fd_table
))
.bind(identity_id)
.fetch_one(state.db.pool())
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let total_traces = total.0 as usize;
let total_faces: i64 = rows.iter().map(|r| r.2).sum();
let traces: Vec<IdentityTraceInfo> = rows
.into_iter()
.map(
|(
file_uuid,
trace_id,
frame_count,
first_frame,
last_frame,
first_sec,
last_sec,
avg_confidence,
)| IdentityTraceInfo {
file_uuid,
trace_id,
frame_count,
first_frame,
last_frame,
first_sec,
last_sec,
avg_confidence,
},
)
.collect();
Ok(Json(IdentityTracesResponse {
success: true,
identity_uuid,
name,
total: total_traces,
page,
page_size,
total_faces,
traces,
}))
}
/// Undo a merge operation within 24 hours
pub async fn undo_merge(
Extension(auth): Extension<crate::api::middleware::UserAuth>,
Path(merge_id): Path<String>,
) -> Result<Json<ApiResponse<serde_json::Value>>, (StatusCode, Json<serde_json::Value>)> {
let face_table = crate::core::db::schema::table_name("face_detections");
let id_table = crate::core::db::schema::table_name("identities");
// Get merge history from MongoDB
let mongo_store = crate::core::db::IdentityMergeHistoryStore::init()
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("MongoDB init failed: {}", e)})),
)
})?;
let history = mongo_store
.get_merge_history(&merge_id)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("MongoDB query failed: {}", e)})),
)
})?;
let history = history.ok_or((
StatusCode::NOT_FOUND,
Json(serde_json::json!({"error": "Merge record not found"})),
))?;
// Check if already undone
if history.undone {
return Err((
StatusCode::BAD_REQUEST,
Json(serde_json::json!({"error": "Merge already undone"})),
));
}
// Check 24hr deadline
let now = chrono::Utc::now();
let now_naive = now.naive_utc();
if now > history.undo_deadline {
return Err((
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"error": "Undo deadline expired",
"merged_at": history.merged_at.to_rfc3339(),
"undo_deadline": history.undo_deadline.to_rfc3339()
})),
));
}
// Connect to PostgreSQL
let db = sqlx::PgPool::connect(&crate::core::config::DATABASE_URL)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?;
// Restore source identity (or recreate if keep_history=false)
let mut source_id = history.source_identity.id;
let source_uuid = history.source_identity.uuid.clone();
if history.merge_params.keep_history {
// Source identity still exists, just restore status
sqlx::query(&format!(
"UPDATE {} SET status = 'confirmed', metadata = $1 WHERE id = $2",
id_table
))
.bind(&history.source_identity.metadata)
.bind(source_id as i32)
.execute(&db)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?;
} else {
// Need to recreate source identity
let source_uuid_parsed = uuid::Uuid::parse_str(&source_uuid).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("Invalid UUID format: {}", e)})),
)
})?;
let new_id: i32 = sqlx::query_scalar(&format!(
"INSERT INTO {} (uuid, name, identity_type, source, status, tmdb_id, tmdb_profile, metadata, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING id",
id_table
))
.bind(source_uuid_parsed)
.bind(&history.source_identity.name)
.bind(&history.source_identity.identity_type)
.bind(&history.source_identity.source)
.bind("confirmed")
.bind(history.source_identity.tmdb_id.map(|id| id as i32))
.bind(&history.source_identity.tmdb_profile)
.bind(&history.source_identity.metadata)
.bind(history.source_identity.created_at.map(|dt| dt.naive_utc()).unwrap_or(now_naive))
.fetch_one(&db)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("Failed to recreate source identity: {}", e)})),
)
})?;
source_id = new_id as i64;
}
// Restore faces to source identity
let faces_reverted = sqlx::query(&format!(
"UPDATE {} SET identity_id = $1 WHERE face_id = ANY($2)",
face_table
))
.bind(source_id as i32)
.bind(&history.faces_transferred.face_ids)
.execute(&db)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?;
// Restore target metadata (precise removal)
let target_meta: Option<serde_json::Value> =
sqlx::query_scalar(&format!("SELECT metadata FROM {} WHERE id = $1", id_table))
.bind(history.target_identity.id as i32)
.fetch_optional(&db)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?;
let mut restored_meta = target_meta.unwrap_or(serde_json::json!({}));
// Remove aliases that were added from source
let aliases_to_remove = &history.aliases_added_to_target;
if let Some(aliases) = restored_meta
.get("aliases")
.and_then(|a| a.as_array())
.cloned()
{
let new_aliases: Vec<serde_json::Value> = aliases
.into_iter()
.filter(|alias| {
let alias_name = alias.get("name").and_then(|n| n.as_str()).unwrap_or("");
let alias_source = alias.get("source").and_then(|s| s.as_str());
!aliases_to_remove.iter().any(|to_remove| {
to_remove.name == alias_name && to_remove.source == Some("merge".to_string())
}) && alias_source != Some("merge")
})
.collect();
if let Some(obj) = restored_meta.as_object_mut() {
obj.insert("aliases".to_string(), serde_json::Value::Array(new_aliases));
}
}
// Remove metadata fields that were added from source (handles nested paths)
for field in &history.metadata_fields_added {
let parts: Vec<&str> = field.split('.').collect();
remove_nested_meta_field(&mut restored_meta, &parts);
}
// Update target metadata
sqlx::query(&format!(
"UPDATE {} SET metadata = $1 WHERE id = $2",
id_table
))
.bind(&restored_meta)
.bind(history.target_identity.id as i32)
.execute(&db)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?;
// Sync identity JSON files
let source_uuid_clean = source_uuid.replace('-', "");
if let Err(e) =
crate::core::identity::storage::save_identity_file_by_pool(&db, &source_uuid_clean).await
{
tracing::warn!("[undo] Failed to sync source identity file: {}", e);
}
let target_uuid_clean = history.target_identity.uuid.replace('-', "");
if let Err(e) =
crate::core::identity::storage::save_identity_file_by_pool(&db, &target_uuid_clean).await
{
tracing::warn!("[undo] Failed to sync target identity file: {}", e);
}
// Mark as undone in MongoDB with snapshot for potential redo
let undone_snapshot = crate::core::db::UndoneSnapshot {
source_identity_id: source_id,
source_uuid: source_uuid.clone(),
source_name: history.source_identity.name.clone(),
target_metadata_at_undo: restored_meta.clone(),
};
mongo_store
.mark_as_undone(&merge_id, Some(&auth.user_id.to_string()), undone_snapshot)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("Failed to mark as undone: {}", e)})),
)
})?;
Ok(Json(ApiResponse {
success: true,
message: format!(
"Undo merge completed: '{}' restored, {} faces reverted",
history.source_identity.name,
faces_reverted.rows_affected()
),
data: Some(serde_json::json!({
"source_identity_restored": {
"uuid": source_uuid,
"name": history.source_identity.name,
"status": "confirmed"
},
"faces_reverted": faces_reverted.rows_affected(),
"aliases_removed_from_target": history.aliases_added_to_target.len(),
"metadata_fields_removed_from_target": history.metadata_fields_added.len()
})),
}))
}
/// Query merge history
#[derive(Debug, Deserialize)]
pub struct MergeHistoryQueryParams {
pub source_uuid: Option<String>,
pub target_uuid: Option<String>,
pub merge_id: Option<String>,
pub undone: Option<bool>,
pub page: Option<u32>,
pub page_size: Option<u32>,
}
#[derive(Debug, Serialize)]
pub struct MergeHistoryResponse {
pub success: bool,
pub total: u64,
pub page: u32,
pub page_size: u32,
pub results: Vec<crate::core::db::MergeHistoryEntry>,
}
pub async fn get_merge_history(
Query(params): Query<MergeHistoryQueryParams>,
) -> Result<Json<MergeHistoryResponse>, (StatusCode, Json<serde_json::Value>)> {
let mongo_store = crate::core::db::IdentityMergeHistoryStore::init()
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("MongoDB init failed: {}", e)})),
)
})?;
let query = crate::core::db::MergeHistoryQuery {
source_uuid: params.source_uuid,
target_uuid: params.target_uuid,
merge_id: params.merge_id,
undone: params.undone,
};
let page = params.page.unwrap_or(1);
let page_size = params.page_size.unwrap_or(20);
let (results, total) = mongo_store
.query_merge_history(query, page, page_size)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("MongoDB query failed: {}", e)})),
)
})?;
Ok(Json(MergeHistoryResponse {
success: true,
total,
page,
page_size,
results,
}))
}
pub async fn redo_merge(
Extension(auth): Extension<crate::api::middleware::UserAuth>,
Path(merge_id): Path<String>,
) -> Result<Json<ApiResponse<serde_json::Value>>, (StatusCode, Json<serde_json::Value>)> {
let face_table = crate::core::db::schema::table_name("face_detections");
let id_table = crate::core::db::schema::table_name("identities");
let mongo_store = crate::core::db::IdentityMergeHistoryStore::init()
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("MongoDB init failed: {}", e)})),
)
})?;
let history = mongo_store
.get_merge_history(&merge_id)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("MongoDB query failed: {}", e)})),
)
})?;
let history = history.ok_or((
StatusCode::NOT_FOUND,
Json(serde_json::json!({"error": "Merge record not found"})),
))?;
if !history.undone {
return Err((
StatusCode::BAD_REQUEST,
Json(serde_json::json!({"error": "Merge is not undone, nothing to redo"})),
));
}
let now = chrono::Utc::now();
if now > history.undo_deadline {
return Err((
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"error": "Redo deadline expired",
"undo_deadline": history.undo_deadline.to_rfc3339()
})),
));
}
let snapshot = history.undone_snapshot.as_ref().ok_or((
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": "Undone snapshot not found, cannot redo"})),
))?;
let db = sqlx::PgPool::connect(&crate::core::config::DATABASE_URL)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?;
// Transfer faces from source back to target
let faces_redone = sqlx::query(&format!(
"UPDATE {} SET identity_id = $1, stranger_id = NULL WHERE identity_id = $2",
face_table
))
.bind(history.target_identity.id as i32)
.bind(snapshot.source_identity_id as i32)
.execute(&db)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?;
// Build target metadata: start from undone_snapshot (clean state after undo)
let mut target_meta = snapshot.target_metadata_at_undo.clone();
let meta_obj = target_meta.as_object_mut().unwrap();
// Add aliases from source
let source_meta = &history.source_identity.metadata;
let existing_aliases = meta_obj
.get("aliases")
.and_then(|a| a.as_array())
.cloned()
.unwrap_or_default();
let mut new_aliases = existing_aliases.clone();
for alias_added in &history.aliases_added_to_target {
let already_exists = new_aliases.iter().any(|a| {
a.get("name").and_then(|n| n.as_str()) == Some(&alias_added.name)
&& a.get("locale").and_then(|l| l.as_str()) == Some(&alias_added.locale)
});
if !already_exists {
new_aliases.push(serde_json::json!({
"name": alias_added.name,
"locale": alias_added.locale,
"source": "merge"
}));
}
}
meta_obj.insert("aliases".into(), serde_json::Value::Array(new_aliases));
// Add metadata fields from source
for field in &history.metadata_fields_added {
if let Some(value) = source_meta.get(field) {
meta_obj.insert(field.clone(), value.clone());
}
}
// Update target metadata
sqlx::query(&format!(
"UPDATE {} SET metadata = $1 WHERE id = $2",
id_table
))
.bind(&target_meta)
.bind(history.target_identity.id as i32)
.execute(&db)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?;
// Handle source identity
if history.merge_params.keep_history {
sqlx::query(&format!(
"UPDATE {} SET status = 'merged' WHERE id = $1",
id_table
))
.bind(snapshot.source_identity_id as i32)
.execute(&db)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?;
} else {
sqlx::query(&format!("DELETE FROM {} WHERE id = $1", id_table))
.bind(snapshot.source_identity_id as i32)
.execute(&db)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?;
}
// Sync identity JSON files
let source_uuid_clean = snapshot.source_uuid.replace('-', "");
if let Err(e) =
crate::core::identity::storage::save_identity_file_by_pool(&db, &source_uuid_clean).await
{
tracing::warn!("[redo] Failed to sync source identity file: {}", e);
}
let target_uuid_clean = history.target_identity.uuid.replace('-', "");
if let Err(e) =
crate::core::identity::storage::save_identity_file_by_pool(&db, &target_uuid_clean).await
{
tracing::warn!("[redo] Failed to sync target identity file: {}", e);
}
// Mark as redone in MongoDB
mongo_store
.mark_as_redone(&merge_id, Some(&auth.user_id.to_string()))
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("Failed to mark as redone: {}", e)})),
)
})?;
Ok(Json(ApiResponse {
success: true,
message: format!(
"Redo merge completed: '{}' merged into '{}', {} faces transferred",
snapshot.source_name,
history.target_identity.name,
faces_redone.rows_affected()
),
data: Some(serde_json::json!({
"merge_id": merge_id,
"faces_transferred": faces_redone.rows_affected(),
"aliases_added": history.aliases_added_to_target.len(),
"metadata_fields_added": history.metadata_fields_added.len()
})),
}))
}
// ── Bind Undo/Redo/History ──────────────────────────────────────────
#[derive(Debug, Deserialize)]
struct BindUndoRequest {
steps: Option<usize>,
}
#[derive(Debug, Serialize)]
struct BindUndoResponse {
success: bool,
identity_uuid: String,
operation: String,
undone_count: usize,
affected_rows: i64,
}
async fn apply_bind_snapshot(
pool: &sqlx::PgPool,
face_table: &str,
snapshot: &serde_json::Value,
identity_id_value: Option<serde_json::Value>,
) -> Result<i64, (StatusCode, Json<serde_json::Value>)> {
let file_uuid = snapshot
.get("file_uuid")
.and_then(|v| v.as_str())
.ok_or_else(|| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": "Missing file_uuid in snapshot"})),
)
})?;
let id_val = match identity_id_value {
Some(val) => val.as_i64(),
None => None,
};
if let Some(face_id) = snapshot.get("face_id").and_then(|v| v.as_str()) {
let rows = sqlx::query(&format!(
"UPDATE {} SET identity_id = $1 WHERE file_uuid = $2 AND face_id = $3",
face_table
))
.bind(id_val)
.bind(file_uuid)
.bind(face_id)
.execute(pool)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?;
Ok(rows.rows_affected() as i64)
} else if let Some(trace_id) = snapshot.get("trace_id").and_then(|v| v.as_i64()) {
let rows = sqlx::query(&format!(
"UPDATE {} SET identity_id = $1 WHERE file_uuid = $2 AND trace_id = $3",
face_table
))
.bind(id_val)
.bind(file_uuid)
.bind(trace_id as i32)
.execute(pool)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?;
Ok(rows.rows_affected() as i64)
} else {
Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": "Snapshot has neither face_id nor trace_id"})),
))
}
}
pub async fn bind_undo(
State(state): State<crate::api::types::AppState>,
Extension(auth): Extension<crate::api::middleware::UserAuth>,
Path(identity_uuid): Path<String>,
Json(req): Json<BindUndoRequest>,
) -> Result<Json<BindUndoResponse>, (StatusCode, Json<serde_json::Value>)> {
let uuid_clean = identity_uuid.replace('-', "");
let steps = req.steps.unwrap_or(1).max(1);
let id_table = crate::core::db::schema::table_name("identities");
let history_table = crate::core::db::schema::table_name("identity_history");
let face_table = crate::core::db::schema::table_name("face_detections");
let identity_id: i32 = sqlx::query_scalar(&format!(
"SELECT id FROM {} WHERE REPLACE(uuid::text, '-', '') = $1",
id_table
))
.bind(&uuid_clean)
.fetch_optional(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("DB error: {}", e)})),
)
})?
.ok_or((
StatusCode::NOT_FOUND,
Json(serde_json::json!({"error": "Identity not found"})),
))?;
let records: Vec<(i64, String, serde_json::Value)> = sqlx::query_as(&format!(
"SELECT id, operation, before_snapshot FROM {} WHERE identity_id = $1 AND is_undone = false AND operation IN ('bind','unbind','bind_trace') ORDER BY created_at DESC LIMIT $2",
history_table
))
.bind(identity_id)
.bind(steps as i64)
.fetch_all(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("Failed to get history: {}", e)})),
)
})?;
if records.is_empty() {
return Err((
StatusCode::BAD_REQUEST,
Json(serde_json::json!({"error": "No bind undo operations available"})),
));
}
let operation = records.last().unwrap().1.clone();
let before = &records.last().unwrap().2;
let identity_id_before = before.get("identity_id_before").cloned();
let affected_rows =
apply_bind_snapshot(state.db.pool(), &face_table, before, identity_id_before).await?;
// Mark all as undone
for (history_id, _, _) in &records {
let _ = sqlx::query(&format!(
"UPDATE {} SET is_undone = true, undone_at = NOW() WHERE id = $1",
history_table
))
.bind(history_id)
.execute(state.db.pool())
.await;
}
Ok(Json(BindUndoResponse {
success: true,
identity_uuid: uuid_clean,
operation,
undone_count: records.len(),
affected_rows,
}))
}
pub async fn bind_redo(
State(state): State<crate::api::types::AppState>,
Extension(auth): Extension<crate::api::middleware::UserAuth>,
Path(identity_uuid): Path<String>,
Json(req): Json<BindUndoRequest>,
) -> Result<Json<BindUndoResponse>, (StatusCode, Json<serde_json::Value>)> {
let uuid_clean = identity_uuid.replace('-', "");
let steps = req.steps.unwrap_or(1).max(1);
let id_table = crate::core::db::schema::table_name("identities");
let history_table = crate::core::db::schema::table_name("identity_history");
let face_table = crate::core::db::schema::table_name("face_detections");
let identity_id: i32 = sqlx::query_scalar(&format!(
"SELECT id FROM {} WHERE REPLACE(uuid::text, '-', '') = $1",
id_table
))
.bind(&uuid_clean)
.fetch_optional(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("DB error: {}", e)})),
)
})?
.ok_or((
StatusCode::NOT_FOUND,
Json(serde_json::json!({"error": "Identity not found"})),
))?;
let records: Vec<(i64, String, serde_json::Value)> = sqlx::query_as(&format!(
"SELECT id, operation, after_snapshot FROM {} WHERE identity_id = $1 AND is_undone = true AND operation IN ('bind','unbind','bind_trace') ORDER BY created_at DESC LIMIT $2",
history_table
))
.bind(identity_id)
.bind(steps as i64)
.fetch_all(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("Failed to get history: {}", e)})),
)
})?;
if records.is_empty() {
return Err((
StatusCode::BAD_REQUEST,
Json(serde_json::json!({"error": "No bind redo operations available"})),
));
}
let operation = records.last().unwrap().1.clone();
let after = &records.last().unwrap().2;
let identity_id_after = after.get("identity_id_after").cloned();
let affected_rows =
apply_bind_snapshot(state.db.pool(), &face_table, after, identity_id_after).await?;
// Mark all as redone
for (history_id, _, _) in &records {
let _ = sqlx::query(&format!(
"UPDATE {} SET is_undone = false, undone_at = NULL WHERE id = $1",
history_table
))
.bind(history_id)
.execute(state.db.pool())
.await;
}
Ok(Json(BindUndoResponse {
success: true,
identity_uuid: uuid_clean,
operation,
undone_count: records.len(),
affected_rows,
}))
}
#[derive(Debug, Serialize)]
struct BindHistoryItem {
history_id: i64,
operation: String,
is_undone: bool,
created_at: Option<chrono::DateTime<chrono::Utc>>,
undone_at: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug, Serialize)]
struct BindHistoryResponse {
success: bool,
identity_uuid: String,
total: i64,
undo_stack_count: i64,
redo_stack_count: i64,
results: Vec<BindHistoryItem>,
}
#[derive(Debug, Deserialize)]
struct BindHistoryQuery {
limit: Option<usize>,
page: Option<usize>,
}
pub async fn bind_history(
State(state): State<crate::api::types::AppState>,
Path(identity_uuid): Path<String>,
Query(params): Query<BindHistoryQuery>,
) -> Result<Json<BindHistoryResponse>, (StatusCode, Json<serde_json::Value>)> {
let uuid_clean = identity_uuid.replace('-', "");
let limit = params.limit.unwrap_or(20).max(1).min(100);
let page = params.page.unwrap_or(1).max(1);
let offset = ((page - 1) * limit) as i64;
let id_table = crate::core::db::schema::table_name("identities");
let history_table = crate::core::db::schema::table_name("identity_history");
let identity_id: i32 = sqlx::query_scalar(&format!(
"SELECT id FROM {} WHERE REPLACE(uuid::text, '-', '') = $1",
id_table
))
.bind(&uuid_clean)
.fetch_optional(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("DB error: {}", e)})),
)
})?
.ok_or((
StatusCode::NOT_FOUND,
Json(serde_json::json!({"error": "Identity not found"})),
))?;
let undo_stack_count: i64 = sqlx::query_scalar(&format!(
"SELECT COUNT(*) FROM {} WHERE identity_id = $1 AND is_undone = false AND operation IN ('bind','unbind','bind_trace')",
history_table
))
.bind(identity_id)
.fetch_one(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("DB error: {}", e)})),
)
})?;
let redo_stack_count: i64 = sqlx::query_scalar(&format!(
"SELECT COUNT(*) FROM {} WHERE identity_id = $1 AND is_undone = true AND operation IN ('bind','unbind','bind_trace')",
history_table
))
.bind(identity_id)
.fetch_one(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("DB error: {}", e)})),
)
})?;
let rows = sqlx::query(&format!(
"SELECT id, operation, is_undone, created_at, undone_at FROM {} WHERE identity_id = $1 AND operation IN ('bind','unbind','bind_trace') ORDER BY created_at DESC LIMIT $2 OFFSET $3",
history_table
))
.bind(identity_id)
.bind(limit as i64)
.bind(offset)
.fetch_all(state.db.pool())
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("DB error: {}", e)})),
)
})?;
let results: Vec<BindHistoryItem> = rows
.into_iter()
.map(|r| BindHistoryItem {
history_id: r.get::<i64, _>("id"),
operation: r.get::<String, _>("operation"),
is_undone: r.get::<bool, _>("is_undone"),
created_at: r.get::<Option<chrono::DateTime<chrono::Utc>>, _>("created_at"),
undone_at: r.get::<Option<chrono::DateTime<chrono::Utc>>, _>("undone_at"),
})
.collect();
let total = undo_stack_count + redo_stack_count;
Ok(Json(BindHistoryResponse {
success: true,
identity_uuid: uuid_clean,
total,
undo_stack_count,
redo_stack_count,
results,
}))
}
pub fn identity_binding_routes() -> Router<crate::api::types::AppState> {
Router::new()
.route("/api/v1/identity/:identity_uuid/bind", post(bind_identity))
.route(
"/api/v1/identity/:identity_uuid/bind/trace",
post(bind_identity_trace),
)
.route(
"/api/v1/identity/:identity_uuid/unbind",
post(unbind_identity),
)
.route(
"/api/v1/identity/:identity_uuid/mergeinto",
post(merge_identities),
)
.route(
"/api/v1/identity/:identity_uuid/traces",
get(get_identity_traces),
)
.route("/api/v1/identity/:identity_uuid/bind/undo", post(bind_undo))
.route("/api/v1/identity/:identity_uuid/bind/redo", post(bind_redo))
.route(
"/api/v1/identity/:identity_uuid/bind/history",
get(bind_history),
)
.route("/api/v1/identity/merge/:merge_id/undo", post(undo_merge))
.route("/api/v1/identity/merge/:merge_id/redo", post(redo_merge))
.route("/api/v1/identity/merge/history", get(get_merge_history))
}