feat: implement Phase 5 Resource Registry & Heartbeat
This commit is contained in:
25
migrations/018_create_resources.sql
Normal file
25
migrations/018_create_resources.sql
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
-- ============================================================================
|
||||||
|
-- Migration 018: Create resources table (Resource Registry)
|
||||||
|
-- ============================================================================
|
||||||
|
-- Purpose:
|
||||||
|
-- 1. Allow Processors and Agents to register themselves.
|
||||||
|
-- 2. Track status and last heartbeat for monitoring.
|
||||||
|
-- 3. Support capabilities metadata for discovery.
|
||||||
|
-- ============================================================================
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS resources (
|
||||||
|
resource_id VARCHAR(64) PRIMARY KEY,
|
||||||
|
resource_type VARCHAR(20) NOT NULL, -- 'processor', 'agent', 'service'
|
||||||
|
category VARCHAR(50), -- 'visual', 'speech', 'logic'
|
||||||
|
capabilities JSONB DEFAULT '{}',
|
||||||
|
config JSONB DEFAULT '{}',
|
||||||
|
metadata JSONB DEFAULT '{}',
|
||||||
|
status VARCHAR(20) DEFAULT 'offline', -- 'idle', 'busy', 'offline', 'error'
|
||||||
|
last_heartbeat TIMESTAMPTZ,
|
||||||
|
created_at TIMESTAMPTZ DEFAULT NOW()
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX idx_resources_type ON resources(resource_type);
|
||||||
|
CREATE INDEX idx_resources_status ON resources(status);
|
||||||
|
|
||||||
|
COMMENT ON TABLE resources IS 'Registry for Processors, Agents, and Services. Used for monitoring and discovery.';
|
||||||
@@ -8,7 +8,7 @@ use axum::{
|
|||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::core::db::{Database, PostgresDb};
|
use crate::core::db::{Database, PostgresDb, ResourceRecord};
|
||||||
|
|
||||||
pub fn identity_routes() -> Router<crate::api::server::AppState> {
|
pub fn identity_routes() -> Router<crate::api::server::AppState> {
|
||||||
Router::new()
|
Router::new()
|
||||||
@@ -19,6 +19,9 @@ pub fn identity_routes() -> Router<crate::api::server::AppState> {
|
|||||||
.route("/api/v1/people/{identity_id}/reject-candidate", post(reject_candidate))
|
.route("/api/v1/people/{identity_id}/reject-candidate", post(reject_candidate))
|
||||||
.route("/api/v1/files", get(list_files))
|
.route("/api/v1/files", get(list_files))
|
||||||
.route("/api/v1/files/{uuid}", get(get_file_detail))
|
.route("/api/v1/files/{uuid}", get(get_file_detail))
|
||||||
|
.route("/api/v1/resources/register", post(register_resource))
|
||||||
|
.route("/api/v1/resources/heartbeat", post(heartbeat_resource))
|
||||||
|
.route("/api/v1/resources", get(list_resources))
|
||||||
}
|
}
|
||||||
|
|
||||||
// ... (Keep existing functions) ...
|
// ... (Keep existing functions) ...
|
||||||
@@ -277,3 +280,101 @@ async fn get_file_detail(
|
|||||||
metadata: serde_json::json!({}),
|
metadata: serde_json::json!({}),
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- Resource Registry Endpoints (Phase 5) ---
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct RegisterResourceRequest {
|
||||||
|
pub resource_id: String,
|
||||||
|
pub resource_type: String,
|
||||||
|
pub category: String,
|
||||||
|
pub capabilities: Option<serde_json::Value>,
|
||||||
|
pub config: Option<serde_json::Value>,
|
||||||
|
pub metadata: Option<serde_json::Value>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
pub struct ResourceResponse {
|
||||||
|
pub success: bool,
|
||||||
|
pub message: String,
|
||||||
|
pub data: Option<ResourceItem>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
pub struct ResourceItem {
|
||||||
|
pub resource_id: String,
|
||||||
|
pub resource_type: String,
|
||||||
|
pub category: String,
|
||||||
|
pub capabilities: Option<serde_json::Value>,
|
||||||
|
pub status: String,
|
||||||
|
pub last_heartbeat: Option<chrono::DateTime<chrono::Utc>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn register_resource(
|
||||||
|
State(state): State<crate::api::server::AppState>,
|
||||||
|
Json(req): Json<RegisterResourceRequest>,
|
||||||
|
) -> Result<Json<ResourceResponse>, (StatusCode, String)> {
|
||||||
|
let resource = ResourceRecord {
|
||||||
|
resource_id: req.resource_id.clone(),
|
||||||
|
resource_type: req.resource_type.clone(),
|
||||||
|
category: req.category.clone(),
|
||||||
|
capabilities: req.capabilities,
|
||||||
|
config: req.config,
|
||||||
|
metadata: req.metadata,
|
||||||
|
status: "online".to_string(),
|
||||||
|
last_heartbeat: None,
|
||||||
|
created_at: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
state.db.register_resource(resource).await
|
||||||
|
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
||||||
|
|
||||||
|
Ok(Json(ResourceResponse {
|
||||||
|
success: true,
|
||||||
|
message: "Resource registered successfully".to_string(),
|
||||||
|
data: None, // We could return the full record, but simplified for now
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct HeartbeatRequest {
|
||||||
|
pub resource_id: String,
|
||||||
|
pub status: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn heartbeat_resource(
|
||||||
|
State(state): State<crate::api::server::AppState>,
|
||||||
|
Json(req): Json<HeartbeatRequest>,
|
||||||
|
) -> Result<Json<ResourceResponse>, (StatusCode, String)> {
|
||||||
|
let status = req.status.unwrap_or("online".to_string());
|
||||||
|
state.db.heartbeat_resource(&req.resource_id, &status).await
|
||||||
|
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
||||||
|
|
||||||
|
Ok(Json(ResourceResponse {
|
||||||
|
success: true,
|
||||||
|
message: "Heartbeat received".to_string(),
|
||||||
|
data: None,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn list_resources(
|
||||||
|
State(state): State<crate::api::server::AppState>,
|
||||||
|
) -> Result<Json<ResourceResponse>, (StatusCode, String)> {
|
||||||
|
let records = state.db.list_resources().await
|
||||||
|
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
||||||
|
|
||||||
|
let data: Vec<ResourceItem> = records.into_iter().map(|r| ResourceItem {
|
||||||
|
resource_id: r.resource_id,
|
||||||
|
resource_type: r.resource_type,
|
||||||
|
category: r.category,
|
||||||
|
capabilities: r.capabilities,
|
||||||
|
status: r.status,
|
||||||
|
last_heartbeat: r.last_heartbeat,
|
||||||
|
}).collect();
|
||||||
|
|
||||||
|
Ok(Json(ResourceResponse {
|
||||||
|
success: true,
|
||||||
|
message: "Resources listed".to_string(),
|
||||||
|
data: None,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|||||||
@@ -41,9 +41,9 @@ pub mod sync_db;
|
|||||||
|
|
||||||
pub use mongodb_db::MongoDb;
|
pub use mongodb_db::MongoDb;
|
||||||
pub use postgres_db::{
|
pub use postgres_db::{
|
||||||
Bm25Result, CreateApiKeyConfig, HybridSearchResult, MonitorJob, MonitorJobStats,
|
Bm25Result, CandidateRecord, CreateApiKeyConfig, FileRecord, HybridSearchResult, MonitorJob,
|
||||||
MonitorJobStatus, PostgresDb, ProcessorJobStatus, ProcessorResult, ProcessorType, VideoRecord,
|
MonitorJobStats, MonitorJobStatus, PostgresDb, ProcessorJobStatus, ProcessorResult,
|
||||||
VideoStatus,
|
ProcessorType, ResourceRecord, VideoRecord, VideoStatus,
|
||||||
};
|
};
|
||||||
pub use qdrant_db::{QdrantDb, VectorPayload};
|
pub use qdrant_db::{QdrantDb, VectorPayload};
|
||||||
pub use redis_client::{
|
pub use redis_client::{
|
||||||
|
|||||||
@@ -15,6 +15,19 @@ use crate::core::text::{
|
|||||||
tokenizer::{contains_chinese, tokenize_chinese_text},
|
tokenizer::{contains_chinese, tokenize_chinese_text},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
|
||||||
|
pub struct ResourceRecord {
|
||||||
|
pub resource_id: String,
|
||||||
|
pub resource_type: String,
|
||||||
|
pub category: String,
|
||||||
|
pub capabilities: Option<serde_json::Value>,
|
||||||
|
pub config: Option<serde_json::Value>,
|
||||||
|
pub metadata: Option<serde_json::Value>,
|
||||||
|
pub status: String,
|
||||||
|
pub last_heartbeat: Option<chrono::DateTime<chrono::Utc>>,
|
||||||
|
pub created_at: Option<chrono::DateTime<chrono::Utc>>,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
|
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
|
||||||
pub struct IdentityRecord {
|
pub struct IdentityRecord {
|
||||||
pub id: i32,
|
pub id: i32,
|
||||||
@@ -1805,6 +1818,59 @@ impl PostgresDb {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn register_resource(&self, resource: ResourceRecord) -> Result<()> {
|
||||||
|
sqlx::query(
|
||||||
|
"INSERT INTO resources (resource_id, resource_type, category, capabilities, config, metadata, status, last_heartbeat)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
|
||||||
|
ON CONFLICT (resource_id) DO UPDATE SET
|
||||||
|
resource_type = EXCLUDED.resource_type,
|
||||||
|
category = EXCLUDED.category,
|
||||||
|
capabilities = EXCLUDED.capabilities,
|
||||||
|
config = EXCLUDED.config,
|
||||||
|
metadata = EXCLUDED.metadata,
|
||||||
|
status = EXCLUDED.status,
|
||||||
|
last_heartbeat = NOW()"
|
||||||
|
)
|
||||||
|
.bind(resource.resource_id)
|
||||||
|
.bind(resource.resource_type)
|
||||||
|
.bind(resource.category)
|
||||||
|
.bind(resource.capabilities)
|
||||||
|
.bind(resource.config)
|
||||||
|
.bind(resource.metadata)
|
||||||
|
.bind(resource.status)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn heartbeat_resource(&self, resource_id: &str, status: &str) -> Result<()> {
|
||||||
|
sqlx::query(
|
||||||
|
"UPDATE resources SET status = $1, last_heartbeat = NOW() WHERE resource_id = $2"
|
||||||
|
)
|
||||||
|
.bind(status)
|
||||||
|
.bind(resource_id)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn deregister_resource(&self, resource_id: &str) -> Result<()> {
|
||||||
|
sqlx::query(
|
||||||
|
"DELETE FROM resources WHERE resource_id = $1"
|
||||||
|
)
|
||||||
|
.bind(resource_id)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn list_resources(&self) -> Result<Vec<ResourceRecord>> {
|
||||||
|
let rows = sqlx::query_as("SELECT * FROM resources ORDER BY last_heartbeat DESC")
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(rows)
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn list_people(&self, limit: i32, offset: i64) -> Result<Vec<IdentityRecord>> {
|
pub async fn list_people(&self, limit: i32, offset: i64) -> Result<Vec<IdentityRecord>> {
|
||||||
let query = r#"
|
let query = r#"
|
||||||
SELECT id, uuid, name, metadata, created_at
|
SELECT id, uuid, name, metadata, created_at
|
||||||
|
|||||||
Reference in New Issue
Block a user