fix: identity agent writes Round 1 matches to DB immediately
This commit is contained in:
@@ -637,13 +637,39 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::
|
||||
}
|
||||
}
|
||||
tracing::info!(
|
||||
"[FaceMatch] Round 1: {} matched ({}%)",
|
||||
"[FaceMatch] Round 1: {} matched ({}%) — writing to DB",
|
||||
matched.len(),
|
||||
matched.len() * 100 / total_traces
|
||||
);
|
||||
|
||||
// Round 2+: 用已匹配的 face 作為 seed 傳播
|
||||
for round_n in 2..=10 {
|
||||
// Step 5: 寫入 DB — Round 1 結果先存
|
||||
let identities_table = schema::table_name("identities");
|
||||
let fd_table = schema::table_name("face_detections");
|
||||
let mut updated = 0usize;
|
||||
for (tid, name) in &matched {
|
||||
let id_opt = sqlx::query_scalar::<_, Option<i32>>(
|
||||
&format!("SELECT id FROM {} WHERE name=$1 AND source='tmdb'", identities_table),
|
||||
)
|
||||
.bind(name)
|
||||
.fetch_optional(pool)
|
||||
.await?;
|
||||
if let Some(identity_id) = id_opt {
|
||||
let _ = sqlx::query(
|
||||
&format!("UPDATE {} SET identity_id=$1 WHERE file_uuid=$2 AND trace_id=$3", fd_table),
|
||||
)
|
||||
.bind(identity_id)
|
||||
.bind(file_uuid)
|
||||
.bind(tid)
|
||||
.execute(pool)
|
||||
.await;
|
||||
updated += 1;
|
||||
}
|
||||
}
|
||||
tracing::info!("[FaceMatch] Round 1: updated {} face_detections", updated);
|
||||
|
||||
// Round 2+: 用已匹配的 face 作為 seed 傳播(剩餘未匹配的 trace)
|
||||
let initial_matched = matched.len();
|
||||
for round_n in 2..=5 {
|
||||
let prev = matched.len();
|
||||
// 建立 seed pool: name → Vec<embedding>
|
||||
let mut seed_pool: HashMap<String, Vec<&Vec<f32>>> = HashMap::new();
|
||||
@@ -698,30 +724,6 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::
|
||||
}
|
||||
}
|
||||
|
||||
// Step 5: 寫入 DB — 已匹配的設 identity_id
|
||||
let identities_table = schema::table_name("identities");
|
||||
let fd_table = schema::table_name("face_detections");
|
||||
let mut updated = 0usize;
|
||||
for (tid, name) in &matched {
|
||||
let id_opt = sqlx::query_scalar::<_, Option<i32>>(
|
||||
&format!("SELECT id FROM {} WHERE name=$1 AND source='tmdb'", identities_table),
|
||||
)
|
||||
.bind(name)
|
||||
.fetch_optional(pool)
|
||||
.await?;
|
||||
if let Some(identity_id) = id_opt {
|
||||
let _ = sqlx::query(
|
||||
&format!("UPDATE {} SET identity_id=$1 WHERE file_uuid=$2 AND trace_id=$3", fd_table),
|
||||
)
|
||||
.bind(identity_id)
|
||||
.bind(file_uuid)
|
||||
.bind(tid)
|
||||
.execute(pool)
|
||||
.await;
|
||||
updated += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Step 6: 未匹配的 trace 設 stranger_id = trace_id
|
||||
// trace_id 在同一個 file 內是 sequential integer,直接複用為 stranger_id
|
||||
let stranger_update = sqlx::query(
|
||||
@@ -915,6 +917,12 @@ pub async fn run_identity_agent(db: &PostgresDb, file_uuid: &str) -> anyhow::Res
|
||||
let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR")
|
||||
.unwrap_or_else(|_| "/Users/accusys/momentry/output".to_string());
|
||||
|
||||
let pool = db.pool();
|
||||
|
||||
// Step 1: 先跑 face matching(不需 face_clustered.json)
|
||||
let matched = match_faces_iterative(pool, file_uuid).await.unwrap_or(0);
|
||||
|
||||
// Step 2: 試著載入 face_clustered.json 建立新 identities
|
||||
let video_dir = PathBuf::from(&output_dir).join(file_uuid);
|
||||
let face_clustered_path = video_dir.join(format!("{}.face_clustered.json", file_uuid));
|
||||
let face_clustered_path = if face_clustered_path.exists() {
|
||||
@@ -923,57 +931,60 @@ pub async fn run_identity_agent(db: &PostgresDb, file_uuid: &str) -> anyhow::Res
|
||||
PathBuf::from(&output_dir).join(format!("{}.face_clustered.json", file_uuid))
|
||||
};
|
||||
|
||||
if !face_clustered_path.exists() {
|
||||
tracing::warn!(
|
||||
"[IdentityAgent] face_clustered.json not found for {}",
|
||||
if face_clustered_path.exists() {
|
||||
let face_data: serde_json::Value =
|
||||
std::fs::read_to_string(&face_clustered_path)?.parse()?;
|
||||
let asrx_path = video_dir.join(format!("{}.asrx.json", file_uuid));
|
||||
let asrx_data: Option<serde_json::Value> = if asrx_path.exists() {
|
||||
Some(std::fs::read_to_string(&asrx_path)?.parse()?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let persons = extract_persons_from_face_data(&face_data);
|
||||
let speakers = extract_speakers_from_asrx_data(&asrx_data);
|
||||
let identities = analyze_person_speaker_overlap(&persons, &speakers);
|
||||
|
||||
let uuid_short = &file_uuid[..8.min(file_uuid.len())];
|
||||
for (idx, id_result) in identities.iter().enumerate() {
|
||||
let identity_name = format!("stranger_{}_{}", uuid_short, idx);
|
||||
let metadata = serde_json::json!({
|
||||
"source": "identity_agent",
|
||||
"trace_ids": id_result.person_ids,
|
||||
"speaker_ids": id_result.speaker_ids,
|
||||
"confidence": id_result.confidence,
|
||||
"evidence": {
|
||||
"speaker_overlap": id_result.evidence.speaker_overlap,
|
||||
"frame_ratio": id_result.evidence.frame_ratio,
|
||||
},
|
||||
"reasoning": id_result.reasoning,
|
||||
});
|
||||
let _ = sqlx::query(
|
||||
&format!("INSERT INTO {} (name, identity_type, source, metadata, status) VALUES ($1, 'people', 'auto', $2::jsonb, 'pending') ON CONFLICT DO NOTHING", schema::table_name("identities"))
|
||||
)
|
||||
.bind(&identity_name)
|
||||
.bind(&metadata)
|
||||
.execute(pool)
|
||||
.await;
|
||||
}
|
||||
let _created = identities.len();
|
||||
tracing::info!(
|
||||
"[IdentityAgent] Created {} auto identities from face_clustered for {}",
|
||||
_created,
|
||||
file_uuid
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let face_data: serde_json::Value = std::fs::read_to_string(&face_clustered_path)?.parse()?;
|
||||
let asrx_path = video_dir.join(format!("{}.asrx.json", file_uuid));
|
||||
let asrx_data: Option<serde_json::Value> = if asrx_path.exists() {
|
||||
Some(std::fs::read_to_string(&asrx_path)?.parse()?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let persons = extract_persons_from_face_data(&face_data);
|
||||
let speakers = extract_speakers_from_asrx_data(&asrx_data);
|
||||
let identities = analyze_person_speaker_overlap(&persons, &speakers);
|
||||
|
||||
let pool = db.pool();
|
||||
let uuid_short = &file_uuid[..8.min(file_uuid.len())];
|
||||
for (idx, id_result) in identities.iter().enumerate() {
|
||||
let identity_name = format!("stranger_{}_{}", uuid_short, idx);
|
||||
let metadata = serde_json::json!({
|
||||
"source": "identity_agent",
|
||||
"trace_ids": id_result.person_ids,
|
||||
"speaker_ids": id_result.speaker_ids,
|
||||
"confidence": id_result.confidence,
|
||||
"evidence": {
|
||||
"speaker_overlap": id_result.evidence.speaker_overlap,
|
||||
"frame_ratio": id_result.evidence.frame_ratio,
|
||||
},
|
||||
"reasoning": id_result.reasoning,
|
||||
});
|
||||
let _ = sqlx::query(
|
||||
&format!("INSERT INTO {} (name, identity_type, source, metadata, status) VALUES ($1, 'people', 'auto', $2::jsonb, 'pending') ON CONFLICT DO NOTHING", schema::table_name("identities"))
|
||||
)
|
||||
.bind(&identity_name)
|
||||
.bind(&metadata)
|
||||
.execute(pool)
|
||||
.await;
|
||||
tracing::warn!(
|
||||
"[IdentityAgent] face_clustered.json not found for {}, skipping identity creation",
|
||||
file_uuid
|
||||
);
|
||||
}
|
||||
|
||||
let matched = match_faces_iterative(pool, file_uuid).await.unwrap_or(0);
|
||||
let bound = bind_speakers(pool, file_uuid).await.unwrap_or(0);
|
||||
|
||||
tracing::info!(
|
||||
"[IdentityAgent] Done for {}: {} identities, {} face matches, {} speaker bindings",
|
||||
"[IdentityAgent] Done for {}: {} face matches, {} speaker bindings",
|
||||
file_uuid,
|
||||
identities.len(),
|
||||
matched,
|
||||
bound
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user