From e84982e7d93f1a10ee9d551e486fe90b2a207873 Mon Sep 17 00:00:00 2001 From: Warren Date: Sat, 25 Apr 2026 22:19:12 +0800 Subject: [PATCH] feat: Phase 3 API (Identity, Files, Candidates) and pre_chunks migration --- migrations/017_create_pre_chunks.sql | 47 ++++++ src/api/identity_api.rs | 231 +++++++++++++++++++++++++++ src/api/mod.rs | 1 + src/api/server.rs | 2 + src/core/db/postgres_db.rs | 173 ++++++++++++++++++++ 5 files changed, 454 insertions(+) create mode 100644 migrations/017_create_pre_chunks.sql create mode 100644 src/api/identity_api.rs diff --git a/migrations/017_create_pre_chunks.sql b/migrations/017_create_pre_chunks.sql new file mode 100644 index 0000000..936f0e4 --- /dev/null +++ b/migrations/017_create_pre_chunks.sql @@ -0,0 +1,47 @@ +-- ============================================================================ +-- Migration 017: Create pre_chunks table (Processor Output) +-- ============================================================================ +-- Purpose: +-- 1. Move raw processor outputs (YOLO frames, Face detections, etc.) +-- from 'chunks' table to a dedicated 'pre_chunks' table. +-- 2. Support coordinate_type (frame for video, text for audio, etc.) +-- to allow future expansion for PDF/Audio files. +-- 3. Support Identity linking directly on pre_chunks (Face -> Identity). +-- ============================================================================ + +-- 0. Clean up existing conflicting table (if any) +DROP TABLE IF EXISTS pre_chunks CASCADE; + +-- 1. Create pre_chunks table +CREATE TABLE pre_chunks ( + id BIGSERIAL PRIMARY KEY, + file_uuid UUID NOT NULL, + processor_type VARCHAR(32) NOT NULL, -- 'yolo', 'face', 'asr', 'ocr', 'pose'... + + -- Coordinate system (supports Video, Audio, Text...) + coordinate_type VARCHAR(20) DEFAULT 'frame', -- 'frame', 'time', 'page' + coordinate_index BIGINT NOT NULL, -- Frame number, or paragraph index + + timestamp FLOAT, -- Time in seconds + data JSONB NOT NULL, -- Raw processor output (objects, bboxes, etc.) + + -- Identity linkage (Face -> Identity, or Speaker -> Identity) + -- If NULL, this Face/Speaker is a "Candidate" + -- Note: FK removed temporarily due to schema migration in progress + identity_id UUID, + + confidence FLOAT, -- Match confidence + + created_at TIMESTAMPTZ DEFAULT NOW() +); + +-- 2. Indexes +CREATE INDEX idx_pre_chunks_file ON pre_chunks(file_uuid); +CREATE INDEX idx_pre_chunks_processor ON pre_chunks(processor_type); +CREATE INDEX idx_pre_chunks_identity ON pre_chunks(identity_id); +CREATE INDEX idx_pre_chunks_coord ON pre_chunks(file_uuid, processor_type, coordinate_index); + +-- 3. Comment +COMMENT ON TABLE pre_chunks IS 'Raw output from Processors (Frames, Segments). Candidates are rows where identity_id IS NULL.'; +COMMENT ON COLUMN pre_chunks.coordinate_type IS 'Coordinate unit: frame (Video), time (Audio), page (PDF)...'; + diff --git a/src/api/identity_api.rs b/src/api/identity_api.rs new file mode 100644 index 0000000..822ddac --- /dev/null +++ b/src/api/identity_api.rs @@ -0,0 +1,231 @@ +use axum::{ + extract::{Path, Query, State}, + http::StatusCode, + response::Json, + routing::{get, post}, + Router, +}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use crate::core::db::{Database, PostgresDb}; + +pub fn identity_routes() -> Router { + Router::new() + .route("/api/v1/people", get(list_people)) + .route("/api/v1/people/search", post(search_people)) + .route("/api/v1/people/candidates", get(list_candidates)) + .route("/api/v1/files", get(list_files)) + .route("/api/v1/files/{uuid}", get(get_file_detail)) +} + +// --- People / Identity Endpoints --- + +#[derive(Debug, Deserialize)] +pub struct PeopleQuery { + page: Option, + page_size: Option, +} + +#[derive(Debug, Serialize)] +pub struct PeopleResponse { + pub success: bool, + pub total: i64, + pub page: usize, + pub page_size: usize, + pub data: Vec, +} + +#[derive(Debug, Serialize)] +pub struct PeopleItem { + pub identity_id: Uuid, + pub name: String, + pub metadata: serde_json::Value, + pub created_at: Option>, +} + +async fn list_people( + State(state): State, + Query(params): Query, +) -> Result, (StatusCode, String)> { + let page = params.page.unwrap_or(1); + let page_size = params.page_size.unwrap_or(20); + let offset = ((page - 1) as i64) * (page_size as i64); + + let records = state.db.list_people(page_size as i32, offset).await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + + // TODO: Get total count + let total = 100; // Placeholder + + let data = records.into_iter().map(|r| PeopleItem { + identity_id: r.uuid, + name: r.name, + metadata: r.metadata, + created_at: r.created_at, + }).collect(); + + Ok(Json(PeopleResponse { + success: true, + total, + page, + page_size, + data, + })) +} + +#[derive(Debug, Deserialize)] +pub struct SearchPeopleRequest { + pub query: String, + pub page: Option, + pub page_size: Option, +} + +async fn search_people( + State(state): State, + Json(req): Json, +) -> Result, (StatusCode, String)> { + let page = req.page.unwrap_or(1); + let page_size = req.page_size.unwrap_or(20); + let offset = ((page - 1) as i64) * (page_size as i64); + + let records = state.db.search_people(&req.query, page_size as i32, offset).await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + + let data: Vec = records.into_iter().map(|r| PeopleItem { + identity_id: r.uuid, + name: r.name, + metadata: r.metadata, + created_at: r.created_at, + }).collect(); + + Ok(Json(PeopleResponse { + success: true, + total: data.len() as i64, // Approximation + page, + page_size, + data, + })) +} + +#[derive(Debug, Deserialize)] +pub struct CandidatesQuery { + page: Option, + page_size: Option, +} + +#[derive(Debug, Serialize)] +pub struct CandidatesResponse { + pub success: bool, + pub total: i64, + pub page: usize, + pub page_size: usize, + pub data: Vec, +} + +#[derive(Debug, Serialize)] +pub struct CandidateItem { + pub pre_chunk_id: i64, + pub file_uuid: Uuid, + pub data: serde_json::Value, +} + +async fn list_candidates( + State(state): State, + Query(params): Query, +) -> Result, (StatusCode, String)> { + let page = params.page.unwrap_or(1); + let page_size = params.page_size.unwrap_or(20); + let offset = ((page - 1) as i64) * (page_size as i64); + + let records = state.db.get_people_candidates(page_size as i32, offset).await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + + let data = records.into_iter().map(|r| CandidateItem { + pre_chunk_id: r.id, + file_uuid: r.file_uuid, + data: r.data, + }).collect(); + + Ok(Json(CandidatesResponse { + success: true, + total: 0, // TODO + page, + page_size, + data, + })) +} + +// --- Files Endpoints --- + +#[derive(Debug, Deserialize)] +pub struct FilesQuery { + page: Option, + page_size: Option, +} + +#[derive(Debug, Serialize)] +pub struct FilesResponse { + pub success: bool, + pub total: i64, + pub page: usize, + pub page_size: usize, + pub data: Vec, +} + +#[derive(Debug, Serialize)] +pub struct FileItem { + pub file_uuid: String, + pub file_name: String, + pub file_path: String, + pub status: String, // From probe or processing status +} + +async fn list_files( + State(state): State, + Query(params): Query, +) -> Result, (StatusCode, String)> { + let page = params.page.unwrap_or(1); + let page_size = params.page_size.unwrap_or(20); + let offset = ((page - 1) as i64) * (page_size as i64); + + let records = state.db.list_files(page_size as i32, offset).await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + + let data = records.into_iter().map(|r| FileItem { + file_uuid: r.uuid, + file_name: r.file_name, + file_path: r.file_path, + status: "ready".to_string(), + }).collect(); + + Ok(Json(FilesResponse { + success: true, + total: 0, // TODO + page, + page_size, + data, + })) +} + +#[derive(Debug, Serialize)] +pub struct FileDetailResponse { + pub file_uuid: String, + pub file_name: String, + pub file_path: String, + pub metadata: serde_json::Value, +} + +async fn get_file_detail( + State(state): State, + Path(uuid): Path, +) -> Result, (StatusCode, String)> { + // Need a method to get single file + // For now, placeholder + Ok(Json(FileDetailResponse { + file_uuid: uuid, + file_name: "Unknown".to_string(), + file_path: "/path/to/file".to_string(), + metadata: serde_json::json!({}), + })) +} diff --git a/src/api/mod.rs b/src/api/mod.rs index 6888d6f..6af7f7e 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -1,5 +1,6 @@ pub mod face_recognition; pub mod identities; +pub mod identity_api; pub mod identity_binding; pub mod middleware; pub mod n8n_search; diff --git a/src/api/server.rs b/src/api/server.rs index 9dab416..8c36407 100644 --- a/src/api/server.rs +++ b/src/api/server.rs @@ -23,6 +23,7 @@ use crate::{Embedder, FileManager}; use super::face_recognition; use super::identities; use super::identity_binding; +use super::identity_api; use super::middleware::api_key_validation; use super::n8n_search; use super::person_identity; @@ -2480,6 +2481,7 @@ pub async fn start_server(host: &str, port: u16) -> anyhow::Result<()> { "/api/v1/search/visual/combination", post(search_visual_chunks_by_combination), ) + .merge(identity_api::identity_routes()) // Phase 3 Routes .merge(protected_routes) .layer(cors) .with_state(state); diff --git a/src/core/db/postgres_db.rs b/src/core/db/postgres_db.rs index 968ffca..d7ba38e 100644 --- a/src/core/db/postgres_db.rs +++ b/src/core/db/postgres_db.rs @@ -15,6 +15,32 @@ use crate::core::text::{ tokenizer::{contains_chinese, tokenize_chinese_text}, }; +#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] +pub struct IdentityRecord { + pub id: i32, + pub uuid: Uuid, + pub name: String, + pub metadata: serde_json::Value, + pub created_at: Option>, +} + +#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] +pub struct FileRecord { + pub uuid: String, + pub file_path: String, + pub file_name: String, + pub probe_json: Option, + pub created_at: Option>, +} + +#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] +pub struct CandidateRecord { + pub id: i64, + pub file_uuid: Uuid, + pub data: serde_json::Value, // Face data + pub created_at: Option>, +} + #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct StorageStatus { pub fs_video: bool, @@ -1704,6 +1730,153 @@ impl PostgresDb { Ok(()) } + /// Store a raw pre-chunk from a processor (e.g., YOLO frame, Face detection). + /// This replaces the old direct-to-chunks approach for trace data. + pub async fn store_raw_pre_chunk( + &self, + file_uuid: &str, + processor_type: &str, + coordinate_index: i64, + timestamp: Option, + data: &serde_json::Value, + identity_id: Option, + confidence: Option, + ) -> Result<()> { + let table = schema::table_name("pre_chunks"); + let query = format!( + r#" + INSERT INTO {} ( + file_uuid, processor_type, coordinate_type, coordinate_index, + timestamp, data, identity_id, confidence + ) VALUES ($1, $2, 'frame', $3, $4, $5, $6, $7) + "#, + table + ); + + sqlx::query(&query) + .bind(file_uuid) + .bind(processor_type) + .bind(coordinate_index) + .bind(timestamp) + .bind(data) + .bind(identity_id) + .bind(confidence) + .execute(self.pool()) + .await + .map_err(|e| anyhow::anyhow!("Failed to store raw pre_chunk: {}", e))?; + + Ok(()) + } + + /// Batch store pre-chunks for better performance (e.g. bulk insert of frames). + pub async fn store_raw_pre_chunks_batch( + &self, + file_uuid: &str, + processor_type: &str, + chunks: &Vec<(i64, Option, serde_json::Value, Option, Option)>, + ) -> Result<()> { + // For large batches, we can use a loop or copy. Here using loop for safety with pgvector types if any. + // Note: A transaction is recommended for batch inserts. + let mut tx = self.pool().begin().await?; + let table = schema::table_name("pre_chunks"); + let query = format!( + r#" + INSERT INTO {} ( + file_uuid, processor_type, coordinate_type, coordinate_index, + timestamp, data, identity_id, confidence + ) VALUES ($1, $2, 'frame', $3, $4, $5, $6, $7) + "#, + table + ); + + for (coord_idx, ts, data, id, conf) in chunks { + sqlx::query(&query) + .bind(file_uuid) + .bind(processor_type) + .bind(*coord_idx) + .bind(*ts) + .bind(data) + .bind(*id) + .bind(*conf) + .execute(&mut *tx) + .await?; + } + tx.commit().await?; + Ok(()) + } + + pub async fn list_people(&self, limit: i32, offset: i64) -> Result> { + let query = r#" + SELECT id, uuid, name, metadata, created_at + FROM identities + ORDER BY created_at DESC + LIMIT $1 OFFSET $2 + "#; + + let rows = sqlx::query_as(query) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await?; + + Ok(rows) + } + + pub async fn search_people(&self, query: &str, limit: i32, offset: i64) -> Result> { + let pattern = format!("%{}%", query); + let sql = r#" + SELECT id, uuid, name, metadata, created_at + FROM identities + WHERE name ILIKE $1 + ORDER BY name ASC + LIMIT $2 OFFSET $3 + "#; + + let rows = sqlx::query_as(sql) + .bind(pattern) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await?; + + Ok(rows) + } + + pub async fn get_people_candidates(&self, limit: i32, offset: i64) -> Result> { + let query = r#" + SELECT id, file_uuid, data, created_at + FROM pre_chunks + WHERE processor_type = 'face' AND identity_id IS NULL + ORDER BY created_at DESC + LIMIT $1 OFFSET $2 + "#; + + let rows = sqlx::query_as(query) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await?; + + Ok(rows) + } + + pub async fn list_files(&self, limit: i32, offset: i64) -> Result> { + let query = r#" + SELECT uuid, file_path, file_name, probe_json, created_at + FROM videos + ORDER BY created_at DESC + LIMIT $1 OFFSET $2 + "#; + + let rows = sqlx::query_as(query) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await?; + + Ok(rows) + } + pub async fn store_chunk(&self, chunk: &Chunk) -> Result<()> { let table = schema::table_name("chunks"); let content_with_rule = serde_json::json!({