feat: add queued status + FIFO queue ordering

- Add Queued variant to VideoStatus enum
- Trigger sets videos.status='queued' instead of staying 'pending'
- Worker sets videos.status='processing' on pickup
- list_monitor_jobs_by_status ORDER BY created_at ASC (FIFO)
- queue_position counts both 'pending' and 'queued' jobs
This commit is contained in:
Accusys
2026-06-24 05:18:40 +08:00
parent 14e886cc08
commit 360cb991e1
6 changed files with 78 additions and 48 deletions

View File

@@ -179,7 +179,7 @@ async fn list_identities(
) )
})?; })?;
let sql = format!( let sql = format!(
"SELECT id::int, uuid, name, metadata FROM {} WHERE status IS NULL OR status != 'merged' ORDER BY id DESC LIMIT $1 OFFSET $2", "SELECT id::int, uuid, name, metadata FROM {} WHERE status IS NULL OR status != 'merged' ORDER BY id DESC LIMIT $1 OFFSET $2",
id_table id_table
); );

View File

@@ -208,7 +208,7 @@ async fn match_from_photo(
// 4. Find best matching trace (highest similarity, no threshold) // 4. Find best matching trace (highest similarity, no threshold)
let fd_table = schema::table_name("face_detections"); let fd_table = schema::table_name("face_detections");
let best_match: Option<(i32, i32, f64)> = sqlx::query_as(&format!( let best_match: Option<(i32, i32, f64)> = sqlx::query_as(&format!(
r#"SELECT id, face_track_id, r#"SELECT id, trace_id,
1 - (embedding::vector <=> $1::vector) as similarity 1 - (embedding::vector <=> $1::vector) as similarity
FROM {} FROM {}
WHERE file_uuid = $2 AND embedding IS NOT NULL WHERE file_uuid = $2 AND embedding IS NOT NULL
@@ -250,7 +250,7 @@ async fn match_from_photo(
matches: 1, matches: 1,
traces_matched, traces_matched,
message: format!( message: format!(
"Best trace: face_track_id={}, similarity={:.4}", "Best trace: trace_id={}, similarity={:.4}",
fb_trace, fb_sim fb_trace, fb_sim
), ),
})) }))
@@ -284,7 +284,7 @@ async fn match_from_trace(
let fd_table = schema::table_name("face_detections"); let fd_table = schema::table_name("face_detections");
let all_faces: Vec<(Vec<f32>, i64)> = sqlx::query_as::<_, (Vec<f32>, i64)>(&format!( let all_faces: Vec<(Vec<f32>, i64)> = sqlx::query_as::<_, (Vec<f32>, i64)>(&format!(
"SELECT embedding, frame_number FROM {} \ "SELECT embedding, frame_number FROM {} \
WHERE file_uuid = $1 AND face_track_id = $2 AND embedding IS NOT NULL \ WHERE file_uuid = $1 AND trace_id = $2 AND embedding IS NOT NULL \
ORDER BY frame_number ASC", ORDER BY frame_number ASC",
fd_table fd_table
)) ))
@@ -321,7 +321,7 @@ async fn match_from_trace(
// Get width*height info if available (not all pipelines store it) // Get width*height info if available (not all pipelines store it)
let face_sizes: Vec<(i64, i32)> = sqlx::query_as::<_, (i64, i32)>(&format!( let face_sizes: Vec<(i64, i32)> = sqlx::query_as::<_, (i64, i32)>(&format!(
"SELECT frame_number, COALESCE(width, 0) * COALESCE(height, 0) AS area \ "SELECT frame_number, COALESCE(width, 0) * COALESCE(height, 0) AS area \
FROM {} WHERE file_uuid = $1 AND face_track_id = $2 AND embedding IS NOT NULL \ FROM {} WHERE file_uuid = $1 AND trace_id = $2 AND embedding IS NOT NULL \
ORDER BY frame_number ASC", ORDER BY frame_number ASC",
fd_table fd_table
)) ))
@@ -360,7 +360,7 @@ async fn match_from_trace(
for qemb in &query_embeddings { for qemb in &query_embeddings {
let top = sqlx::query_as::<_, (i32, i32, f64)>(&format!( let top = sqlx::query_as::<_, (i32, i32, f64)>(&format!(
r#"SELECT id, face_track_id, r#"SELECT id, trace_id,
1 - (embedding::vector <=> $1::vector) as similarity 1 - (embedding::vector <=> $1::vector) as similarity
FROM {} FROM {}
WHERE file_uuid = $2 WHERE file_uuid = $2
@@ -382,9 +382,9 @@ async fn match_from_trace(
) )
})?; })?;
if let Some((cface_id, c_face_track_id, c_sim)) = top { if let Some((cface_id, c_trace_id, c_sim)) = top {
if seen_trace_ids.insert(c_face_track_id) { if seen_trace_ids.insert(c_trace_id) {
validated.push((cface_id, c_face_track_id, c_sim)); validated.push((cface_id, c_trace_id, c_sim));
} }
} }
} }
@@ -419,7 +419,7 @@ async fn match_from_trace(
// 4. Update matched face_detections // 4. Update matched face_detections
let mut traces_matched: Vec<i32> = Vec::new(); let mut traces_matched: Vec<i32> = Vec::new();
for (id, face_track_id, _similarity) in &validated { for (id, trace_id, _similarity) in &validated {
if let Err(e) = sqlx::query(&format!( if let Err(e) = sqlx::query(&format!(
"UPDATE {} SET identity_id = $1 WHERE id = $2", "UPDATE {} SET identity_id = $1 WHERE id = $2",
fd_table fd_table
@@ -435,15 +435,15 @@ async fn match_from_trace(
e e
); );
} else { } else {
if !traces_matched.contains(face_track_id) { if !traces_matched.contains(trace_id) {
traces_matched.push(*face_track_id); traces_matched.push(*trace_id);
} }
} }
} }
// 5. Also bind the source trace itself // 5. Also bind the source trace itself
let _ = sqlx::query(&format!( let _ = sqlx::query(&format!(
"UPDATE {} SET identity_id = $1 WHERE file_uuid = $2 AND face_track_id = $3", "UPDATE {} SET identity_id = $1 WHERE file_uuid = $2 AND trace_id = $3",
fd_table fd_table
)) ))
.bind(identity_id) .bind(identity_id)
@@ -1014,12 +1014,12 @@ async fn match_faces_iterative_pg(pool: &sqlx::PgPool, file_uuid: &str) -> anyho
tmdb_rows.len() tmdb_rows.len()
); );
// Step 2: 載入所有 face_detections含 frame_numberface_track_id 分組 // Step 2: 載入所有 face_detections含 frame_number按 trace_id 分組
let fd_table = schema::table_name("face_detections"); let fd_table = schema::table_name("face_detections");
let fd_rows = sqlx::query_as::<_, (i32, i64, Vec<f32>)>(&format!( let fd_rows = sqlx::query_as::<_, (i32, i64, Vec<f32>)>(&format!(
"SELECT face_track_id, frame_number, embedding FROM {} \ "SELECT trace_id, frame_number, embedding FROM {} \
WHERE file_uuid=$1 AND face_track_id IS NOT NULL AND embedding IS NOT NULL \ WHERE file_uuid=$1 AND trace_id IS NOT NULL AND embedding IS NOT NULL \
ORDER BY face_track_id, frame_number", ORDER BY trace_id, frame_number",
fd_table fd_table
)) ))
.bind(file_uuid) .bind(file_uuid)
@@ -1031,7 +1031,7 @@ async fn match_faces_iterative_pg(pool: &sqlx::PgPool, file_uuid: &str) -> anyho
return Ok(0); return Ok(0);
} }
// 分組:face_track_id → (frame_number, embedding) // 分組trace_id → (frame_number, embedding)
use std::collections::HashMap; use std::collections::HashMap;
let mut face_track_faces_raw: HashMap<i32, Vec<(i64, Vec<f32>)>> = HashMap::new(); let mut face_track_faces_raw: HashMap<i32, Vec<(i64, Vec<f32>)>> = HashMap::new();
for (tid, frame, emb) in &fd_rows { for (tid, frame, emb) in &fd_rows {
@@ -1069,7 +1069,7 @@ async fn match_faces_iterative_pg(pool: &sqlx::PgPool, file_uuid: &str) -> anyho
// Step 4: 迭代匹配 // Step 4: 迭代匹配
const TH: f32 = 0.50; const TH: f32 = 0.50;
let mut matched: HashMap<i32, String> = HashMap::new(); // face_track_id → identity_name let mut matched: HashMap<i32, String> = HashMap::new(); // trace_id → identity_name
// Round 1: 用 3-angle samples 比對 TMDb // Round 1: 用 3-angle samples 比對 TMDb
for (&tid, samples) in &face_track_samples { for (&tid, samples) in &face_track_samples {
@@ -1110,7 +1110,7 @@ async fn match_faces_iterative_pg(pool: &sqlx::PgPool, file_uuid: &str) -> anyho
.await?; .await?;
if let Some(identity_id) = id_opt { if let Some(identity_id) = id_opt {
let _ = sqlx::query(&format!( let _ = sqlx::query(&format!(
"UPDATE {} SET identity_id=$1 WHERE file_uuid=$2 AND face_track_id=$3", "UPDATE {} SET identity_id=$1 WHERE file_uuid=$2 AND trace_id=$3",
fd_table fd_table
)) ))
.bind(identity_id) .bind(identity_id)
@@ -1200,11 +1200,11 @@ async fn match_faces_iterative_pg(pool: &sqlx::PgPool, file_uuid: &str) -> anyho
// Step 6: 未匹配的 trace 設 stranger_id = strangers.id (FK) // Step 6: 未匹配的 trace 設 stranger_id = strangers.id (FK)
// First: ensure strangers records exist // First: ensure strangers records exist
let _ = sqlx::query(&format!( let _ = sqlx::query(&format!(
"INSERT INTO {} (file_uuid, face_track_id) \ "INSERT INTO {} (file_uuid, trace_id) \
SELECT $1, fd.face_track_id FROM {} fd \ SELECT $1, fd.trace_id FROM {} fd \
WHERE fd.file_uuid = $1 AND fd.face_track_id IS NOT NULL \ WHERE fd.file_uuid = $1 AND fd.trace_id IS NOT NULL \
AND fd.identity_id IS NULL \ AND fd.identity_id IS NULL \
ON CONFLICT (file_uuid, face_track_id) DO NOTHING", ON CONFLICT (file_uuid, trace_id) DO NOTHING",
strangers_table, fd_table strangers_table, fd_table
)) ))
.bind(file_uuid) .bind(file_uuid)
@@ -1215,9 +1215,9 @@ async fn match_faces_iterative_pg(pool: &sqlx::PgPool, file_uuid: &str) -> anyho
let stranger_update = sqlx::query(&format!( let stranger_update = sqlx::query(&format!(
"UPDATE {} fd SET stranger_id = s.id \ "UPDATE {} fd SET stranger_id = s.id \
FROM {} s \ FROM {} s \
WHERE s.file_uuid = fd.file_uuid AND s.face_track_id = fd.face_track_id \ WHERE s.file_uuid = fd.file_uuid AND s.trace_id = fd.trace_id \
AND fd.file_uuid = $1 AND fd.identity_id IS NULL \ AND fd.file_uuid = $1 AND fd.identity_id IS NULL \
AND fd.face_track_id IS NOT NULL AND fd.stranger_id IS NULL", AND fd.trace_id IS NOT NULL AND fd.stranger_id IS NULL",
fd_table, strangers_table fd_table, strangers_table
)) ))
.bind(file_uuid) .bind(file_uuid)
@@ -1255,16 +1255,16 @@ async fn match_faces_iterative_pg(pool: &sqlx::PgPool, file_uuid: &str) -> anyho
} }
/// Bind ASRX speakers to face traces based on temporal overlap. /// Bind ASRX speakers to face traces based on temporal overlap.
/// Reads face_detections (face_track_id, identity_id, frame_number) and ASRX /// Reads face_detections (trace_id, identity_id, frame_number) and ASRX
/// segments (speaker_id, start_time, end_time), computes overlap, /// segments (speaker_id, start_time, end_time), computes overlap,
/// and stores bindings in identity_bindings table. /// and stores bindings in identity_bindings table.
pub async fn bind_speakers(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::Result<usize> { pub async fn bind_speakers(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::Result<usize> {
// Load face traces with identity_id and frame numbers // Load face traces with identity_id and frame numbers
let fd_table = schema::table_name("face_detections"); let fd_table = schema::table_name("face_detections");
let traces = sqlx::query_as::<_, (i32, Vec<i32>)>(&format!( let traces = sqlx::query_as::<_, (i32, Vec<i32>)>(&format!(
"SELECT face_track_id, array_agg(frame_number ORDER BY frame_number) \ "SELECT trace_id, array_agg(frame_number ORDER BY frame_number) \
FROM {} WHERE file_uuid=$1 AND face_track_id IS NOT NULL AND identity_id IS NOT NULL \ FROM {} WHERE file_uuid=$1 AND trace_id IS NOT NULL AND identity_id IS NOT NULL \
GROUP BY face_track_id", GROUP BY trace_id",
fd_table fd_table
)) ))
.bind(file_uuid) .bind(file_uuid)
@@ -1327,7 +1327,7 @@ pub async fn bind_speakers(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::Resu
// For each trace, compute overlap with each speaker // For each trace, compute overlap with each speaker
let mut bindings = 0usize; let mut bindings = 0usize;
for (face_track_id, frames) in &traces { for (trace_id, frames) in &traces {
if frames.is_empty() { if frames.is_empty() {
continue; continue;
} }
@@ -1335,9 +1335,9 @@ pub async fn bind_speakers(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::Resu
// Get identity_id for this trace // Get identity_id for this trace
let fd_table = schema::table_name("face_detections"); let fd_table = schema::table_name("face_detections");
let identity_id: Option<i32> = sqlx::query_scalar( let identity_id: Option<i32> = sqlx::query_scalar(
&format!("SELECT identity_id FROM {} WHERE file_uuid=$1 AND face_track_id=$2 AND identity_id IS NOT NULL LIMIT 1", fd_table) &format!("SELECT identity_id FROM {} WHERE file_uuid=$1 AND trace_id=$2 AND identity_id IS NOT NULL LIMIT 1", fd_table)
) )
.bind(file_uuid).bind(face_track_id) .bind(file_uuid).bind(trace_id)
.fetch_optional(pool).await?.flatten(); .fetch_optional(pool).await?.flatten();
if identity_id.is_none() { if identity_id.is_none() {
@@ -1370,7 +1370,7 @@ pub async fn bind_speakers(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::Resu
let overlap_ratio = best_overlap as f64 / frames.len() as f64; let overlap_ratio = best_overlap as f64 / frames.len() as f64;
if overlap_ratio > 0.3 && !best_speaker.is_empty() { if overlap_ratio > 0.3 && !best_speaker.is_empty() {
let metadata = serde_json::json!({ let metadata = serde_json::json!({
"trace_id": face_track_id, "trace_id": trace_id,
"overlap_frames": best_overlap, "overlap_frames": best_overlap,
"total_frames": frames.len(), "total_frames": frames.len(),
"overlap_ratio": overlap_ratio, "overlap_ratio": overlap_ratio,
@@ -1464,7 +1464,7 @@ pub async fn run_identity_agent(db: &PostgresDb, file_uuid: &str) -> anyhow::Res
"reasoning": identities[0].reasoning, "reasoning": identities[0].reasoning,
}); });
let _ = sqlx::query(&format!( let _ = sqlx::query(&format!(
"INSERT INTO {} (file_uuid, face_track_id, metadata) \ "INSERT INTO {} (file_uuid, trace_id, metadata) \
VALUES ($1, NULL, $2::jsonb) ON CONFLICT DO NOTHING", VALUES ($1, NULL, $2::jsonb) ON CONFLICT DO NOTHING",
schema::table_name("strangers") schema::table_name("strangers")
)) ))

View File

@@ -1289,6 +1289,8 @@ pub struct SetProfileFromFaceRequest {
pub file_uuid: String, pub file_uuid: String,
pub face_id: Option<String>, pub face_id: Option<String>,
pub id: Option<i64>, pub id: Option<i64>,
pub trace_id: Option<i32>,
pub frame_number: Option<i64>,
} }
async fn set_profile_from_face( async fn set_profile_from_face(
@@ -1302,20 +1304,40 @@ async fn set_profile_from_face(
let uuid_clean = identity_uuid.replace('-', ""); let uuid_clean = identity_uuid.replace('-', "");
let face_identifier = match (&req.face_id, req.id) { let (face_identifier, use_trace, use_frame) = match (&req.face_id, req.id, req.trace_id) {
(Some(fid), _) => fid.clone(), (Some(fid), _, _) => (fid.clone(), false, None),
(None, Some(id)) => id.to_string(), (None, Some(id), _) => (id.to_string(), false, None),
(None, None) => { (None, None, Some(trace_id)) => (trace_id.to_string(), true, req.frame_number),
(None, None, None) => {
return Err(( return Err((
StatusCode::BAD_REQUEST, StatusCode::BAD_REQUEST,
Json(serde_json::json!({"success": false, "message": "Either face_id or id is required"})), Json(serde_json::json!({"success": false, "message": "Either face_id, id, or trace_id is required"})),
)); ));
} }
}; };
let use_id_field = req.id.is_some(); let row: Option<(i64, i32, i32, i32, i32, f64)> = if use_trace {
if let Some(frame) = use_frame {
let row: Option<(i64, i32, i32, i32, i32, f64)> = if use_id_field { sqlx::query_as(&format!(
"SELECT frame_number, x, y, width, height, confidence FROM {} WHERE file_uuid = $1 AND trace_id = $2 AND frame_number = $3 LIMIT 1",
fd_table
))
.bind(&req.file_uuid)
.bind(use_trace)
.bind(frame as i32)
.fetch_optional(state.db.pool())
.await
} else {
sqlx::query_as(&format!(
"SELECT frame_number, x, y, width, height, confidence FROM {} WHERE file_uuid = $1 AND trace_id = $2 ORDER BY confidence DESC LIMIT 1",
fd_table
))
.bind(&req.file_uuid)
.bind(use_trace)
.fetch_optional(state.db.pool())
.await
}
} else if req.id.is_some() {
sqlx::query_as(&format!( sqlx::query_as(&format!(
"SELECT frame_number, x, y, width, height, confidence FROM {} WHERE file_uuid = $1 AND id = $2", "SELECT frame_number, x, y, width, height, confidence FROM {} WHERE file_uuid = $1 AND id = $2",
fd_table fd_table

View File

@@ -302,7 +302,7 @@ async fn trigger_processing(
"progress": progress "progress": progress
}); });
sqlx::query(&format!( sqlx::query(&format!(
"UPDATE {videos_table} SET processing_status = $1, updated_at = CURRENT_TIMESTAMP WHERE file_uuid = $2" "UPDATE {videos_table} SET status = 'queued', processing_status = $1, updated_at = CURRENT_TIMESTAMP WHERE file_uuid = $2"
)) ))
.bind(&status) .bind(&status)
.bind(&file_uuid) .bind(&file_uuid)
@@ -558,10 +558,10 @@ async fn get_job(Path(uuid): Path<String>) -> Result<Json<JobDetailResponse>, St
updated_at, updated_at,
) = job.ok_or(StatusCode::NOT_FOUND)?; ) = job.ok_or(StatusCode::NOT_FOUND)?;
// Calculate queue position if status is 'pending' // Calculate queue position (pending or queued jobs ahead of this one)
let queue_position = if status == "pending" { let queue_position = if status == "pending" || status == "queued" {
sqlx::query_scalar::<_, i64>(&format!( sqlx::query_scalar::<_, i64>(&format!(
"SELECT COUNT(*) + 1 FROM {} WHERE status = 'pending' AND created_at < (SELECT created_at FROM {} WHERE uuid = $1)", "SELECT COUNT(*) + 1 FROM {} WHERE status IN ('pending', 'queued') AND created_at < (SELECT created_at FROM {} WHERE uuid = $1)",
jobs_table, jobs_table jobs_table, jobs_table
)) ))
.bind(&uuid) .bind(&uuid)

View File

@@ -194,6 +194,7 @@ pub enum VideoStatus {
Unregistered, Unregistered,
Registered, Registered,
Pending, Pending,
Queued,
Processing, Processing,
Processed, Processed,
Indexed, Indexed,
@@ -208,6 +209,7 @@ impl VideoStatus {
VideoStatus::Unregistered => "unregistered", VideoStatus::Unregistered => "unregistered",
VideoStatus::Registered => "registered", VideoStatus::Registered => "registered",
VideoStatus::Pending => "pending", VideoStatus::Pending => "pending",
VideoStatus::Queued => "queued",
VideoStatus::Processing => "processing", VideoStatus::Processing => "processing",
VideoStatus::Processed => "processed", VideoStatus::Processed => "processed",
VideoStatus::Indexed => "indexed", VideoStatus::Indexed => "indexed",
@@ -222,6 +224,7 @@ impl VideoStatus {
"unregistered" => Some(VideoStatus::Unregistered), "unregistered" => Some(VideoStatus::Unregistered),
"registered" => Some(VideoStatus::Registered), "registered" => Some(VideoStatus::Registered),
"pending" => Some(VideoStatus::Pending), "pending" => Some(VideoStatus::Pending),
"queued" => Some(VideoStatus::Queued),
"processing" => Some(VideoStatus::Processing), "processing" => Some(VideoStatus::Processing),
"processed" => Some(VideoStatus::Processed), "processed" => Some(VideoStatus::Processed),
"indexed" => Some(VideoStatus::Indexed), "indexed" => Some(VideoStatus::Indexed),
@@ -2030,7 +2033,7 @@ sqlx::query(
&format!( &format!(
r#" r#"
SELECT id, uuid, video_path, status, current_processor, progress_total, progress_current, error_count, last_error, started_at::TEXT, updated_at::TEXT, created_at::TEXT, processors, completed_processors, failed_processors, video_id SELECT id, uuid, video_path, status, current_processor, progress_total, progress_current, error_count, last_error, started_at::TEXT, updated_at::TEXT, created_at::TEXT, processors, completed_processors, failed_processors, video_id
FROM {} WHERE status = $1 ORDER BY created_at DESC FROM {} WHERE status = $1 ORDER BY created_at ASC
"#, "#,
table table
) )

View File

@@ -294,6 +294,11 @@ impl JobWorker {
.update_job_status(job.id, MonitorJobStatus::Running) .update_job_status(job.id, MonitorJobStatus::Running)
.await?; .await?;
// Update video status to processing once worker picks it up
self.db
.update_video_status(&job.uuid, VideoStatus::Processing)
.await?;
self.redis self.redis
.update_worker_job_status(&job.uuid, job.id, "running", None, 0, total_processor_types) .update_worker_job_status(&job.uuid, job.id, "running", None, 0, total_processor_types)
.await?; .await?;