#!/usr/bin/env python3 """ 視覺分片處理器 (Phase 2.2) 從 YOLO 結果生成視覺分片,支持多種分片策略: 1. 固定幀數分片 2. 基於物件相似度分片 3. 基於場景變化分片 """ import json import sys import os import argparse from pathlib import Path from typing import Dict, List, Any, Optional import numpy as np from datetime import datetime # 添加父目錄到路徑以導入其他模組 sys.path.insert(0, str(Path(__file__).parent.parent)) from scripts.yolo_processor_contract_v1 import YOLOProcessor class VisualChunkProcessor: """視覺分片處理器""" def __init__(self, video_path: str, yolo_result_path: Optional[str] = None): self.video_path = video_path self.yolo_result_path = yolo_result_path self.yolo_result = None def load_yolo_result(self): """加載 YOLO 結果""" if self.yolo_result_path and os.path.exists(self.yolo_result_path): with open(self.yolo_result_path, "r", encoding="utf-8") as f: self.yolo_result = json.load(f) else: # 如果沒有提供 YOLO 結果路徑,則運行 YOLO 檢測 print(f"[VisualChunk] Running YOLO detection for: {self.video_path}") yolo_processor = YOLOProcessor(self.video_path) yolo_result = yolo_processor.process() self.yolo_result = yolo_processor.to_json_dict() def create_fixed_frame_chunks( self, frames_per_chunk: int = 30 ) -> List[Dict[str, Any]]: """創建固定幀數分片 Args: frames_per_chunk: 每個分片的幀數 Returns: 視覺分片列表 """ if not self.yolo_result: self.load_yolo_result() frames = self.yolo_result.get("frames", {}) if not frames: return [] # 將幀字典轉換為排序後的列表 frame_list = [] for frame_key, frame_data in frames.items(): frame_list.append( { "frame_number": int(frame_key), "timestamp": frame_data.get("time_seconds", 0), "objects": frame_data.get("detections", []), } ) # 按幀號排序 frame_list.sort(key=lambda x: x["frame_number"]) chunks = [] total_frames = len(frame_list) for start_idx in range(0, total_frames, frames_per_chunk): end_idx = min(start_idx + frames_per_chunk, total_frames) chunk_frames = frame_list[start_idx:end_idx] if not chunk_frames: continue # 計算分片統計 chunk_stats = self._calculate_chunk_stats(chunk_frames) chunk = { "start_frame": chunk_frames[0]["frame_number"], "end_frame": chunk_frames[-1]["frame_number"] + 1, # exclusive "frame_count": len(chunk_frames), "keyframe_objects": self._extract_keyframe_objects(chunk_frames), "dominant_objects": chunk_stats["dominant_objects"], "metadata": { "object_count": chunk_stats["total_objects"], "unique_classes": chunk_stats["unique_classes"], "max_confidence": chunk_stats["max_confidence"], "avg_confidence": chunk_stats["avg_confidence"], "spatial_density": chunk_stats["spatial_density"], }, } chunks.append(chunk) return chunks def create_similarity_based_chunks( self, similarity_threshold: float = 0.5, min_frames_per_chunk: int = 10 ) -> List[Dict[str, Any]]: """基於物件相似度創建分片 Args: similarity_threshold: 相似度閾值 (0-1) min_frames_per_chunk: 最小幀數 Returns: 視覺分片列表 """ if not self.yolo_result: self.load_yolo_result() frames = self.yolo_result.get("frames", {}) if not frames: return [] # 將幀字典轉換為排序後的列表 frame_list = [] for frame_key, frame_data in frames.items(): frame_list.append( { "frame_number": int(frame_key), "timestamp": frame_data.get("time_seconds", 0), "objects": frame_data.get("detections", []), } ) # 按幀號排序 frame_list.sort(key=lambda x: x["frame_number"]) chunks = [] current_chunk_frames = [] current_start_frame = 0 for i, frame in enumerate(frame_list): if not current_chunk_frames: current_chunk_frames.append(frame) current_start_frame = frame["frame_number"] continue # 計算相似度 last_frame = current_chunk_frames[-1] similarity = self._calculate_frame_similarity(last_frame, frame) if similarity >= similarity_threshold: # 相似度高,加入當前分片 current_chunk_frames.append(frame) else: # 相似度低,創建新分片 if len(current_chunk_frames) >= min_frames_per_chunk: chunk = self._create_chunk_from_frames( current_chunk_frames, current_start_frame, frame_list[i - 1]["frame_number"] + 1, ) chunks.append(chunk) # 開始新的分片 current_chunk_frames = [frame] current_start_frame = frame["frame_number"] # 處理最後一個分片 if len(current_chunk_frames) >= min_frames_per_chunk: chunk = self._create_chunk_from_frames( current_chunk_frames, current_start_frame, current_chunk_frames[-1]["frame_number"] + 1, ) chunks.append(chunk) return chunks def _calculate_frame_similarity(self, frame1: Dict, frame2: Dict) -> float: """計算兩個幀之間的相似度(基於物件類別)""" objects1 = frame1.get("objects", []) objects2 = frame2.get("objects", []) if not objects1 and not objects2: return 1.0 if not objects1 or not objects2: return 0.0 # 提取物件類別 classes1 = set( obj.get("class_name", "") for obj in objects1 if obj.get("class_name") ) classes2 = set( obj.get("class_name", "") for obj in objects2 if obj.get("class_name") ) # 計算 Jaccard 相似度 intersection = classes1.intersection(classes2) union = classes1.union(classes2) if not union: return 0.0 return len(intersection) / len(union) def _calculate_chunk_stats(self, frames: List[Dict]) -> Dict[str, Any]: """計算分片統計信息""" all_objects = [] for frame in frames: all_objects.extend(frame.get("objects", [])) # 總物件數 total_objects = len(all_objects) # 唯一類別 unique_classes = list( set( obj.get("class_name", "") for obj in all_objects if obj.get("class_name") ) ) # 信心值統計 confidences = [obj.get("confidence", 0) for obj in all_objects] max_confidence = max(confidences) if confidences else 0 avg_confidence = np.mean(confidences) if confidences else 0 # 空間密度(每幀平均物件數) spatial_density = total_objects / len(frames) if frames else 0 # 主要物件(出現在大多數幀中的物件) object_counts = {} for frame in frames: frame_classes = set( obj.get("class_name", "") for obj in frame.get("objects", []) if obj.get("class_name") ) for class_name in frame_classes: object_counts[class_name] = object_counts.get(class_name, 0) + 1 dominant_objects = [ class_name for class_name, count in object_counts.items() if count / len(frames) > 0.5 ] dominant_objects.sort() return { "total_objects": total_objects, "unique_classes": unique_classes, "max_confidence": float(max_confidence), "avg_confidence": float(avg_confidence), "spatial_density": float(spatial_density), "dominant_objects": dominant_objects, } def _extract_keyframe_objects(self, frames: List[Dict]) -> List[Dict[str, Any]]: """提取關鍵幀物件""" keyframe_objects = [] # 簡化:每5幀取一個關鍵幀 for i in range(0, len(frames), 5): if i < len(frames): frame = frames[i] objects = [] for obj in frame.get("objects", []): objects.append( { "class_name": obj.get("class_name", ""), "class_id": obj.get("class_id", 0), "confidence": float(obj.get("confidence", 0)), "bbox": { "x": obj.get("x1", 0), "y": obj.get("y1", 0), "width": obj.get("width", 0), "height": obj.get("height", 0), } if "x1" in obj else None, "occurrence": 1, } ) keyframe_objects.append( { "timestamp": float(frame.get("timestamp", 0)), "frame_number": frame.get("frame_number", 0), "objects": objects, } ) return keyframe_objects def _create_chunk_from_frames( self, frames: List[Dict], start_frame: int, end_frame: int ) -> Dict[str, Any]: """從幀列表創建分片""" chunk_stats = self._calculate_chunk_stats(frames) return { "start_frame": start_frame, "end_frame": end_frame, # exclusive "frame_count": len(frames), "keyframe_objects": self._extract_keyframe_objects(frames), "dominant_objects": chunk_stats["dominant_objects"], "object_relationships": [], # 可選:後期添加關係檢測 "scene_description": None, # 可選:後期添加 LLM 生成的場景描述 "metadata": { "object_count": chunk_stats["total_objects"], "unique_classes": chunk_stats["unique_classes"], "max_confidence": chunk_stats["max_confidence"], "avg_confidence": chunk_stats["avg_confidence"], "spatial_density": chunk_stats["spatial_density"], }, } def process(self, strategy: str = "fixed", **kwargs) -> Dict[str, Any]: """處理視覺分片生成 Args: strategy: 分片策略 ("fixed" 或 "similarity") **kwargs: 策略參數 Returns: 處理結果 """ if not self.yolo_result: self.load_yolo_result() start_time = datetime.now() if strategy == "fixed": frames_per_chunk = kwargs.get("frames_per_chunk", 30) chunks = self.create_fixed_frame_chunks(frames_per_chunk) elif strategy == "similarity": similarity_threshold = kwargs.get("similarity_threshold", 0.5) min_frames = kwargs.get("min_frames_per_chunk", 10) chunks = self.create_similarity_based_chunks( similarity_threshold, min_frames ) else: raise ValueError(f"Unknown strategy: {strategy}") # 計算總統計 total_frames = sum(chunk["frame_count"] for chunk in chunks) total_objects = sum(chunk["metadata"]["object_count"] for chunk in chunks) # 收集所有唯一類別 all_unique_classes = set() for chunk in chunks: all_unique_classes.update(chunk["metadata"]["unique_classes"]) processing_time = (datetime.now() - start_time).total_seconds() result = { "metadata": { "video_path": self.video_path, "processing_time": processing_time, "strategy": strategy, "parameters": kwargs, "processed_at": datetime.now().isoformat(), }, "chunk_count": len(chunks), "total_frames": total_frames, "total_objects": total_objects, "unique_classes": len(all_unique_classes), "chunks": chunks, } return result def main(): parser = argparse.ArgumentParser(description="視覺分片處理器") parser.add_argument("video_path", help="視頻文件路徑") parser.add_argument("output_path", help="輸出文件路徑") parser.add_argument("--yolo-result", help="YOLO 結果文件路徑(可選)") parser.add_argument("--uuid", help="檔案 UUID(由 executor 傳入)") parser.add_argument( "--strategy", choices=["fixed", "similarity"], default="fixed", help="分片策略" ) parser.add_argument( "--frames-per-chunk", type=int, default=30, help="固定幀數策略:每個分片的幀數" ) parser.add_argument( "--similarity-threshold", type=float, default=0.5, help="相似度策略:相似度閾值" ) parser.add_argument( "--min-frames-per-chunk", type=int, default=10, help="相似度策略:最小幀數" ) args = parser.parse_args() print(f"[VisualChunk] Starting processing: {args.video_path}") print(f"[VisualChunk] Strategy: {args.strategy}") try: processor = VisualChunkProcessor(args.video_path, args.yolo_result) if args.strategy == "fixed": result = processor.process( strategy="fixed", frames_per_chunk=args.frames_per_chunk ) else: result = processor.process( strategy="similarity", similarity_threshold=args.similarity_threshold, min_frames_per_chunk=args.min_frames_per_chunk, ) # 保存結果 with open(args.output_path, "w", encoding="utf-8") as f: json.dump(result, f, ensure_ascii=False, indent=2) print("[VisualChunk] Processing completed") print(f"[VisualChunk] Generated {result['chunk_count']} visual chunks") print(f"[VisualChunk] Total frames: {result['total_frames']}") print(f"[VisualChunk] Total objects: {result['total_objects']}") print(f"[VisualChunk] Unique classes: {result['unique_classes']}") print(f"[VisualChunk] Result saved to: {args.output_path}") except Exception as e: print(f"[VisualChunk] Error: {e}", file=sys.stderr) result = { "chunk_count": 0, "total_frames": 0, "total_objects": 0, "unique_classes": 0, "chunks": [], } with open(args.output_path, "w", encoding="utf-8") as f: json.dump(result, f, ensure_ascii=False, indent=2) print(f"[VisualChunk] Fallback: empty result saved to {args.output_path}") if __name__ == "__main__": main()