diff --git a/src/api/identities.rs b/src/api/identities.rs index aed186f..ce00ad4 100644 --- a/src/api/identities.rs +++ b/src/api/identities.rs @@ -179,7 +179,7 @@ async fn list_identities( ) })?; - let sql = format!( +let sql = format!( "SELECT id::int, uuid, name, metadata FROM {} WHERE status IS NULL OR status != 'merged' ORDER BY id DESC LIMIT $1 OFFSET $2", id_table ); diff --git a/src/api/identity_agent_api.rs b/src/api/identity_agent_api.rs index c0f3d89..290ceee 100644 --- a/src/api/identity_agent_api.rs +++ b/src/api/identity_agent_api.rs @@ -208,7 +208,7 @@ async fn match_from_photo( // 4. Find best matching trace (highest similarity, no threshold) let fd_table = schema::table_name("face_detections"); let best_match: Option<(i32, i32, f64)> = sqlx::query_as(&format!( - r#"SELECT id, face_track_id, + r#"SELECT id, trace_id, 1 - (embedding::vector <=> $1::vector) as similarity FROM {} WHERE file_uuid = $2 AND embedding IS NOT NULL @@ -250,7 +250,7 @@ async fn match_from_photo( matches: 1, traces_matched, message: format!( - "Best trace: face_track_id={}, similarity={:.4}", + "Best trace: trace_id={}, similarity={:.4}", fb_trace, fb_sim ), })) @@ -284,7 +284,7 @@ async fn match_from_trace( let fd_table = schema::table_name("face_detections"); let all_faces: Vec<(Vec, i64)> = sqlx::query_as::<_, (Vec, i64)>(&format!( "SELECT embedding, frame_number FROM {} \ - WHERE file_uuid = $1 AND face_track_id = $2 AND embedding IS NOT NULL \ + WHERE file_uuid = $1 AND trace_id = $2 AND embedding IS NOT NULL \ ORDER BY frame_number ASC", fd_table )) @@ -321,7 +321,7 @@ async fn match_from_trace( // Get width*height info if available (not all pipelines store it) let face_sizes: Vec<(i64, i32)> = sqlx::query_as::<_, (i64, i32)>(&format!( "SELECT frame_number, COALESCE(width, 0) * COALESCE(height, 0) AS area \ - FROM {} WHERE file_uuid = $1 AND face_track_id = $2 AND embedding IS NOT NULL \ + FROM {} WHERE file_uuid = $1 AND trace_id = $2 AND embedding IS NOT NULL \ ORDER BY frame_number ASC", fd_table )) @@ -360,7 +360,7 @@ async fn match_from_trace( for qemb in &query_embeddings { let top = sqlx::query_as::<_, (i32, i32, f64)>(&format!( - r#"SELECT id, face_track_id, + r#"SELECT id, trace_id, 1 - (embedding::vector <=> $1::vector) as similarity FROM {} WHERE file_uuid = $2 @@ -382,9 +382,9 @@ async fn match_from_trace( ) })?; - if let Some((cface_id, c_face_track_id, c_sim)) = top { - if seen_trace_ids.insert(c_face_track_id) { - validated.push((cface_id, c_face_track_id, c_sim)); + if let Some((cface_id, c_trace_id, c_sim)) = top { + if seen_trace_ids.insert(c_trace_id) { + validated.push((cface_id, c_trace_id, c_sim)); } } } @@ -419,7 +419,7 @@ async fn match_from_trace( // 4. Update matched face_detections let mut traces_matched: Vec = Vec::new(); - for (id, face_track_id, _similarity) in &validated { + for (id, trace_id, _similarity) in &validated { if let Err(e) = sqlx::query(&format!( "UPDATE {} SET identity_id = $1 WHERE id = $2", fd_table @@ -435,15 +435,15 @@ async fn match_from_trace( e ); } else { - if !traces_matched.contains(face_track_id) { - traces_matched.push(*face_track_id); + if !traces_matched.contains(trace_id) { + traces_matched.push(*trace_id); } } } // 5. Also bind the source trace itself let _ = sqlx::query(&format!( - "UPDATE {} SET identity_id = $1 WHERE file_uuid = $2 AND face_track_id = $3", + "UPDATE {} SET identity_id = $1 WHERE file_uuid = $2 AND trace_id = $3", fd_table )) .bind(identity_id) @@ -1014,12 +1014,12 @@ async fn match_faces_iterative_pg(pool: &sqlx::PgPool, file_uuid: &str) -> anyho tmdb_rows.len() ); - // Step 2: 載入所有 face_detections(含 frame_number),按 face_track_id 分組 + // Step 2: 載入所有 face_detections(含 frame_number),按 trace_id 分組 let fd_table = schema::table_name("face_detections"); let fd_rows = sqlx::query_as::<_, (i32, i64, Vec)>(&format!( - "SELECT face_track_id, frame_number, embedding FROM {} \ - WHERE file_uuid=$1 AND face_track_id IS NOT NULL AND embedding IS NOT NULL \ - ORDER BY face_track_id, frame_number", + "SELECT trace_id, frame_number, embedding FROM {} \ + WHERE file_uuid=$1 AND trace_id IS NOT NULL AND embedding IS NOT NULL \ + ORDER BY trace_id, frame_number", fd_table )) .bind(file_uuid) @@ -1031,7 +1031,7 @@ async fn match_faces_iterative_pg(pool: &sqlx::PgPool, file_uuid: &str) -> anyho return Ok(0); } - // 分組:face_track_id → (frame_number, embedding) + // 分組:trace_id → (frame_number, embedding) use std::collections::HashMap; let mut face_track_faces_raw: HashMap)>> = HashMap::new(); for (tid, frame, emb) in &fd_rows { @@ -1069,7 +1069,7 @@ async fn match_faces_iterative_pg(pool: &sqlx::PgPool, file_uuid: &str) -> anyho // Step 4: 迭代匹配 const TH: f32 = 0.50; - let mut matched: HashMap = HashMap::new(); // face_track_id → identity_name + let mut matched: HashMap = HashMap::new(); // trace_id → identity_name // Round 1: 用 3-angle samples 比對 TMDb for (&tid, samples) in &face_track_samples { @@ -1110,7 +1110,7 @@ async fn match_faces_iterative_pg(pool: &sqlx::PgPool, file_uuid: &str) -> anyho .await?; if let Some(identity_id) = id_opt { let _ = sqlx::query(&format!( - "UPDATE {} SET identity_id=$1 WHERE file_uuid=$2 AND face_track_id=$3", + "UPDATE {} SET identity_id=$1 WHERE file_uuid=$2 AND trace_id=$3", fd_table )) .bind(identity_id) @@ -1200,11 +1200,11 @@ async fn match_faces_iterative_pg(pool: &sqlx::PgPool, file_uuid: &str) -> anyho // Step 6: 未匹配的 trace 設 stranger_id = strangers.id (FK) // First: ensure strangers records exist let _ = sqlx::query(&format!( - "INSERT INTO {} (file_uuid, face_track_id) \ - SELECT $1, fd.face_track_id FROM {} fd \ - WHERE fd.file_uuid = $1 AND fd.face_track_id IS NOT NULL \ + "INSERT INTO {} (file_uuid, trace_id) \ + SELECT $1, fd.trace_id FROM {} fd \ + WHERE fd.file_uuid = $1 AND fd.trace_id IS NOT NULL \ AND fd.identity_id IS NULL \ - ON CONFLICT (file_uuid, face_track_id) DO NOTHING", + ON CONFLICT (file_uuid, trace_id) DO NOTHING", strangers_table, fd_table )) .bind(file_uuid) @@ -1215,9 +1215,9 @@ async fn match_faces_iterative_pg(pool: &sqlx::PgPool, file_uuid: &str) -> anyho let stranger_update = sqlx::query(&format!( "UPDATE {} fd SET stranger_id = s.id \ FROM {} s \ - WHERE s.file_uuid = fd.file_uuid AND s.face_track_id = fd.face_track_id \ + WHERE s.file_uuid = fd.file_uuid AND s.trace_id = fd.trace_id \ AND fd.file_uuid = $1 AND fd.identity_id IS NULL \ - AND fd.face_track_id IS NOT NULL AND fd.stranger_id IS NULL", + AND fd.trace_id IS NOT NULL AND fd.stranger_id IS NULL", fd_table, strangers_table )) .bind(file_uuid) @@ -1255,16 +1255,16 @@ async fn match_faces_iterative_pg(pool: &sqlx::PgPool, file_uuid: &str) -> anyho } /// Bind ASRX speakers to face traces based on temporal overlap. -/// Reads face_detections (face_track_id, identity_id, frame_number) and ASRX +/// Reads face_detections (trace_id, identity_id, frame_number) and ASRX /// segments (speaker_id, start_time, end_time), computes overlap, /// and stores bindings in identity_bindings table. pub async fn bind_speakers(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::Result { // Load face traces with identity_id and frame numbers let fd_table = schema::table_name("face_detections"); let traces = sqlx::query_as::<_, (i32, Vec)>(&format!( - "SELECT face_track_id, array_agg(frame_number ORDER BY frame_number) \ - FROM {} WHERE file_uuid=$1 AND face_track_id IS NOT NULL AND identity_id IS NOT NULL \ - GROUP BY face_track_id", + "SELECT trace_id, array_agg(frame_number ORDER BY frame_number) \ + FROM {} WHERE file_uuid=$1 AND trace_id IS NOT NULL AND identity_id IS NOT NULL \ + GROUP BY trace_id", fd_table )) .bind(file_uuid) @@ -1327,7 +1327,7 @@ pub async fn bind_speakers(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::Resu // For each trace, compute overlap with each speaker let mut bindings = 0usize; - for (face_track_id, frames) in &traces { + for (trace_id, frames) in &traces { if frames.is_empty() { continue; } @@ -1335,9 +1335,9 @@ pub async fn bind_speakers(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::Resu // Get identity_id for this trace let fd_table = schema::table_name("face_detections"); let identity_id: Option = sqlx::query_scalar( - &format!("SELECT identity_id FROM {} WHERE file_uuid=$1 AND face_track_id=$2 AND identity_id IS NOT NULL LIMIT 1", fd_table) + &format!("SELECT identity_id FROM {} WHERE file_uuid=$1 AND trace_id=$2 AND identity_id IS NOT NULL LIMIT 1", fd_table) ) - .bind(file_uuid).bind(face_track_id) + .bind(file_uuid).bind(trace_id) .fetch_optional(pool).await?.flatten(); if identity_id.is_none() { @@ -1370,7 +1370,7 @@ pub async fn bind_speakers(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::Resu let overlap_ratio = best_overlap as f64 / frames.len() as f64; if overlap_ratio > 0.3 && !best_speaker.is_empty() { let metadata = serde_json::json!({ - "trace_id": face_track_id, + "trace_id": trace_id, "overlap_frames": best_overlap, "total_frames": frames.len(), "overlap_ratio": overlap_ratio, @@ -1464,7 +1464,7 @@ pub async fn run_identity_agent(db: &PostgresDb, file_uuid: &str) -> anyhow::Res "reasoning": identities[0].reasoning, }); let _ = sqlx::query(&format!( - "INSERT INTO {} (file_uuid, face_track_id, metadata) \ + "INSERT INTO {} (file_uuid, trace_id, metadata) \ VALUES ($1, NULL, $2::jsonb) ON CONFLICT DO NOTHING", schema::table_name("strangers") )) diff --git a/src/api/identity_api.rs b/src/api/identity_api.rs index ebb173a..49b3d73 100644 --- a/src/api/identity_api.rs +++ b/src/api/identity_api.rs @@ -1289,6 +1289,8 @@ pub struct SetProfileFromFaceRequest { pub file_uuid: String, pub face_id: Option, pub id: Option, + pub trace_id: Option, + pub frame_number: Option, } async fn set_profile_from_face( @@ -1302,20 +1304,40 @@ async fn set_profile_from_face( let uuid_clean = identity_uuid.replace('-', ""); - let face_identifier = match (&req.face_id, req.id) { - (Some(fid), _) => fid.clone(), - (None, Some(id)) => id.to_string(), - (None, None) => { + let (face_identifier, use_trace, use_frame) = match (&req.face_id, req.id, req.trace_id) { + (Some(fid), _, _) => (fid.clone(), false, None), + (None, Some(id), _) => (id.to_string(), false, None), + (None, None, Some(trace_id)) => (trace_id.to_string(), true, req.frame_number), + (None, None, None) => { return Err(( StatusCode::BAD_REQUEST, - Json(serde_json::json!({"success": false, "message": "Either face_id or id is required"})), + Json(serde_json::json!({"success": false, "message": "Either face_id, id, or trace_id is required"})), )); } }; - let use_id_field = req.id.is_some(); - - let row: Option<(i64, i32, i32, i32, i32, f64)> = if use_id_field { + let row: Option<(i64, i32, i32, i32, i32, f64)> = if use_trace { + if let Some(frame) = use_frame { + sqlx::query_as(&format!( + "SELECT frame_number, x, y, width, height, confidence FROM {} WHERE file_uuid = $1 AND trace_id = $2 AND frame_number = $3 LIMIT 1", + fd_table + )) + .bind(&req.file_uuid) + .bind(use_trace) + .bind(frame as i32) + .fetch_optional(state.db.pool()) + .await + } else { + sqlx::query_as(&format!( + "SELECT frame_number, x, y, width, height, confidence FROM {} WHERE file_uuid = $1 AND trace_id = $2 ORDER BY confidence DESC LIMIT 1", + fd_table + )) + .bind(&req.file_uuid) + .bind(use_trace) + .fetch_optional(state.db.pool()) + .await + } + } else if req.id.is_some() { sqlx::query_as(&format!( "SELECT frame_number, x, y, width, height, confidence FROM {} WHERE file_uuid = $1 AND id = $2", fd_table diff --git a/src/api/processing.rs b/src/api/processing.rs index 472b355..f33bfa9 100644 --- a/src/api/processing.rs +++ b/src/api/processing.rs @@ -302,7 +302,7 @@ async fn trigger_processing( "progress": progress }); sqlx::query(&format!( - "UPDATE {videos_table} SET processing_status = $1, updated_at = CURRENT_TIMESTAMP WHERE file_uuid = $2" + "UPDATE {videos_table} SET status = 'queued', processing_status = $1, updated_at = CURRENT_TIMESTAMP WHERE file_uuid = $2" )) .bind(&status) .bind(&file_uuid) @@ -558,10 +558,10 @@ async fn get_job(Path(uuid): Path) -> Result, St updated_at, ) = job.ok_or(StatusCode::NOT_FOUND)?; - // Calculate queue position if status is 'pending' - let queue_position = if status == "pending" { + // Calculate queue position (pending or queued jobs ahead of this one) + let queue_position = if status == "pending" || status == "queued" { sqlx::query_scalar::<_, i64>(&format!( - "SELECT COUNT(*) + 1 FROM {} WHERE status = 'pending' AND created_at < (SELECT created_at FROM {} WHERE uuid = $1)", + "SELECT COUNT(*) + 1 FROM {} WHERE status IN ('pending', 'queued') AND created_at < (SELECT created_at FROM {} WHERE uuid = $1)", jobs_table, jobs_table )) .bind(&uuid) diff --git a/src/core/db/postgres_db.rs b/src/core/db/postgres_db.rs index cdad2a2..abd15f5 100644 --- a/src/core/db/postgres_db.rs +++ b/src/core/db/postgres_db.rs @@ -194,6 +194,7 @@ pub enum VideoStatus { Unregistered, Registered, Pending, + Queued, Processing, Processed, Indexed, @@ -208,6 +209,7 @@ impl VideoStatus { VideoStatus::Unregistered => "unregistered", VideoStatus::Registered => "registered", VideoStatus::Pending => "pending", + VideoStatus::Queued => "queued", VideoStatus::Processing => "processing", VideoStatus::Processed => "processed", VideoStatus::Indexed => "indexed", @@ -222,6 +224,7 @@ impl VideoStatus { "unregistered" => Some(VideoStatus::Unregistered), "registered" => Some(VideoStatus::Registered), "pending" => Some(VideoStatus::Pending), + "queued" => Some(VideoStatus::Queued), "processing" => Some(VideoStatus::Processing), "processed" => Some(VideoStatus::Processed), "indexed" => Some(VideoStatus::Indexed), @@ -2030,7 +2033,7 @@ sqlx::query( &format!( r#" SELECT id, uuid, video_path, status, current_processor, progress_total, progress_current, error_count, last_error, started_at::TEXT, updated_at::TEXT, created_at::TEXT, processors, completed_processors, failed_processors, video_id - FROM {} WHERE status = $1 ORDER BY created_at DESC + FROM {} WHERE status = $1 ORDER BY created_at ASC "#, table ) diff --git a/src/worker/job_worker.rs b/src/worker/job_worker.rs index 8595d62..0e24884 100644 --- a/src/worker/job_worker.rs +++ b/src/worker/job_worker.rs @@ -294,6 +294,11 @@ impl JobWorker { .update_job_status(job.id, MonitorJobStatus::Running) .await?; + // Update video status to processing once worker picks it up + self.db + .update_video_status(&job.uuid, VideoStatus::Processing) + .await?; + self.redis .update_worker_job_status(&job.uuid, job.id, "running", None, 0, total_processor_types) .await?;