From c15f7cd4af53a4c7cbfe4c787ad113aa01e10cba Mon Sep 17 00:00:00 2001 From: Warren Date: Sat, 25 Apr 2026 23:12:15 +0800 Subject: [PATCH] feat: implement Phase 5 Resource Registry & Heartbeat --- migrations/018_create_resources.sql | 25 +++++++ src/api/identity_api.rs | 103 +++++++++++++++++++++++++++- src/core/db/mod.rs | 6 +- src/core/db/postgres_db.rs | 66 ++++++++++++++++++ 4 files changed, 196 insertions(+), 4 deletions(-) create mode 100644 migrations/018_create_resources.sql diff --git a/migrations/018_create_resources.sql b/migrations/018_create_resources.sql new file mode 100644 index 0000000..aec94a1 --- /dev/null +++ b/migrations/018_create_resources.sql @@ -0,0 +1,25 @@ +-- ============================================================================ +-- Migration 018: Create resources table (Resource Registry) +-- ============================================================================ +-- Purpose: +-- 1. Allow Processors and Agents to register themselves. +-- 2. Track status and last heartbeat for monitoring. +-- 3. Support capabilities metadata for discovery. +-- ============================================================================ + +CREATE TABLE IF NOT EXISTS resources ( + resource_id VARCHAR(64) PRIMARY KEY, + resource_type VARCHAR(20) NOT NULL, -- 'processor', 'agent', 'service' + category VARCHAR(50), -- 'visual', 'speech', 'logic' + capabilities JSONB DEFAULT '{}', + config JSONB DEFAULT '{}', + metadata JSONB DEFAULT '{}', + status VARCHAR(20) DEFAULT 'offline', -- 'idle', 'busy', 'offline', 'error' + last_heartbeat TIMESTAMPTZ, + created_at TIMESTAMPTZ DEFAULT NOW() +); + +CREATE INDEX idx_resources_type ON resources(resource_type); +CREATE INDEX idx_resources_status ON resources(status); + +COMMENT ON TABLE resources IS 'Registry for Processors, Agents, and Services. Used for monitoring and discovery.'; diff --git a/src/api/identity_api.rs b/src/api/identity_api.rs index cbe5d23..e8c5d98 100644 --- a/src/api/identity_api.rs +++ b/src/api/identity_api.rs @@ -8,7 +8,7 @@ use axum::{ use serde::{Deserialize, Serialize}; use uuid::Uuid; -use crate::core::db::{Database, PostgresDb}; +use crate::core::db::{Database, PostgresDb, ResourceRecord}; pub fn identity_routes() -> Router { Router::new() @@ -19,6 +19,9 @@ pub fn identity_routes() -> Router { .route("/api/v1/people/{identity_id}/reject-candidate", post(reject_candidate)) .route("/api/v1/files", get(list_files)) .route("/api/v1/files/{uuid}", get(get_file_detail)) + .route("/api/v1/resources/register", post(register_resource)) + .route("/api/v1/resources/heartbeat", post(heartbeat_resource)) + .route("/api/v1/resources", get(list_resources)) } // ... (Keep existing functions) ... @@ -277,3 +280,101 @@ async fn get_file_detail( metadata: serde_json::json!({}), })) } + +// --- Resource Registry Endpoints (Phase 5) --- + +#[derive(Debug, Deserialize)] +pub struct RegisterResourceRequest { + pub resource_id: String, + pub resource_type: String, + pub category: String, + pub capabilities: Option, + pub config: Option, + pub metadata: Option, +} + +#[derive(Debug, Serialize)] +pub struct ResourceResponse { + pub success: bool, + pub message: String, + pub data: Option, +} + +#[derive(Debug, Serialize)] +pub struct ResourceItem { + pub resource_id: String, + pub resource_type: String, + pub category: String, + pub capabilities: Option, + pub status: String, + pub last_heartbeat: Option>, +} + +async fn register_resource( + State(state): State, + Json(req): Json, +) -> Result, (StatusCode, String)> { + let resource = ResourceRecord { + resource_id: req.resource_id.clone(), + resource_type: req.resource_type.clone(), + category: req.category.clone(), + capabilities: req.capabilities, + config: req.config, + metadata: req.metadata, + status: "online".to_string(), + last_heartbeat: None, + created_at: None, + }; + + state.db.register_resource(resource).await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + + Ok(Json(ResourceResponse { + success: true, + message: "Resource registered successfully".to_string(), + data: None, // We could return the full record, but simplified for now + })) +} + +#[derive(Debug, Deserialize)] +pub struct HeartbeatRequest { + pub resource_id: String, + pub status: Option, +} + +async fn heartbeat_resource( + State(state): State, + Json(req): Json, +) -> Result, (StatusCode, String)> { + let status = req.status.unwrap_or("online".to_string()); + state.db.heartbeat_resource(&req.resource_id, &status).await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + + Ok(Json(ResourceResponse { + success: true, + message: "Heartbeat received".to_string(), + data: None, + })) +} + +async fn list_resources( + State(state): State, +) -> Result, (StatusCode, String)> { + let records = state.db.list_resources().await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + + let data: Vec = records.into_iter().map(|r| ResourceItem { + resource_id: r.resource_id, + resource_type: r.resource_type, + category: r.category, + capabilities: r.capabilities, + status: r.status, + last_heartbeat: r.last_heartbeat, + }).collect(); + + Ok(Json(ResourceResponse { + success: true, + message: "Resources listed".to_string(), + data: None, + })) +} diff --git a/src/core/db/mod.rs b/src/core/db/mod.rs index c2d0bfb..7b120a8 100644 --- a/src/core/db/mod.rs +++ b/src/core/db/mod.rs @@ -41,9 +41,9 @@ pub mod sync_db; pub use mongodb_db::MongoDb; pub use postgres_db::{ - Bm25Result, CreateApiKeyConfig, HybridSearchResult, MonitorJob, MonitorJobStats, - MonitorJobStatus, PostgresDb, ProcessorJobStatus, ProcessorResult, ProcessorType, VideoRecord, - VideoStatus, + Bm25Result, CandidateRecord, CreateApiKeyConfig, FileRecord, HybridSearchResult, MonitorJob, + MonitorJobStats, MonitorJobStatus, PostgresDb, ProcessorJobStatus, ProcessorResult, + ProcessorType, ResourceRecord, VideoRecord, VideoStatus, }; pub use qdrant_db::{QdrantDb, VectorPayload}; pub use redis_client::{ diff --git a/src/core/db/postgres_db.rs b/src/core/db/postgres_db.rs index babb424..759ef67 100644 --- a/src/core/db/postgres_db.rs +++ b/src/core/db/postgres_db.rs @@ -15,6 +15,19 @@ use crate::core::text::{ tokenizer::{contains_chinese, tokenize_chinese_text}, }; +#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] +pub struct ResourceRecord { + pub resource_id: String, + pub resource_type: String, + pub category: String, + pub capabilities: Option, + pub config: Option, + pub metadata: Option, + pub status: String, + pub last_heartbeat: Option>, + pub created_at: Option>, +} + #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] pub struct IdentityRecord { pub id: i32, @@ -1805,6 +1818,59 @@ impl PostgresDb { Ok(()) } + pub async fn register_resource(&self, resource: ResourceRecord) -> Result<()> { + sqlx::query( + "INSERT INTO resources (resource_id, resource_type, category, capabilities, config, metadata, status, last_heartbeat) + VALUES ($1, $2, $3, $4, $5, $6, $7, NOW()) + ON CONFLICT (resource_id) DO UPDATE SET + resource_type = EXCLUDED.resource_type, + category = EXCLUDED.category, + capabilities = EXCLUDED.capabilities, + config = EXCLUDED.config, + metadata = EXCLUDED.metadata, + status = EXCLUDED.status, + last_heartbeat = NOW()" + ) + .bind(resource.resource_id) + .bind(resource.resource_type) + .bind(resource.category) + .bind(resource.capabilities) + .bind(resource.config) + .bind(resource.metadata) + .bind(resource.status) + .execute(&self.pool) + .await?; + Ok(()) + } + + pub async fn heartbeat_resource(&self, resource_id: &str, status: &str) -> Result<()> { + sqlx::query( + "UPDATE resources SET status = $1, last_heartbeat = NOW() WHERE resource_id = $2" + ) + .bind(status) + .bind(resource_id) + .execute(&self.pool) + .await?; + Ok(()) + } + + pub async fn deregister_resource(&self, resource_id: &str) -> Result<()> { + sqlx::query( + "DELETE FROM resources WHERE resource_id = $1" + ) + .bind(resource_id) + .execute(&self.pool) + .await?; + Ok(()) + } + + pub async fn list_resources(&self) -> Result> { + let rows = sqlx::query_as("SELECT * FROM resources ORDER BY last_heartbeat DESC") + .fetch_all(&self.pool) + .await?; + Ok(rows) + } + pub async fn list_people(&self, limit: i32, offset: i64) -> Result> { let query = r#" SELECT id, uuid, name, metadata, created_at