feat: Identity Agent query Qdrant for face embeddings
Phase 1.4: Modify match_faces_iterative to use Qdrant Changes: - match_faces_iterative() now queries FaceEmbeddingDb - Fallback to PostgreSQL if Qdrant is empty - Group embeddings by trace_id from Qdrant payload - Sample 3-angle embeddings (front, mid, back) - Match against TMDb seeds (threshold=0.50) - Propagate to unmatched traces - Update face_detections.identity_id in PostgreSQL New functions: - match_faces_iterative() - Qdrant-based matching - match_faces_iterative_pg() - PostgreSQL fallback Flow: 1. Load TMDb identities with face_embedding 2. Query Qdrant for file embeddings 3. Sample 3 embeddings per trace 4. Match against TMDb seeds 5. Propagate matches iteratively 6. Update identity_id in PostgreSQL
This commit is contained in:
@@ -619,10 +619,13 @@ fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// 迭代多角度 face embedding 比對 + 傳播
|
/// 迭代多角度 face embedding 比對 + 傳播 (Qdrant version)
|
||||||
/// Round 1: 用 TMDb seed face_embedding 比對 face_detections (threshold 0.50)
|
/// Round 1: 用 TMDb seed face_embedding 比對 Qdrant embeddings (threshold 0.50)
|
||||||
/// Round 2+: 用已匹配 trace 的所有 face 作為 seed,傳播到未匹配 trace
|
/// Round 2+: 用已匹配 trace 的所有 face 作為 seed,傳播到未匹配 trace
|
||||||
async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::Result<usize> {
|
async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::Result<usize> {
|
||||||
|
use crate::core::db::face_embedding_db::FaceEmbeddingDb;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
// Step 1: 載入 TMDb identities (source='tmdb' 且有 face_embedding)
|
// Step 1: 載入 TMDb identities (source='tmdb' 且有 face_embedding)
|
||||||
let identities_table = schema::table_name("identities");
|
let identities_table = schema::table_name("identities");
|
||||||
let tmdb_rows = sqlx::query_as::<_, (i32, String, Vec<f32>)>(
|
let tmdb_rows = sqlx::query_as::<_, (i32, String, Vec<f32>)>(
|
||||||
@@ -635,12 +638,167 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::
|
|||||||
return Ok(0);
|
return Ok(0);
|
||||||
}
|
}
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
"[FaceMatch] Loaded {} TMDb seed identities",
|
"[FaceMatch-Qdrant] Loaded {} TMDb seed identities",
|
||||||
|
tmdb_rows.len()
|
||||||
|
);
|
||||||
|
|
||||||
|
// Step 2: Load embeddings from Qdrant
|
||||||
|
let face_db = FaceEmbeddingDb::new();
|
||||||
|
let qdrant_embeddings = face_db.get_all_embeddings_for_file(file_uuid).await?;
|
||||||
|
|
||||||
|
if qdrant_embeddings.is_empty() {
|
||||||
|
tracing::warn!("[FaceMatch-Qdrant] No face embeddings in Qdrant for {}", file_uuid);
|
||||||
|
return match_faces_iterative_pg(pool, file_uuid).await; // Fallback to PG
|
||||||
|
}
|
||||||
|
|
||||||
|
// Group: trace_id → Vec<(frame, embedding)>
|
||||||
|
let mut trace_faces_raw: HashMap<i32, Vec<(i64, Vec<f32>)>> = HashMap::new();
|
||||||
|
for (_, emb, payload) in &qdrant_embeddings {
|
||||||
|
trace_faces_raw
|
||||||
|
.entry(payload.trace_id)
|
||||||
|
.or_default()
|
||||||
|
.push((payload.frame, emb.clone()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sample 3 embeddings per trace (front, mid, back)
|
||||||
|
let mut trace_samples: HashMap<i32, Vec<Vec<f32>>> = HashMap::new();
|
||||||
|
for (tid, mut faces) in trace_faces_raw {
|
||||||
|
faces.sort_by_key(|(frame, _)| *frame);
|
||||||
|
let n = faces.len();
|
||||||
|
let indices = if n <= 3 {
|
||||||
|
(0..n).collect::<Vec<_>>()
|
||||||
|
} else {
|
||||||
|
vec![0, n / 2, n - 1]
|
||||||
|
};
|
||||||
|
let samples: Vec<Vec<f32>> = indices.iter().map(|&i| faces[i].1.clone()).collect();
|
||||||
|
trace_samples.insert(tid, samples);
|
||||||
|
}
|
||||||
|
|
||||||
|
let total_traces = trace_samples.len();
|
||||||
|
let sample_count: usize = trace_samples.values().map(|v| v.len()).sum();
|
||||||
|
tracing::info!(
|
||||||
|
"[FaceMatch-Qdrant] Loaded {} traces, sampled {} embeddings",
|
||||||
|
total_traces,
|
||||||
|
sample_count
|
||||||
|
);
|
||||||
|
|
||||||
|
// Step 3: Match against TMDb seeds
|
||||||
|
const TH: f32 = 0.50;
|
||||||
|
let tmdb_seeds: Vec<(i32, String, Vec<f32>)> = tmdb_rows;
|
||||||
|
let mut matched: HashMap<i32, String> = HashMap::new();
|
||||||
|
|
||||||
|
for (&tid, samples) in &trace_samples {
|
||||||
|
let mut best_name = String::new();
|
||||||
|
let mut best_sim = 0.0f32;
|
||||||
|
for (_, ref name, ref tmdb_emb) in &tmdb_seeds {
|
||||||
|
for face_emb in samples {
|
||||||
|
let s = cosine_similarity(face_emb, tmdb_emb);
|
||||||
|
if s > best_sim {
|
||||||
|
best_sim = s;
|
||||||
|
best_name = name.clone();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if best_sim >= TH {
|
||||||
|
matched.insert(tid, best_name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tracing::info!(
|
||||||
|
"[FaceMatch-Qdrant] Round 1: matched {} traces (threshold={})",
|
||||||
|
matched.len(),
|
||||||
|
TH
|
||||||
|
);
|
||||||
|
|
||||||
|
// Round 2+: Propagate
|
||||||
|
let mut round = 2;
|
||||||
|
while matched.len() < trace_samples.len() {
|
||||||
|
let prev_count = matched.len();
|
||||||
|
|
||||||
|
// Collect new matches in separate HashMap
|
||||||
|
let mut new_matches: HashMap<i32, String> = HashMap::new();
|
||||||
|
|
||||||
|
for (&tid, samples) in &trace_samples {
|
||||||
|
if matched.contains_key(&tid) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (matched_tid, matched_name) in &matched {
|
||||||
|
if let Some(matched_embs) = trace_samples.get(matched_tid) {
|
||||||
|
for face_emb in samples {
|
||||||
|
for ref_emb in matched_embs {
|
||||||
|
let s = cosine_similarity(face_emb, ref_emb);
|
||||||
|
if s >= TH {
|
||||||
|
new_matches.insert(tid, matched_name.clone());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Merge new matches
|
||||||
|
matched.extend(new_matches);
|
||||||
|
|
||||||
|
if matched.len() == prev_count {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
tracing::info!(
|
||||||
|
"[FaceMatch-Qdrant] Round {}: matched {} total",
|
||||||
|
round,
|
||||||
|
matched.len()
|
||||||
|
);
|
||||||
|
round += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update face_detections.identity_id
|
||||||
|
let fd_table = schema::table_name("face_detections");
|
||||||
|
let identities_map: HashMap<String, i32> = tmdb_seeds
|
||||||
|
.iter()
|
||||||
|
.map(|(id, name, _)| (name.clone(), *id))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let mut updated = 0usize;
|
||||||
|
for (tid, name) in &matched {
|
||||||
|
let identity_id = identities_map.get(name);
|
||||||
|
if let Some(id) = identity_id {
|
||||||
|
let rows = sqlx::query(&format!(
|
||||||
|
"UPDATE {} SET identity_id = $1 WHERE file_uuid = $2 AND trace_id = $3",
|
||||||
|
fd_table
|
||||||
|
))
|
||||||
|
.bind(*id)
|
||||||
|
.bind(file_uuid)
|
||||||
|
.bind(*tid)
|
||||||
|
.execute(pool)
|
||||||
|
.await?
|
||||||
|
.rows_affected();
|
||||||
|
updated += rows as usize;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::info!("[FaceMatch-Qdrant] Updated {} face_detections", updated);
|
||||||
|
Ok(updated)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fallback: PostgreSQL-based matching (original implementation)
|
||||||
|
async fn match_faces_iterative_pg(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::Result<usize> {
|
||||||
|
// Step 1: 載入 TMDb identities (source='tmdb' 且有 face_embedding)
|
||||||
|
let identities_table = schema::table_name("identities");
|
||||||
|
let tmdb_rows = sqlx::query_as::<_, (i32, String, Vec<f32>)>(
|
||||||
|
&format!("SELECT id, name, face_embedding::real[] FROM {} WHERE source='tmdb' AND face_embedding IS NOT NULL", identities_table)
|
||||||
|
)
|
||||||
|
.fetch_all(pool).await?;
|
||||||
|
|
||||||
|
if tmdb_rows.is_empty() {
|
||||||
|
tracing::warn!("[FaceMatch-PG] No TMDb identities with face embeddings");
|
||||||
|
return Ok(0);
|
||||||
|
}
|
||||||
|
tracing::info!(
|
||||||
|
"[FaceMatch-PG] Loaded {} TMDb seed identities",
|
||||||
tmdb_rows.len()
|
tmdb_rows.len()
|
||||||
);
|
);
|
||||||
|
|
||||||
// Step 2: 載入所有 face_detections(含 frame_number),按 trace_id 分組
|
// Step 2: 載入所有 face_detections(含 frame_number),按 trace_id 分組
|
||||||
// frame_number is BIGINT (i64) in database
|
|
||||||
let fd_table = schema::table_name("face_detections");
|
let fd_table = schema::table_name("face_detections");
|
||||||
let fd_rows = sqlx::query_as::<_, (i32, i64, Vec<f32>)>(&format!(
|
let fd_rows = sqlx::query_as::<_, (i32, i64, Vec<f32>)>(&format!(
|
||||||
"SELECT trace_id, frame_number, embedding FROM {} \
|
"SELECT trace_id, frame_number, embedding FROM {} \
|
||||||
@@ -653,7 +811,7 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::
|
|||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
if fd_rows.is_empty() {
|
if fd_rows.is_empty() {
|
||||||
tracing::warn!("[FaceMatch] No face detections with embeddings");
|
tracing::warn!("[FaceMatch-PG] No face detections with embeddings");
|
||||||
return Ok(0);
|
return Ok(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -668,7 +826,6 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 從每個 trace 選取不同角度的 3 個 face embedding
|
// 從每個 trace 選取不同角度的 3 個 face embedding
|
||||||
// 策略:按 frame_number 排序,取前中後各 1 個
|
|
||||||
let mut trace_samples: HashMap<i32, Vec<Vec<f32>>> = HashMap::new();
|
let mut trace_samples: HashMap<i32, Vec<Vec<f32>>> = HashMap::new();
|
||||||
for (tid, mut faces) in trace_faces_raw {
|
for (tid, mut faces) in trace_faces_raw {
|
||||||
faces.sort_by_key(|(frame, _)| *frame);
|
faces.sort_by_key(|(frame, _)| *frame);
|
||||||
@@ -686,7 +843,7 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::
|
|||||||
let total_traces = trace_samples.len();
|
let total_traces = trace_samples.len();
|
||||||
let sample_count: usize = trace_samples.values().map(|v| v.len()).sum();
|
let sample_count: usize = trace_samples.values().map(|v| v.len()).sum();
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
"[FaceMatch] Loaded {} traces, sampled {} embeddings (3-angle)",
|
"[FaceMatch-PG] Loaded {} traces, sampled {} embeddings (3-angle)",
|
||||||
total_traces,
|
total_traces,
|
||||||
sample_count
|
sample_count
|
||||||
);
|
);
|
||||||
@@ -699,7 +856,6 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::
|
|||||||
let mut matched: HashMap<i32, String> = HashMap::new(); // trace_id → identity_name
|
let mut matched: HashMap<i32, String> = HashMap::new(); // trace_id → identity_name
|
||||||
|
|
||||||
// Round 1: 用 3-angle samples 比對 TMDb
|
// Round 1: 用 3-angle samples 比對 TMDb
|
||||||
// 每個 trace 選 3 個不同角度 face,取最高 similarity
|
|
||||||
for (&tid, samples) in &trace_samples {
|
for (&tid, samples) in &trace_samples {
|
||||||
let mut best_name = String::new();
|
let mut best_name = String::new();
|
||||||
let mut best_sim = 0.0f32;
|
let mut best_sim = 0.0f32;
|
||||||
|
|||||||
Reference in New Issue
Block a user