diff --git a/Cargo.lock b/Cargo.lock index 5200c3a..5a9e990 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2219,6 +2219,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + [[package]] name = "matches" version = "0.1.10" @@ -4840,10 +4849,14 @@ version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "regex-automata", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] diff --git a/Cargo.toml b/Cargo.toml index 91a799a..9c13632 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ anyhow = "1.0" thiserror = "1.0" tokio = { version = "1", features = ["full"] } tracing = "0.1" -tracing-subscriber = "0.3" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } once_cell = "1.19" libc = "0.2" dotenv = "0.15" diff --git a/docs_v1.0/API_V1.0.0/API_REFERENCE_v1.0.0.md b/docs_v1.0/API_V1.0.0/API_REFERENCE_v1.0.0.md index 2a1bbb6..11d6d61 100644 --- a/docs_v1.0/API_V1.0.0/API_REFERENCE_v1.0.0.md +++ b/docs_v1.0/API_V1.0.0/API_REFERENCE_v1.0.0.md @@ -275,10 +275,22 @@ Green bbox per face detection: actual frames `thickness=4`, interpolated `thickn curl "http://localhost:3002/api/v1/identities?page=1&page_size=3" -H "X-API-Key: muser_68600856036340bcafc01930eb4bd839_1774418104_97221b69" ``` ```json -{"count":3,"page":1,"page_size":3,"identities":[ - {"name":"Cary Grant","tmdb_id":2102}, - {"name":"Audrey Hepburn","tmdb_id":187}, - {"name":"Walter Matthau","tmdb_id":2091} +{"count":3852,"page":1,"page_size":3,"identities":[ + {"id":18299,"identity_uuid":"76f85ee6-bc47-4a1a-9878-1beb67851ec5","name":"PERSON_aeed7134_390","metadata":{}}, + {"id":18298,"identity_uuid":"f4d4ccbf-fccb-4f62-8806-2b7f4a706edb","name":"PERSON_aeed7134_389","metadata":{}}, + {"id":18297,"identity_uuid":"e8a1b2c3-d4e5-4f67-8901-23456789abcd","name":"PERSON_aeed7134_388","metadata":{}} +]} +``` + +### GET /api/v1/file/:file_uuid/identities — identities with frame/time ranges + +```bash +curl "http://localhost:3002/api/v1/file/aeed71342a899fe4b4c57b7d41bcb692/identities?limit=2" -H "X-API-Key: muser_68600856036340bcafc01930eb4bd839_1774418104_97221b69" +``` +```json +{"success":true,"file_uuid":"aeed71342a899fe4b4c57b7d41bcb692","fps":25.0,"total":20,"page":1,"page_size":20,"data":[ + {"identity_id":18276,"identity_uuid":"77d895cc-bc2e-4f5a-84b3-3c1f0e2a5b6a","name":"PERSON_aeed7134_367","face_count":86,"start_frame":150744,"end_frame":152895,"start_time":6029.76,"end_time":6115.8,"confidence":0.855}, + {"identity_id":18179,"identity_uuid":"90fc04cd-003b-4a1b-9f7d-8c3e1d2f4a5b","name":"PERSON_aeed7134_270","face_count":13,"start_frame":77418,"end_frame":77454,"start_time":3096.72,"end_time":3098.16,"confidence":0.851} ]} ``` diff --git a/momentry_runtime/plist/com.momentry.sftpgo.plist b/momentry_runtime/plist/com.momentry.sftpgo.plist index 5c40c0b..e6c3c60 100644 --- a/momentry_runtime/plist/com.momentry.sftpgo.plist +++ b/momentry_runtime/plist/com.momentry.sftpgo.plist @@ -13,7 +13,7 @@ ProgramArguments - /opt/homebrew/opt/sftpgo/bin/sftpgo + /Users/accusys/bin/sftpgo serve --config-file /Users/accusys/momentry/etc/sftpgo/sftpgo.json @@ -22,7 +22,7 @@ EnvironmentVariables PATH - /opt/homebrew/bin:/opt/homebrew/sbin:/usr/bin:/bin + /Users/accusys/bin:/opt/homebrew/bin:/usr/bin:/bin HOME /Users/accusys SFTPGO_DEFAULT_ADMIN_USERNAME diff --git a/release/migrate_fix_chunk_id_format.sql b/release/migrate_fix_chunk_id_format.sql index c571b27..a061bcc 100644 --- a/release/migrate_fix_chunk_id_format.sql +++ b/release/migrate_fix_chunk_id_format.sql @@ -1,10 +1,7 @@ --- Migration: Normalize chunk_id format for all chunks --- Converts integer-format chunk_ids ('0', '1', '2', ...) to {file_uuid}_{id} --- Date: 2026-05-15 +-- Migration: Normalize chunk_id format to compact integer +-- Converts all chunk_ids to just {id} (the serial primary key) +-- The unique constraint is on (file_uuid, chunk_id), not chunk_id alone. +-- Date: 2026-05-16 -- Usage: psql -U accusys -d momentry -f migrate_fix_chunk_id_format.sql --- Note: runs with current search_path; use SET search_path TO ; for target schema -UPDATE chunk -SET chunk_id = file_uuid || '_' || id::text -WHERE chunk_id ~ '^[0-9]+$' - AND chunk_id != file_uuid || '_' || id::text; +UPDATE chunk SET chunk_id = id::text WHERE chunk_id != id::text; diff --git a/src/api/identities.rs b/src/api/identities.rs index afdb981..a01b0cf 100644 --- a/src/api/identities.rs +++ b/src/api/identities.rs @@ -195,7 +195,7 @@ async fn list_identities( .into_iter() .map(|r| IdentityResponse { id: r.0, - identity_uuid: r.1.to_string(), + identity_uuid: r.1.to_string().replace('-', ""), name: r.2, metadata: r.3, }) diff --git a/src/api/identity_agent_api.rs b/src/api/identity_agent_api.rs index 81737b3..85a6b4d 100644 --- a/src/api/identity_agent_api.rs +++ b/src/api/identity_agent_api.rs @@ -676,12 +676,12 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow:: tmdb_rows.len() ); - // Step 2: 載入所有 face_detections,按 trace_id 分組 + // Step 2: 載入所有 face_detections(含 frame_number),按 trace_id 分組 let fd_table = schema::table_name("face_detections"); - let fd_rows = sqlx::query_as::<_, (i32, Vec)>( - &format!("SELECT trace_id, embedding FROM {} \ + let fd_rows = sqlx::query_as::<_, (i32, i32, Vec)>( + &format!("SELECT trace_id, frame_number, embedding FROM {} \ WHERE file_uuid=$1 AND trace_id IS NOT NULL AND embedding IS NOT NULL \ - ORDER BY trace_id", fd_table), + ORDER BY trace_id, frame_number", fd_table), ) .bind(file_uuid) .fetch_all(pool) @@ -692,27 +692,38 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow:: return Ok(0); } - // 分組:trace_id → Vec + // 分組:trace_id → (frame_number, embedding) use std::collections::HashMap; - let mut trace_faces: HashMap>> = HashMap::new(); - for (tid, emb) in &fd_rows { - trace_faces + let mut trace_faces_raw: HashMap)>> = HashMap::new(); + for (tid, frame, emb) in &fd_rows { + trace_faces_raw .entry(*tid) .or_insert_with(Vec::new) - .push(emb.clone()); + .push((*frame, emb.clone())); } - // 去重:同一個 trace 內,embedding 太接近的只留一個 - for faces in trace_faces.values_mut() { - faces.sort_by(|a, b| b[0].partial_cmp(&a[0]).unwrap_or(std::cmp::Ordering::Equal)); - faces.dedup_by(|a, b| cosine_similarity(a, b) > 0.99); + // 從每個 trace 選取不同角度的 3 個 face embedding + // 策略:按 frame_number 排序,取前中後各 1 個 + let mut trace_samples: HashMap>> = HashMap::new(); + for (tid, mut faces) in trace_faces_raw { + faces.sort_by_key(|(frame, _)| *frame); + let n = faces.len(); + let indices = if n <= 3 { + (0..n).collect() + } else { + let mid = n / 2; + vec![0, mid, n - 1] + }; + let samples: Vec> = indices.iter().map(|&i| faces[i].1.clone()).collect(); + trace_samples.insert(tid, samples); } - let total_traces = trace_faces.len(); + let total_traces = trace_samples.len(); + let sample_count: usize = trace_samples.values().map(|v| v.len()).sum(); tracing::info!( - "[FaceMatch] Loaded {} traces with {} faces", + "[FaceMatch] Loaded {} traces, sampled {} embeddings (3-angle)", total_traces, - fd_rows.len() + sample_count ); // Step 3: 建立 TMDb 查找表 @@ -722,12 +733,13 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow:: const TH: f32 = 0.50; let mut matched: HashMap = HashMap::new(); // trace_id → identity_name - // Round 1: 直接比對 TMDb - for (&tid, faces) in &trace_faces { + // Round 1: 用 3-angle samples 比對 TMDb + // 每個 trace 選 3 個不同角度 face,取最高 similarity + for (&tid, samples) in &trace_samples { let mut best_name = String::new(); let mut best_sim = 0.0f32; for (_, ref name, ref tmdb_emb) in &tmdb_seeds { - for face_emb in faces { + for face_emb in samples { let s = cosine_similarity(face_emb, tmdb_emb); if s > best_sim { best_sim = s; @@ -751,31 +763,33 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow:: // 建立 seed pool: name → Vec let mut seed_pool: HashMap>> = HashMap::new(); for (&tid, name) in &matched { - if let Some(faces) = trace_faces.get(&tid) { + if let Some(samples) = trace_samples.get(&tid) { seed_pool .entry(name.clone()) .or_default() - .extend(faces.iter()); + .extend(samples.iter()); } } let mut new_matches: Vec<(i32, String)> = Vec::new(); - for (&tid, faces) in &trace_faces { + for (&tid, samples) in &trace_samples { if matched.contains_key(&tid) { continue; } let mut best_name = String::new(); let mut best_sim = 0.0f32; - if faces.is_empty() { + if samples.is_empty() { continue; } - let ref_face = &faces[0]; + // 用 3-angle samples 分別比對 seed,取最高 similarity for (name, seed_faces) in &seed_pool { - for seed in seed_faces { - let s = cosine_similarity(ref_face, seed); - if s > best_sim { - best_sim = s; - best_name = name.clone(); + for face_emb in samples { + for seed in seed_faces { + let s = cosine_similarity(face_emb, seed); + if s > best_sim { + best_sim = s; + best_name = name.clone(); + } } } } @@ -799,7 +813,7 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow:: } } - // Step 5: 寫入 DB + // Step 5: 寫入 DB — 已匹配的設 identity_id let identities_table = schema::table_name("identities"); let fd_table = schema::table_name("face_detections"); let mut updated = 0usize; @@ -823,11 +837,27 @@ async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow:: } } + // Step 6: 未匹配的 trace 設 stranger_id = trace_id + // trace_id 在同一個 file 內是 sequential integer,直接複用為 stranger_id + let stranger_update = sqlx::query( + &format!( + "UPDATE {} SET stranger_id = trace_id \ + WHERE file_uuid = $1 AND trace_id IS NOT NULL AND identity_id IS NULL \ + AND (stranger_id IS NULL OR stranger_id != trace_id)", + fd_table + ) + ) + .bind(file_uuid) + .execute(pool) + .await?; + let stranger_count = stranger_update.rows_affected(); + tracing::info!( - "[FaceMatch] Done: {}/{} traces matched ({}%)", + "[FaceMatch] Done: {}/{} traces matched ({}%), {} strangers", matched.len(), total_traces, - matched.len() * 100 / total_traces + matched.len() * 100 / total_traces, + stranger_count ); Ok(updated) } diff --git a/src/api/identity_api.rs b/src/api/identity_api.rs index aa7b424..78b0d50 100644 --- a/src/api/identity_api.rs +++ b/src/api/identity_api.rs @@ -31,6 +31,10 @@ pub fn identity_routes() -> Router { "/api/v1/identity/:identity_uuid/chunks", get(get_identity_chunks), ) + .route( + "/api/v1/identity/:identity_uuid/faces", + get(get_identity_faces), + ) .route("/api/v1/resource/register", post(register_resource)) .route("/api/v1/resource/heartbeat", post(heartbeat_resource)) .route("/api/v1/resources", get(list_resources)) @@ -212,7 +216,7 @@ async fn get_file_identities( .into_iter() .map(|r| FileIdentityItem { identity_id: r.identity_id, - identity_uuid: r.identity_uuid.map(|u| u.to_string()), + identity_uuid: r.identity_uuid.map(|u| u.to_string().replace('-', "")), name: r.name, metadata: r.metadata, face_count: r.face_count, @@ -239,7 +243,7 @@ async fn get_file_identities( #[derive(Debug, Serialize)] pub struct IdentityDetailResponse { pub success: bool, - pub uuid: Uuid, + pub uuid: String, pub name: String, pub identity_type: Option, pub source: Option, @@ -273,7 +277,7 @@ async fn get_identity_detail( match identity { Some(i) => Ok(Json(IdentityDetailResponse { success: true, - uuid: i.uuid, + uuid: i.uuid.to_string().replace('-', ""), name: i.name, identity_type: i.identity_type, source: i.source, @@ -295,7 +299,7 @@ async fn get_identity_detail( #[derive(Debug, Serialize)] pub struct IdentityFilesResponse { pub success: bool, - pub identity_uuid: Uuid, + pub identity_uuid: String, pub total: i64, pub page: usize, pub page_size: usize, @@ -390,7 +394,7 @@ async fn get_identity_files( Ok(Json(IdentityFilesResponse { success: true, - identity_uuid: uuid, + identity_uuid: uuid.to_string().replace('-', ""), total: data.len() as i64, page, page_size, @@ -401,7 +405,7 @@ async fn get_identity_files( #[derive(Debug, Serialize)] pub struct IdentityFacesResponse { pub success: bool, - pub identity_uuid: Uuid, + pub identity_uuid: String, pub total: i64, pub page: usize, pub page_size: usize, @@ -413,7 +417,7 @@ pub struct IdentityFaceItem { pub id: i64, pub file_uuid: String, pub frame_number: i64, - pub timestamp_secs: f64, + pub timestamp_secs: Option, pub face_id: Option, pub bbox: BBox, pub confidence: f64, @@ -465,7 +469,7 @@ async fn get_identity_faces( Ok(Json(IdentityFacesResponse { success: true, - identity_uuid: uuid, + identity_uuid: uuid.to_string().replace('-', ""), total: data.len() as i64, page, page_size, @@ -476,7 +480,7 @@ async fn get_identity_faces( #[derive(Debug, Serialize)] pub struct IdentityChunksResponse { pub success: bool, - pub identity_uuid: Uuid, + pub identity_uuid: String, pub total: i64, pub page: usize, pub page_size: usize, @@ -528,7 +532,7 @@ async fn get_identity_chunks( Ok(Json(IdentityChunksResponse { success: true, - identity_uuid: uuid, + identity_uuid: uuid.to_string().replace('-', ""), total: data.len() as i64, page, page_size, diff --git a/src/api/server.rs b/src/api/server.rs index 5b065b8..a69ea7c 100644 --- a/src/api/server.rs +++ b/src/api/server.rs @@ -65,6 +65,19 @@ struct UserInfo { // Global State static SERVER_START: OnceCell = OnceCell::new(); +static SERVER_HOST: OnceCell = OnceCell::new(); +static SERVER_PORT: OnceCell = OnceCell::new(); + +fn get_host() -> String { + SERVER_HOST + .get() + .cloned() + .unwrap_or_else(|| "0.0.0.0".to_string()) +} + +fn get_port() -> u16 { + SERVER_PORT.get().copied().unwrap_or(0) +} fn get_uptime_ms() -> u64 { SERVER_START @@ -75,6 +88,8 @@ fn get_uptime_ms() -> u64 { #[derive(Debug, Serialize)] struct HealthResponse { + ip: String, + port: u16, status: String, version: String, build_git_hash: String, @@ -462,6 +477,8 @@ pub struct AppState { #[derive(Debug, Serialize)] struct DetailedHealthResponse { + ip: String, + port: u16, status: String, version: String, build_git_hash: String, @@ -583,6 +600,8 @@ async fn health(State(state): State) -> Json { } Json(HealthResponse { + ip: get_host(), + port: get_port(), status: status.to_string(), version: env!("BUILD_VERSION").to_string(), build_git_hash: env!("BUILD_GIT_HASH").to_string(), @@ -677,6 +696,8 @@ async fn health_detailed(State(state): State) -> Json anyhow::Result<()> { let _ = SERVER_START.set(Instant::now()); + // Resolve actual IP address for health identification + let resolved_ip = if host == "0.0.0.0" { + // Try to find a non-loopback IP + if let Ok(addrs) = std::net::ToSocketAddrs::to_socket_addrs(&"localhost:0") { + if let Some(addr) = addrs.filter_map(|a| match a { + std::net::SocketAddr::V4(v4) if !v4.ip().is_loopback() => Some(v4.ip().to_string()), + _ => None, + }).next() { + addr + } else { + // Fallback: try getting IP from UDP socket + std::net::UdpSocket::bind("0.0.0.0:0") + .and_then(|s| s.connect("8.8.8.8:53").map(|_| s)) + .and_then(|s| s.local_addr()) + .map(|a| a.ip().to_string()) + .unwrap_or_else(|_| "0.0.0.0".to_string()) + } + } else { + host.to_string() + } + } else { + host.to_string() + }; + let _ = SERVER_HOST.set(resolved_ip); + let _ = SERVER_PORT.set(port); let embedder = std::sync::Arc::new(Embedder::new("nomic-embed-text-v2-moe:latest".to_string())); let mongo_cache = MongoCache::init().await?; diff --git a/src/api/universal_search.rs b/src/api/universal_search.rs index 532460a..ef199ee 100644 --- a/src/api/universal_search.rs +++ b/src/api/universal_search.rs @@ -159,6 +159,18 @@ pub async fn universal_search( results.extend(person_results); } + // Deduplicate by chunk_id / frame_number / person_id + { + let mut seen_chunks = std::collections::HashSet::new(); + let mut seen_frames = std::collections::HashSet::new(); + let mut seen_persons = std::collections::HashSet::new(); + results.retain(|r| match r { + SearchResult::Chunk { chunk_id, .. } => seen_chunks.insert(chunk_id.clone()), + SearchResult::Frame { frame_number, .. } => seen_frames.insert(*frame_number), + SearchResult::Person { person_id, .. } => seen_persons.insert(person_id.clone()), + }); + } + // Sort by score descending results.sort_by(|a, b| { let score_a = match a { diff --git a/src/core/db/postgres_db.rs b/src/core/db/postgres_db.rs index 73c0668..c0f4284 100644 --- a/src/core/db/postgres_db.rs +++ b/src/core/db/postgres_db.rs @@ -106,7 +106,7 @@ pub struct IdentityFaceRecord { pub id: i64, pub file_uuid: String, pub frame_number: i64, - pub timestamp_secs: f64, + pub timestamp_secs: Option, pub face_id: Option, pub x: f64, pub y: f64, @@ -795,7 +795,7 @@ impl PostgresDb { .await?; // Chunks - sqlx::query("CREATE TABLE IF NOT EXISTS chunk (id SERIAL PRIMARY KEY, file_uuid VARCHAR(32) NOT NULL, chunk_id VARCHAR(64) NOT NULL, chunk_type VARCHAR(32) NOT NULL, start_time DOUBLE PRECISION NOT NULL, end_time DOUBLE PRECISION NOT NULL, fps DOUBLE PRECISION DEFAULT 24.0, start_frame BIGINT DEFAULT 0, end_frame BIGINT DEFAULT 0, content JSONB NOT NULL, metadata JSONB, vector_id VARCHAR(64), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(file_uuid, chunk_id))").execute(pool).await?; + sqlx::query("CREATE TABLE IF NOT EXISTS chunk (id SERIAL PRIMARY KEY, file_uuid VARCHAR(32) NOT NULL, chunk_id VARCHAR(32) NOT NULL, chunk_type VARCHAR(32) NOT NULL, start_time DOUBLE PRECISION NOT NULL, end_time DOUBLE PRECISION NOT NULL, fps DOUBLE PRECISION DEFAULT 24.0, start_frame BIGINT DEFAULT 0, end_frame BIGINT DEFAULT 0, content JSONB NOT NULL, metadata JSONB, vector_id VARCHAR(64), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(file_uuid, chunk_id))").execute(pool).await?; sqlx::query("CREATE INDEX IF NOT EXISTS idx_chunk_file ON chunk(file_uuid)") .execute(pool) .await?; @@ -814,7 +814,7 @@ impl PostgresDb { // Talents & Identity Bindings sqlx::query("CREATE TABLE IF NOT EXISTS talents (id BIGSERIAL PRIMARY KEY, real_name VARCHAR(255) NOT NULL UNIQUE, actor_name VARCHAR(255), voice_embedding TEXT, face_embedding TEXT, metadata JSONB DEFAULT '{}', created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP)").execute(pool).await?; - sqlx::query("CREATE TABLE IF NOT EXISTS identity_bindings (id BIGSERIAL PRIMARY KEY, identity_id BIGINT REFERENCES talents(id) ON DELETE CASCADE, identity_type VARCHAR(20) NOT NULL, identity_value VARCHAR(100) NOT NULL, metadata JSONB DEFAULT '{}', confidence DOUBLE PRECISION DEFAULT 1.0, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, UNIQUE(identity_id, identity_type, identity_value))").execute(pool).await?; + sqlx::query("CREATE TABLE IF NOT EXISTS identity_bindings (id BIGSERIAL PRIMARY KEY, identity_id BIGINT REFERENCES identities(id) ON DELETE CASCADE, identity_type VARCHAR(20) NOT NULL, identity_value VARCHAR(100) NOT NULL, metadata JSONB DEFAULT '{}', confidence DOUBLE PRECISION DEFAULT 1.0, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, UNIQUE(identity_id, identity_type, identity_value))").execute(pool).await?; // API Keys sqlx::query("CREATE TABLE IF NOT EXISTS api_keys (id SERIAL PRIMARY KEY, key_id VARCHAR(48) UNIQUE NOT NULL, key_hash VARCHAR(64) NOT NULL, key_prefix VARCHAR(8) NOT NULL, name VARCHAR(128) NOT NULL, key_type VARCHAR(20) NOT NULL DEFAULT 'user', user_id BIGINT, service_name VARCHAR(64), permissions JSONB DEFAULT '[\"read\", \"write\"]', expires_at TIMESTAMP, last_used_at TIMESTAMP, last_used_ip VARCHAR(45), usage_count BIGINT DEFAULT 0, status VARCHAR(20) NOT NULL DEFAULT 'active', rotation_required BOOLEAN DEFAULT FALSE, rotation_reason TEXT, grace_period_end TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)").execute(pool).await?; @@ -2497,8 +2497,8 @@ impl PostgresDb { offset: i64, ) -> Result> { let query = r#" - SELECT fd.id, fd.file_uuid, fd.frame_number, fd.timestamp_secs, - fd.face_id, fd.x, fd.y, fd.width, fd.height, fd.confidence + SELECT fd.id::int8, fd.file_uuid, fd.frame_number::int8, fd.timestamp_secs, + fd.face_id, fd.x::float8, fd.y::float8, fd.width::float8, fd.height::float8, fd.confidence::float8 FROM face_detections fd JOIN identities i ON fd.identity_id = i.id WHERE i.uuid = $1 diff --git a/src/core/processor/executor.rs b/src/core/processor/executor.rs index fa4df7c..15dbd94 100644 --- a/src/core/processor/executor.rs +++ b/src/core/processor/executor.rs @@ -82,14 +82,14 @@ fn load_checksums(scripts_dir: &PathBuf) -> HashMap { } pub fn validate_python_env() -> Result<()> { - let manifest = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - let venv_python = manifest.join("venv").join("bin").join("python"); + let python_path = std::env::var("MOMENTRY_PYTHON_PATH") + .unwrap_or_else(|_| "/opt/homebrew/bin/python3.11".to_string()); + let venv_python = PathBuf::from(&python_path); if !venv_python.exists() { anyhow::bail!( - "Python venv not found at {:?}\n\ - Run: /opt/homebrew/bin/python3.11 -m venv venv", - venv_python + "Python not found at {} (set MOMENTRY_PYTHON_PATH env var)", + python_path ); } @@ -109,9 +109,14 @@ pub fn validate_python_env() -> Result<()> { tracing::warn!("Expected Python 3.11, got: {}", version.trim()); } - let script_path = manifest.join("scripts"); + let scripts_dir = std::env::var("MOMENTRY_SCRIPTS_DIR") + .unwrap_or_else(|_| { + let manifest = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + manifest.join("scripts").to_string_lossy().to_string() + }); + let script_path = PathBuf::from(&scripts_dir); if !script_path.exists() { - anyhow::bail!("Scripts directory not found at {:?}", script_path); + anyhow::bail!("Scripts directory not found at {}", scripts_dir); } tracing::info!("Python environment validated successfully"); @@ -126,27 +131,37 @@ pub struct PythonExecutor { impl PythonExecutor { pub fn new() -> Result { - let manifest = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - let venv_python = manifest.join("venv").join("bin").join("python"); - let scripts_dir = manifest.join("scripts"); + let python_path = std::env::var("MOMENTRY_PYTHON_PATH") + .unwrap_or_else(|_| "/opt/homebrew/bin/python3.11".to_string()); + let scripts_dir = std::env::var("MOMENTRY_SCRIPTS_DIR") + .unwrap_or_else(|_| { + let manifest = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + manifest.join("scripts").to_string_lossy().to_string() + }); + + let venv_python = PathBuf::from(&python_path); + let scripts_path = PathBuf::from(&scripts_dir); if !venv_python.exists() { anyhow::bail!( - "Python venv not found at {:?}. Run: /opt/homebrew/bin/python3.11 -m venv venv", - venv_python + "Python not found at {} (set MOMENTRY_PYTHON_PATH env var)", + python_path ); } - if !scripts_dir.exists() { - anyhow::bail!("Scripts directory not found at {:?}", scripts_dir); + if !scripts_path.exists() { + anyhow::bail!( + "Scripts directory not found at {} (set MOMENTRY_SCRIPTS_DIR env var)", + scripts_dir + ); } // Load SHA256 checksums manifest - let checksums = load_checksums(&scripts_dir); + let checksums = load_checksums(&scripts_path); Ok(Self { venv_python, - scripts_dir, + scripts_dir: scripts_path, checksums, }) } diff --git a/src/main.rs b/src/main.rs index 62f4189..82d1763 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,12 +9,20 @@ use clap::Parser; mod cli; mod processing; +fn init_tracing() { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_target(true) + .init(); +} + use cli::*; use processing::handlers::*; /// Main entry point #[tokio::main] async fn main() -> Result<()> { + init_tracing(); let cli = Cli::parse(); match cli.command { @@ -207,12 +215,37 @@ async fn handle_worker( poll_interval: Option, batch_size: Option, ) -> Result<()> { + use momentry_core::core::db::{Database, PostgresDb, RedisClient}; + use momentry_core::worker::{JobWorker, WorkerConfig}; + println!("Starting job worker"); println!("Max concurrent: {:?}", max_concurrent); println!("Poll interval: {:?}", poll_interval); println!("Batch size: {:?}", batch_size); - // TODO: Implement worker logic + let config = WorkerConfig { + max_concurrent: max_concurrent.unwrap_or(2), + poll_interval_secs: poll_interval.unwrap_or(5), + enabled: true, + batch_size: batch_size.unwrap_or(10), + processor_timeout_secs: 3600, + }; + + let db = PostgresDb::init().await?; + let redis = RedisClient::new()?; + + let worker = JobWorker::new( + std::sync::Arc::new(db), + std::sync::Arc::new(redis), + config.clone(), + ); + + println!( + "Starting worker with max_concurrent={}, poll_interval={}s", + config.max_concurrent, config.poll_interval_secs + ); + + worker.run().await?; Ok(()) }