From e6fd170da209c8aa910eccd5600f6c7285fd5984 Mon Sep 17 00:00:00 2001 From: Accusys Date: Mon, 18 May 2026 03:46:33 +0800 Subject: [PATCH] fix: identity agent writes Round 1 matches to DB immediately --- src/api/identity_agent_api.rs | 149 ++++++++++++++++++---------------- 1 file changed, 80 insertions(+), 69 deletions(-) diff --git a/src/api/identity_agent_api.rs b/src/api/identity_agent_api.rs index 551db9c..9f63255 100644 --- a/src/api/identity_agent_api.rs +++ b/src/api/identity_agent_api.rs @@ -637,13 +637,39 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow:: } } tracing::info!( - "[FaceMatch] Round 1: {} matched ({}%)", + "[FaceMatch] Round 1: {} matched ({}%) — writing to DB", matched.len(), matched.len() * 100 / total_traces ); - // Round 2+: 用已匹配的 face 作為 seed 傳播 - for round_n in 2..=10 { + // Step 5: 寫入 DB — Round 1 結果先存 + let identities_table = schema::table_name("identities"); + let fd_table = schema::table_name("face_detections"); + let mut updated = 0usize; + for (tid, name) in &matched { + let id_opt = sqlx::query_scalar::<_, Option>( + &format!("SELECT id FROM {} WHERE name=$1 AND source='tmdb'", identities_table), + ) + .bind(name) + .fetch_optional(pool) + .await?; + if let Some(identity_id) = id_opt { + let _ = sqlx::query( + &format!("UPDATE {} SET identity_id=$1 WHERE file_uuid=$2 AND trace_id=$3", fd_table), + ) + .bind(identity_id) + .bind(file_uuid) + .bind(tid) + .execute(pool) + .await; + updated += 1; + } + } + tracing::info!("[FaceMatch] Round 1: updated {} face_detections", updated); + + // Round 2+: 用已匹配的 face 作為 seed 傳播(剩餘未匹配的 trace) + let initial_matched = matched.len(); + for round_n in 2..=5 { let prev = matched.len(); // 建立 seed pool: name → Vec let mut seed_pool: HashMap>> = HashMap::new(); @@ -698,30 +724,6 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow:: } } - // Step 5: 寫入 DB — 已匹配的設 identity_id - let identities_table = schema::table_name("identities"); - let fd_table = schema::table_name("face_detections"); - let mut updated = 0usize; - for (tid, name) in &matched { - let id_opt = sqlx::query_scalar::<_, Option>( - &format!("SELECT id FROM {} WHERE name=$1 AND source='tmdb'", identities_table), - ) - .bind(name) - .fetch_optional(pool) - .await?; - if let Some(identity_id) = id_opt { - let _ = sqlx::query( - &format!("UPDATE {} SET identity_id=$1 WHERE file_uuid=$2 AND trace_id=$3", fd_table), - ) - .bind(identity_id) - .bind(file_uuid) - .bind(tid) - .execute(pool) - .await; - updated += 1; - } - } - // Step 6: 未匹配的 trace 設 stranger_id = trace_id // trace_id 在同一個 file 內是 sequential integer,直接複用為 stranger_id let stranger_update = sqlx::query( @@ -915,6 +917,12 @@ pub async fn run_identity_agent(db: &PostgresDb, file_uuid: &str) -> anyhow::Res let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR") .unwrap_or_else(|_| "/Users/accusys/momentry/output".to_string()); + let pool = db.pool(); + + // Step 1: 先跑 face matching(不需 face_clustered.json) + let matched = match_faces_iterative(pool, file_uuid).await.unwrap_or(0); + + // Step 2: 試著載入 face_clustered.json 建立新 identities let video_dir = PathBuf::from(&output_dir).join(file_uuid); let face_clustered_path = video_dir.join(format!("{}.face_clustered.json", file_uuid)); let face_clustered_path = if face_clustered_path.exists() { @@ -923,57 +931,60 @@ pub async fn run_identity_agent(db: &PostgresDb, file_uuid: &str) -> anyhow::Res PathBuf::from(&output_dir).join(format!("{}.face_clustered.json", file_uuid)) }; - if !face_clustered_path.exists() { - tracing::warn!( - "[IdentityAgent] face_clustered.json not found for {}", + if face_clustered_path.exists() { + let face_data: serde_json::Value = + std::fs::read_to_string(&face_clustered_path)?.parse()?; + let asrx_path = video_dir.join(format!("{}.asrx.json", file_uuid)); + let asrx_data: Option = if asrx_path.exists() { + Some(std::fs::read_to_string(&asrx_path)?.parse()?) + } else { + None + }; + + let persons = extract_persons_from_face_data(&face_data); + let speakers = extract_speakers_from_asrx_data(&asrx_data); + let identities = analyze_person_speaker_overlap(&persons, &speakers); + + let uuid_short = &file_uuid[..8.min(file_uuid.len())]; + for (idx, id_result) in identities.iter().enumerate() { + let identity_name = format!("stranger_{}_{}", uuid_short, idx); + let metadata = serde_json::json!({ + "source": "identity_agent", + "trace_ids": id_result.person_ids, + "speaker_ids": id_result.speaker_ids, + "confidence": id_result.confidence, + "evidence": { + "speaker_overlap": id_result.evidence.speaker_overlap, + "frame_ratio": id_result.evidence.frame_ratio, + }, + "reasoning": id_result.reasoning, + }); + let _ = sqlx::query( + &format!("INSERT INTO {} (name, identity_type, source, metadata, status) VALUES ($1, 'people', 'auto', $2::jsonb, 'pending') ON CONFLICT DO NOTHING", schema::table_name("identities")) + ) + .bind(&identity_name) + .bind(&metadata) + .execute(pool) + .await; + } + let _created = identities.len(); + tracing::info!( + "[IdentityAgent] Created {} auto identities from face_clustered for {}", + _created, file_uuid ); - return Ok(()); - } - - let face_data: serde_json::Value = std::fs::read_to_string(&face_clustered_path)?.parse()?; - let asrx_path = video_dir.join(format!("{}.asrx.json", file_uuid)); - let asrx_data: Option = if asrx_path.exists() { - Some(std::fs::read_to_string(&asrx_path)?.parse()?) } else { - None - }; - - let persons = extract_persons_from_face_data(&face_data); - let speakers = extract_speakers_from_asrx_data(&asrx_data); - let identities = analyze_person_speaker_overlap(&persons, &speakers); - - let pool = db.pool(); - let uuid_short = &file_uuid[..8.min(file_uuid.len())]; - for (idx, id_result) in identities.iter().enumerate() { - let identity_name = format!("stranger_{}_{}", uuid_short, idx); - let metadata = serde_json::json!({ - "source": "identity_agent", - "trace_ids": id_result.person_ids, - "speaker_ids": id_result.speaker_ids, - "confidence": id_result.confidence, - "evidence": { - "speaker_overlap": id_result.evidence.speaker_overlap, - "frame_ratio": id_result.evidence.frame_ratio, - }, - "reasoning": id_result.reasoning, - }); - let _ = sqlx::query( - &format!("INSERT INTO {} (name, identity_type, source, metadata, status) VALUES ($1, 'people', 'auto', $2::jsonb, 'pending') ON CONFLICT DO NOTHING", schema::table_name("identities")) - ) - .bind(&identity_name) - .bind(&metadata) - .execute(pool) - .await; + tracing::warn!( + "[IdentityAgent] face_clustered.json not found for {}, skipping identity creation", + file_uuid + ); } - let matched = match_faces_iterative(pool, file_uuid).await.unwrap_or(0); let bound = bind_speakers(pool, file_uuid).await.unwrap_or(0); tracing::info!( - "[IdentityAgent] Done for {}: {} identities, {} face matches, {} speaker bindings", + "[IdentityAgent] Done for {}: {} face matches, {} speaker bindings", file_uuid, - identities.len(), matched, bound );