diff --git a/scripts/face_processor.py b/scripts/face_processor.py index 19e17d7..e0fd164 100644 --- a/scripts/face_processor.py +++ b/scripts/face_processor.py @@ -33,7 +33,7 @@ sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from redis_publisher import RedisPublisher SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) -SWIFT_BIN = os.path.join(SCRIPT_DIR, "swift_processors", ".build", "debug", "swift_face") +SWIFT_BIN = os.path.join(SCRIPT_DIR, "swift_processors", ".build", "debug", "swift_face_pose") FACENET_PATH = os.path.join(SCRIPT_DIR, "..", "models", "facenet512.mlpackage") # Pose angle classification from roll/yaw @@ -106,23 +106,29 @@ class FaceProcessorVision: return None def process_with_swift(self) -> Dict: - """Step 1: Run swift_face to get bbox + pose""" - print(f"[FACE_V2] Step 1: Vision detection...") + """Step 1: Run swift_face_pose to get bbox + pose (generates face.json + pose.json)""" + print(f"[FACE_V2] Step 1: Vision detection (face + pose)...") - # Build swift_face if needed + # Build swift_face_pose if needed if not os.path.exists(SWIFT_BIN): build_dir = os.path.join(SCRIPT_DIR, "swift_processors") - print(f"[FACE_V2] Building swift_face in {build_dir}...") + print(f"[FACE_V2] Building swift_face_pose in {build_dir}...") subprocess.run( - ["swift", "build", "-c", "debug", "--product", "swift_face"], + ["swift", "build", "-c", "debug", "--product", "swift_face_pose"], cwd=build_dir, check=True ) - swift_out = self.output_path.replace(".json", "_detect.json") + swift_face_out = self.output_path.replace(".json", "_detect.json") + # Pose output: same directory, but replace "face" with "pose" in filename + output_dir = os.path.dirname(self.output_path) + output_basename = os.path.basename(self.output_path) + pose_basename = output_basename.replace("face", "pose") + swift_pose_out = os.path.join(output_dir, pose_basename) cmd = [ SWIFT_BIN, self.video_path, - swift_out, + swift_face_out, + swift_pose_out, "--sample-interval", str(self.sample_interval), ] if self.uuid: @@ -130,7 +136,7 @@ class FaceProcessorVision: print(f"[FACE_V2] Running: {' '.join(cmd)}") t0 = time.time() - log_path = swift_out + ".log" + log_path = swift_face_out + ".log" log_f = open(log_path, "w") proc = subprocess.Popen(cmd, stdout=log_f, stderr=subprocess.STDOUT, text=True) last_pct = -1 @@ -155,13 +161,19 @@ class FaceProcessorVision: stderr_out = proc.stderr.read() if stderr_out: print(stderr_out.strip(), file=sys.stderr) - raise RuntimeError(f"swift_face exited with code {proc.returncode}") + raise RuntimeError(f"swift_face_pose exited with code {proc.returncode}") elapsed = time.time() - t0 print(f"[FACE_V2] Detection done in {elapsed:.1f}s") - with open(swift_out) as f: - return json.load(f) + with open(swift_face_out) as f: + face_data = json.load(f) + + # Also check if pose.json was generated (for reference) + if os.path.exists(swift_pose_out): + print(f"[FACE_V2] Pose file generated: {swift_pose_out}") + + return face_data def embed_and_save(self, detection_data: Dict): """Step 2: Crop faces + CoreML embedding + save face.json""" diff --git a/scripts/hand_processor.py b/scripts/hand_processor.py new file mode 100644 index 0000000..a63204c --- /dev/null +++ b/scripts/hand_processor.py @@ -0,0 +1,101 @@ +#!/opt/homebrew/bin/python3.11 +""" +Hand Processor Wrapper +Calls Swift Vision Framework hand pose (swift_hand) for gesture detection. +Uses VNDetectHumanHandPoseRequest with ANE acceleration. +""" + +import re +import sys +import json +import os +import subprocess +import argparse + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from redis_publisher import RedisPublisher + +SWIFT_HAND_PATH = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "swift_processors/.build/debug/swift_hand" +) +SWIFT_HAND_ALT = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "swift_processors/.build/arm64-apple-macosx/debug/swift_hand" +) + +SWIFT_HAND_PROGRESS_RE = re.compile(r"\[SwiftHand\] Progress:\s*(\d+)%") + +def process_hand( + video_path: str, + output_path: str, + uuid: str = "", + sample_interval: int = 3, + publisher: RedisPublisher = None, +) -> dict: + swift_bin = SWIFT_HAND_PATH + if not os.path.exists(swift_bin): + swift_bin = SWIFT_HAND_ALT + + if not os.path.exists(swift_bin): + print("[Hand] Swift binary not found", file=sys.stderr) + if publisher: + publisher.error("hand", "Swift binary not found") + return {"frame_count": 0, "fps": 0.0, "frames": []} + + cmd = [swift_bin, video_path, output_path, + "--sample-interval", str(sample_interval), + "--uuid", uuid] + + print(f"[Hand] Running Swift Hand (Vision Framework)", file=sys.stderr) + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + + last_pct = -1 + for line in proc.stdout: + line = line.strip() + m = SWIFT_HAND_PROGRESS_RE.search(line) + if m: + pct = int(m.group(1)) + if pct > last_pct: + last_pct = pct + print(f"[Hand] Progress: {pct}%", file=sys.stderr) + if publisher: + publisher.progress("hand", pct, 100, f"{pct}%") + elif line: + print(f" {line}", file=sys.stderr) + + stderr_output = proc.stderr.read() + if stderr_output: + print(stderr_output.strip(), file=sys.stderr) + + proc.wait() + + if proc.returncode != 0 or not os.path.exists(output_path): + print(f"[Hand] Swift Hand failed (exit={proc.returncode})", file=sys.stderr) + if publisher: + publisher.error("hand", f"Swift Hand failed") + return {"frame_count": 0, "fps": 0.0, "frames": []} + + with open(output_path) as f: + return json.load(f) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Hand Processor (Swift Vision)") + parser.add_argument("video_path") + parser.add_argument("output_path") + parser.add_argument("--uuid", "-u", default="") + parser.add_argument("--sample-interval", type=int, default=3) + args = parser.parse_args() + + publisher = RedisPublisher(args.uuid) if args.uuid else None + if publisher: + publisher.info("hand", "HAND_START") + + result = process_hand(args.video_path, args.output_path, args.uuid, + args.sample_interval, publisher) + with open(args.output_path, "w") as f: + json.dump(result, f, indent=2) + print(f"Hand: {len(result.get('frames', []))} frames with hands") + if publisher: + publisher.complete("hand", f"{len(result.get('frames',[]))} frames") \ No newline at end of file diff --git a/scripts/pose_processor.py b/scripts/pose_processor.py index ae47acc..42fac94 100755 --- a/scripts/pose_processor.py +++ b/scripts/pose_processor.py @@ -31,7 +31,7 @@ def process_pose( video_path: str, output_path: str, uuid: str = "", - sample_interval: int = 30, + sample_interval: int = 3, # Changed from 30 to match Face publisher: RedisPublisher = None, ) -> dict: swift_bin = SWIFT_POSE_PATH @@ -134,7 +134,7 @@ if __name__ == "__main__": parser.add_argument("video_path") parser.add_argument("output_path") parser.add_argument("--uuid", "-u", default="") - parser.add_argument("--sample-interval", type=int, default=30) + parser.add_argument("--sample-interval", type=int, default=3) # Changed from 30 to match Face args = parser.parse_args() publisher = RedisPublisher(args.uuid) if args.uuid else None diff --git a/scripts/swift_processors/Package.swift b/scripts/swift_processors/Package.swift index 1e6cb02..04d8d6e 100644 --- a/scripts/swift_processors/Package.swift +++ b/scripts/swift_processors/Package.swift @@ -118,5 +118,13 @@ let package = Package( path: ".", sources: ["swift_hand.swift"] ), + .executableTarget( + name: "swift_face_pose", + dependencies: [ + .product(name: "ArgumentParser", package: "swift-argument-parser"), + ], + path: ".", + sources: ["swift_face_pose.swift"] + ), ] ) diff --git a/scripts/swift_processors/swift_face.swift b/scripts/swift_processors/swift_face.swift index 06710d1..2b3a34c 100644 --- a/scripts/swift_processors/swift_face.swift +++ b/scripts/swift_processors/swift_face.swift @@ -13,8 +13,8 @@ struct SwiftFace: ParsableCommand { @Argument(help: "Output JSON path") var outputPath: String - @Option(name: .long, help: "Sample interval (frames, default=30)") - var sampleInterval: Int = 30 + @Option(name: .long, help: "Sample interval (frames, default=3)") + var sampleInterval: Int = 3 @Option(name: .long, help: "UUID for logging") var uuid: String = "" diff --git a/scripts/swift_processors/swift_face_pose.swift b/scripts/swift_processors/swift_face_pose.swift index 0094d1f..2854af7 100644 --- a/scripts/swift_processors/swift_face_pose.swift +++ b/scripts/swift_processors/swift_face_pose.swift @@ -318,8 +318,18 @@ struct SwiftFacePose: ParsableCommand { "fps": Double(fps), "frames": faceFrames, ] - if let faceJson = try? JSONSerialization.data(withJSONObject: faceOutputDict, options: []) { + do { + let faceJson = try JSONSerialization.data(withJSONObject: faceOutputDict, options: []) try faceJson.write(to: URL(fileURLWithPath: faceOutput)) + print("[SwiftFacePose] Face output written: \(faceOutput)") + // Verify file exists + if FileManager.default.fileExists(atPath: faceOutput) { + print("[SwiftFacePose] Verified: file exists at \(faceOutput)") + } else { + print("[SwiftFacePose] ERROR: file not found after write!") + } + } catch { + print("[SwiftFacePose] ERROR writing face output: \(error)") } let poseOutputDict: [String: Any] = [ diff --git a/scripts/swift_processors/swift_hand.swift b/scripts/swift_processors/swift_hand.swift index 6f84380..232679a 100644 --- a/scripts/swift_processors/swift_hand.swift +++ b/scripts/swift_processors/swift_hand.swift @@ -18,7 +18,7 @@ struct SwiftHandProcessor: ParsableCommand { var uuid: String = "" @Option(name: [.short, .long], help: "Sample interval (frames)") - var sampleInterval: Int = 30 + var sampleInterval: Int = 3 @Option(name: [.long], help: "Minimum confidence threshold") var minConfidence: Double = 0.3 diff --git a/scripts/swift_processors/swift_pose.swift b/scripts/swift_processors/swift_pose.swift index 67c5069..dc14e02 100644 --- a/scripts/swift_processors/swift_pose.swift +++ b/scripts/swift_processors/swift_pose.swift @@ -26,8 +26,8 @@ struct SwiftPose: ParsableCommand { @Argument(help: "Output JSON path") var outputPath: String - @Option(name: .long, help: "Sample interval (frames, default=30)") - var sampleInterval: Int = 30 + @Option(name: .long, help: "Sample interval (frames, default=3)") + var sampleInterval: Int = 3 @Option(name: .long, help: "UUID for logging") var uuid: String = "" diff --git a/src/api/trace_agent_api.rs b/src/api/trace_agent_api.rs index 6c13440..950b814 100644 --- a/src/api/trace_agent_api.rs +++ b/src/api/trace_agent_api.rs @@ -1004,6 +1004,7 @@ async fn rebuild_tkg( "appearance_trace_nodes": r.appearance_trace_nodes, "accessory_nodes": r.accessory_nodes, "object_nodes": r.object_nodes, + "hand_nodes": r.hand_nodes, "speaker_nodes": r.speaker_nodes, "co_occurrence_edges": r.co_occurrence_edges, "speaker_face_edges": r.speaker_face_edges, @@ -1012,6 +1013,7 @@ async fn rebuild_tkg( "lip_sync_edges": r.lip_sync_edges, "has_appearance_edges": r.has_appearance_edges, "wears_edges": r.wears_edges, + "hand_object_edges": r.hand_object_edges, })), error: None, }) diff --git a/src/core/db/postgres_db.rs b/src/core/db/postgres_db.rs index 9b8e43b..492892c 100644 --- a/src/core/db/postgres_db.rs +++ b/src/core/db/postgres_db.rs @@ -441,6 +441,7 @@ pub enum ProcessorType { Ocr, Face, Pose, + Hand, Asrx, Scene, Story, @@ -479,6 +480,7 @@ impl ProcessorType { ProcessorType::Ocr => "ocr", ProcessorType::Face => "face", ProcessorType::Pose => "pose", + ProcessorType::Hand => "hand", ProcessorType::Asrx => "asrx", ProcessorType::Scene => "scene", ProcessorType::Story => "story", @@ -496,6 +498,7 @@ impl ProcessorType { "ocr" => Some(ProcessorType::Ocr), "face" => Some(ProcessorType::Face), "pose" => Some(ProcessorType::Pose), + "hand" => Some(ProcessorType::Hand), "asrx" => Some(ProcessorType::Asrx), "scene" => Some(ProcessorType::Scene), "story" => Some(ProcessorType::Story), @@ -514,6 +517,7 @@ impl ProcessorType { ProcessorType::Ocr => 0.8, ProcessorType::Face => 0.6, ProcessorType::Pose => 0.4, + ProcessorType::Hand => 0.4, ProcessorType::Asrx => 0.8, ProcessorType::Scene => 0.3, ProcessorType::Story => 0.1, @@ -525,7 +529,7 @@ impl ProcessorType { pub fn uses_gpu(&self) -> bool { match self { - ProcessorType::Yolo | ProcessorType::Face | ProcessorType::Pose => true, + ProcessorType::Yolo | ProcessorType::Face | ProcessorType::Pose | ProcessorType::Hand => true, ProcessorType::MediaPipe => false, _ => false, } @@ -539,6 +543,7 @@ impl ProcessorType { ProcessorType::Ocr => 1024, ProcessorType::Face => 1536, ProcessorType::Pose => 1024, + ProcessorType::Hand => 1024, ProcessorType::Asrx => 2048, ProcessorType::Scene => 512, ProcessorType::Story => 256, @@ -556,6 +561,7 @@ impl ProcessorType { ProcessorType::Ocr => Some("paddleocr"), ProcessorType::Face => Some("insightface/buffalo_l"), ProcessorType::Pose => Some("mediapipe/pose"), + ProcessorType::Hand => Some("vision/hand_pose"), ProcessorType::Asrx => Some("speechbrain/ecapa-tdnn"), ProcessorType::Scene => Some("places365"), ProcessorType::Story => None, @@ -577,6 +583,7 @@ impl ProcessorType { ], ProcessorType::FiveW1H => vec![ProcessorType::Story], ProcessorType::Appearance => vec![ProcessorType::Pose], + ProcessorType::Hand => vec![], ProcessorType::MediaPipe => vec![], _ => vec![], } @@ -591,6 +598,7 @@ impl ProcessorType { ProcessorType::Ocr, ProcessorType::Face, ProcessorType::Pose, + ProcessorType::Hand, ProcessorType::Appearance, ] } @@ -601,6 +609,7 @@ impl ProcessorType { | ProcessorType::Ocr | ProcessorType::Face | ProcessorType::Pose + | ProcessorType::Hand | ProcessorType::Appearance | ProcessorType::MediaPipe => PipelineType::Frame, @@ -3623,6 +3632,79 @@ impl PostgresDb { })) } + pub async fn delete_pre_chunks_by_uuid(&self, uuid: &str) -> Result { + let table = schema::table_name("pre_chunks"); + let result = sqlx::query(&format!( + "DELETE FROM {} WHERE file_uuid = $1", + table + )) + .bind(uuid) + .execute(&self.pool) + .await?; + Ok(result.rows_affected() as i64) + } + + pub async fn delete_frames_by_uuid(&self, uuid: &str) -> Result { + let table = schema::table_name("frames"); + let file_id = self.get_file_id_by_uuid(uuid).await?; + let result = sqlx::query(&format!( + "DELETE FROM {} WHERE file_id = $1", + table + )) + .bind(file_id) + .execute(&self.pool) + .await?; + Ok(result.rows_affected() as i64) + } + + pub async fn delete_chunks_by_uuid(&self, uuid: &str) -> Result { + let table = schema::table_name("chunks_rule1"); + let result = sqlx::query(&format!( + "DELETE FROM {} WHERE file_uuid = $1", + table + )) + .bind(uuid) + .execute(&self.pool) + .await?; + Ok(result.rows_affected() as i64) + } + + pub async fn delete_tkg_nodes_by_uuid(&self, uuid: &str) -> Result { + let table = schema::table_name("tkg_nodes"); + let result = sqlx::query(&format!( + "DELETE FROM {} WHERE file_uuid = $1", + table + )) + .bind(uuid) + .execute(&self.pool) + .await?; + Ok(result.rows_affected() as i64) + } + + pub async fn delete_tkg_edges_by_uuid(&self, uuid: &str) -> Result { + let table = schema::table_name("tkg_edges"); + let result = sqlx::query(&format!( + "DELETE FROM {} WHERE file_uuid = $1", + table + )) + .bind(uuid) + .execute(&self.pool) + .await?; + Ok(result.rows_affected() as i64) + } + + async fn get_file_id_by_uuid(&self, uuid: &str) -> Result { + let table = schema::table_name("videos"); + let result: (i32,) = sqlx::query_as(&format!( + "SELECT id FROM {} WHERE file_uuid = $1", + table + )) + .bind(uuid) + .fetch_one(&self.pool) + .await?; + Ok(result.0) + } + pub async fn store_pre_chunk( &self, uuid: &str, diff --git a/src/core/processor/face.rs b/src/core/processor/face.rs index 6e7a5b0..b437416 100644 --- a/src/core/processor/face.rs +++ b/src/core/processor/face.rs @@ -30,6 +30,7 @@ pub struct Face { pub confidence: f32, pub embedding: Option>, pub landmarks: Option, + pub pose_angle: Option, pub attributes: Option, } diff --git a/src/core/processor/hand.rs b/src/core/processor/hand.rs new file mode 100644 index 0000000..cd00dde --- /dev/null +++ b/src/core/processor/hand.rs @@ -0,0 +1,85 @@ +use anyhow::{Context, Result}; +use serde::{Deserialize, Serialize}; +use std::time::Duration; + +use super::executor::PythonExecutor; + +const HAND_TIMEOUT: Duration = Duration::from_secs(7200); + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct HandResult { + pub frame_count: u64, + pub fps: f64, + pub frames: Vec, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct HandFrame { + pub frame: u64, + pub timestamp: f64, + pub persons: Vec, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct PersonHand { + pub person_id: u32, + pub hand_type: String, + pub confidence: f32, + pub landmarks: Vec, + pub num_landmarks: u32, + pub gesture: String, + pub hand_state: String, + pub finger_extensions: serde_json::Value, + pub num_fingers_extended: u32, + pub num_fingers_curled: u32, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct HandLandmark { + pub name: String, + pub x: f32, + pub y: f32, + pub confidence: f32, +} + +pub async fn process_hand( + video_path: &str, + output_path: &str, + uuid: Option<&str>, + frames: Option<&[i64]>, +) -> Result { + let executor = PythonExecutor::new()?; + let script_path = executor.script_path("hand_processor.py"); + + tracing::info!("[HAND] Starting hand pose estimation: {}", video_path); + + if !script_path.exists() { + tracing::warn!("[HAND] Script not found, returning empty result"); + return Ok(HandResult { + frame_count: 0, + fps: 0.0, + frames: vec![], + }); + } + + executor + .run_with_frames( + "hand_processor.py", + &[video_path, output_path], + uuid, + "HAND", + Some(HAND_TIMEOUT), + frames, + ) + .await + .with_context(|| format!("Failed to run {:?}", script_path))?; + + let json_str = std::fs::read_to_string(output_path).context("Failed to read HAND output")?; + + let result: HandResult = + serde_json::from_str(&json_str).context("Failed to parse HAND output")?; + + tracing::info!("[HAND] Result: {} frames", result.frames.len()); + + Ok(result) +} \ No newline at end of file diff --git a/src/core/processor/mod.rs b/src/core/processor/mod.rs index b8061dc..0cfdabf 100644 --- a/src/core/processor/mod.rs +++ b/src/core/processor/mod.rs @@ -8,6 +8,7 @@ pub mod cut; pub mod executor; pub mod face; pub mod face_recognition; +pub mod hand; pub mod heuristic_scene; pub mod mediapipe_v2; pub mod ocr; @@ -36,6 +37,9 @@ pub use face_recognition::{ FaceRecognitionFrame, FaceRecognitionResult, FaceRegistrationResult, RecognizedFace, RecognizedFaceDetection, }; +pub use hand::{ + process_hand, HandFrame, HandLandmark, HandResult, PersonHand, +}; pub use heuristic_scene::{ build_heuristic_scene_meta, generate_scene_meta, CrowdSize, HeuristicSceneMeta, SceneSegmentMeta, diff --git a/src/core/processor/tkg.rs b/src/core/processor/tkg.rs index f1520be..8f92f0a 100644 --- a/src/core/processor/tkg.rs +++ b/src/core/processor/tkg.rs @@ -429,9 +429,61 @@ struct YoloDetEntry { #[serde(default)] class_name: String, #[serde(default)] + class_id: i32, + #[serde(default)] + x: i32, + #[serde(default)] + y: i32, + #[serde(default)] + width: i32, + #[serde(default)] + height: i32, + #[serde(default)] confidence: f64, } +#[derive(Debug, Deserialize)] +struct HandJson { + #[serde(default)] + frames: Vec, +} + +#[derive(Debug, Deserialize)] +struct HandFrameData { + #[serde(default)] + frame: u64, + #[serde(default)] + timestamp: f64, + #[serde(default)] + persons: Vec, +} + +#[derive(Debug, Deserialize)] +struct HandPersonData { + #[serde(default)] + hand_type: String, + #[serde(default)] + gesture: String, + #[serde(default)] + hand_state: String, + #[serde(default)] + confidence: f32, + #[serde(default)] + landmarks: Vec, +} + +#[derive(Debug, Deserialize)] +struct HandLandmark { + #[serde(default)] + name: String, + #[serde(default)] + x: f32, + #[serde(default)] + y: f32, + #[serde(default)] + confidence: f32, +} + #[derive(Debug, Deserialize)] struct AsrxJson { #[serde(default)] @@ -502,6 +554,7 @@ pub text_region_nodes: usize, pub appearance_trace_nodes: usize, pub accessory_nodes: usize, pub object_nodes: usize, + pub hand_nodes: usize, pub speaker_nodes: usize, pub co_occurrence_edges: usize, pub speaker_face_edges: usize, @@ -510,6 +563,7 @@ pub accessory_nodes: usize, pub lip_sync_edges: usize, pub has_appearance_edges: usize, pub wears_edges: usize, + pub hand_object_edges: usize, } pub async fn build_tkg(db: &PostgresDb, file_uuid: &str, output_dir: &str) -> Result { @@ -553,6 +607,7 @@ let n_appearance = build_appearance_trace_nodes(pool, file_uuid, output_dir, &pose_data).await?; let n_accessories = build_accessory_nodes(pool, file_uuid, output_dir).await?; let n_objects = build_yolo_object_nodes(pool, file_uuid, output_dir).await?; + let n_hands = build_hand_nodes(pool, file_uuid, output_dir).await?; let n_speakers = build_speaker_nodes(pool, file_uuid, output_dir).await?; let e_co = build_co_occurrence_edges(pool, file_uuid, output_dir).await?; @@ -562,6 +617,7 @@ let n_accessories = build_accessory_nodes(pool, file_uuid, output_dir).await?; let e_ls = build_lip_sync_edges(pool, file_uuid, output_dir, &pose_data).await?; let e_ha = build_has_appearance_edges(pool, file_uuid).await?; let e_w = build_wears_edges(pool, file_uuid).await?; + let e_ho = build_hand_object_edges(pool, file_uuid, output_dir).await?; Ok(TkgResult { face_track_nodes: n_face, @@ -571,6 +627,7 @@ text_region_nodes: n_text, appearance_trace_nodes: n_appearance, accessory_nodes: n_accessories, object_nodes: n_objects, + hand_nodes: n_hands, speaker_nodes: n_speakers, co_occurrence_edges: e_co, speaker_face_edges: e_sf, @@ -579,6 +636,7 @@ object_nodes: n_objects, lip_sync_edges: e_ls, has_appearance_edges: e_ha, wears_edges: e_w, + hand_object_edges: e_ho, }) } @@ -604,9 +662,9 @@ async fn build_face_track_nodes( .await; } - // Fallback to PostgreSQL - tracing::info!("[TKG-Phase2] No Qdrant embeddings, falling back to PostgreSQL"); - build_face_track_nodes_from_pg(pool, file_uuid, pose_data).await + // Fallback to face.json (Phase 2.5: Direct from face.json) + tracing::info!("[TKG-Phase2.5] No Qdrant embeddings, trying face.json"); + build_face_track_nodes_from_face_json(pool, file_uuid, pose_data).await } async fn build_face_track_nodes_from_qdrant( @@ -844,6 +902,121 @@ async fn build_face_track_nodes_from_pg( Ok(count) } +async fn build_face_track_nodes_from_face_json( + pool: &PgPool, + file_uuid: &str, + pose_data: &[FacePose], +) -> Result { + let face_json_path = Path::new(&*crate::core::config::OUTPUT_DIR) + .join(format!("{}.face.json", file_uuid)); + + if !face_json_path.exists() { + tracing::info!("[TKG-Phase2.5] No face.json for {}", file_uuid); + return Ok(0); + } + + let content = std::fs::read_to_string(&face_json_path) + .with_context(|| format!("Failed to read {:?}", face_json_path))?; + let face_result: crate::core::processor::face::FaceResult = serde_json::from_str(&content) + .with_context(|| format!("Failed to parse {:?}", face_json_path))?; + + // Extract faces with embeddings + let faces_with_embeddings: Vec<(u64, f64, crate::core::processor::face::Face)> = face_result + .frames + .iter() + .flat_map(|frame| { + frame.faces.iter().filter_map(|face| { + if let Some(ref embedding) = face.embedding { + Some((frame.frame, frame.timestamp, face.clone())) + } else { + None + } + }) + }) + .collect(); + + if faces_with_embeddings.is_empty() { + tracing::info!("[TKG-Phase2.5] No embeddings in face.json for {}", file_uuid); + return Ok(0); + } + + tracing::info!( + "[TKG-Phase2.5] Found {} faces with embeddings in face.json", + faces_with_embeddings.len() + ); + + // Simple clustering: assign trace_id based on consecutive similar embeddings + let nodes_table = t("tkg_nodes"); + let mut trace_id = 1_i64; + let mut traces: Vec<(i64, Vec<(u64, f64, f64, f64, f64, f64)>)> = vec![]; + + for (frame, timestamp, face) in &faces_with_embeddings { + // For simplicity, assign all faces to one trace (proper clustering requires DBSCAN) + if traces.is_empty() { + traces.push((trace_id, vec![])); + } + traces[0].1.push(( + *frame, + *timestamp, + face.x as f64, + face.y as f64, + face.width as f64, + face.height as f64, + )); + } + + let mut count = 0; + for (tid, frames) in &traces { + let external_id = format!("face_track_{}", tid); + let label = format!("Face Trace {}", tid); + + let frame_count = frames.len() as i64; + let start_f = frames.iter().map(|(f, _, _, _, _, _)| *f as i64).min().unwrap_or(0); + let end_f = frames.iter().map(|(f, _, _, _, _, _)| *f as i64).max().unwrap_or(0); + let avg_x = frames.iter().map(|(_, _, x, _, _, _)| *x).sum::() / frame_count as f64; + let avg_y = frames.iter().map(|(_, _, _, y, _, _)| *y).sum::() / frame_count as f64; + let avg_w = frames.iter().map(|(_, _, _, _, w, _)| *w).sum::() / frame_count as f64; + let avg_h = frames.iter().map(|(_, _, _, _, _, h)| *h).sum::() / frame_count as f64; + + let props = serde_json::json!({ + "trace_id": tid, + "frame_count": frame_count, + "start_frame": start_f, + "end_frame": end_f, + "avg_bbox": { + "x": avg_x.round() as i64, + "y": avg_y.round() as i64, + "width": avg_w.round() as i64, + "height": avg_h.round() as i64, + }, + }); + + sqlx::query(&format!( + r#" + INSERT INTO {} (node_type, external_id, file_uuid, label, properties) + VALUES ($1, $2, $3, $4, $5::jsonb) + ON CONFLICT (file_uuid, node_type, external_id) + DO UPDATE SET + properties = COALESCE(EXCLUDED.properties, tkg_nodes.properties), + label = COALESCE(NULLIF(EXCLUDED.label, ''), tkg_nodes.label) + "#, + nodes_table + )) + .bind("face_track") + .bind(&external_id) + .bind(file_uuid) + .bind(&label) + .bind(serde_json::to_string(&props)?) + .execute(pool) + .await?; + + count += 1; + } + + tracing::info!("[TKG-Phase2.5] Built {} face_track nodes from face.json", count); + Ok(count) +} + async fn build_yolo_object_nodes( pool: &PgPool, file_uuid: &str, @@ -900,6 +1073,64 @@ for det in dets { Ok(count) } +async fn build_hand_nodes( + pool: &PgPool, + file_uuid: &str, + output_dir: &str, +) -> Result { + let hand_path = Path::new(output_dir).join(format!("{}.hand.json", file_uuid)); + if !hand_path.exists() { + return Ok(0); + } + + let content = std::fs::read_to_string(&hand_path) + .with_context(|| format!("Failed to read {:?}", hand_path))?; + let hand: HandJson = serde_json::from_str(&content) + .with_context(|| format!("Failed to parse {:?}", hand_path))?; + + let mut hand_gesture_counts: HashMap<(String, String), i64> = HashMap::new(); + for fdata in &hand.frames { + for person in &fdata.persons { + let key = (person.hand_type.clone(), person.gesture.clone()); + *hand_gesture_counts.entry(key).or_insert(0) += 1; + } + } + + let nodes_table = t("tkg_nodes"); + let mut count = 0; + for ((hand_type, gesture), cnt) in &hand_gesture_counts { + let external_id = format!("{}/{}", hand_type, gesture); + let props = serde_json::json!({ + "total_detections": cnt, + "hand_type": hand_type, + "gesture": gesture, + }); + + sqlx::query(&format!( + r#" + INSERT INTO {} (node_type, external_id, file_uuid, label, properties) + VALUES ($1, $2, $3, $4, $5::jsonb) + ON CONFLICT (file_uuid, node_type, external_id) + DO UPDATE SET + properties = COALESCE(EXCLUDED.properties, tkg_nodes.properties) + "#, + nodes_table + )) + .bind("hand") + .bind(&external_id) + .bind(file_uuid) + .bind(&external_id) + .bind(serde_json::to_string(&props)?) + .execute(pool) + .await?; + + count += 1; + } + + tracing::info!("[TKG] Hand nodes: {} hand_type/gesture combinations", count); + Ok(count) +} + async fn build_speaker_nodes(pool: &PgPool, file_uuid: &str, output_dir: &str) -> Result { let asrx_path = Path::new(output_dir).join(format!("{}.asrx.json", file_uuid)); if !asrx_path.exists() { @@ -1928,7 +2159,16 @@ async fn build_gaze_track_nodes( .await; } - tracing::info!("[TKG-Phase2.5] No Qdrant embeddings, falling back to PostgreSQL"); + tracing::info!("[TKG-Phase2.5] No Qdrant embeddings, trying face.json"); + + // Try face.json first (方案 B) + let count = build_gaze_track_nodes_from_face_json(pool, file_uuid, pose_data).await?; + if count > 0 { + return Ok(count); + } + + // Fallback to PostgreSQL + tracing::info!("[TKG-Phase2.5] No face.json gaze data, falling back to PostgreSQL"); build_gaze_track_nodes_from_pg(pool, file_uuid, pose_data).await } @@ -2084,6 +2324,107 @@ let mut frame_count = 0i64; Ok(count) } +async fn build_gaze_track_nodes_from_face_json( + pool: &PgPool, + file_uuid: &str, + pose_data: &[FacePose], +) -> Result { + let face_json_path = Path::new(&*crate::core::config::OUTPUT_DIR) + .join(format!("{}.face.json", file_uuid)); + + if !face_json_path.exists() { + tracing::info!("[TKG-Phase2.5] No face.json for gaze_track"); + return Ok(0); + } + + let content = std::fs::read_to_string(&face_json_path)?; + let face_result: crate::core::processor::face::FaceResult = serde_json::from_str(&content)?; + + // Group faces by trace_id (assuming trace_id = 1 for all faces in face.json) + let mut frames_data: Vec<(u64, f64, f64, f64, f64)> = vec![]; + + for frame in &face_result.frames { + for face in &frame.faces { + frames_data.push(( + frame.frame, + face.x as f64, + face.y as f64, + face.width as f64, + face.height as f64, + )); + } + } + + if frames_data.is_empty() { + return Ok(0); + } + + // Compute gaze stats for trace_id = 1 + let trace_id = 1_i64; + let external_id = format!("gaze_{}", trace_id); + + let mut frame_count = 0i64; + let mut first_frame = i64::MAX; + let mut last_frame = i64::MIN; + let mut yaw_sum = 0.0f64; + let mut pitch_sum = 0.0f64; + let mut roll_sum = 0.0f64; + let mut gaze_dir_counts: HashMap<&str, i64> = HashMap::new(); + + for (frame, x, y, w, h) in &frames_data { + if let Some((yaw, pitch, roll)) = get_pose_for_face(*frame as i64, *x, *y, *w, *h, pose_data) { + frame_count += 1; + first_frame = first_frame.min(*frame as i64); + last_frame = last_frame.max(*frame as i64); + yaw_sum += yaw; + pitch_sum += pitch; + roll_sum += roll; + + let gaze_dir = GazeDirection::from_yaw_pitch(yaw, pitch); + *gaze_dir_counts.entry(gaze_dir.as_str()).or_default() += 1; + } + } + + if frame_count == 0 { + return Ok(0); + } + + let avg_yaw = yaw_sum / frame_count as f64; + let avg_pitch = pitch_sum / frame_count as f64; + let avg_roll = roll_sum / frame_count as f64; + let dominant_gaze = gaze_dir_counts.iter().max_by_key(|(_, &c)| c).map(|(&d, _)| d).unwrap_or("unknown"); + let (gaze_dx, gaze_dy) = compute_gaze_vector(avg_yaw, avg_pitch); + + let props = serde_json::json!({ + "trace_id": trace_id, + "frame_count": frame_count, + "start_frame": first_frame, + "end_frame": last_frame, + "avg_yaw": (avg_yaw * 1000.0).round() / 1000.0, + "avg_pitch": (avg_pitch * 1000.0).round() / 1000.0, + "avg_roll": (avg_roll * 1000.0).round() / 1000.0, + "dominant_gaze": dominant_gaze, + "gaze_dx": (gaze_dx * 1000.0).round() / 1000.0, + "gaze_dy": (gaze_dy * 1000.0).round() / 1000.0, + }); + + let nodes_table = t("tkg_nodes"); + sqlx::query(&format!( + "INSERT INTO {} (file_uuid, external_id, label, node_type, properties, created_at) \ + VALUES ($1, $2, $3, 'gaze_track', $4, NOW())", + nodes_table + )) + .bind(file_uuid) + .bind(&external_id) + .bind(&format!("Gaze Trace {}", trace_id)) + .bind(&props) + .execute(pool) + .await?; + + tracing::info!("[TKG-Phase2.5] Built {} gaze_track node from face.json", 1); + Ok(1) +} + async fn build_gaze_track_nodes_from_pg( pool: &PgPool, file_uuid: &str, @@ -2408,7 +2749,16 @@ async fn build_lip_track_nodes( .await; } - tracing::info!("[TKG-Phase2.5] No Qdrant embeddings, falling back to PostgreSQL"); + tracing::info!("[TKG-Phase2.5] No Qdrant embeddings, trying face.json"); + + // Try face.json first (方案 B) + let count = build_lip_track_nodes_from_face_json(pool, file_uuid, pose_data).await?; + if count > 0 { + return Ok(count); + } + + // Fallback to PostgreSQL + tracing::info!("[TKG-Phase2.5] No face.json lip data, falling back to PostgreSQL"); build_lip_track_nodes_from_pg(pool, file_uuid, output_dir, pose_data).await } @@ -2624,6 +2974,109 @@ let frame_count = frames.len() as i64; Ok(count) } +async fn build_lip_track_nodes_from_face_json( + pool: &PgPool, + file_uuid: &str, + pose_data: &[FacePose], +) -> Result { + let face_json_path = Path::new(&*crate::core::config::OUTPUT_DIR) + .join(format!("{}.face.json", file_uuid)); + + if !face_json_path.exists() { + tracing::info!("[TKG-Phase2.5] No face.json for lip_track"); + return Ok(0); + } + + let content = std::fs::read_to_string(&face_json_path)?; + let face_result: crate::core::processor::face::FaceResult = serde_json::from_str(&content)?; + + // Group faces by trace_id (assuming trace_id = 1 for all faces in face.json) + let mut frames_data: Vec<(u64, f64, f64, f64, f64, Option)> = vec![]; + + for frame in &face_result.frames { + for face in &frame.faces { + frames_data.push(( + frame.frame, + face.x as f64, + face.y as f64, + face.width as f64, + face.height as f64, + face.landmarks.clone(), + )); + } + } + + if frames_data.is_empty() { + return Ok(0); + } + + // Compute lip stats for trace_id = 1 + let trace_id = 1_i64; + let external_id = format!("lip_{}", trace_id); + + let mut frame_count = 0i64; + let mut first_frame = i64::MAX; + let mut last_frame = i64::MIN; + let mut lip_area_sum = 0.0f64; + let mut lip_openness_sum = 0.0f64; + let mut speaking_frames = 0i64; + + for (frame, x, y, w, h, landmarks) in &frames_data { + if let Some((yaw, pitch, roll)) = get_pose_for_face(*frame as i64, *x, *y, *w, *h, pose_data) { + frame_count += 1; + first_frame = first_frame.min(*frame as i64); + last_frame = last_frame.max(*frame as i64); + + // Compute lip area and openness from landmarks + let lip_area = compute_lip_area(landmarks.as_ref()); + let lip_openness = if lip_area > 0.0 { lip_area / (w * h) } else { 0.0 }; + + lip_area_sum += lip_area; + lip_openness_sum += lip_openness; + + // Speaking detection (lip openness > threshold) + if lip_openness > 0.02 { + speaking_frames += 1; + } + } + } + + if frame_count == 0 { + return Ok(0); + } + + let avg_lip_area = lip_area_sum / frame_count as f64; + let avg_lip_openness = lip_openness_sum / frame_count as f64; + let speaking_ratio = speaking_frames as f64 / frame_count as f64; + + let props = serde_json::json!({ + "trace_id": trace_id, + "frame_count": frame_count, + "start_frame": first_frame, + "end_frame": last_frame, + "avg_lip_area": (avg_lip_area * 1000.0).round() / 1000.0, + "avg_lip_openness": (avg_lip_openness * 1000.0).round() / 1000.0, + "speaking_frames": speaking_frames, + "speaking_ratio": (speaking_ratio * 100.0).round() / 100.0, + }); + + let nodes_table = t("tkg_nodes"); + sqlx::query(&format!( + "INSERT INTO {} (file_uuid, external_id, label, node_type, properties, created_at) \ + VALUES ($1, $2, $3, 'lip_track', $4, NOW())", + nodes_table + )) + .bind(file_uuid) + .bind(&external_id) + .bind(&format!("Lip Trace {}", trace_id)) + .bind(&props) + .execute(pool) + .await?; + + tracing::info!("[TKG-Phase2.5] Built {} lip_track node from face.json", 1); + Ok(1) +} + async fn build_lip_track_nodes_from_pg( pool: &PgPool, file_uuid: &str, @@ -2936,13 +3389,13 @@ async fn build_lip_sync_edges( let edges_table = t("tkg_edges"); // Get lip traces - let lip_tracks: Vec<(i64, String, i64, i64, i64, f64)> = sqlx::query_as(&format!( + let lip_tracks: Vec<(i64, String, Option, Option, Option, Option)> = sqlx::query_as(&format!( r#" SELECT id::bigint, external_id, (properties->>'start_frame')::bigint, (properties->>'end_frame')::bigint, (properties->>'speaking_frames')::bigint, - (properties->>'avg_openness')::float8 + (properties->>'avg_lip_openness')::float8 FROM {} WHERE file_uuid = $1 AND node_type = 'lip_track' "#, nodes_table @@ -2952,7 +3405,7 @@ async fn build_lip_sync_edges( .await?; // Get text traces - let text_regions: Vec<(i64, String, i64, i64, Option)> = sqlx::query_as(&format!( + let text_regions: Vec<(i64, String, Option, Option, Option)> = sqlx::query_as(&format!( r#" SELECT id::bigint, external_id, (properties->>'start_frame')::bigint, @@ -2969,8 +3422,15 @@ async fn build_lip_sync_edges( let mut edge_count = 0; let mut node_id_cache: HashMap = HashMap::new(); - for (lip_id, lip_ext, lip_start, lip_end, lip_speaking, lip_openness) in &lip_tracks { - for (text_id, text_ext, text_start, text_end, speaker_id) in &text_regions { + for (lip_id, lip_ext, lip_start_opt, lip_end_opt, lip_speaking_opt, lip_openness_opt) in &lip_tracks { + let lip_start = lip_start_opt.unwrap_or(0); + let lip_end = lip_end_opt.unwrap_or(0); + let lip_speaking = lip_speaking_opt.unwrap_or(0); + let lip_openness = lip_openness_opt.unwrap_or(0.0); + + for (text_id, text_ext, text_start_opt, text_end_opt, speaker_id) in &text_regions { + let text_start = text_start_opt.unwrap_or(0); + let text_end = text_end_opt.unwrap_or(0); // Check time overlap let overlap_start = lip_start.max(text_start); let overlap_end = lip_end.min(text_end); @@ -3334,6 +3794,142 @@ async fn build_wears_edges(pool: &PgPool, file_uuid: &str) -> Result { Ok(0) } +async fn build_hand_object_edges(pool: &PgPool, file_uuid: &str, output_dir: &str) -> Result { + let hand_path = Path::new(output_dir).join(format!("{}.hand.json", file_uuid)); + let yolo_path = Path::new(output_dir).join(format!("{}.yolo.json", file_uuid)); + + if !hand_path.exists() || !yolo_path.exists() { + return Ok(0); + } + + let hand_content = std::fs::read_to_string(&hand_path) + .with_context(|| format!("Failed to read {:?}", hand_path))?; + let hand: HandJson = serde_json::from_str(&hand_content) + .with_context(|| format!("Failed to parse {:?}", hand_path))?; + + let yolo_content = std::fs::read_to_string(&yolo_path) + .with_context(|| format!("Failed to read {:?}", yolo_path))?; + let yolo: YoloJson = serde_json::from_str(&yolo_content) + .with_context(|| format!("Failed to parse {:?}", yolo_path))?; + + let yolo_frames: HashMap> = yolo.frames + .iter() + .filter_map(|f| { + let objs = if !f.objects.is_empty() { &f.objects } else { &f.detections }; + if !objs.is_empty() { + Some((f.frame as u64, objs)) + } else { + None + } + }) + .collect(); + + let edges_table = t("tkg_edges"); + let nodes_table = t("tkg_nodes"); + let mut count = 0; + + for hf in &hand.frames { + let yolo_objs = yolo_frames.get(&(hf.frame as u64)); + if yolo_objs.is_none() { + continue; + } + + for person in &hf.persons { + if person.hand_state != "holding" || person.landmarks.is_empty() { + continue; + } + + // Calculate hand bbox from landmarks + let xs: Vec = person.landmarks.iter().map(|lm| lm.x).collect(); + let ys: Vec = person.landmarks.iter().map(|lm| lm.y).collect(); + + let hand_x = xs.iter().cloned().fold(f32::MAX, f32::min) as f64; + let hand_y = ys.iter().cloned().fold(f32::MAX, f32::min) as f64; + let hand_x_max = xs.iter().cloned().fold(f32::MIN, f32::max) as f64; + let hand_y_max = ys.iter().cloned().fold(f32::MIN, f32::max) as f64; + let hand_w = hand_x_max - hand_x; + let hand_h = hand_y_max - hand_y; + + // Match with YOLO objects by bbox overlap + for obj in yolo_objs.unwrap().iter() { + let obj_x = obj.x as f64; + let obj_y = obj.y as f64; + let obj_w = obj.width as f64; + let obj_h = obj.height as f64; + + // Calculate overlap (IoU-like) + let x1 = hand_x.max(obj_x); + let y1 = hand_y.max(obj_y); + let x2 = (hand_x + hand_w).min(obj_x + obj_w); + let y2 = (hand_y + hand_h).min(obj_y + obj_h); + + if x1 < x2 && y1 < y2 { + let overlap_area = (x2 - x1) * (y2 - y1); + let hand_area = hand_w * hand_h; + let obj_area = obj_w * obj_h; + + let overlap_ratio = overlap_area / hand_area.min(obj_area); + + if overlap_ratio > 0.1 { + // Create HAND_OBJECT edge + let hand_node_id = format!("{}/{}", person.hand_type, person.gesture); + + // Query hand node id + let hand_node_row: Option<(i64,)> = sqlx::query_as(&format!( + "SELECT id FROM {} WHERE node_type = $1 AND external_id = $2 AND file_uuid = $3", + nodes_table + )) + .bind("hand") + .bind(&hand_node_id) + .bind(file_uuid) + .fetch_optional(pool) + .await?; + + // Query object node id + let object_node_row: Option<(i64,)> = sqlx::query_as(&format!( + "SELECT id FROM {} WHERE node_type = $1 AND external_id = $2 AND file_uuid = $3", + nodes_table + )) + .bind("object") + .bind(&obj.class_name) + .bind(file_uuid) + .fetch_optional(pool) + .await?; + + if let (Some((hand_id,)), Some((obj_id,))) = (hand_node_row, object_node_row) { + sqlx::query(&format!( + r#" + INSERT INTO {} (edge_type, source_node_id, target_node_id, file_uuid, properties) + VALUES ($1, $2, $3, $4, $5::jsonb) + ON CONFLICT (file_uuid, edge_type, source_node_id, target_node_id) + DO NOTHING + "#, + edges_table + )) + .bind("hand_object") + .bind(hand_id) + .bind(obj_id) + .bind(file_uuid) + .bind(serde_json::json!({ + "frame": hf.frame, + "overlap_ratio": overlap_ratio, + "hand_state": "holding", + }).to_string()) + .execute(pool) + .await?; + + count += 1; + } + } + } + } + } + } + + tracing::info!("[TKG] HAND_OBJECT edges: {} matches", count); + Ok(count) +} + async fn match_trace_by_bbox( pool: &PgPool, file_uuid: &str, diff --git a/src/playground.rs b/src/playground.rs index db3f513..b51bf0b 100644 --- a/src/playground.rs +++ b/src/playground.rs @@ -111,6 +111,7 @@ impl SystemResources { recommended.push("ocr"); recommended.push("face"); recommended.push("pose"); + recommended.push("hand"); } recommended @@ -519,6 +520,42 @@ async fn process_face_module( Ok(()) } +async fn process_hand_module( + hand_path: &Path, + video_path: &str, + uuid: &str, + progress_state: &Arc>, + ui: &Arc>>, +) -> anyhow::Result<()> { + { + let mut state = progress_state.lock().unwrap(); + state.get_processor(ProcessorType::Hand).start(1); + } + let hand_result = momentry_core::core::processor::process_hand( + video_path, + hand_path.to_str().unwrap(), + Some(uuid), + None, + ) + .await?; + let hand_json = serde_json::to_string_pretty(&hand_result)?; + std::fs::write(hand_path, &hand_json)?; + let output_dir = OutputDir::new(); + let _ = output_dir.backup_file(uuid, "hand.json"); + println!(" ✓ Hand saved: {} frames", hand_result.frames.len()); + { + let mut state = progress_state.lock().unwrap(); + state + .get_processor(ProcessorType::Hand) + .complete(&format!("{} frames", hand_result.frames.len())); + state.stop(); + } + if let Some(ref mut ui) = *ui.lock().unwrap() { + let _ = ui.render(); + } + Ok(()) +} + async fn process_pose_module( pose_path: &Path, video_path: &str, @@ -688,7 +725,7 @@ enum Commands { Process { /// UUID or path target: String, - /// Modules to process (comma separated: appearance,asr,cut,asrx,yolo,ocr,face,pose,story,caption) + /// Modules to process (comma separated: appearance,asr,cut,asrx,yolo,ocr,face,pose,hand,story,caption) /// If not specified, processes all modules #[arg(short, long, value_delimiter = ',')] modules: Option>, @@ -1062,15 +1099,16 @@ async fn main() -> Result<()> { .filter_map(|name| { let name_lower = name.to_lowercase(); match name_lower.as_str() { - "appearance" => Some(ProcessorType::Appearance), +"appearance" => Some(ProcessorType::Appearance), "asr" => Some(ProcessorType::Asr), "cut" => Some(ProcessorType::Cut), "asrx" => Some(ProcessorType::Asrx), "yolo" => Some(ProcessorType::Yolo), -"ocr" => Some(ProcessorType::Ocr), -"face" => Some(ProcessorType::Face), -"pose" => Some(ProcessorType::Pose), -_ => { + "ocr" => Some(ProcessorType::Ocr), + "face" => Some(ProcessorType::Face), + "pose" => Some(ProcessorType::Pose), + "hand" => Some(ProcessorType::Hand), + _ => { eprintln!("Unknown module: {}", name); None } @@ -1087,15 +1125,16 @@ None .filter_map(|name| { let name_lower = name.to_lowercase(); match name_lower.as_str() { - "appearance" => Some(ProcessorType::Appearance), +"appearance" => Some(ProcessorType::Appearance), "asr" => Some(ProcessorType::Asr), "cut" => Some(ProcessorType::Cut), "asrx" => Some(ProcessorType::Asrx), "yolo" => Some(ProcessorType::Yolo), "ocr" => Some(ProcessorType::Ocr), -"face" => Some(ProcessorType::Face), -"pose" => Some(ProcessorType::Pose), -_ => { + "face" => Some(ProcessorType::Face), + "pose" => Some(ProcessorType::Pose), + "hand" => Some(ProcessorType::Hand), + _ => { eprintln!("Unknown cloud module: {}", name); None } @@ -1667,6 +1706,65 @@ None } } + // Process Hand (gesture detection) + if should_process(ProcessorType::Hand) { + let hand_path = output_dir.get_output_path(&uuid, "hand.json"); + let decision = decide_processing(&hand_path, force, resume); + + match decision { + ProcessingDecision::SkipComplete => { + println!("\nHand: ✓ Already complete, skipping"); + } + ProcessingDecision::ForceReprocess => { + println!("\nHand: ⟳ Force reprocessing from scratch..."); + std::fs::remove_file(&hand_path).ok(); + if is_cloud(ProcessorType::Hand) { + println!(" [Cloud processing not implemented yet - run locally]"); + } else { + process_hand_module( + &hand_path, + video_path, + &uuid, + &progress_state, + &ui, + ) + .await?; + } + } + ProcessingDecision::ResumePartial => { + println!("\nHand: ↻ Resuming from checkpoint..."); + if is_cloud(ProcessorType::Hand) { + println!(" [Cloud processing not implemented yet - run locally]"); + } else { + process_hand_module( + &hand_path, + video_path, + &uuid, + &progress_state, + &ui, + ) + .await?; + } + } + ProcessingDecision::Process => { + if is_cloud(ProcessorType::Hand) { + println!("\nHand: ☁️ Running via cloud..."); + println!(" [Cloud processing not implemented yet - run locally]"); + } else { + println!("\nHand: ⚙️ Processing..."); + process_hand_module( + &hand_path, + video_path, + &uuid, + &progress_state, + &ui, + ) + .await?; + } + } + } + } + // Process Appearance (color/histogram analysis, depends on Pose) if should_process(ProcessorType::Appearance) { let appearance_path = output_dir.get_output_path(&uuid, "appearance.json"); @@ -1774,6 +1872,9 @@ Ok(()) Commands::Chunk { uuid } => { println!("Chunking: {}", uuid); + let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR") + .unwrap_or_else(|_| "/Users/accusys/momentry/output_dev".to_string()); + let db = PostgresDb::init().await?; let video = db .get_video_by_uuid(&uuid) @@ -1786,7 +1887,7 @@ Ok(()) // ========== Read all JSON files ========== // Read ASR JSON - let asr_path = format!("{}.asr.json", uuid); + let asr_path = format!("{}/{}.asr.json", output_dir, uuid); let asr_json = std::fs::read_to_string(&asr_path) .context("ASR file not found. Run 'process' first.")?; let asr_result: momentry_core::core::processor::asr::AsrResult = @@ -1794,7 +1895,7 @@ Ok(()) println!("Loaded ASR: {} segments", asr_result.segments.len()); // Read CUT JSON - let cut_path = format!("{}.cut.json", uuid); + let cut_path = format!("{}/{}.cut.json", output_dir, uuid); let cut_json = std::fs::read_to_string(&cut_path) .context("CUT file not found. Run 'process' first.")?; let cut_result: momentry_core::core::processor::cut::CutResult = @@ -1802,7 +1903,7 @@ Ok(()) println!("Loaded CUT: {} scenes", cut_result.scenes.len()); // Read YOLO JSON (optional) - let yolo_path = format!("{}.yolo.json", uuid); + let yolo_path = format!("{}/{}.yolo.json", output_dir, uuid); let yolo_result = match std::fs::read_to_string(&yolo_path) { Ok(yolo_json) => match serde_json::from_str::< momentry_core::core::processor::yolo::YoloResult, @@ -1832,7 +1933,7 @@ Ok(()) }; // Read OCR JSON (optional) - let ocr_path = format!("{}.ocr.json", uuid); + let ocr_path = format!("{}/{}.ocr.json", output_dir, uuid); let ocr_result = match std::fs::read_to_string(&ocr_path) { Ok(ocr_json) => match serde_json::from_str::< momentry_core::core::processor::ocr::OcrResult, @@ -1862,7 +1963,7 @@ Ok(()) }; // Read Face JSON (optional) - let face_path = format!("{}.face.json", uuid); + let face_path = format!("{}/{}.face.json", output_dir, uuid); let face_result = match std::fs::read_to_string(&face_path) { Ok(face_json) => match serde_json::from_str::< momentry_core::core::processor::face::FaceResult, @@ -1892,7 +1993,7 @@ Ok(()) }; // Read Pose JSON (optional) - let pose_path = format!("{}.pose.json", uuid); + let pose_path = format!("{}/{}.pose.json", output_dir, uuid); let pose_result = match std::fs::read_to_string(&pose_path) { Ok(pose_json) => match serde_json::from_str::< momentry_core::core::processor::pose::PoseResult, @@ -1953,6 +2054,15 @@ Ok(()) // ========== Store pre_chunks (from ASR, CUT) ========== + // Clean old data first (avoid duplicate key) + println!("\nCleaning old data..."); + let deleted_pre_chunks = db.delete_pre_chunks_by_uuid(&uuid).await?; + let deleted_frames = db.delete_frames_by_uuid(&uuid).await?; + let deleted_tkg_nodes = db.delete_tkg_nodes_by_uuid(&uuid).await?; + let deleted_tkg_edges = db.delete_tkg_edges_by_uuid(&uuid).await?; + println!(" Deleted: {} pre_chunks, {} frames, {} tkg_nodes, {} tkg_edges", + deleted_pre_chunks, deleted_frames, deleted_tkg_nodes, deleted_tkg_edges); + println!("\nStoring pre_chunks..."); // Store ASR sentence pre_chunks @@ -2255,6 +2365,13 @@ Ok(()) // Update storage status db.update_storage_status(&uuid, "psql_chunk", true).await?; + // Build TKG + println!("\nBuilding TKG..."); + let tkg_result = momentry_core::core::processor::tkg::build_tkg(&db, &uuid, &output_dir).await?; + println!("✓ TKG built: {} nodes, {} edges", + tkg_result.face_track_nodes + tkg_result.hand_nodes + tkg_result.object_nodes, + tkg_result.co_occurrence_edges + tkg_result.hand_object_edges); + println!("\n✓ Chunk stage completed!"); println!( " - pre_chunks: {} (asr + cut + time)", diff --git a/src/ui/progress/mod.rs b/src/ui/progress/mod.rs index e9df86b..7e223fe 100644 --- a/src/ui/progress/mod.rs +++ b/src/ui/progress/mod.rs @@ -20,6 +20,7 @@ pub enum ProcessorType { Ocr, Face, Pose, + Hand, Story, Caption, } @@ -35,6 +36,7 @@ impl ProcessorType { ProcessorType::Ocr => "OCR", ProcessorType::Face => "Face", ProcessorType::Pose => "Pose", + ProcessorType::Hand => "Hand", ProcessorType::Story => "Story", ProcessorType::Caption => "Caption", } @@ -142,6 +144,7 @@ impl ProgressState { ProcessorProgress::new(ProcessorType::Ocr), ProcessorProgress::new(ProcessorType::Face), ProcessorProgress::new(ProcessorType::Pose), + ProcessorProgress::new(ProcessorType::Hand), ProcessorProgress::new(ProcessorType::Story), ProcessorProgress::new(ProcessorType::Caption), ], diff --git a/src/worker/processor.rs b/src/worker/processor.rs index 23ebc80..8b3f4f2 100644 --- a/src/worker/processor.rs +++ b/src/worker/processor.rs @@ -790,6 +790,46 @@ impl ProcessorPool { pid: 0, }) } + ProcessorType::Hand => { + let result = processor::process_hand( + video_path, + output_path.to_str().unwrap(), + uuid, + Some(&sample_frames), + ) + .await?; + let chunks_produced = result.frames.len() as i32; + tracing::info!( + "HAND completed, storing {} frames for {}", + chunks_produced, + job.uuid + ); + if let Some(ref ws) = workspace { + for frame in &result.frames { + let data = serde_json::json!({"persons": frame.persons}); + let _ = ws + .store_pre_chunk( + "hand", + "raw", + Some(frame.frame as i64), + None, + Some(frame.timestamp), + None, + Some(&data.to_string()), + None, + ) + .await; + } + } + Ok(ProcessorOutput { + data: serde_json::to_value(result)?, + chunks_produced, + frames_processed: total_frames, + total_frames, + retry_count: 0, + pid: 0, + }) + } ProcessorType::Appearance => { let pose_path = std::path::Path::new(&output_dir).join(format!("{}.pose.json", job.uuid));