feat: Identity JSON sync mechanism

- storage.rs: add local_profile field, check disk for profile.jpg in save_identity_file_by_pool
- tmdb_api.rs: trigger JSON sync after TMDb probe
- identity_api.rs: upload_profile_image triggers JSON sync
- identity_binding.rs: bind/unbind/merge trigger JSON sync
- get_identity_json: replace DB fallback with Lazy Sync (generates JSON from DB if missing)
- Fixes missing/obsolete JSON files for all identity mutations
This commit is contained in:
Accusys
2026-05-19 22:20:19 +08:00
parent 7680c202ef
commit 0eb08acaae
4 changed files with 129 additions and 54 deletions

View File

@@ -766,6 +766,11 @@ async fn upload_profile_image(
(StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"success": false, "message": format!("Failed to write file: {}", e)}))) (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"success": false, "message": format!("Failed to write file: {}", e)})))
})?; })?;
// Sync identity JSON to reflect new profile image
let pool = state.db.pool().clone();
let uuid_clone = uuid_clean.clone();
let _ = crate::core::identity::storage::save_identity_file_by_pool(&pool, &uuid_clone).await;
Ok(Json(ProfileImageResponse { Ok(Json(ProfileImageResponse {
success: true, success: true,
identity_uuid: uuid_clean, identity_uuid: uuid_clean,
@@ -815,45 +820,24 @@ async fn get_identity_json(
} }
} }
// 2. Fallback: Generate JSON from DB // 2. Lazy Sync: If file missing, generate from DB and save
use crate::core::identity::storage::{IdentityFile, FileBinding}; if let Err(e) = crate::core::identity::storage::save_identity_file_by_pool(state.db.pool(), &clean).await {
let record = state.db.get_identity_by_uuid(&clean).await tracing::warn!("[identity-json] Lazy sync failed for {}: {}", clean, e);
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? return Err(StatusCode::NOT_FOUND);
.ok_or(StatusCode::NOT_FOUND)?; }
let id = record.id as i64; // 3. Read the newly generated file
let bindings: Vec<FileBinding> = { let p = crate::core::identity::storage::identity_file_path(&clean);
let fd_table = crate::core::db::schema::table_name("face_detections"); if p.exists() {
let rows = sqlx::query_as::<_, (String, Vec<i32>, i64)>( let data = std::fs::read(&p).map_err(|_| StatusCode::NOT_FOUND)?;
&format!("SELECT fd.file_uuid, COALESCE(array_agg(DISTINCT fd.trace_id) FILTER (WHERE fd.trace_id IS NOT NULL), '{{}}'::int[]), COUNT(*)::bigint FROM {} fd WHERE fd.identity_id = $1 GROUP BY fd.file_uuid ORDER BY fd.file_uuid", fd_table) return Ok((
).bind(id).fetch_all(state.db.pool()).await.unwrap_or_default(); StatusCode::OK,
rows.into_iter().map(|(fu, tids, cnt)| FileBinding { [("content-type".to_string(), "application/json".to_string())],
file_uuid: fu, trace_ids: tids, face_count: cnt, data,
}).collect() ));
}; }
let fmt_time = |dt: Option<chrono::DateTime<chrono::Utc>>| -> String { Err(StatusCode::NOT_FOUND)
dt.map(|d| d.to_rfc3339()).unwrap_or_else(|| chrono::Utc::now().to_rfc3339())
};
let file = IdentityFile {
version: 1,
identity_uuid: record.uuid.clone(),
name: record.name.clone(),
identity_type: record.identity_type.clone(),
source: record.source.clone(),
status: record.status.clone(),
tmdb_id: record.tmdb_id,
tmdb_profile: record.tmdb_profile.clone(),
metadata: record.metadata.clone(),
file_bindings: bindings,
created_at: fmt_time(record.created_at),
updated_at: fmt_time(record.updated_at),
};
let json = serde_json::to_string_pretty(&file).unwrap_or_default();
let bytes = json.into_bytes();
Ok((StatusCode::OK, [("content-type".to_string(), "application/json".to_string())], bytes))
} }
// ── Experiment: Identity Text Search ────────────────────────── // ── Experiment: Identity Text Search ──────────────────────────

View File

@@ -115,10 +115,9 @@ pub async fn bind_identity(
})?; })?;
let uuid_clean = identity_uuid.replace('-', ""); let uuid_clean = identity_uuid.replace('-', "");
if let Ok(ref db) = PostgresDb::init().await { // Sync identity JSON file
if let Err(e) = crate::core::identity::storage::save_identity_file(db, &uuid_clean).await { if let Err(e) = crate::core::identity::storage::save_identity_file_by_pool(&db, &uuid_clean).await {
tracing::warn!("[bind] Failed to save identity file for {}: {}", uuid_clean, e); tracing::warn!("[bind] Failed to sync identity file for {}: {}", uuid_clean, e);
}
} }
Ok(Json(ApiResponse { Ok(Json(ApiResponse {
@@ -136,6 +135,7 @@ pub async fn unbind_identity(
Json(req): Json<UnbindIdentityRequest>, Json(req): Json<UnbindIdentityRequest>,
) -> Result<Json<ApiResponse<serde_json::Value>>, (StatusCode, Json<serde_json::Value>)> { ) -> Result<Json<ApiResponse<serde_json::Value>>, (StatusCode, Json<serde_json::Value>)> {
let table = crate::core::db::schema::table_name("face_detections"); let table = crate::core::db::schema::table_name("face_detections");
let id_table = crate::core::db::schema::table_name("identities");
let db = sqlx::PgPool::connect(&crate::core::config::DATABASE_URL) let db = sqlx::PgPool::connect(&crate::core::config::DATABASE_URL)
.await .await
@@ -146,6 +146,22 @@ pub async fn unbind_identity(
) )
})?; })?;
// Find the identity_id before unbinding to sync it later
let identity_id: Option<i64> = sqlx::query_scalar(&format!(
"SELECT identity_id FROM {} WHERE file_uuid = $1 AND face_id = $2 AND identity_id IS NOT NULL",
table
))
.bind(&req.file_uuid)
.bind(&req.face_id)
.fetch_optional(&db)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
})?;
let result = sqlx::query(&format!( let result = sqlx::query(&format!(
"UPDATE {} SET identity_id = NULL WHERE file_uuid = $1 AND face_id = $2", "UPDATE {} SET identity_id = NULL WHERE file_uuid = $1 AND face_id = $2",
table table
@@ -161,6 +177,24 @@ pub async fn unbind_identity(
) )
})?; })?;
// Sync the identity JSON if we found an identity
if let Some(id) = identity_id {
let uuid: Option<String> = sqlx::query_scalar(&format!(
"SELECT uuid::text FROM {} WHERE id = $1",
id_table
))
.bind(id)
.fetch_optional(&db)
.await
.ok()
.flatten();
if let Some(identity_uuid) = uuid {
if let Err(e) = crate::core::identity::storage::save_identity_file_by_pool(&db, &identity_uuid).await {
tracing::warn!("[unbind] Failed to sync identity file for {}: {}", identity_uuid, e);
}
}
}
Ok(Json(ApiResponse { Ok(Json(ApiResponse {
success: true, success: true,
message: format!("Unbound face {} from {}", req.face_id, req.file_uuid), message: format!("Unbound face {} from {}", req.face_id, req.file_uuid),
@@ -263,6 +297,18 @@ pub async fn merge_identities(
})?; })?;
} }
// Sync target identity JSON
let into_uuid_clean = req.into_uuid.replace('-', "");
if let Err(e) = crate::core::identity::storage::save_identity_file_by_pool(&db, &into_uuid_clean).await {
tracing::warn!("[merge] Failed to sync target identity file for {}: {}", into_uuid_clean, e);
}
// Delete source identity JSON if not keeping history
if !keep {
let from_uuid_clean = identity_uuid.replace('-', "");
let _ = crate::core::identity::storage::delete_identity_file(&from_uuid_clean);
}
Ok(Json(ApiResponse { Ok(Json(ApiResponse {
success: true, success: true,
message: format!( message: format!(

View File

@@ -197,18 +197,41 @@ async fn tmdb_probe_handler(
} }
match tmdb::probe::probe_from_cache(&state.db, &file_uuid).await { match tmdb::probe::probe_from_cache(&state.db, &file_uuid).await {
Ok(result) => Ok(Json(TmdbProbeResponse { Ok(result) => {
success: true, // Sync identity JSON files for newly created/updated identities
file_uuid, let pool = state.db.pool().clone();
tmdb_id: Some(result.tmdb_id), let file_uuid_clone = file_uuid.clone();
movie_title: Some(result.title), tokio::spawn(async move {
cast_count: Some(result.cast_count), // Query identities linked to this file
identities_created: Some(result.identities_created), let fi_table = crate::core::db::schema::table_name("file_identities");
message: format!( let query = format!(
"Created/updated {} identities for movie ID {}", "SELECT i.uuid::text FROM {} fi JOIN {} i ON fi.identity_id = i.id WHERE fi.file_uuid = $1",
result.identities_created, result.tmdb_id fi_table, crate::core::db::schema::table_name("identities")
), );
})), if let Ok(rows) = sqlx::query_scalar::<_, String>(&query)
.bind(&file_uuid_clone)
.fetch_all(&pool)
.await
{
for uuid in rows {
let _ = crate::core::identity::storage::save_identity_file_by_pool(&pool, &uuid).await;
}
}
});
Ok(Json(TmdbProbeResponse {
success: true,
file_uuid,
tmdb_id: Some(result.tmdb_id),
movie_title: Some(result.title),
cast_count: Some(result.cast_count),
identities_created: Some(result.identities_created),
message: format!(
"Created/updated {} identities for movie ID {}",
result.identities_created, result.tmdb_id
),
}))
}
Err(e) => { Err(e) => {
let msg = e.to_string(); let msg = e.to_string();
if msg.contains("not found") { if msg.contains("not found") {

View File

@@ -18,6 +18,9 @@ pub struct IdentityFile {
pub status: Option<String>, pub status: Option<String>,
pub tmdb_id: Option<i32>, pub tmdb_id: Option<i32>,
pub tmdb_profile: Option<String>, pub tmdb_profile: Option<String>,
/// Local profile image filename (e.g., "profile.jpg") if uploaded by user.
/// Overrides tmdb_profile if present.
pub local_profile: Option<String>,
pub metadata: serde_json::Value, pub metadata: serde_json::Value,
pub file_bindings: Vec<FileBinding>, pub file_bindings: Vec<FileBinding>,
pub created_at: String, pub created_at: String,
@@ -187,7 +190,7 @@ pub async fn save_identity_file_by_pool(pool: &sqlx::PgPool, uuid: &str) -> Resu
let clean = uuid.replace('-', ""); let clean = uuid.replace('-', "");
let record = sqlx::query_as::<_, crate::core::db::IdentityDetailRecord>( let record = sqlx::query_as::<_, crate::core::db::IdentityDetailRecord>(
&format!( &format!(
"SELECT id, uuid::text, name, identity_type, source, status, metadata, reference_data, \ "SELECT id, uuid::text, COALESCE(real_name, actor_name, name) AS name, identity_type, source, status, metadata, reference_data, \
NULL::real[] as voice_embedding, NULL::real[] as identity_embedding, \ NULL::real[] as voice_embedding, NULL::real[] as identity_embedding, \
face_embedding::real[] as face_embedding, \ face_embedding::real[] as face_embedding, \
tmdb_id, tmdb_profile, created_at::timestamptz as created_at, NULL::timestamptz as updated_at \ tmdb_id, tmdb_profile, created_at::timestamptz as created_at, NULL::timestamptz as updated_at \
@@ -222,6 +225,14 @@ pub async fn save_identity_file_by_pool(pool: &sqlx::PgPool, uuid: &str) -> Resu
}) })
.collect(); .collect();
// Check for local profile image
let profile_path = identity_dir(&clean).join("profile.jpg");
let local_profile = if profile_path.exists() {
Some("profile.jpg".to_string())
} else {
None
};
let fmt_time = |dt: Option<chrono::DateTime<chrono::Utc>>| -> String { let fmt_time = |dt: Option<chrono::DateTime<chrono::Utc>>| -> String {
dt.map(|d| d.to_rfc3339()) dt.map(|d| d.to_rfc3339())
.unwrap_or_else(|| chrono::Utc::now().to_rfc3339()) .unwrap_or_else(|| chrono::Utc::now().to_rfc3339())
@@ -236,6 +247,7 @@ pub async fn save_identity_file_by_pool(pool: &sqlx::PgPool, uuid: &str) -> Resu
status: record.status, status: record.status,
tmdb_id: record.tmdb_id, tmdb_id: record.tmdb_id,
tmdb_profile: record.tmdb_profile, tmdb_profile: record.tmdb_profile,
local_profile,
metadata: record.metadata, metadata: record.metadata,
file_bindings, file_bindings,
created_at: fmt_time(record.created_at), created_at: fmt_time(record.created_at),
@@ -349,6 +361,15 @@ pub async fn save_identity_file(db: &PostgresDb, uuid: &str) -> Result<()> {
}) })
.collect(); .collect();
// Check for local profile image
let clean = uuid.replace('-', "");
let profile_path = identity_dir(&clean).join("profile.jpg");
let local_profile = if profile_path.exists() {
Some("profile.jpg".to_string())
} else {
None
};
let fmt_time = |dt: Option<chrono::DateTime<chrono::Utc>>| -> String { let fmt_time = |dt: Option<chrono::DateTime<chrono::Utc>>| -> String {
dt.map(|d| d.to_rfc3339()) dt.map(|d| d.to_rfc3339())
.unwrap_or_else(|| chrono::Utc::now().to_rfc3339()) .unwrap_or_else(|| chrono::Utc::now().to_rfc3339())
@@ -363,6 +384,7 @@ pub async fn save_identity_file(db: &PostgresDb, uuid: &str) -> Result<()> {
status: record.status, status: record.status,
tmdb_id: record.tmdb_id, tmdb_id: record.tmdb_id,
tmdb_profile: record.tmdb_profile, tmdb_profile: record.tmdb_profile,
local_profile,
metadata: record.metadata, metadata: record.metadata,
file_bindings, file_bindings,
created_at: fmt_time(record.created_at), created_at: fmt_time(record.created_at),