refactor: remove face embedding architecture - single Qdrant _faces collection

- Delete FaceEmbeddingDb module (face_embedding_db.rs)
- Stub match_faces_iterative, generate_seed_embeddings, tmdb_match_handler
- Remove sync_trace_embeddings, populate_face_embeddings_to_qdrant
- Remove embedding from face.json output (face_processor.py)
- Remove embedding from PG UPDATE (store_traced_faces.py)
- Remove workspace traces staging (checkin.rs, qdrant_workspace.rs)
- Fix tests: add pose_angle to Face, hand_nodes to TkgResult

Disabled functions (need reimplement with _faces):
- match_faces_iterative (identity agent)
- generate_seed_embeddings (TMDb seeds)
- tmdb_match_handler (TMDb matching)
- cluster_face_embeddings, search_similar_faces
- merge_traces_within_cuts
This commit is contained in:
Accusys
2026-06-24 22:27:09 +08:00
parent 360cb991e1
commit 074cdcdbed
60 changed files with 657 additions and 9454 deletions

View File

@@ -12,7 +12,7 @@ use std::collections::HashMap;
use super::types::AppState;
use crate::core::config;
use crate::core::db::schema;
use crate::core::db::{Database, PostgresDb, QdrantDb, RedisClient};
use crate::core::db::{Database, PostgresDb, QdrantDb, QdrantWorkspace, RedisClient};
use crate::core::storage::content_hash;
use crate::FileManager;
@@ -463,7 +463,6 @@ async fn register_single_file(
.execute(db.pool()).await;
let mut cut_done = false;
let mut scene_done = false;
if has_video && total_frames > 0 && fps > 0.0 {
let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string());
@@ -511,31 +510,6 @@ async fn register_single_file(
}
}
let scene_path =
std::path::Path::new(&output_dir).join(format!("{}.scene.json", file_uuid));
if !scene_path.exists() {
let scene_script = std::path::Path::new(&scripts_dir).join("scene_classifier.py");
if scene_script.exists() {
let scene_output = std::process::Command::new(&python_path)
.arg(&scene_script)
.arg(&canonical_path)
.arg(&scene_path)
.arg("--sample-interval")
.arg("2")
.output();
if let Ok(output) = scene_output {
if output.status.success() {
scene_done = true;
tracing::info!(
"[REGISTER] Scene classification completed for {}",
file_uuid
);
}
}
}
} else {
scene_done = true;
}
}
let audio_tracks: Vec<serde_json::Value> = temp_probe_json
@@ -584,9 +558,9 @@ async fn register_single_file(
}
}
let _ = sqlx::query(
&format!("UPDATE {} SET cut_done = $1, scene_done = $2, audio_tracks = $3, cut_count = $4, cut_max_duration = $5 WHERE file_uuid = $6", videos_table)
&format!("UPDATE {} SET cut_done = $1, scene_done = false, audio_tracks = $3, cut_count = $4, cut_max_duration = $5 WHERE file_uuid = $6", videos_table)
)
.bind(cut_done).bind(scene_done).bind(&audio_tracks_json).bind(cut_count).bind(cut_max_duration).bind(&file_uuid)
.bind(cut_done).bind(&audio_tracks_json).bind(cut_count).bind(cut_max_duration).bind(&file_uuid)
.execute(db.pool()).await;
if let Some(json_val) = probe_json {
@@ -599,41 +573,6 @@ async fn register_single_file(
let _ = std::fs::write(&probe_path, json_str);
}
if final_file_type.as_deref() == Some("video") {
let auto_file_uuid = file_uuid.clone();
let auto_db = db.clone();
tokio::spawn(async move {
let identities_dir =
std::path::Path::new(&*crate::core::config::OUTPUT_DIR).join("identities");
let index_path = identities_dir.join("_index.json");
let cache_path = format!(
"{}/{}.tmdb.json",
*crate::core::config::OUTPUT_DIR,
auto_file_uuid
);
let cache_file = std::path::Path::new(&cache_path);
if index_path.exists() && cache_file.exists() {
tracing::info!(
"[AUTO-TMDB] Offline cache found for {}, running probe",
auto_file_uuid
);
if let Err(e) =
crate::core::tmdb::probe::probe_from_cache(&auto_db, &auto_file_uuid).await
{
tracing::warn!("[AUTO-TMDB] Probe failed for {}: {}", auto_file_uuid, e);
} else {
tracing::info!("[AUTO-TMDB] Probe completed for {}", auto_file_uuid);
}
} else {
tracing::info!(
"[AUTO-TMDB] No offline cache for {}, skipping",
auto_file_uuid
);
}
});
}
RegisterFileResponse {
success: true,
file_uuid,
@@ -978,8 +917,16 @@ struct UnregisterResponse {
deleted_chunks: u64,
deleted_tkg_nodes: u64,
deleted_qdrant_vectors: Option<u64>,
deleted_qdrant_workspace: Option<u64>,
deleted_redis_keys: Option<u64>,
deleted_output_files: u64,
deleted_file_identities: u64,
deleted_speaker_detections: u64,
deleted_face_clusters: u64,
deleted_face_recognition_results: u64,
deleted_characters: u64,
deleted_chunks_rule1: u64,
deleted_processor_alerts: u64,
}
#[derive(Debug, Deserialize)]
@@ -1011,6 +958,15 @@ fn delete_output_files(uuid: &str) -> u64 {
}
}
}
let workspace_sqlite = format!("{}.workspace.sqlite", uuid);
for output_dir in &output_dirs {
let path = std::path::Path::new(output_dir).join(&workspace_sqlite);
if path.exists() && std::fs::remove_file(&path).is_ok() {
deleted_count += 1;
tracing::info!("[UNREGISTER] Deleted workspace SQLite: {}", path.display());
}
}
deleted_count
}
@@ -1037,6 +993,13 @@ async fn unregister(
let chunk_vectors_table = schema::table_name("chunk_vectors");
let monitor_jobs_table = schema::table_name("monitor_jobs");
let frames_table = schema::table_name("frames");
let file_identities_table = schema::table_name("file_identities");
let speaker_detections_table = schema::table_name("speaker_detections");
let face_clusters_table = schema::table_name("face_clusters");
let face_recognition_results_table = schema::table_name("face_recognition_results");
let characters_table = schema::table_name("characters");
let chunks_rule1_table = schema::table_name("chunks_rule1");
let processor_alerts_table = schema::table_name("processor_alerts");
let mut tx = state.db.pool().begin().await.map_err(|e| {
tracing::error!("[unregister] Failed to start transaction: {}", e);
@@ -1082,6 +1045,21 @@ async fn unregister(
})?
.rows_affected() as i64;
let deleted_file_identities =
delete_safe!(file_identities_table, "file_uuid = $1", &uuid, "file identities");
let deleted_speaker_detections =
delete_safe!(speaker_detections_table, "file_uuid = $1", &uuid, "speaker detections");
let deleted_face_clusters =
delete_safe!(face_clusters_table, "file_uuid = $1", &uuid, "face clusters");
let deleted_face_recognition =
delete_safe!(face_recognition_results_table, "file_uuid = $1", &uuid, "face recognition results");
let deleted_characters =
delete_safe!(characters_table, "file_uuid = $1", &uuid, "characters");
let deleted_chunks_rule1 =
delete_safe!(chunks_rule1_table, "uuid = $1", &uuid, "chunks rule1");
let deleted_processor_alerts =
delete_safe!(processor_alerts_table, "file_uuid = $1", &uuid, "processor alerts");
sqlx::query(&format!(
"DELETE FROM {} WHERE file_uuid = $1",
videos_table
@@ -1100,10 +1078,13 @@ async fn unregister(
})?;
tracing::info!(
"[UNREGISTER] Deleted: {} faces, {} processors, {} parent_chunks, {} chunks, {} pre_chunks, {} tkg_nodes, {} cuts, {} strangers, {} chunk_vectors, {} monitor_jobs, {} frames",
"[UNREGISTER] Deleted: {} faces, {} processors, {} parent_chunks, {} chunks, {} pre_chunks, {} tkg_nodes, {} cuts, {} strangers, {} chunk_vectors, {} monitor_jobs, {} frames, {} file_identities, {} speaker_detections, {} face_clusters, {} face_recognition_results, {} characters, {} chunks_rule1, {} processor_alerts",
deleted_faces, deleted_processors, deleted_parent_chunks, deleted_chunks,
deleted_pre_chunks, deleted_tkg_nodes, deleted_cuts, deleted_strangers,
deleted_chunk_vectors, deleted_monitor_jobs, deleted_frames
deleted_chunk_vectors, deleted_monitor_jobs, deleted_frames,
deleted_file_identities, deleted_speaker_detections, deleted_face_clusters,
deleted_face_recognition, deleted_characters, deleted_chunks_rule1,
deleted_processor_alerts
);
let deleted_output_files = delete_output_files(&uuid);
@@ -1141,6 +1122,20 @@ async fn unregister(
}
};
let deleted_qdrant_workspace = {
let workspace = QdrantWorkspace::new();
match workspace.delete_by_file_uuid(&uuid).await {
Ok(_) => {
tracing::info!("[UNREGISTER] Deleted Qdrant workspace vectors for {}", uuid);
Some(1)
}
Err(e) => {
tracing::warn!("[UNREGISTER] Failed to delete Qdrant workspace vectors: {}", e);
None
}
}
};
Ok(Json(UnregisterResponse {
success: true,
message: format!("File {} unregistered successfully.", uuid),
@@ -1150,8 +1145,16 @@ async fn unregister(
deleted_chunks: (deleted_chunks + deleted_parent_chunks + deleted_pre_chunks) as u64,
deleted_tkg_nodes: deleted_tkg_nodes as u64,
deleted_qdrant_vectors,
deleted_qdrant_workspace,
deleted_redis_keys,
deleted_output_files,
deleted_file_identities: deleted_file_identities as u64,
deleted_speaker_detections: deleted_speaker_detections as u64,
deleted_face_clusters: deleted_face_clusters as u64,
deleted_face_recognition_results: deleted_face_recognition as u64,
deleted_characters: deleted_characters as u64,
deleted_chunks_rule1: deleted_chunks_rule1 as u64,
deleted_processor_alerts: deleted_processor_alerts as u64,
}))
}

View File

@@ -1,807 +0,0 @@
use axum::{
extract::State,
http::StatusCode,
response::Json,
routing::{get, post},
Router,
};
use serde::{Deserialize, Serialize};
use crate::core::llm::function_calling::LLM_CLIENT;
use sqlx::Row;
use crate::api::types::AppState;
use crate::core::db::qdrant_db::QdrantDb;
use crate::core::db::schema;
use crate::core::db::{PostgresDb, VectorPayload};
use crate::core::embedding::Embedder;
pub fn five_w1h_agent_routes() -> Router<AppState> {
Router::new()
.route("/api/v1/agents/5w1h/analyze", post(analyze_5w1h))
.route("/api/v1/agents/5w1h/batch", post(batch_analyze_5w1h))
.route("/api/v1/agents/5w1h/status", get(get_5w1h_status))
}
// ── Data Structures ──
#[derive(Debug, Deserialize)]
pub struct Analyze5W1HRequest {
pub file_uuid: String,
}
#[derive(Debug, Serialize)]
pub struct Analyze5W1HResponse {
pub success: bool,
pub file_uuid: String,
pub scenes_processed: usize,
pub scenes_total: usize,
}
#[derive(Debug, Deserialize)]
pub struct BatchAnalyze5W1HRequest {
pub file_uuids: Vec<String>,
}
#[derive(Debug, Serialize)]
pub struct BatchAnalyze5W1HResponse {
pub success: bool,
pub jobs: Vec<BatchJobStatus>,
}
#[derive(Debug, Serialize)]
pub struct BatchJobStatus {
pub file_uuid: String,
pub status: String,
pub message: String,
}
#[derive(Debug, Clone)]
struct CutScene {
chunk_id: String,
start_frame: i64,
end_frame: i64,
fps: f64,
start_time: f64,
end_time: f64,
content: serde_json::Value,
metadata: serde_json::Value,
summary_text: Option<String>,
}
#[derive(Debug, Clone)]
struct SentenceChunk {
chunk_id: String,
text: String,
start_time: f64,
end_time: f64,
start_frame: i64,
end_frame: i64,
content: serde_json::Value,
}
#[derive(Debug)]
struct ChildSummary {
chunk_id: String,
enhanced: String,
five_w1h: serde_json::Value,
}
#[derive(Debug)]
struct SceneSummaryResult {
parent_summary: String,
five_w1h: serde_json::Value,
child_summaries: Vec<ChildSummary>,
}
// ── LLM Endpoint ──
fn llm_base_url() -> String {
crate::core::config::llm::SUMMARY_URL.clone()
}
fn llm_model() -> String {
crate::core::config::llm::SUMMARY_MODEL.clone()
}
// ── Data Fetching ──
async fn fetch_cut_scenes(db: &PostgresDb, file_uuid: &str) -> anyhow::Result<Vec<CutScene>> {
let table = schema::table_name("chunk");
sqlx::query_as::<_, (String, i64, i64, f64, Option<f64>, Option<f64>, serde_json::Value, Option<serde_json::Value>, Option<String>)>(&format!(
r#"SELECT chunk_id, start_frame, end_frame, fps, start_time, end_time, content, metadata, summary_text
FROM {} WHERE file_uuid = $1 AND chunk_type = 'cut' ORDER BY start_frame"#, table
))
.bind(file_uuid)
.fetch_all(db.pool()).await?
.into_iter().map(|r| Ok(CutScene {
chunk_id: r.0, start_frame: r.1, end_frame: r.2,
fps: r.3, start_time: r.4.unwrap_or(0.0), end_time: r.5.unwrap_or(0.0),
content: r.6, metadata: r.7.unwrap_or(serde_json::json!({})), summary_text: r.8,
})).collect()
}
async fn fetch_sentences_in_scene(
db: &PostgresDb,
file_uuid: &str,
cut: &CutScene,
) -> anyhow::Result<Vec<SentenceChunk>> {
let table = schema::table_name("chunk");
sqlx::query_as::<_, (String, String, Option<f64>, Option<f64>, i64, i64, serde_json::Value)>(&format!(
r#"SELECT chunk_id, COALESCE(text_content,''), start_time, end_time, start_frame, end_frame, content
FROM {} WHERE file_uuid = $1 AND chunk_type = 'sentence'
AND start_time >= $2 AND end_time <= $3 ORDER BY start_time"#, table
))
.bind(file_uuid).bind(cut.start_time).bind(cut.end_time)
.fetch_all(db.pool()).await?
.into_iter().map(|r| Ok(SentenceChunk {
chunk_id: r.0, text: r.1, start_time: r.2.unwrap_or(0.0), end_time: r.3.unwrap_or(0.0),
start_frame: r.4, end_frame: r.5, content: r.6,
})).collect()
}
/// Fetch actor names present in this scene from face_detections + identity_bindings + identities
async fn fetch_identity_names_for_scene(
db: &PostgresDb,
file_uuid: &str,
cut: &CutScene,
) -> anyhow::Result<Vec<String>> {
let fd_table = schema::table_name("face_detections");
let ib_table = schema::table_name("identity_bindings");
let id_table = schema::table_name("identities");
let rows = sqlx::query_scalar::<_, String>(&format!(
r#"SELECT DISTINCT i.name
FROM {} fd
JOIN {} ib ON ib.identity_value = fd.trace_id::text AND ib.identity_type = 'trace'
JOIN {} i ON i.id = ib.identity_id
WHERE fd.file_uuid = $1 AND fd.frame_number >= $2 AND fd.frame_number <= $3
AND fd.trace_id IS NOT NULL
ORDER BY i.name"#,
fd_table, ib_table, id_table
))
.bind(file_uuid)
.bind(cut.start_frame)
.bind(cut.end_frame)
.fetch_all(db.pool())
.await?;
Ok(rows)
}
/// Fetch YOLO object labels detected in this scene from pre_chunks
async fn fetch_yolo_objects_for_scene(
db: &PostgresDb,
file_uuid: &str,
cut: &CutScene,
) -> anyhow::Result<Vec<String>> {
let table = schema::table_name("pre_chunks");
let rows = sqlx::query_scalar::<_, String>(&format!(
r#"SELECT DISTINCT data->>'label'
FROM {} WHERE file_uuid = $1 AND processor_type = 'yolo'
AND frame_number >= $2 AND frame_number <= $3
AND data->>'label' IS NOT NULL
ORDER BY data->>'label'"#,
table
))
.bind(file_uuid)
.bind(cut.start_frame)
.bind(cut.end_frame)
.fetch_all(db.pool())
.await?;
Ok(rows)
}
/// Fetch active speakers + their actor names for a scene's frame range
/// Uses identity_bindings to map SPEAKER_X to actor names
async fn fetch_speakers_for_scene(
db: &PostgresDb,
file_uuid: &str,
cut: &CutScene,
) -> anyhow::Result<Vec<String>> {
let pc_table = schema::table_name("pre_chunks");
let speakers = sqlx::query_scalar::<_, String>(&format!(
r#"SELECT DISTINCT data->>'speaker_id'
FROM {} WHERE file_uuid = $1 AND processor_type = 'asrx'
AND data->>'speaker_id' IS NOT NULL
AND start_frame <= $3 AND end_frame >= $2
ORDER BY data->>'speaker_id'"#,
pc_table
))
.bind(file_uuid)
.bind(cut.start_frame)
.bind(cut.end_frame)
.fetch_all(db.pool())
.await?;
if speakers.is_empty() {
return Ok(vec![]);
}
// Map speaker_ids to actor names via identity_bindings
let ib_table = schema::table_name("identity_bindings");
let id_table = schema::table_name("identities");
let mut result = Vec::new();
for spk in &speakers {
let name: Option<String> = sqlx::query_scalar(&format!(
r#"SELECT i.name FROM {} ib JOIN {} i ON i.id = ib.identity_id
WHERE ib.identity_type = 'speaker' AND ib.identity_value = $1 AND i.name IS NOT NULL
LIMIT 1"#,
ib_table, id_table
))
.bind(spk)
.fetch_optional(db.pool())
.await?;
match name {
Some(n) => result.push(format!("{} ({})", spk, n)),
None => result.push(spk.clone()),
}
}
Ok(result)
}
/// Fetch trace IDs with identity names for a scene's frame range
async fn fetch_trace_info(
db: &PostgresDb,
file_uuid: &str,
cut: &CutScene,
) -> anyhow::Result<Vec<String>> {
let fd_table = schema::table_name("face_detections");
let ib_table = schema::table_name("identity_bindings");
let id_table = schema::table_name("identities");
let rows = sqlx::query_as::<_, (i32, Option<String>)>(&format!(
r#"SELECT DISTINCT fd.trace_id, i.name
FROM {} fd
LEFT JOIN {} ib ON ib.identity_value = fd.trace_id::text AND ib.identity_type = 'trace'
LEFT JOIN {} i ON i.id = ib.identity_id
WHERE fd.file_uuid = $1 AND fd.frame_number >= $2 AND fd.frame_number <= $3
AND fd.trace_id IS NOT NULL
ORDER BY fd.trace_id"#,
fd_table, ib_table, id_table
))
.bind(file_uuid)
.bind(cut.start_frame)
.bind(cut.end_frame)
.fetch_all(db.pool())
.await?;
Ok(rows
.iter()
.map(|(trace, name)| {
if let Some(n) = name {
format!("trace_{} ({})", trace, n)
} else {
format!("trace_{}", trace)
}
})
.collect())
}
// ── LLM Prompt (Embedding-Optimized) ──
async fn summarize_one_scene(
db: &PostgresDb,
file_uuid: &str,
cut: &CutScene,
sentences: &[SentenceChunk],
prev_context: &str,
) -> anyhow::Result<SceneSummaryResult> {
if sentences.is_empty() {
return Ok(SceneSummaryResult {
parent_summary: String::new(),
five_w1h: serde_json::Value::Null,
child_summaries: vec![],
});
}
let faces = fetch_identity_names_for_scene(db, file_uuid, cut)
.await
.unwrap_or_default();
let objects = fetch_yolo_objects_for_scene(db, file_uuid, cut)
.await
.unwrap_or_default();
let traces = fetch_trace_info(db, file_uuid, cut)
.await
.unwrap_or_default();
let speakers = fetch_speakers_for_scene(db, file_uuid, cut)
.await
.unwrap_or_default();
let mut dialogue = String::new();
for (i, s) in sentences.iter().enumerate() {
let t = s.text.trim();
if !t.is_empty() {
dialogue.push_str(&format!("[{}] {}\n", i + 1, t));
}
}
let story_so_far = if prev_context.is_empty() {
String::new()
} else {
format!("\nStory so far (previous scenes):\n{}\n", prev_context)
};
let prompt = format!(
r#"Analyze this movie scene and produce a structured summary. Be specific — quote actual dialogue. Avoid template phrases like "within the established dramatic setting."
Scene time: {:.0}s{:.0}s
Dialogue:
{}Actors: {}
Objects: {}
Face traces: {}
Speakers: {}
{}
Output EXACTLY this JSON format:
{{
"scene_summary": "5 flowing sentences: who+what+where+when+why+how. Quote actual lines.",
"5w1h": {{
"who": "1 sentence with actor/character name",
"what": "1 sentence describing the action, quote the line",
"where": "1 sentence about setting",
"when": "1 sentence about timing in story",
"why": "1 sentence explaining why this moment matters",
"how": "1 sentence about delivery, emotion, tone"
}},
"sentences": [
{{
"index": 1,
"who": "1 sentence",
"what": "1 sentence referencing the actual line",
"where": "1 sentence",
"when": "1 sentence",
"why": "1 sentence why this is said",
"how": "1 sentence describing delivery",
"enhanced": "1 sentence with actual dialogue, self-contained for search"
}}
]
}}
Rules:
- scene_summary: 5 sentences, natural paragraph. Use quotes. No template phrases.
- Each 5w1h field: exactly 1 sentence. Specific details. Character names. Quotes.
- Each sentence.enhanced: self-contained for search, include actual spoken words.
- Return ONLY valid JSON. No markdown.
- A short scene with 1-2 lines should have a short summary."#,
cut.start_time,
cut.end_time,
dialogue,
faces.join(", "),
objects.join(", "),
traces.join(", "),
speakers.join(", "),
story_so_far,
);
let body = serde_json::json!({
"model": llm_model(),
"messages": [
{"role": "system", "content": "You output JSON only. Be specific. Quote actual dialogue. Avoid template phrases."},
{"role": "user", "content": prompt}
],
"temperature": 0.1,
"max_tokens": 4096,
"stream": false
});
let resp = LLM_CLIENT
.post(llm_base_url())
.json(&body)
.timeout(std::time::Duration::from_secs(180))
.send()
.await?
.json::<serde_json::Value>()
.await?;
let content = resp["choices"][0]["message"]["content"]
.as_str()
.unwrap_or("{}");
// Strip markdown code fences if present
let cleaned = content
.trim_start_matches("```json")
.trim_start_matches("```")
.trim_end_matches("```")
.trim();
let parsed: serde_json::Value =
serde_json::from_str(cleaned).unwrap_or(serde_json::Value::Null);
let parent_summary = parsed["scene_summary"].as_str().unwrap_or("").to_string();
let five_w1h = parsed
.get("5w1h")
.cloned()
.unwrap_or(serde_json::Value::Null);
let mut child_summaries = Vec::new();
if let Some(arr) = parsed["sentences"].as_array() {
for entry in arr {
let idx = entry["index"].as_u64().unwrap_or(0).saturating_sub(1) as usize;
if let Some(enhanced) = entry["enhanced"].as_str() {
if idx < sentences.len() {
let child_5w1h = serde_json::json!({
"who": entry["who"].as_str().unwrap_or(""),
"what": entry["what"].as_str().unwrap_or(""),
"where": entry["where"].as_str().unwrap_or(""),
"when": entry["when"].as_str().unwrap_or(""),
"why": entry["why"].as_str().unwrap_or(""),
"how": entry["how"].as_str().unwrap_or(""),
});
child_summaries.push(ChildSummary {
chunk_id: sentences[idx].chunk_id.clone(),
enhanced: enhanced.to_string(),
five_w1h: child_5w1h,
});
}
}
}
}
// Fallback
if child_summaries.is_empty() && !parent_summary.is_empty() {
for s in sentences {
let text = s.text.trim();
if !text.is_empty() {
child_summaries.push(ChildSummary {
chunk_id: s.chunk_id.clone(),
enhanced: format!("{} Scene: {}", text, parent_summary),
five_w1h: serde_json::Value::Null,
});
}
}
}
Ok(SceneSummaryResult {
parent_summary,
five_w1h,
child_summaries,
})
}
// ── DB Storage ──
async fn store_parent_summary(
db: &PostgresDb,
cut_chunk_id: &str,
file_uuid: &str,
summary: &str,
five_w1h: &serde_json::Value,
sentences: &[SentenceChunk],
) -> anyhow::Result<()> {
let table = schema::table_name("chunk");
let meta = serde_json::json!({
"5w1h": five_w1h,
"sentence_ids": sentences.iter().map(|s| s.chunk_id.clone()).collect::<Vec<_>>(),
"sentence_count": sentences.len(),
});
sqlx::query(&format!(
r#"UPDATE {} SET summary_text = $1, metadata = jsonb_deep_merge(COALESCE(metadata, '{{}}'::jsonb), $2::jsonb)
WHERE chunk_id = $3 AND file_uuid = $4"#,
table
))
.bind(summary)
.bind(&meta)
.bind(cut_chunk_id)
.bind(file_uuid)
.execute(db.pool())
.await?;
Ok(())
}
async fn store_child_summaries(
db: &PostgresDb,
file_uuid: &str,
children: &[ChildSummary],
) -> anyhow::Result<()> {
let table = schema::table_name("chunk");
for c in children {
let text = c.enhanced.trim();
if text.is_empty() || text.len() < 10 {
continue;
}
// Update text_content (for embedding) + merge 5w1h into content
let merge = serde_json::json!({ "5w1h": c.five_w1h });
sqlx::query(&format!(
r#"UPDATE {} SET text_content = $1, content = content || $2::jsonb, embedding = NULL
WHERE chunk_id = $3 AND file_uuid = $4"#,
table
))
.bind(text)
.bind(&merge)
.bind(&c.chunk_id)
.bind(file_uuid)
.execute(db.pool())
.await?;
}
Ok(())
}
// ── API Handlers ──
async fn analyze_5w1h(
State(state): State<AppState>,
Json(req): Json<Analyze5W1HRequest>,
) -> Result<Json<Analyze5W1HResponse>, (StatusCode, String)> {
let db = PostgresDb::from_pool(state.db.pool().clone());
let cuts = fetch_cut_scenes(&db, &req.file_uuid)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let total = cuts.len();
let mut processed = 0usize;
let mut prev_context: Vec<String> = Vec::new();
for cut in &cuts {
// Skip already-summarized scenes but preserve context
if let Some(ref t) = cut.summary_text {
if t.len() > 20 {
processed += 1;
prev_context.push(format!("Scene (t={:.0}s): {}", cut.start_time, t));
continue;
}
}
let sentences = match fetch_sentences_in_scene(&db, &req.file_uuid, cut).await {
Ok(s) => s,
Err(e) => {
tracing::error!("[5W1H] fetch sentences failed: {}", e);
continue;
}
};
if sentences.is_empty() {
continue;
}
let context = prev_context.join("\n");
let result = match summarize_one_scene(&db, &req.file_uuid, cut, &sentences, &context).await
{
Ok(r) => r,
Err(e) => {
tracing::error!("[5W1H] scene {} failed: {}", cut.chunk_id, e);
processed += 1;
continue;
}
};
if !result.parent_summary.is_empty() {
if let Err(e) = store_parent_summary(
&db,
&cut.chunk_id,
&req.file_uuid,
&result.parent_summary,
&result.five_w1h,
&sentences,
)
.await
{
tracing::error!("[5W1H] parent: {}", e);
}
if let Err(e) =
store_child_summaries(&db, &req.file_uuid, &result.child_summaries).await
{
tracing::error!("[5W1H] child: {}", e);
}
prev_context.push(format!(
"Scene (t={:.0}s): {}",
cut.start_time, result.parent_summary
));
}
processed += 1;
}
Ok(Json(Analyze5W1HResponse {
success: true,
file_uuid: req.file_uuid,
scenes_processed: processed,
scenes_total: total,
}))
}
async fn batch_analyze_5w1h(
State(state): State<AppState>,
Json(req): Json<BatchAnalyze5W1HRequest>,
) -> Result<Json<BatchAnalyze5W1HResponse>, (StatusCode, String)> {
let db = PostgresDb::from_pool(state.db.pool().clone());
let mut jobs = Vec::new();
for uuid in &req.file_uuids {
let cuts = fetch_cut_scenes(&db, uuid).await.unwrap_or_default();
let total = cuts.len();
let mut processed = 0usize;
let mut prev_context: Vec<String> = Vec::new();
for cut in &cuts {
if let Some(ref t) = cut.summary_text {
if t.len() > 20 {
processed += 1;
prev_context.push(format!("Scene (t={:.0}s): {}", cut.start_time, t));
continue;
}
}
let sentences = fetch_sentences_in_scene(&db, uuid, cut)
.await
.unwrap_or_default();
if sentences.is_empty() {
continue;
}
let context = prev_context.join("\n");
if let Ok(result) = summarize_one_scene(&db, uuid, cut, &sentences, &context).await {
if !result.parent_summary.is_empty() {
let _ = store_parent_summary(
&db,
&cut.chunk_id,
uuid,
&result.parent_summary,
&result.five_w1h,
&sentences,
)
.await;
let _ = store_child_summaries(&db, uuid, &result.child_summaries).await;
prev_context.push(format!(
"Scene (t={:.0}s): {}",
cut.start_time, result.parent_summary
));
}
}
processed += 1;
}
jobs.push(BatchJobStatus {
file_uuid: uuid.clone(),
status: if processed > 0 {
"completed".to_string()
} else {
"no_cut_scenes".to_string()
},
message: format!("{}/{} scenes processed", processed, total),
});
}
Ok(Json(BatchAnalyze5W1HResponse {
success: true,
jobs,
}))
}
async fn get_5w1h_status(
State(state): State<AppState>,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
let table = schema::table_name("videos");
let rows = sqlx::query(&format!(
r#"SELECT file_uuid, processing_status->'agents'->'five_w1h' as s
FROM {} WHERE processing_status->'agents'->'five_w1h' IS NOT NULL
ORDER BY updated_at DESC LIMIT 50"#,
table
))
.fetch_all(state.db.pool())
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let videos: Vec<serde_json::Value> = rows
.iter()
.map(|r| {
serde_json::json!({
"uuid": r.try_get::<String,_>("file_uuid").unwrap_or_default(),
"five_w1h_status": r.try_get::<Option<serde_json::Value>,_>("s").ok().flatten(),
})
})
.collect();
Ok(Json(
serde_json::json!({ "success": true, "videos": videos }),
))
}
/// Pipeline-triggered entry point: run 5W1H agent for a file.
pub async fn run_5w1h_agent(db: &PostgresDb, file_uuid: &str) -> anyhow::Result<()> {
let cuts = fetch_cut_scenes(db, file_uuid).await?;
let total = cuts.len();
let mut processed = 0usize;
let mut prev_context: Vec<String> = Vec::new();
for cut in &cuts {
if let Some(ref t) = cut.summary_text {
if t.len() > 20 {
processed += 1;
prev_context.push(format!("Scene (t={:.0}s): {}", cut.start_time, t));
continue;
}
}
let sentences = fetch_sentences_in_scene(db, file_uuid, cut).await?;
if sentences.is_empty() {
continue;
}
let context = prev_context.join("\n");
match summarize_one_scene(db, file_uuid, cut, &sentences, &context).await {
Ok(result) => {
if !result.parent_summary.is_empty() {
let _ = store_parent_summary(
db,
&cut.chunk_id,
file_uuid,
&result.parent_summary,
&result.five_w1h,
&sentences,
)
.await;
let _ = store_child_summaries(db, file_uuid, &result.child_summaries).await;
prev_context.push(format!(
"Scene (t={:.0}s): {}",
cut.start_time, result.parent_summary
));
}
processed += 1;
}
Err(e) => tracing::error!("[5W1H] Scene {} failed: {}", cut.chunk_id, e),
}
}
tracing::info!(
"[5W1H] Done for {}: {}/{} scenes",
file_uuid,
processed,
total
);
// Auto-vectorize sentences with EmbeddingGemma (768D)
tracing::info!("[5W1H] Starting vectorize for sentence chunks...");
let embedder = Embedder::new("embeddinggemma-300m".to_string());
let qdrant = QdrantDb::new();
qdrant.init_collection(768).await?;
let chunk_table = schema::table_name("chunk");
let rows = sqlx::query_as::<_, (String, String, String, i64, i64, f64, f64)>(&format!(
"SELECT chunk_id, chunk_type, text_content, start_frame, end_frame, start_time, end_time \
FROM {} WHERE file_uuid = $1 AND chunk_type = 'sentence' AND embedding IS NULL \
AND (text_content IS NOT NULL AND text_content != '') ORDER BY id",
chunk_table
))
.bind(file_uuid)
.fetch_all(db.pool())
.await?;
let total_vec = rows.len();
let mut stored = 0usize;
for (chunk_id, _ctype, text, start_frame, end_frame, start_time, end_time) in &rows {
let text = text.trim();
if text.is_empty() || text.len() < 5 {
continue;
}
match embedder.embed_document(text).await {
Ok(vector) => {
if let Err(e) = sqlx::query(&format!(
"UPDATE {} SET embedding = $1::vector WHERE chunk_id = $2 AND file_uuid = $3",
chunk_table
))
.bind(&vector as &[f32])
.bind(chunk_id)
.bind(file_uuid)
.execute(db.pool())
.await
{
tracing::error!("[Vectorize] PG failed for {}: {}", chunk_id, e);
continue;
}
let payload = VectorPayload {
file_uuid: file_uuid.to_string(),
chunk_id: chunk_id.clone(),
chunk_type: "sentence".to_string(),
start_frame: *start_frame,
end_frame: *end_frame,
start_time: *start_time,
end_time: *end_time,
text: Some(text.to_string()),
};
if let Err(e) = qdrant.upsert_vector(chunk_id, &vector, payload).await {
tracing::error!("[Vectorize] Qdrant failed for {}: {}", chunk_id, e);
continue;
}
stored += 1;
if stored % 50 == 0 {
tracing::info!("[Vectorize] {}/{}", stored, total_vec);
}
}
Err(e) => tracing::error!("[Vectorize] Embed failed for {}: {}", chunk_id, e),
}
}
tracing::info!("[5W1H] Vectorize done: {}/{} stored", stored, total_vec);
Ok(())
}

View File

@@ -180,11 +180,11 @@ async fn list_identities(
})?;
let sql = format!(
"SELECT id::int, uuid, name, metadata FROM {} WHERE status IS NULL OR status != 'merged' ORDER BY id DESC LIMIT $1 OFFSET $2",
"SELECT id::int, uuid, name, metadata, status, starred FROM {} WHERE status IS NULL OR status != 'merged' ORDER BY id DESC LIMIT $1 OFFSET $2",
id_table
);
let rows: Vec<(i32, uuid::Uuid, String, Option<serde_json::Value>)> = match sqlx::query_as(&sql)
let rows: Vec<(i32, uuid::Uuid, String, Option<serde_json::Value>, Option<String>, Option<bool>)> = match sqlx::query_as(&sql)
.bind(page_size as i64)
.bind(offset)
.fetch_all(db.pool())
@@ -201,11 +201,16 @@ let sql = format!(
let identities: Vec<IdentityResponse> = rows
.into_iter()
.map(|r| IdentityResponse {
id: r.0,
identity_uuid: r.1.to_string().replace('-', ""),
name: r.2,
metadata: r.3,
.map(|r| {
IdentityResponse {
id: r.0,
identity_uuid: r.1.to_string().replace('-', ""),
name: r.2,
metadata: r.3,
status: r.4,
starred: r.5.unwrap_or(false),
file_uuids: vec![], // Removed N+1 query
}
})
.collect();
@@ -281,6 +286,9 @@ pub struct IdentityResponse {
pub identity_uuid: String,
pub name: String,
pub metadata: Option<serde_json::Value>,
pub status: Option<String>,
pub starred: bool,
pub file_uuids: Vec<String>,
}
#[derive(Debug, Serialize)]

View File

@@ -661,597 +661,21 @@ fn average_embeddings<'a>(embeddings: impl Iterator<Item = &'a Vec<f32>>) -> Vec
/// Unknown: greedy stranger clustering (TH=0.40)
/// Writes identity_ref/stranger_ref to Qdrant payload, TKG nodes, and face_detections.
async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::Result<usize> {
use crate::core::db::face_embedding_db::FaceEmbeddingDb;
use std::collections::HashMap;
let face_db = FaceEmbeddingDb::new();
// Step 1: Load seeds from Qdrant (type=identity_seed)
let seeds = face_db.get_seed_embeddings().await?;
tracing::info!(
"[FaceMatch] Loaded {} seeds from Qdrant",
seeds.len()
);
// Step 2: Preload identity internal IDs (uuid → (id, name))
let id_table = schema::table_name("identities");
let seed_identity_map: HashMap<String, (i32, String)> = if !seeds.is_empty() {
let uuids: Vec<String> = seeds.iter().map(|(uuid, _, _)| uuid.clone()).collect();
if uuids.is_empty() {
HashMap::new()
} else {
let rows = sqlx::query_as::<_, (i32, String, String)>(&format!(
"SELECT id, uuid::text, name FROM {} WHERE uuid::text = ANY($1)",
id_table
))
.bind(&uuids)
.fetch_all(pool)
.await?
.into_iter()
.map(|(id, uuid, name)| (uuid, (id, name)))
.collect();
rows
}
} else {
HashMap::new()
};
// Step 3: Load face embeddings from Qdrant for this file
let qdrant_embeddings = face_db.get_all_embeddings_for_file(file_uuid).await?;
if qdrant_embeddings.is_empty() {
tracing::warn!("[FaceMatch] No face embeddings in Qdrant for {}", file_uuid);
return Ok(0);
}
// Step 4: Group embeddings by trace_id, keeping confidence
let mut trace_faces: HashMap<i32, Vec<(i64, Vec<f32>, f64)>> = HashMap::new();
for (_, emb, payload) in &qdrant_embeddings {
trace_faces
.entry(payload.trace_id)
.or_default()
.push((payload.frame, emb.clone(), payload.confidence));
}
// Step 5: Progressive multi-round matching with derived seeds
// Each round: choose a face with best seed sim for matching; separately,
// collect the highest-confidence face per trace for building derived seeds.
const TH_MIN: f32 = 0.35;
const DERIVED_CONF: f64 = 0.90;
const MAX_DERIVED_PER_ID: usize = 9;
const MAX_FACES_PER_TRACE: usize = 3;
const ANGLE_SIM_THRESHOLD: f32 = 0.90;
const TH_STRANGER: f32 = 0.40;
let total_traces = trace_faces.len();
let total_embeddings: usize = trace_faces.values().map(|v| v.len()).sum();
tracing::info!(
"[FaceMatch] Loaded {} traces ({} face embeddings) from Qdrant for {}",
total_traces,
total_embeddings,
tracing::warn!(
"[FaceMatch] Face matching disabled - FaceEmbeddingDb removed. \
TODO: Reimplement with _faces collection for {}",
file_uuid
);
let mut matched: HashMap<i32, (String, i32)> = HashMap::new();
let mut trace_face_count: HashMap<i32, usize> = HashMap::new();
// All reference embeddings: start with original TMDb seeds
let mut all_refs: Vec<(String, String, Vec<f32>)> = seeds.clone();
let thresholds = [0.55f32, 0.50, 0.45, 0.40, 0.35];
let mut prev_total = 0usize;
for (round_idx, &th) in thresholds.iter().enumerate() {
if th < TH_MIN {
break;
}
let mut new_matches: HashMap<i32, (String, i32)> = HashMap::new();
let mut seed_candidates: Vec<(i32, String, i32, Vec<f32>, f64)> = Vec::new();
for (&tid, faces) in &trace_faces {
if matched.contains_key(&tid) {
continue;
}
trace_face_count.entry(tid).or_insert(faces.len());
let mut best_sim = 0.0f32;
let mut best_name = String::new();
let mut best_id = 0i32;
// Collect all high-confidence faces in this trace for derived seeds
let mut trace_candidates: Vec<(Vec<f32>, f64)> = Vec::new();
for (_, emb, conf) in faces {
for (ref_uuid, ref_name, ref_emb) in &all_refs {
let s = cosine_similarity(emb, ref_emb);
if s > best_sim {
best_sim = s;
best_name = ref_name.clone();
if let Some(id_str) = ref_uuid.strip_prefix("derived:") {
if let Ok(parsed) = id_str.parse::<i32>() {
best_id = parsed;
}
} else if let Some((id, _)) = seed_identity_map.get(ref_uuid) {
best_id = *id;
}
}
}
if *conf >= DERIVED_CONF {
trace_candidates.push((emb.clone(), *conf));
}
}
if best_sim >= th && best_id > 0 {
new_matches.insert(tid, (best_name.clone(), best_id));
// Top MAX_FACES_PER_TRACE highest-confidence faces with angular diversity
trace_candidates.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
let mut selected: Vec<Vec<f32>> = Vec::new();
for (emb, conf) in trace_candidates {
if selected.len() >= MAX_FACES_PER_TRACE {
break;
}
if selected.iter().any(|e| cosine_similarity(e, &emb) >= ANGLE_SIM_THRESHOLD) {
continue;
}
selected.push(emb.clone());
seed_candidates.push((best_id, best_name.clone(), tid, emb, conf));
}
}
}
let new_count = new_matches.len();
if new_count == 0 && round_idx > 0 {
break;
}
matched.extend(new_matches);
// Build derived seeds: pick up to MAX_DERIVED_PER_ID per identity
// (max MAX_FACES_PER_TRACE from each trace), sorted by confidence descending
seed_candidates.sort_by(|a, b| b.4.partial_cmp(&a.4).unwrap());
let mut per_id: HashMap<i32, usize> = HashMap::new();
let mut trace_used_faces: HashMap<i32, usize> = HashMap::new();
let mut added_seeds = 0usize;
for (id, name, tid, emb, _) in &seed_candidates {
let cnt = per_id.entry(*id).or_insert(0);
if *cnt >= MAX_DERIVED_PER_ID {
continue;
}
let trace_cnt = trace_used_faces.entry(*tid).or_insert(0);
if *trace_cnt >= MAX_FACES_PER_TRACE {
continue;
}
*trace_cnt += 1;
*cnt += 1;
all_refs.push((format!("derived:{}", id), name.clone(), emb.clone()));
added_seeds += 1;
}
tracing::info!(
"[FaceMatch] Round {}: matched {}+{}={} total (TH={}, {} new derived seeds)",
round_idx + 1,
prev_total,
new_count,
matched.len(),
th,
added_seeds
);
prev_total = matched.len();
}
// Step 7: Stranger clustering for unmatched traces
let unmatched_ids: Vec<i32> = trace_faces
.keys()
.filter(|tid| !matched.contains_key(tid))
.copied()
.collect();
let mut stranger_map: HashMap<i32, String> = HashMap::new();
let mut assigned_stranger: std::collections::HashSet<i32> = std::collections::HashSet::new();
let mut stranger_count = 0usize;
// Sort by face count descending (most reliable first)
let mut sorted_unmatched: Vec<i32> = unmatched_ids.clone();
sorted_unmatched.sort_by(|a, b| {
trace_face_count
.get(b)
.unwrap_or(&0)
.cmp(trace_face_count.get(a).unwrap_or(&0))
});
for &tid in &sorted_unmatched {
if assigned_stranger.contains(&tid) {
continue;
}
let centroid_a = if let Some(faces) = trace_faces.get(&tid) {
average_embeddings(faces.iter().map(|(_, emb, _)| emb))
} else {
continue;
};
stranger_count += 1;
let stranger_id = format!("{}:stranger_{}", file_uuid, stranger_count);
assigned_stranger.insert(tid);
stranger_map.insert(tid, stranger_id.clone());
for &other_tid in &sorted_unmatched {
if assigned_stranger.contains(&other_tid) || other_tid == tid {
continue;
}
if let Some(faces_b) = trace_faces.get(&other_tid) {
let centroid_b = average_embeddings(faces_b.iter().map(|(_, emb, _)| emb));
let s = cosine_similarity(&centroid_a, &centroid_b);
if s >= TH_STRANGER {
assigned_stranger.insert(other_tid);
stranger_map.insert(other_tid, stranger_id.clone());
}
}
}
}
let stranger_trace_count = stranger_map.len();
tracing::info!(
"[FaceMatch] Stranger clusters: {} groups, {} traces",
stranger_count,
stranger_trace_count
);
// Step 8: Write results to TKG nodes + Qdrant payload + face_detections
let fd_table = schema::table_name("face_detections");
let nodes_table = schema::table_name("tkg_nodes");
let mut pg_updated = 0usize;
// Clear old identity assignments before writing new ones
let _ = sqlx::query(&format!(
"UPDATE {} SET identity_id = NULL WHERE file_uuid = $1",
fd_table
))
.bind(file_uuid)
.execute(pool)
.await;
// 8a: Matched traces → identity_ref
for (&tid, (name, identity_id)) in &matched {
// Skip if identity_id is invalid (FK constraint would fail)
if *identity_id <= 0 {
tracing::warn!(
"[FaceMatch] Skipping trace {}: invalid identity_id={}",
tid, identity_id
);
continue;
}
let identity_ref = format!("{}:{}", file_uuid, identity_id);
// TKG node
let external_id = format!("face_track_{}", tid);
if let Err(e) = sqlx::query(&format!(
"UPDATE {} SET properties = jsonb_set(\
jsonb_set(properties, '{{identity_ref}}', to_jsonb($1), true),\
'{{identity_name}}', to_jsonb($2), true)\
WHERE file_uuid = $3 AND node_type = 'face_track' AND external_id = $4",
nodes_table
))
.bind(&identity_ref)
.bind(name)
.bind(file_uuid)
.bind(&external_id)
.execute(pool)
.await
{
tracing::warn!("[FaceMatch] TKG update failed for trace {}: {:?}", tid, e);
}
// Qdrant payload
let _ = face_db
.update_identity_ref_by_trace(file_uuid, tid, &identity_ref)
.await;
// PostgreSQL face_detections (backward compat)
let rows = sqlx::query(&format!(
"UPDATE {} SET identity_id = $1 WHERE file_uuid = $2 AND trace_id = $3",
fd_table
))
.bind(identity_id)
.bind(file_uuid)
.bind(tid)
.execute(pool)
.await
.map(|r| r.rows_affected())
.unwrap_or(0);
pg_updated += rows as usize;
}
// 8b: Stranger traces → stranger_ref
for (&tid, stranger_ref) in &stranger_map {
// TKG node
let external_id = format!("face_track_{}", tid);
if let Err(e) = sqlx::query(&format!(
"UPDATE {} SET properties = jsonb_set(\
properties, '{{stranger_ref}}', to_jsonb($1), true)\
WHERE file_uuid = $2 AND node_type = 'face_track' AND external_id = $3",
nodes_table
))
.bind(stranger_ref)
.bind(file_uuid)
.bind(&external_id)
.execute(pool)
.await
{
tracing::warn!("[FaceMatch] TKG stranger update failed for trace {}: {:?}", tid, e);
}
// Qdrant payload
let _ = face_db
.update_stranger_ref_by_trace(file_uuid, tid, stranger_ref)
.await;
}
tracing::info!(
"[FaceMatch] Done: {} matched, {} strangers — {} face_detections updated",
matched.len(),
stranger_trace_count,
pg_updated
);
Ok(pg_updated)
Ok(0)
}
/// Fallback: PostgreSQL-based matching (original implementation)
/// Fallback: PostgreSQL-based matching (disabled - embedding column removed)
async fn match_faces_iterative_pg(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::Result<usize> {
// Step 1: 載入 TMDb identities (source='tmdb' 且有 face_embedding)
let identities_table = schema::table_name("identities");
let tmdb_rows = sqlx::query_as::<_, (i32, String, Vec<f32>)>(
&format!("SELECT id, name, face_embedding::real[] FROM {} WHERE source='tmdb' AND face_embedding IS NOT NULL", identities_table)
)
.fetch_all(pool).await?;
if tmdb_rows.is_empty() {
tracing::warn!("[FaceMatch-PG] No TMDb identities with face embeddings");
return Ok(0);
}
tracing::info!(
"[FaceMatch-PG] Loaded {} TMDb seed identities",
tmdb_rows.len()
tracing::warn!(
"[FaceMatch-PG] PostgreSQL matching disabled - embedding column removed for {}",
file_uuid
);
// Step 2: 載入所有 face_detections含 frame_number按 trace_id 分組
let fd_table = schema::table_name("face_detections");
let fd_rows = sqlx::query_as::<_, (i32, i64, Vec<f32>)>(&format!(
"SELECT trace_id, frame_number, embedding FROM {} \
WHERE file_uuid=$1 AND trace_id IS NOT NULL AND embedding IS NOT NULL \
ORDER BY trace_id, frame_number",
fd_table
))
.bind(file_uuid)
.fetch_all(pool)
.await?;
if fd_rows.is_empty() {
tracing::warn!("[FaceMatch-PG] No face detections with embeddings");
return Ok(0);
}
// 分組trace_id → (frame_number, embedding)
use std::collections::HashMap;
let mut face_track_faces_raw: HashMap<i32, Vec<(i64, Vec<f32>)>> = HashMap::new();
for (tid, frame, emb) in &fd_rows {
face_track_faces_raw
.entry(*tid)
.or_insert_with(Vec::new)
.push((*frame, emb.clone()));
}
// 從每個 trace 選取不同角度的 3 個 face embedding
let mut face_track_samples: HashMap<i32, Vec<Vec<f32>>> = HashMap::new();
for (tid, mut faces) in face_track_faces_raw {
faces.sort_by_key(|(frame, _)| *frame);
let n = faces.len();
let indices = if n <= 3 {
(0..n).collect()
} else {
let mid = n / 2;
vec![0, mid, n - 1]
};
let samples: Vec<Vec<f32>> = indices.iter().map(|&i| faces[i].1.clone()).collect();
face_track_samples.insert(tid, samples);
}
let total_traces = face_track_samples.len();
let sample_count: usize = face_track_samples.values().map(|v| v.len()).sum();
tracing::info!(
"[FaceMatch-PG] Loaded {} traces, sampled {} embeddings (3-angle)",
total_traces,
sample_count
);
// Step 3: 建立 TMDb 查找表
let tmdb_seeds: Vec<(i32, String, Vec<f32>)> = tmdb_rows;
// Step 4: 迭代匹配
const TH: f32 = 0.50;
let mut matched: HashMap<i32, String> = HashMap::new(); // trace_id → identity_name
// Round 1: 用 3-angle samples 比對 TMDb
for (&tid, samples) in &face_track_samples {
let mut best_name = String::new();
let mut best_sim = 0.0f32;
for (_, ref name, ref tmdb_emb) in &tmdb_seeds {
for face_emb in samples {
let s = cosine_similarity(face_emb, tmdb_emb);
if s > best_sim {
best_sim = s;
best_name = name.clone();
}
}
}
if best_sim >= TH {
matched.insert(tid, best_name);
}
}
tracing::info!(
"[FaceMatch] Round 1: {} matched ({}%) — writing to DB",
matched.len(),
matched.len() * 100 / total_traces
);
// Step 5: 寫入 DB — Round 1 結果先存 (Phase 3: update both face_detections AND tkg_nodes)
let identities_table = schema::table_name("identities");
let strangers_table = schema::table_name("strangers");
let fd_table = schema::table_name("face_detections");
let nodes_table = schema::table_name("tkg_nodes");
let mut updated = 0usize;
for (tid, name) in &matched {
let id_opt = sqlx::query_scalar::<_, Option<i32>>(&format!(
"SELECT id FROM {} WHERE name=$1 AND source='tmdb'",
identities_table
))
.bind(name)
.fetch_optional(pool)
.await?;
if let Some(identity_id) = id_opt {
let _ = sqlx::query(&format!(
"UPDATE {} SET identity_id=$1 WHERE file_uuid=$2 AND trace_id=$3",
fd_table
))
.bind(identity_id)
.bind(file_uuid)
.bind(tid)
.execute(pool)
.await;
// Phase 3: Also update TKG node
let external_id = format!("face_track_{}", tid);
let _ = sqlx::query(&format!(
"UPDATE {} SET properties = jsonb_set(\
jsonb_set(properties, '{{identity_id}}', $1::jsonb, false),\
'{{identity_name}}', $2::jsonb, false)\
WHERE file_uuid = $3 AND node_type = 'face_track' AND external_id = $4",
nodes_table
))
.bind(identity_id)
.bind(name.as_str())
.bind(file_uuid)
.bind(&external_id)
.execute(pool)
.await;
updated += 1;
}
}
tracing::info!("[FaceMatch] Round 1: updated {} face_detections", updated);
// Round 2+: 用已匹配的 face 作為 seed 傳播(剩餘未匹配的 trace
let initial_matched = matched.len();
for round_n in 2..=5 {
let prev = matched.len();
// 建立 seed pool: name → Vec<embedding>
let mut seed_pool: HashMap<String, Vec<&Vec<f32>>> = HashMap::new();
for (&tid, name) in &matched {
if let Some(samples) = face_track_samples.get(&tid) {
seed_pool
.entry(name.clone())
.or_default()
.extend(samples.iter());
}
}
let mut new_matches: Vec<(i32, String)> = Vec::new();
for (&tid, samples) in &face_track_samples {
if matched.contains_key(&tid) {
continue;
}
let mut best_name = String::new();
let mut best_sim = 0.0f32;
if samples.is_empty() {
continue;
}
// 用 3-angle samples 分別比對 seed取最高 similarity
for (name, seed_faces) in &seed_pool {
for face_emb in samples {
for seed in seed_faces {
let s = cosine_similarity(face_emb, seed);
if s > best_sim {
best_sim = s;
best_name = name.clone();
}
}
}
}
if best_sim >= TH {
new_matches.push((tid, best_name));
}
}
for (tid, name) in new_matches {
matched.insert(tid, name);
}
let new = matched.len() - prev;
tracing::info!(
"[FaceMatch] Round {}: +{} matched (total {}, {}%)",
round_n,
new,
matched.len(),
matched.len() * 100 / total_traces
);
if new < 5 {
break;
}
}
// Step 6: 未匹配的 trace 設 stranger_id = strangers.id (FK)
// First: ensure strangers records exist
let _ = sqlx::query(&format!(
"INSERT INTO {} (file_uuid, trace_id) \
SELECT $1, fd.trace_id FROM {} fd \
WHERE fd.file_uuid = $1 AND fd.trace_id IS NOT NULL \
AND fd.identity_id IS NULL \
ON CONFLICT (file_uuid, trace_id) DO NOTHING",
strangers_table, fd_table
))
.bind(file_uuid)
.execute(pool)
.await?;
// Then: update face_detections.stranger_id = strangers.id
let stranger_update = sqlx::query(&format!(
"UPDATE {} fd SET stranger_id = s.id \
FROM {} s \
WHERE s.file_uuid = fd.file_uuid AND s.trace_id = fd.trace_id \
AND fd.file_uuid = $1 AND fd.identity_id IS NULL \
AND fd.trace_id IS NOT NULL AND fd.stranger_id IS NULL",
fd_table, strangers_table
))
.bind(file_uuid)
.execute(pool)
.await?;
let stranger_count = stranger_update.rows_affected();
// Step 7: Save identity files for all affected identities
let affected = sqlx::query_scalar::<_, uuid::Uuid>(&format!(
"SELECT DISTINCT i.uuid FROM {} i \
JOIN {} fd ON fd.identity_id = i.id \
WHERE fd.file_uuid=$1 AND fd.identity_id IS NOT NULL",
identities_table, fd_table
))
.bind(file_uuid)
.fetch_all(pool)
.await
.unwrap_or_default();
for uuid in &affected {
let us = uuid.to_string().replace('-', "");
if let Err(e) = crate::core::identity::storage::save_identity_file_by_pool(pool, &us).await
{
tracing::warn!("[FaceMatch] Failed to save identity file {}: {}", us, e);
}
}
tracing::info!(
"[FaceMatch] Done: {}/{} traces matched ({}%), {} strangers, {} identity files",
matched.len(),
total_traces,
matched.len() * 100 / total_traces,
stranger_count,
affected.len()
);
Ok(updated)
Ok(0)
}
/// Bind ASRX speakers to face traces based on temporal overlap.
@@ -1589,126 +1013,9 @@ async fn run_identity_handler(
/// Read all TMDb identities with profile photos, extract face embeddings, store in Qdrant as seeds.
pub async fn generate_seed_embeddings(db: &PostgresDb) -> anyhow::Result<usize> {
use crate::core::db::face_embedding_db::FaceEmbeddingDb;
use std::path::Path;
let pool = db.pool();
let id_table = schema::table_name("identities");
let rows = sqlx::query_as::<_, (i32, String, String, i32, String)>(&format!(
"SELECT id, name, uuid::text, tmdb_id, tmdb_profile FROM {} \
WHERE source='tmdb' AND tmdb_profile IS NOT NULL",
id_table
))
.fetch_all(pool)
.await?;
if rows.is_empty() {
tracing::warn!("[GenerateSeeds] No TMDb identities with profile photos");
return Ok(0);
}
let scripts_dir = std::env::var("MOMENTRY_SCRIPTS_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry_core_0.1/scripts".to_string());
let python_path = std::env::var("MOMENTRY_PYTHON_PATH")
.unwrap_or_else(|_| "/opt/homebrew/bin/python3.11".to_string());
let extract_script = Path::new(&scripts_dir).join("extract_face_embedding.py");
let face_db = FaceEmbeddingDb::new();
let mut success = 0usize;
for (id, name, uuid, tmdb_id, profile_url) in &rows {
tracing::info!("[GenerateSeeds] Processing {} ({})", name, uuid);
// Download profile image
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()
.unwrap_or_else(|_| reqwest::Client::new());
let resp = client.get(profile_url).send().await;
let image_bytes = match resp {
Ok(r) if r.status().is_success() => r.bytes().await.unwrap_or_default(),
_ => {
tracing::warn!("[GenerateSeeds] Failed to download: {} from {}", name, profile_url);
continue;
}
};
if image_bytes.is_empty() {
tracing::warn!("[GenerateSeeds] Empty image for {}", name);
continue;
}
// Save to temp file
let temp_dir = std::env::temp_dir().join("momentry_seed_faces");
std::fs::create_dir_all(&temp_dir)?;
let temp_img = temp_dir.join(format!("{}.jpg", uuid));
std::fs::write(&temp_img, &image_bytes)?;
// Extract embedding with timeout
use tokio::time::timeout;
let output = timeout(
std::time::Duration::from_secs(180),
tokio::process::Command::new(&python_path)
.arg(&extract_script)
.arg(&temp_img)
.output(),
)
.await
.map_err(|_| anyhow::anyhow!("Extract embedding timed out for {}", name))??;
let _ = std::fs::remove_file(&temp_img);
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
tracing::warn!(
"[GenerateSeeds] Extraction failed for {}: {}",
name,
stderr.trim()
);
continue;
}
let stdout = String::from_utf8_lossy(&output.stdout);
let extract_result: serde_json::Value = match serde_json::from_str(&stdout) {
Ok(v) => v,
Err(e) => {
tracing::warn!("[GenerateSeeds] Parse error for {}: {}", name, e);
continue;
}
};
let embedding: Vec<f64> = match serde_json::from_value(
extract_result.get("embedding").ok_or_else(|| anyhow::anyhow!("No embedding"))?.clone(),
) {
Ok(v) => v,
Err(e) => {
tracing::warn!("[GenerateSeeds] Embedding format error for {}: {}", name, e);
continue;
}
};
let embedding_f32: Vec<f32> = embedding.into_iter().map(|v| v as f32).collect();
// Store in Qdrant
match face_db
.upsert_seed_embedding(uuid, name, *tmdb_id, &embedding_f32)
.await
{
Ok(_) => {
success += 1;
tracing::info!("[GenerateSeeds] Stored seed for {}", name);
}
Err(e) => {
tracing::warn!("[GenerateSeeds] Qdrant error for {}: {}", name, e);
}
}
}
tracing::info!(
"[GenerateSeeds] Done: {}/{} seeds generated",
success,
rows.len()
tracing::warn!(
"[GenerateSeeds] Seed embedding generation disabled - FaceEmbeddingDb removed. \
TODO: Reimplement with _faces collection"
);
Ok(success)
Ok(0)
}

View File

@@ -67,11 +67,13 @@ pub async fn bind_identity(
Path(identity_uuid): Path<String>,
Json(req): Json<BindIdentityRequest>,
) -> Result<Json<ApiResponse<serde_json::Value>>, (StatusCode, Json<serde_json::Value>)> {
tracing::info!("[bind_identity] req: {:?}", req);
let table = crate::core::db::schema::table_name("face_detections");
let id_table = crate::core::db::schema::table_name("identities");
let history_table = crate::core::db::schema::table_name("identity_history");
let uuid_clean = identity_uuid.replace('-', "");
tracing::info!("[bind_identity] uuid_clean={}, expand_to_trace={:?}", uuid_clean, req.expand_to_trace);
let identity_row: Option<(i32, String)> = sqlx::query_as(&format!(
"SELECT id, name FROM {} WHERE REPLACE(uuid::text, '-', '') = $1",
id_table
@@ -188,21 +190,32 @@ pub async fn bind_identity(
})?
.flatten();
// Update Qdrant + TKG if trace_id exists
if let Some(tid) = trace_id {
// 1. Update Qdrant payload
let face_db = crate::core::db::FaceEmbeddingDb::new();
if let Err(e) = face_db
.update_identity_by_trace(&req.file_uuid, tid, &uuid_clean)
.await
{
tracing::warn!(
"[bind] Failed to update Qdrant identity_uuid for trace {}: {}",
tid, e
);
// Expand to entire trace if requested
tracing::info!("[bind_identity] trace_id={:?}, expand_to_trace={:?}", trace_id, req.expand_to_trace);
if req.expand_to_trace.unwrap_or(false) && trace_id.is_some() {
let tid = trace_id.unwrap();
tracing::info!("[bind_identity] Expanding to trace {} for file {}", tid, req.file_uuid);
let expand_result = sqlx::query(&format!(
"UPDATE {} SET identity_id = $1 WHERE file_uuid = $2 AND trace_id = $3",
table
))
.bind(identity_id)
.bind(&req.file_uuid)
.bind(tid)
.execute(state.db.pool())
.await;
if let Ok(r) = expand_result {
tracing::info!("[bind] Expanded to trace {}: {} rows", tid, r.rows_affected());
} else {
tracing::error!("[bind] Failed to expand to trace {}: {:?}", tid, expand_result.err());
}
} else {
tracing::info!("[bind_identity] NOT expanding: expand_to_trace={:?}, trace_id={:?}", req.expand_to_trace, trace_id);
}
// 2. Update TKG face_track node (dual-field design)
// Update TKG if trace_id exists
if let Some(tid) = trace_id {
// Update TKG face_track node (dual-field design)
let tkg_table = crate::core::db::schema::table_name("tkg_nodes");
let ext_id = format!("face_track_{}", tid);
let identity_ref = format!("{}:identity_{}", req.file_uuid, identity_id);
@@ -380,21 +393,9 @@ pub async fn unbind_identity(
})?
.flatten();
// Clear Qdrant + TKG if trace_id exists
// Clear TKG if trace_id exists
if let Some(tid) = trace_id {
// 1. Clear Qdrant payload
let face_db = crate::core::db::FaceEmbeddingDb::new();
if let Err(e) = face_db
.clear_identity_by_trace(&req.file_uuid, tid)
.await
{
tracing::warn!(
"[unbind] Failed to clear Qdrant identity_uuid for trace {}: {}",
tid, e
);
}
// 2. Update TKG face_track node (restore stranger_ref)
// Update TKG face_track node (restore stranger_ref)
let tkg_table = crate::core::db::schema::table_name("tkg_nodes");
let ext_id = format!("face_track_{}", tid);
let stranger_ref = format!("{}:stranger_trace_{}", req.file_uuid, tid);
@@ -2199,8 +2200,10 @@ pub async fn list_pending_persons(
let fd_table = crate::core::db::schema::table_name("face_detections");
let rows: Vec<(i32, String, String, chrono::NaiveDateTime)> = sqlx::query_as(&format!(
"SELECT id, uuid::text, name, created_at FROM {} WHERE file_uuid = $1 AND status = 'pending' ORDER BY created_at DESC",
id_table
"SELECT DISTINCT i.id, i.uuid::text, i.name, i.created_at FROM {} i \
JOIN {} fd ON fd.identity_id = i.id \
WHERE fd.file_uuid = $1 AND i.status = 'pending' ORDER BY i.created_at DESC",
id_table, fd_table
))
.bind(&file_uuid)
.fetch_all(state.db.pool())

View File

@@ -4,7 +4,6 @@ pub mod auth;
pub mod checkin_api;
pub mod docs;
pub mod files;
pub mod five_w1h_agent_api;
pub mod health;
pub mod identities;
pub mod identity_agent_api;

View File

@@ -260,7 +260,25 @@ async fn trigger_processing(
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
if existing_id.is_none() {
if let Some(job_id) = existing_id {
// Clean up stale processor_results from previous runs
// Old entries with status='running' from a dead worker session
// would block the worker from actually running processors.
let pr_table = schema::table_name("processor_results");
sqlx::query(&format!("DELETE FROM {pr_table} WHERE job_id = $1"))
.bind(job_id)
.execute(state.db.pool())
.await
.map_err(|e| {
tracing::error!(
"[TRIGGER] Failed to clean processor_results for job {}: {}",
job_id,
e
);
StatusCode::INTERNAL_SERVER_ERROR
})?;
tracing::info!("[TRIGGER] Cleaned processor_results for job {}", job_id);
} else {
state
.db
.create_monitor_job(&file_uuid, Some(&file_path))

View File

@@ -14,7 +14,6 @@ use super::auth;
use super::checkin_api;
use super::docs;
use super::files;
use super::five_w1h_agent_api;
use super::health;
use super::identities;
use super::identity_agent_api;
@@ -116,7 +115,6 @@ pub async fn start_server(host: &str, port: u16) -> anyhow::Result<()> {
.merge(agent_search::agent_search_routes())
.merge(processing::processing_routes())
.merge(identity_agent_api::identity_agent_routes())
.merge(five_w1h_agent_api::five_w1h_agent_routes())
.merge(media_api::bbox_routes())
.merge(media_api::media_proxy_routes())
.merge(trace_agent_api::trace_agent_routes())

View File

@@ -608,122 +608,17 @@ async fn tmdb_match_handler(
));
}
// Get all TMDb identities with face_embedding
let tmdb_rows = sqlx::query_as::<_, (i32, String, Vec<f32>)>(
&format!(
"SELECT id, name, face_embedding::real[] FROM {} WHERE source='tmdb' AND face_embedding IS NOT NULL",
crate::core::db::schema::table_name("identities")
)
)
.fetch_all(state.db.pool())
.await
.map_err(|e| {
(StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})))
})?;
if tmdb_rows.is_empty() {
return Ok(Json(TmdbMatchResponse {
success: true,
file_uuid,
bindings_created: 0,
tmdb_identities_available: 0,
message: "No TMDb identities with face embeddings".to_string(),
}));
}
let face_collection = format!(
"{}_faces",
crate::core::config::REDIS_KEY_PREFIX
.as_str()
.trim_end_matches(':')
tracing::warn!(
"[TKG-MATCH] TMDb matching disabled - sync_trace_embeddings removed. \
TODO: Reimplement with _faces collection for {}",
file_uuid
);
let qdrant = QdrantDb::new();
let _ = qdrant.ensure_collection(&face_collection, 512).await;
let trace_collection = format!(
"{}_traces",
crate::core::config::REDIS_KEY_PREFIX
.as_str()
.trim_end_matches(':')
);
let _ = qdrant.ensure_collection(&trace_collection, 512).await;
// Sync trace embeddings (idempotent)
if let Err(e) = crate::core::db::qdrant_db::sync_trace_embeddings(&file_uuid).await {
tracing::error!("[TKG-MATCH] Trace sync failed: {}", e);
}
let mut total_bindings = 0usize;
for (tmdb_id, tmdb_name, tmdb_embedding) in &tmdb_rows {
// Search Qdrant trace collection with this TMDb embedding
let results = match qdrant
.search_face_collection(
&trace_collection,
tmdb_embedding,
100,
"source",
"tmdb",
Some(&file_uuid),
)
.await
{
Ok(r) => r,
Err(e) => {
tracing::warn!("[TKG-MATCH] Qdrant search failed for {}: {}", tmdb_name, e);
continue;
}
};
// Filter results by threshold and file_uuid
let filtered: Vec<_> = results
.into_iter()
.filter(|(score, payload)| {
*score >= 0.50
&& payload.get("file_uuid").and_then(|v| v.as_str()) == Some(&file_uuid)
})
.collect();
if filtered.is_empty() {
continue;
}
// Bind matched traces directly
let mut bound_count = 0usize;
for (_score, payload) in &filtered {
if let Some(tid) = payload.get("trace_id").and_then(|v| v.as_i64()) {
let r = sqlx::query(&format!(
"UPDATE {} SET identity_id=$1 WHERE file_uuid=$2 AND trace_id=$3",
crate::core::db::schema::table_name("face_detections")
))
.bind(tmdb_id)
.bind(&file_uuid)
.bind(tid as i32)
.execute(state.db.pool())
.await;
if let Ok(result) = r {
bound_count += result.rows_affected() as usize;
}
}
}
if bound_count > 0 {
tracing::info!(
"[TKG-MATCH] {}: bound {} traces to TMDb identity {}",
tmdb_name,
bound_count,
tmdb_id
);
}
total_bindings += bound_count;
}
Ok(Json(TmdbMatchResponse {
success: true,
file_uuid,
bindings_created: total_bindings,
tmdb_identities_available: tmdb_rows.len(),
message: format!("{} traces matched to TMDb identities", total_bindings),
bindings_created: 0,
tmdb_identities_available: 0,
message: "TMDb matching disabled - needs reimplementation with _faces collection".to_string(),
}))
}