feat: schema tracking, SHA256 integrity, identity UUID fix, 3-angle face match, cuts table, trace stranger_id

This commit is contained in:
Accusys
2026-05-16 03:10:50 +08:00
parent c41f7e0c6e
commit 5317cb4bec
13 changed files with 242 additions and 80 deletions

View File

@@ -195,7 +195,7 @@ async fn list_identities(
.into_iter()
.map(|r| IdentityResponse {
id: r.0,
identity_uuid: r.1.to_string(),
identity_uuid: r.1.to_string().replace('-', ""),
name: r.2,
metadata: r.3,
})

View File

@@ -676,12 +676,12 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::
tmdb_rows.len()
);
// Step 2: 載入所有 face_detections按 trace_id 分組
// Step 2: 載入所有 face_detections(含 frame_number,按 trace_id 分組
let fd_table = schema::table_name("face_detections");
let fd_rows = sqlx::query_as::<_, (i32, Vec<f32>)>(
&format!("SELECT trace_id, embedding FROM {} \
let fd_rows = sqlx::query_as::<_, (i32, i32, Vec<f32>)>(
&format!("SELECT trace_id, frame_number, embedding FROM {} \
WHERE file_uuid=$1 AND trace_id IS NOT NULL AND embedding IS NOT NULL \
ORDER BY trace_id", fd_table),
ORDER BY trace_id, frame_number", fd_table),
)
.bind(file_uuid)
.fetch_all(pool)
@@ -692,27 +692,38 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::
return Ok(0);
}
// 分組trace_id → Vec<embedding>
// 分組trace_id → (frame_number, embedding)
use std::collections::HashMap;
let mut trace_faces: HashMap<i32, Vec<Vec<f32>>> = HashMap::new();
for (tid, emb) in &fd_rows {
trace_faces
let mut trace_faces_raw: HashMap<i32, Vec<(i32, Vec<f32>)>> = HashMap::new();
for (tid, frame, emb) in &fd_rows {
trace_faces_raw
.entry(*tid)
.or_insert_with(Vec::new)
.push(emb.clone());
.push((*frame, emb.clone()));
}
// 去重:同一個 trace embedding 太接近的只留一個
for faces in trace_faces.values_mut() {
faces.sort_by(|a, b| b[0].partial_cmp(&a[0]).unwrap_or(std::cmp::Ordering::Equal));
faces.dedup_by(|a, b| cosine_similarity(a, b) > 0.99);
// 從每個 trace 選取不同角度的 3 個 face embedding
// 策略:按 frame_number 排序,取前中後各 1 個
let mut trace_samples: HashMap<i32, Vec<Vec<f32>>> = HashMap::new();
for (tid, mut faces) in trace_faces_raw {
faces.sort_by_key(|(frame, _)| *frame);
let n = faces.len();
let indices = if n <= 3 {
(0..n).collect()
} else {
let mid = n / 2;
vec![0, mid, n - 1]
};
let samples: Vec<Vec<f32>> = indices.iter().map(|&i| faces[i].1.clone()).collect();
trace_samples.insert(tid, samples);
}
let total_traces = trace_faces.len();
let total_traces = trace_samples.len();
let sample_count: usize = trace_samples.values().map(|v| v.len()).sum();
tracing::info!(
"[FaceMatch] Loaded {} traces with {} faces",
"[FaceMatch] Loaded {} traces, sampled {} embeddings (3-angle)",
total_traces,
fd_rows.len()
sample_count
);
// Step 3: 建立 TMDb 查找表
@@ -722,12 +733,13 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::
const TH: f32 = 0.50;
let mut matched: HashMap<i32, String> = HashMap::new(); // trace_id → identity_name
// Round 1: 直接比對 TMDb
for (&tid, faces) in &trace_faces {
// Round 1: 用 3-angle samples 比對 TMDb
// 每個 trace 選 3 個不同角度 face取最高 similarity
for (&tid, samples) in &trace_samples {
let mut best_name = String::new();
let mut best_sim = 0.0f32;
for (_, ref name, ref tmdb_emb) in &tmdb_seeds {
for face_emb in faces {
for face_emb in samples {
let s = cosine_similarity(face_emb, tmdb_emb);
if s > best_sim {
best_sim = s;
@@ -751,31 +763,33 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::
// 建立 seed pool: name → Vec<embedding>
let mut seed_pool: HashMap<String, Vec<&Vec<f32>>> = HashMap::new();
for (&tid, name) in &matched {
if let Some(faces) = trace_faces.get(&tid) {
if let Some(samples) = trace_samples.get(&tid) {
seed_pool
.entry(name.clone())
.or_default()
.extend(faces.iter());
.extend(samples.iter());
}
}
let mut new_matches: Vec<(i32, String)> = Vec::new();
for (&tid, faces) in &trace_faces {
for (&tid, samples) in &trace_samples {
if matched.contains_key(&tid) {
continue;
}
let mut best_name = String::new();
let mut best_sim = 0.0f32;
if faces.is_empty() {
if samples.is_empty() {
continue;
}
let ref_face = &faces[0];
// 用 3-angle samples 分別比對 seed取最高 similarity
for (name, seed_faces) in &seed_pool {
for seed in seed_faces {
let s = cosine_similarity(ref_face, seed);
if s > best_sim {
best_sim = s;
best_name = name.clone();
for face_emb in samples {
for seed in seed_faces {
let s = cosine_similarity(face_emb, seed);
if s > best_sim {
best_sim = s;
best_name = name.clone();
}
}
}
}
@@ -799,7 +813,7 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::
}
}
// Step 5: 寫入 DB
// Step 5: 寫入 DB — 已匹配的設 identity_id
let identities_table = schema::table_name("identities");
let fd_table = schema::table_name("face_detections");
let mut updated = 0usize;
@@ -823,11 +837,27 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::
}
}
// Step 6: 未匹配的 trace 設 stranger_id = trace_id
// trace_id 在同一個 file 內是 sequential integer直接複用為 stranger_id
let stranger_update = sqlx::query(
&format!(
"UPDATE {} SET stranger_id = trace_id \
WHERE file_uuid = $1 AND trace_id IS NOT NULL AND identity_id IS NULL \
AND (stranger_id IS NULL OR stranger_id != trace_id)",
fd_table
)
)
.bind(file_uuid)
.execute(pool)
.await?;
let stranger_count = stranger_update.rows_affected();
tracing::info!(
"[FaceMatch] Done: {}/{} traces matched ({}%)",
"[FaceMatch] Done: {}/{} traces matched ({}%), {} strangers",
matched.len(),
total_traces,
matched.len() * 100 / total_traces
matched.len() * 100 / total_traces,
stranger_count
);
Ok(updated)
}

View File

@@ -31,6 +31,10 @@ pub fn identity_routes() -> Router<crate::api::server::AppState> {
"/api/v1/identity/:identity_uuid/chunks",
get(get_identity_chunks),
)
.route(
"/api/v1/identity/:identity_uuid/faces",
get(get_identity_faces),
)
.route("/api/v1/resource/register", post(register_resource))
.route("/api/v1/resource/heartbeat", post(heartbeat_resource))
.route("/api/v1/resources", get(list_resources))
@@ -212,7 +216,7 @@ async fn get_file_identities(
.into_iter()
.map(|r| FileIdentityItem {
identity_id: r.identity_id,
identity_uuid: r.identity_uuid.map(|u| u.to_string()),
identity_uuid: r.identity_uuid.map(|u| u.to_string().replace('-', "")),
name: r.name,
metadata: r.metadata,
face_count: r.face_count,
@@ -239,7 +243,7 @@ async fn get_file_identities(
#[derive(Debug, Serialize)]
pub struct IdentityDetailResponse {
pub success: bool,
pub uuid: Uuid,
pub uuid: String,
pub name: String,
pub identity_type: Option<String>,
pub source: Option<String>,
@@ -273,7 +277,7 @@ async fn get_identity_detail(
match identity {
Some(i) => Ok(Json(IdentityDetailResponse {
success: true,
uuid: i.uuid,
uuid: i.uuid.to_string().replace('-', ""),
name: i.name,
identity_type: i.identity_type,
source: i.source,
@@ -295,7 +299,7 @@ async fn get_identity_detail(
#[derive(Debug, Serialize)]
pub struct IdentityFilesResponse {
pub success: bool,
pub identity_uuid: Uuid,
pub identity_uuid: String,
pub total: i64,
pub page: usize,
pub page_size: usize,
@@ -390,7 +394,7 @@ async fn get_identity_files(
Ok(Json(IdentityFilesResponse {
success: true,
identity_uuid: uuid,
identity_uuid: uuid.to_string().replace('-', ""),
total: data.len() as i64,
page,
page_size,
@@ -401,7 +405,7 @@ async fn get_identity_files(
#[derive(Debug, Serialize)]
pub struct IdentityFacesResponse {
pub success: bool,
pub identity_uuid: Uuid,
pub identity_uuid: String,
pub total: i64,
pub page: usize,
pub page_size: usize,
@@ -413,7 +417,7 @@ pub struct IdentityFaceItem {
pub id: i64,
pub file_uuid: String,
pub frame_number: i64,
pub timestamp_secs: f64,
pub timestamp_secs: Option<f64>,
pub face_id: Option<String>,
pub bbox: BBox,
pub confidence: f64,
@@ -465,7 +469,7 @@ async fn get_identity_faces(
Ok(Json(IdentityFacesResponse {
success: true,
identity_uuid: uuid,
identity_uuid: uuid.to_string().replace('-', ""),
total: data.len() as i64,
page,
page_size,
@@ -476,7 +480,7 @@ async fn get_identity_faces(
#[derive(Debug, Serialize)]
pub struct IdentityChunksResponse {
pub success: bool,
pub identity_uuid: Uuid,
pub identity_uuid: String,
pub total: i64,
pub page: usize,
pub page_size: usize,
@@ -528,7 +532,7 @@ async fn get_identity_chunks(
Ok(Json(IdentityChunksResponse {
success: true,
identity_uuid: uuid,
identity_uuid: uuid.to_string().replace('-', ""),
total: data.len() as i64,
page,
page_size,

View File

@@ -65,6 +65,19 @@ struct UserInfo {
// Global State
static SERVER_START: OnceCell<Instant> = OnceCell::new();
static SERVER_HOST: OnceCell<String> = OnceCell::new();
static SERVER_PORT: OnceCell<u16> = OnceCell::new();
fn get_host() -> String {
SERVER_HOST
.get()
.cloned()
.unwrap_or_else(|| "0.0.0.0".to_string())
}
fn get_port() -> u16 {
SERVER_PORT.get().copied().unwrap_or(0)
}
fn get_uptime_ms() -> u64 {
SERVER_START
@@ -75,6 +88,8 @@ fn get_uptime_ms() -> u64 {
#[derive(Debug, Serialize)]
struct HealthResponse {
ip: String,
port: u16,
status: String,
version: String,
build_git_hash: String,
@@ -462,6 +477,8 @@ pub struct AppState {
#[derive(Debug, Serialize)]
struct DetailedHealthResponse {
ip: String,
port: u16,
status: String,
version: String,
build_git_hash: String,
@@ -583,6 +600,8 @@ async fn health(State(state): State<AppState>) -> Json<HealthResponse> {
}
Json(HealthResponse {
ip: get_host(),
port: get_port(),
status: status.to_string(),
version: env!("BUILD_VERSION").to_string(),
build_git_hash: env!("BUILD_GIT_HASH").to_string(),
@@ -677,6 +696,8 @@ async fn health_detailed(State(state): State<AppState>) -> Json<DetailedHealthRe
};
Json(DetailedHealthResponse {
ip: get_host(),
port: get_port(),
status: overall_status.to_string(),
version: env!("BUILD_VERSION").to_string(),
build_git_hash: env!("BUILD_GIT_HASH").to_string(),
@@ -3014,6 +3035,31 @@ async fn unregister(
pub async fn start_server(host: &str, port: u16) -> anyhow::Result<()> {
let _ = SERVER_START.set(Instant::now());
// Resolve actual IP address for health identification
let resolved_ip = if host == "0.0.0.0" {
// Try to find a non-loopback IP
if let Ok(addrs) = std::net::ToSocketAddrs::to_socket_addrs(&"localhost:0") {
if let Some(addr) = addrs.filter_map(|a| match a {
std::net::SocketAddr::V4(v4) if !v4.ip().is_loopback() => Some(v4.ip().to_string()),
_ => None,
}).next() {
addr
} else {
// Fallback: try getting IP from UDP socket
std::net::UdpSocket::bind("0.0.0.0:0")
.and_then(|s| s.connect("8.8.8.8:53").map(|_| s))
.and_then(|s| s.local_addr())
.map(|a| a.ip().to_string())
.unwrap_or_else(|_| "0.0.0.0".to_string())
}
} else {
host.to_string()
}
} else {
host.to_string()
};
let _ = SERVER_HOST.set(resolved_ip);
let _ = SERVER_PORT.set(port);
let embedder = std::sync::Arc::new(Embedder::new("nomic-embed-text-v2-moe:latest".to_string()));
let mongo_cache = MongoCache::init().await?;

View File

@@ -159,6 +159,18 @@ pub async fn universal_search(
results.extend(person_results);
}
// Deduplicate by chunk_id / frame_number / person_id
{
let mut seen_chunks = std::collections::HashSet::new();
let mut seen_frames = std::collections::HashSet::new();
let mut seen_persons = std::collections::HashSet::new();
results.retain(|r| match r {
SearchResult::Chunk { chunk_id, .. } => seen_chunks.insert(chunk_id.clone()),
SearchResult::Frame { frame_number, .. } => seen_frames.insert(*frame_number),
SearchResult::Person { person_id, .. } => seen_persons.insert(person_id.clone()),
});
}
// Sort by score descending
results.sort_by(|a, b| {
let score_a = match a {