fix: ASRX duplication, TKG edges, trace ingest, and add pipeline progress publishing

- ASRX handler no longer stores duplicate 'asr' pre_chunks
- Pre_chunks storage made idempotent (delete-before-insert)
- Rule 1 + trace_ingest changed to query 'asrx' not 'asr'
- Trace chunks removed (dynamic from TKG/Qdrant)
- TKG scroll_face_points fixed: trace_id >= 1 (not == 1)
- TKG AsrxSegmentEntry: start/end -> start_time/end_time (match ASRX JSON)
- Unregister error handling: log instead of silent discard
- Add publish_pipeline_progress calls at each pipeline stage
  (processors, rule1, face_trace, identity_agent, TKG, rule2, completion)
This commit is contained in:
Accusys
2026-07-02 10:43:46 +08:00
parent d791d138f2
commit 3eabd45882
65 changed files with 9481 additions and 3856 deletions

View File

@@ -180,7 +180,7 @@ async fn list_identities(
)
})?;
let sql = format!(
let sql = format!(
r#"SELECT i.id::int, i.uuid, i.name, i.metadata, i.status, i.starred,
COALESCE(
jsonb_agg(jsonb_build_object(
@@ -195,10 +195,19 @@ let sql = format!(
WHERE i.status IS NULL OR i.status != 'merged'
GROUP BY i.id, i.uuid, i.name, i.metadata, i.status, i.starred
ORDER BY i.id DESC LIMIT $1 OFFSET $2"#,
id_table, crate::core::db::schema::table_name("file_identities")
id_table,
crate::core::db::schema::table_name("file_identities")
);
let rows: Vec<(i32, uuid::Uuid, String, Option<serde_json::Value>, Option<String>, Option<bool>, serde_json::Value)> = match sqlx::query_as(&sql)
let rows: Vec<(
i32,
uuid::Uuid,
String,
Option<serde_json::Value>,
Option<String>,
Option<bool>,
serde_json::Value,
)> = match sqlx::query_as(&sql)
.bind(page_size as i64)
.bind(offset)
.fetch_all(db.pool())
@@ -216,10 +225,18 @@ let sql = format!(
let identities: Vec<IdentityResponse> = rows
.into_iter()
.map(|r| {
let file_bindings: Vec<FileBinding> = r.6.as_array()
.map(|arr| arr.iter().filter_map(|v| serde_json::from_value(v.clone()).ok()).collect())
.unwrap_or_default();
let file_uuids: Vec<String> = file_bindings.iter().map(|fb| fb.file_uuid.clone()).collect();
let file_bindings: Vec<FileBinding> =
r.6.as_array()
.map(|arr| {
arr.iter()
.filter_map(|v| serde_json::from_value(v.clone()).ok())
.collect()
})
.unwrap_or_default();
let file_uuids: Vec<String> = file_bindings
.iter()
.map(|fb| fb.file_uuid.clone())
.collect();
IdentityResponse {
id: r.0,
identity_uuid: r.1.to_string().replace('-', ""),
@@ -332,149 +349,57 @@ pub struct IdentityListResponse {
async fn list_face_candidates(
Query(query): Query<FaceCandidatesQuery>,
) -> Result<Json<FaceCandidatesResponse>, (StatusCode, String)> {
let db = match PostgresDb::init().await {
Ok(db) => db,
Err(e) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to connect to database: {}", e),
))
}
};
let page = query.page.unwrap_or(1);
let page_size = std::cmp::min(query.page_size.unwrap_or(15), 100);
let offset = (page - 1) * page_size;
let min_confidence = query.min_confidence.unwrap_or(0.5);
let table = crate::core::db::schema::table_name("face_detections");
// Query Qdrant _faces for unbound faces (identity_id IS NULL)
let qdrant = crate::core::db::qdrant_db::QdrantDb::new();
let mut filter_must = vec![
serde_json::json!({"is_null": {"key": "identity_id"}}),
serde_json::json!({"key": "confidence", "range": {"gte": min_confidence}}),
];
if let Some(ref file_uuid) = query.file_uuid {
filter_must.push(serde_json::json!({"key": "file_uuid", "match": {"value": file_uuid}}));
}
let scroll_filter = serde_json::json!({"must": filter_must});
let total: i64 = if let Some(file_uuid) = &query.file_uuid {
let count_sql = format!(
"SELECT COUNT(*) FROM {} WHERE identity_id IS NULL AND confidence >= $1 AND file_uuid = $2",
table
);
match sqlx::query_scalar(&count_sql)
.bind(min_confidence)
.bind(file_uuid)
.fetch_one(db.pool())
.await
{
Ok(count) => count,
Err(e) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("Count error: {}", e),
))
}
}
} else {
let count_sql = format!(
"SELECT COUNT(*) FROM {} WHERE identity_id IS NULL AND confidence >= $1",
table
);
match sqlx::query_scalar(&count_sql)
.bind(min_confidence)
.fetch_one(db.pool())
.await
{
Ok(count) => count,
Err(e) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("Count error: {}", e),
))
}
}
};
let rows = if let Some(file_uuid) = &query.file_uuid {
let sql = format!(
"SELECT id, face_id, file_uuid, frame_number::bigint, confidence::float4,
jsonb_build_object('x', x, 'y', y, 'width', width, 'height', height) as bbox,
NULL::jsonb as attributes
FROM {}
WHERE identity_id IS NULL AND confidence >= $1 AND file_uuid = $2
ORDER BY confidence DESC
LIMIT $3 OFFSET $4",
table
);
match sqlx::query_as::<
_,
(
i32,
Option<String>,
String,
i64,
f32,
Option<serde_json::Value>,
Option<serde_json::Value>,
),
>(&sql)
.bind(min_confidence)
.bind(file_uuid)
.bind(page_size as i64)
.bind(offset as i64)
.fetch_all(db.pool())
let all_points = qdrant
.scroll_all_points("_faces", scroll_filter, 1000)
.await
{
Ok(rows) => rows,
Err(e) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("Query error: {}", e),
))
}
}
} else {
let sql = format!(
"SELECT id, face_id, file_uuid, frame_number::bigint, confidence::float4,
jsonb_build_object('x', x, 'y', y, 'width', width, 'height', height) as bbox,
NULL::jsonb as attributes
FROM {}
WHERE identity_id IS NULL AND confidence >= $1
ORDER BY confidence DESC
LIMIT $2 OFFSET $3",
table
);
match sqlx::query_as::<
_,
.map_err(|e| {
(
i32,
Option<String>,
String,
i64,
f32,
Option<serde_json::Value>,
Option<serde_json::Value>,
),
>(&sql)
.bind(min_confidence)
.bind(page_size as i64)
.bind(offset as i64)
.fetch_all(db.pool())
.await
{
Ok(rows) => rows,
Err(e) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("Query error: {}", e),
))
}
}
};
StatusCode::INTERNAL_SERVER_ERROR,
format!("Qdrant scroll failed: {}", e),
)
})?;
let candidates: Vec<FaceCandidate> = rows
let total = all_points.len() as i64;
// Sort by confidence DESC then paginate
let mut sorted: Vec<&serde_json::Value> = all_points.iter().collect();
sorted.sort_by(|a, b| {
let ca = a["payload"]["confidence"].as_f64().unwrap_or(0.0);
let cb = b["payload"]["confidence"].as_f64().unwrap_or(0.0);
cb.partial_cmp(&ca).unwrap_or(std::cmp::Ordering::Equal)
});
let paginated: Vec<&&serde_json::Value> = sorted.iter().skip(offset).take(page_size).collect();
let candidates: Vec<FaceCandidate> = paginated
.into_iter()
.map(|r| FaceCandidate {
id: r.0,
face_id: r.1,
file_uuid: r.2,
frame_number: r.3,
confidence: r.4,
bbox: r.5,
attributes: r.6,
.map(|p| {
let payload = &p["payload"];
let point_id = p["id"].as_u64().unwrap_or(0);
FaceCandidate {
id: point_id as i32,
face_id: Some(format!("{:x}", point_id)),
file_uuid: payload["file_uuid"].as_str().unwrap_or("").to_string(),
frame_number: payload["frame"].as_i64().unwrap_or(0),
confidence: payload["confidence"].as_f64().unwrap_or(0.0) as f32,
bbox: payload.get("bbox").cloned(),
attributes: None,
}
})
.collect();
@@ -518,133 +443,98 @@ pub struct UnassignedTracesResponse {
async fn list_unassigned_traces(
Query(query): Query<UnassignedTracesQuery>,
) -> Result<Json<UnassignedTracesResponse>, (StatusCode, String)> {
let db = match PostgresDb::init().await {
Ok(db) => db,
Err(e) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to connect to database: {}", e),
))
}
};
let page = query.page.unwrap_or(1);
let page_size = std::cmp::min(query.page_size.unwrap_or(20), 100);
let offset = (page - 1) * page_size;
let table = crate::core::db::schema::table_name("face_detections");
// Query Qdrant _faces for unbound traces (identity_id IS NULL, trace_id > 0)
let qdrant = crate::core::db::qdrant_db::QdrantDb::new();
let mut filter_must: Vec<serde_json::Value> = vec![
serde_json::json!({"is_null": {"key": "identity_id"}}),
serde_json::json!({"key": "trace_id", "range": {"gt": 0}}),
];
if let Some(ref file_uuid) = query.file_uuid {
filter_must.push(serde_json::json!({"key": "file_uuid", "match": {"value": file_uuid}}));
}
let scroll_filter = serde_json::json!({"must": filter_must});
let total: i64 = if let Some(file_uuid) = &query.file_uuid {
let count_sql = format!(
"SELECT COUNT(DISTINCT trace_id) FROM {} WHERE identity_id IS NULL AND trace_id IS NOT NULL AND file_uuid = $1",
table
);
sqlx::query_scalar(&count_sql)
.bind(file_uuid)
.fetch_one(db.pool())
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Count error: {}", e)))?
} else {
let count_sql = format!(
"SELECT COUNT(DISTINCT trace_id) FROM {} WHERE identity_id IS NULL AND trace_id IS NOT NULL",
table
);
sqlx::query_scalar(&count_sql)
.fetch_one(db.pool())
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Count error: {}", e)))?
};
let sql = if let Some(file_uuid) = &query.file_uuid {
format!(
"WITH trace_agg AS (
SELECT trace_id, file_uuid,
COUNT(*) as frame_count,
MIN(frame_number) as start_frame,
MAX(frame_number) as end_frame
FROM {}
WHERE identity_id IS NULL AND trace_id IS NOT NULL AND file_uuid = $1
GROUP BY trace_id, file_uuid
),
best_face AS (
SELECT DISTINCT ON (fd.trace_id, fd.file_uuid)
fd.trace_id, fd.file_uuid, fd.id as best_face_id,
fd.frame_number as best_face_frame,
fd.confidence as best_face_confidence,
jsonb_build_object('x', fd.x, 'y', fd.y, 'width', fd.width, 'height', fd.height) as best_face_bbox
FROM {} fd
WHERE fd.identity_id IS NULL AND fd.trace_id IS NOT NULL AND fd.file_uuid = $1
ORDER BY fd.trace_id, fd.file_uuid, fd.confidence DESC
let all_points = qdrant
.scroll_all_points("_faces", scroll_filter, 1000)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Qdrant scroll failed: {}", e),
)
SELECT ta.trace_id, ta.file_uuid, ta.frame_count, ta.start_frame, ta.end_frame,
bf.best_face_id, bf.best_face_frame, bf.best_face_confidence, bf.best_face_bbox
FROM trace_agg ta
JOIN best_face bf ON ta.trace_id = bf.trace_id AND ta.file_uuid = bf.file_uuid
ORDER BY ta.frame_count DESC
LIMIT $2 OFFSET $3",
table, table
)
} else {
format!(
"WITH trace_agg AS (
SELECT trace_id, file_uuid,
COUNT(*) as frame_count,
MIN(frame_number) as start_frame,
MAX(frame_number) as end_frame
FROM {}
WHERE identity_id IS NULL AND trace_id IS NOT NULL
GROUP BY trace_id, file_uuid
),
best_face AS (
SELECT DISTINCT ON (fd.trace_id, fd.file_uuid)
fd.trace_id, fd.file_uuid, fd.id as best_face_id,
fd.frame_number as best_face_frame,
fd.confidence as best_face_confidence,
jsonb_build_object('x', fd.x, 'y', fd.y, 'width', fd.width, 'height', fd.height) as best_face_bbox
FROM {} fd
WHERE fd.identity_id IS NULL AND fd.trace_id IS NOT NULL
ORDER BY fd.trace_id, fd.file_uuid, fd.confidence DESC
)
SELECT ta.trace_id, ta.file_uuid, ta.frame_count, ta.start_frame, ta.end_frame,
bf.best_face_id, bf.best_face_frame, bf.best_face_confidence, bf.best_face_bbox
FROM trace_agg ta
JOIN best_face bf ON ta.trace_id = bf.trace_id AND ta.file_uuid = bf.file_uuid
ORDER BY ta.frame_count DESC
LIMIT $1 OFFSET $2",
table, table
)
};
})?;
let rows: Vec<(i32, String, i64, i64, i64, i32, i64, f64, Option<serde_json::Value>)> =
if let Some(file_uuid) = &query.file_uuid {
sqlx::query_as(&sql)
.bind(file_uuid)
.bind(page_size as i64)
.bind(offset as i64)
.fetch_all(db.pool())
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Query error: {}", e)))?
} else {
sqlx::query_as(&sql)
.bind(page_size as i64)
.bind(offset as i64)
.fetch_all(db.pool())
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Query error: {}", e)))?
// Group by (file_uuid, trace_id) and aggregate
use std::collections::BTreeMap;
#[derive(Default)]
struct TraceAgg {
frame_count: i64,
start_frame: i64,
end_frame: i64,
best_confidence: f64,
best_point_id: i64,
best_frame: i64,
best_bbox: Option<serde_json::Value>,
}
let mut trace_map: BTreeMap<(String, i32), TraceAgg> = BTreeMap::new();
for point in &all_points {
let payload = &point["payload"];
let file_uuid = match payload["file_uuid"].as_str() {
Some(f) => f.to_string(),
None => continue,
};
let trace_id = payload["trace_id"].as_i64().unwrap_or(0) as i32;
if trace_id <= 0 {
continue;
}
let frame = payload["frame"].as_i64().unwrap_or(0);
let confidence = payload["confidence"].as_f64().unwrap_or(0.0);
let point_id = point["id"].as_i64().unwrap_or(0);
let traces: Vec<UnassignedTrace> = rows
let entry = trace_map.entry((file_uuid, trace_id)).or_default();
entry.frame_count += 1;
if frame < entry.start_frame || entry.start_frame == 0 {
entry.start_frame = frame;
}
if frame > entry.end_frame {
entry.end_frame = frame;
}
if confidence > entry.best_confidence {
entry.best_confidence = confidence;
entry.best_point_id = point_id;
entry.best_frame = frame;
entry.best_bbox = payload.get("bbox").cloned();
}
}
let total = trace_map.len() as i64;
// Sort by frame_count DESC, paginate
let mut sorted_traces: Vec<((String, i32), TraceAgg)> = trace_map.into_iter().collect();
sorted_traces.sort_by(|a, b| b.1.frame_count.cmp(&a.1.frame_count));
let paginated: Vec<_> = sorted_traces
.into_iter()
.map(|r| UnassignedTrace {
trace_id: r.0,
file_uuid: r.1,
frame_count: r.2,
start_frame: r.3,
end_frame: r.4,
best_face_id: r.5,
best_face_frame: r.6,
best_face_confidence: r.7,
best_face_bbox: r.8,
.skip(offset)
.take(page_size)
.collect();
let traces: Vec<UnassignedTrace> = paginated
.into_iter()
.map(|((file_uuid, trace_id), agg)| UnassignedTrace {
trace_id,
file_uuid,
frame_count: agg.frame_count,
start_frame: agg.start_frame,
end_frame: agg.end_frame,
best_face_id: agg.best_point_id as i32,
best_face_frame: agg.best_frame,
best_face_confidence: agg.best_confidence,
best_face_bbox: agg.best_bbox,
})
.collect();