diff --git a/scripts/clothing_color_search.py b/scripts/clothing_color_search.py new file mode 100644 index 0000000..511a7d5 --- /dev/null +++ b/scripts/clothing_color_search.py @@ -0,0 +1,216 @@ +#!/opt/homebrew/bin/python3.11 +""" +Clothing Color Search - Find people wearing specific colors + +Usage: + python3 clothing_color_search.py --file-uuid UUID --color red --output output.json + +Color matching uses HSV hue ranges: + red: 0-15, 165-180 + orange: 15-35 + yellow: 35-50 + green: 50-85 + cyan: 85-105 + blue: 105-140 + purple: 140-165 +""" + +import sys +import os +import json +import argparse +import cv2 +import numpy as np + +COLOR_RANGES = { + "red": [(0, 40), (165, 180)], + "orange": [(15, 35)], + "yellow": [(35, 50)], + "green": [(50, 85)], + "cyan": [(85, 105)], + "blue": [(105, 140)], + "purple": [(140, 165)], + "white": [(0, 180, 0, 40, 200, 255)], # (h_min, h_max, s_min, s_max, v_min, v_max) + "black": [(0, 180, 0, 255, 0, 50)], +} + +def hsv_to_color_name(h, s, v): + """Convert HSV to color name""" + if v < 50: + return "black" + if s < 40 and v > 200: + return "white" + if 0 <= h <= 15 or 165 <= h <= 180: + return "red" + if 15 < h <= 35: + return "orange" + if 35 < h <= 50: + return "yellow" + if 50 < h <= 85: + return "green" + if 85 < h <= 105: + return "cyan" + if 105 < h <= 140: + return "blue" + if 140 < h <= 165: + return "purple" + return "unknown" + +def check_color_match(dominant_colors, target_color): + """Check if dominant colors match target color""" + if not dominant_colors: + return False, 0.0 + + target_lower = target_color.lower() + match_count = 0 + total = len(dominant_colors) + + for color_hsv in dominant_colors: + h, s, v = color_hsv[0], color_hsv[1], color_hsv[2] + color_name = hsv_to_color_name(h, s, v) + if color_name == target_lower: + match_count += 1 + + ratio = match_count / total if total > 0 else 0.0 + return ratio > 0.3, ratio # Match if >30% of dominant colors match + +def search_by_color(appearance_path, video_path, target_color, output_path, max_frames=500): + """Search for people wearing target color""" + if not os.path.exists(appearance_path): + print(json.dumps({"error": f"appearance.json not found: {appearance_path}"})) + return + + with open(appearance_path) as f: + appearance = json.load(f) + + frames = appearance.get("frames", []) + fps = appearance.get("fps", 30) + + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + print(json.dumps({"error": f"Cannot open video: {video_path}"})) + return + + results = [] + frame_count = 0 + + for frame_data in frames[:max_frames]: + frame_num = frame_data.get("frame", 0) + persons = frame_data.get("persons", []) + timestamp = frame_data.get("timestamp", 0) + + if not persons: + frame_count += 1 + continue + + cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num) + ret, frame = cap.read() + if not ret: + frame_count += 1 + continue + + frame_h, frame_w = frame.shape[:2] + + for person in persons: + bbox = person.get("bbox", {}) + if not bbox: + continue + + x, y = bbox.get("x", 0), bbox.get("y", 0) + w, h = bbox.get("width", 0), bbox.get("height", 0) + + # Extract upper body region (clothing area) + upper_h = int(h * 0.6) # Upper 60% of person + roi_x = max(0, int(x)) + roi_y = max(0, int(y)) + roi_w = min(w, frame_w - roi_x) + roi_h = min(upper_h, frame_h - roi_y) + + if roi_w < 10 or roi_h < 10: + continue + + roi = frame[roi_y:roi_y+roi_h, roi_x:roi_x+roi_w] + + # Get dominant colors + hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) + pixels = hsv.reshape(-1, 3).astype(np.float32) + + if len(pixels) < 10: + continue + + criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 10, 1.0) + _, labels, centers = cv2.kmeans(pixels, 5, None, criteria, 10, cv2.KMEANS_RANDOM_CENTERS) + counts = np.bincount(labels.flatten()) + dominant = centers[np.argsort(-counts)[:5]].tolist() + + match, confidence = check_color_match(dominant, target_color) + + if match: + results.append({ + "frame": frame_num, + "timestamp": round(timestamp, 2), + "bbox": bbox, + "confidence": round(confidence, 3), + "dominant_colors": [[round(c, 1) for c in dc] for dc in dominant[:3]] + }) + + frame_count += 1 + + cap.release() + + # Summary + color_names = set() + for r in results: + for dc in r.get("dominant_colors", []): + if len(dc) >= 3: + color_names.add(hsv_to_color_name(dc[0], dc[1], dc[2])) + + output = { + "file_uuid": os.path.basename(appearance_path).split(".")[0], + "target_color": target_color, + "total_matches": len(results), + "matched_frames": list(set(r["frame"] for r in results)), + "results": results[:50], # Limit to 50 results + "color_names_found": list(color_names) + } + + os.makedirs(os.path.dirname(output_path) if os.path.dirname(output_path) else ".", exist_ok=True) + with open(output_path, "w") as f: + json.dump(output, f, indent=2) + + print(json.dumps({"success": True, **output})) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Search for people by clothing color") + parser.add_argument("--file-uuid", required=True) + parser.add_argument("--color", required=True, choices=list(COLOR_RANGES.keys())) + parser.add_argument("--video-path", default="") + parser.add_argument("--appearance-path", default="") + parser.add_argument("--output", default="") + args = parser.parse_args() + + output_dir = os.environ.get("MOMENTRY_OUTPUT_DIR", "/Users/accusys/momentry/output") + appearance_path = args.appearance_path or f"{output_dir}/{args.file_uuid}.appearance.json" + video_path = args.video_path + output_path = args.output or f"{output_dir}/{args.file_uuid}.color_search_{args.color}.json" + + if not video_path: + # Try to find video in common locations + for ext in ["mp4", "mov", "avi"]: + candidate = f"/Users/accusys/momentry/var/sftpgo/data/demo/{args.file_uuid}.{ext}" + if os.path.exists(candidate): + video_path = candidate + break + + if not video_path: + # Search in output directory for video + import glob + matches = glob.glob(f"/Users/accusys/momentry/var/sftpgo/**/*{args.file_uuid}*", recursive=True) + if matches: + video_path = matches[0] + + if not video_path: + print(json.dumps({"error": "video_path not found, please provide --video-path"})) + sys.exit(1) + + search_by_color(appearance_path, video_path, args.color, output_path) diff --git a/src/api/agent_search.rs b/src/api/agent_search.rs index 7634dee..fdd63f2 100644 --- a/src/api/agent_search.rs +++ b/src/api/agent_search.rs @@ -276,6 +276,15 @@ fn make_tools(pool: &sqlx::PgPool) -> Vec { }), vec!["file_uuid", "node_id"], ), + function_calling::make_tool( + "search_by_appearance", + "根據衣服顏色搜尋影片中的人物。支援顏色:red, orange, yellow, green, cyan, blue, purple, white, black。", + serde_json::json!({ + "file_uuid": {"type": "string", "description": "影片 UUID"}, + "color": {"type": "string", "description": "目標顏色: red, orange, yellow, green, cyan, blue, purple, white, black"} + }), + vec!["file_uuid", "color"], + ), ] } @@ -310,6 +319,7 @@ async fn execute_tool(pool: &sqlx::PgPool, tool_call: &ToolCall) -> (String, Str "get_file_info" => tools::exec_get_file_info(pool, &args).await, "get_representative_frame" => tools::exec_get_representative_frame(pool, &args).await, "analyze_frame" => tools::exec_analyze_frame(pool, &args).await, + "search_by_appearance" => tools::exec_search_by_appearance(pool, &args).await, _ => Err(format!("Unknown tool: {}", name)), }; let content = match result { diff --git a/src/core/agent/tools.rs b/src/core/agent/tools.rs index 3ab520a..d9cfffd 100644 --- a/src/core/agent/tools.rs +++ b/src/core/agent/tools.rs @@ -1092,3 +1092,68 @@ pub async fn exec_tkg_node_detail( None => Err("Node not found".to_string()), } } + +/// Search for people by clothing color using appearance data +pub async fn exec_search_by_appearance(pool: &sqlx::PgPool, args: &serde_json::Value) -> Result { + let file_uuid = args.get("file_uuid") + .and_then(|v| v.as_str()) + .ok_or("file_uuid is required".to_string())?; + let color = args.get("color") + .and_then(|v| v.as_str()) + .ok_or("color is required (red, blue, green, yellow, orange, cyan, purple, white, black)".to_string())?; + + let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR") + .unwrap_or_else(|_| "/Users/accusys/momentry/output".to_string()); + let scripts_dir = std::env::var("MOMENTRY_SCRIPTS_DIR") + .unwrap_or_else(|_| "/Users/accusys/momentry_core/scripts".to_string()); + let script_path = format!("{}/clothing_color_search.py", scripts_dir); + let appearance_path = format!("{}/{}.appearance.json", output_dir, file_uuid); + let output_path = format!("{}/{}.color_search_{}.json", output_dir, file_uuid, color); + + if !std::path::Path::new(&appearance_path).exists() { + return Err(format!("appearance.json not found for file {}", file_uuid)); + } + + // Get video path from videos table + let videos_table = schema::table_name("videos"); + let video_path: Option = sqlx::query_scalar(&format!( + "SELECT file_path FROM {} WHERE file_uuid = $1", videos_table + )) + .bind(file_uuid) + .fetch_optional(pool) + .await + .map_err(|e| e.to_string())?; + + let video_path = video_path.unwrap_or_default(); + if video_path.is_empty() { + return Err("Video path not found".to_string()); + } + + let executor = crate::core::processor::PythonExecutor::new() + .map_err(|e| e.to_string())?; + + executor.run( + &script_path, + &[ + "--file-uuid", file_uuid, + "--color", color, + "--video-path", &video_path, + "--appearance-path", &appearance_path, + "--output", &output_path, + ], + None, + "CLOTHING_COLOR_SEARCH", + Some(std::time::Duration::from_secs(300)), + ) + .await + .map_err(|e| e.to_string())?; + + // Read results + if std::path::Path::new(&output_path).exists() { + let content = std::fs::read_to_string(&output_path) + .map_err(|e| e.to_string())?; + Ok(content) + } else { + Err("Color search output not found".to_string()) + } +}