446 lines
16 KiB
Python
446 lines
16 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
視覺分片處理器 (Phase 2.2)
|
||
|
||
從 YOLO 結果生成視覺分片,支持多種分片策略:
|
||
1. 固定幀數分片
|
||
2. 基於物件相似度分片
|
||
3. 基於場景變化分片
|
||
"""
|
||
|
||
import json
|
||
import sys
|
||
import os
|
||
import argparse
|
||
from pathlib import Path
|
||
from typing import Dict, List, Any, Optional
|
||
import numpy as np
|
||
from datetime import datetime
|
||
|
||
# 添加父目錄到路徑以導入其他模組
|
||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||
from scripts.yolo_processor_contract_v1 import YOLOProcessor
|
||
|
||
|
||
class VisualChunkProcessor:
|
||
"""視覺分片處理器"""
|
||
|
||
def __init__(self, video_path: str, yolo_result_path: Optional[str] = None):
|
||
self.video_path = video_path
|
||
self.yolo_result_path = yolo_result_path
|
||
self.yolo_result = None
|
||
|
||
def load_yolo_result(self):
|
||
"""加載 YOLO 結果"""
|
||
if self.yolo_result_path and os.path.exists(self.yolo_result_path):
|
||
with open(self.yolo_result_path, "r", encoding="utf-8") as f:
|
||
self.yolo_result = json.load(f)
|
||
else:
|
||
# 如果沒有提供 YOLO 結果路徑,則運行 YOLO 檢測
|
||
print(f"[VisualChunk] Running YOLO detection for: {self.video_path}")
|
||
yolo_processor = YOLOProcessor(self.video_path)
|
||
yolo_result = yolo_processor.process()
|
||
self.yolo_result = yolo_processor.to_json_dict()
|
||
|
||
def create_fixed_frame_chunks(
|
||
self, frames_per_chunk: int = 30
|
||
) -> List[Dict[str, Any]]:
|
||
"""創建固定幀數分片
|
||
|
||
Args:
|
||
frames_per_chunk: 每個分片的幀數
|
||
|
||
Returns:
|
||
視覺分片列表
|
||
"""
|
||
if not self.yolo_result:
|
||
self.load_yolo_result()
|
||
|
||
frames = self.yolo_result.get("frames", {})
|
||
if not frames:
|
||
return []
|
||
|
||
# 將幀字典轉換為排序後的列表
|
||
frame_list = []
|
||
for frame_key, frame_data in frames.items():
|
||
frame_list.append(
|
||
{
|
||
"frame_number": int(frame_key),
|
||
"timestamp": frame_data.get("time_seconds", 0),
|
||
"objects": frame_data.get("detections", []),
|
||
}
|
||
)
|
||
|
||
# 按幀號排序
|
||
frame_list.sort(key=lambda x: x["frame_number"])
|
||
|
||
chunks = []
|
||
total_frames = len(frame_list)
|
||
|
||
for start_idx in range(0, total_frames, frames_per_chunk):
|
||
end_idx = min(start_idx + frames_per_chunk, total_frames)
|
||
chunk_frames = frame_list[start_idx:end_idx]
|
||
|
||
if not chunk_frames:
|
||
continue
|
||
|
||
# 計算分片統計
|
||
chunk_stats = self._calculate_chunk_stats(chunk_frames)
|
||
|
||
chunk = {
|
||
"start_frame": chunk_frames[0]["frame_number"],
|
||
"end_frame": chunk_frames[-1]["frame_number"] + 1, # exclusive
|
||
"frame_count": len(chunk_frames),
|
||
"keyframe_objects": self._extract_keyframe_objects(chunk_frames),
|
||
"dominant_objects": chunk_stats["dominant_objects"],
|
||
"metadata": {
|
||
"object_count": chunk_stats["total_objects"],
|
||
"unique_classes": chunk_stats["unique_classes"],
|
||
"max_confidence": chunk_stats["max_confidence"],
|
||
"avg_confidence": chunk_stats["avg_confidence"],
|
||
"spatial_density": chunk_stats["spatial_density"],
|
||
},
|
||
}
|
||
|
||
chunks.append(chunk)
|
||
|
||
return chunks
|
||
|
||
def create_similarity_based_chunks(
|
||
self, similarity_threshold: float = 0.5, min_frames_per_chunk: int = 10
|
||
) -> List[Dict[str, Any]]:
|
||
"""基於物件相似度創建分片
|
||
|
||
Args:
|
||
similarity_threshold: 相似度閾值 (0-1)
|
||
min_frames_per_chunk: 最小幀數
|
||
|
||
Returns:
|
||
視覺分片列表
|
||
"""
|
||
if not self.yolo_result:
|
||
self.load_yolo_result()
|
||
|
||
frames = self.yolo_result.get("frames", {})
|
||
if not frames:
|
||
return []
|
||
|
||
# 將幀字典轉換為排序後的列表
|
||
frame_list = []
|
||
for frame_key, frame_data in frames.items():
|
||
frame_list.append(
|
||
{
|
||
"frame_number": int(frame_key),
|
||
"timestamp": frame_data.get("time_seconds", 0),
|
||
"objects": frame_data.get("detections", []),
|
||
}
|
||
)
|
||
|
||
# 按幀號排序
|
||
frame_list.sort(key=lambda x: x["frame_number"])
|
||
|
||
chunks = []
|
||
current_chunk_frames = []
|
||
current_start_frame = 0
|
||
|
||
for i, frame in enumerate(frame_list):
|
||
if not current_chunk_frames:
|
||
current_chunk_frames.append(frame)
|
||
current_start_frame = frame["frame_number"]
|
||
continue
|
||
|
||
# 計算相似度
|
||
last_frame = current_chunk_frames[-1]
|
||
similarity = self._calculate_frame_similarity(last_frame, frame)
|
||
|
||
if similarity >= similarity_threshold:
|
||
# 相似度高,加入當前分片
|
||
current_chunk_frames.append(frame)
|
||
else:
|
||
# 相似度低,創建新分片
|
||
if len(current_chunk_frames) >= min_frames_per_chunk:
|
||
chunk = self._create_chunk_from_frames(
|
||
current_chunk_frames,
|
||
current_start_frame,
|
||
frame_list[i - 1]["frame_number"] + 1,
|
||
)
|
||
chunks.append(chunk)
|
||
|
||
# 開始新的分片
|
||
current_chunk_frames = [frame]
|
||
current_start_frame = frame["frame_number"]
|
||
|
||
# 處理最後一個分片
|
||
if len(current_chunk_frames) >= min_frames_per_chunk:
|
||
chunk = self._create_chunk_from_frames(
|
||
current_chunk_frames,
|
||
current_start_frame,
|
||
current_chunk_frames[-1]["frame_number"] + 1,
|
||
)
|
||
chunks.append(chunk)
|
||
|
||
return chunks
|
||
|
||
def _calculate_frame_similarity(self, frame1: Dict, frame2: Dict) -> float:
|
||
"""計算兩個幀之間的相似度(基於物件類別)"""
|
||
objects1 = frame1.get("objects", [])
|
||
objects2 = frame2.get("objects", [])
|
||
|
||
if not objects1 and not objects2:
|
||
return 1.0
|
||
|
||
if not objects1 or not objects2:
|
||
return 0.0
|
||
|
||
# 提取物件類別
|
||
classes1 = set(
|
||
obj.get("class_name", "") for obj in objects1 if obj.get("class_name")
|
||
)
|
||
classes2 = set(
|
||
obj.get("class_name", "") for obj in objects2 if obj.get("class_name")
|
||
)
|
||
|
||
# 計算 Jaccard 相似度
|
||
intersection = classes1.intersection(classes2)
|
||
union = classes1.union(classes2)
|
||
|
||
if not union:
|
||
return 0.0
|
||
|
||
return len(intersection) / len(union)
|
||
|
||
def _calculate_chunk_stats(self, frames: List[Dict]) -> Dict[str, Any]:
|
||
"""計算分片統計信息"""
|
||
all_objects = []
|
||
for frame in frames:
|
||
all_objects.extend(frame.get("objects", []))
|
||
|
||
# 總物件數
|
||
total_objects = len(all_objects)
|
||
|
||
# 唯一類別
|
||
unique_classes = list(
|
||
set(
|
||
obj.get("class_name", "")
|
||
for obj in all_objects
|
||
if obj.get("class_name")
|
||
)
|
||
)
|
||
|
||
# 信心值統計
|
||
confidences = [obj.get("confidence", 0) for obj in all_objects]
|
||
max_confidence = max(confidences) if confidences else 0
|
||
avg_confidence = np.mean(confidences) if confidences else 0
|
||
|
||
# 空間密度(每幀平均物件數)
|
||
spatial_density = total_objects / len(frames) if frames else 0
|
||
|
||
# 主要物件(出現在大多數幀中的物件)
|
||
object_counts = {}
|
||
for frame in frames:
|
||
frame_classes = set(
|
||
obj.get("class_name", "")
|
||
for obj in frame.get("objects", [])
|
||
if obj.get("class_name")
|
||
)
|
||
for class_name in frame_classes:
|
||
object_counts[class_name] = object_counts.get(class_name, 0) + 1
|
||
|
||
dominant_objects = [
|
||
class_name
|
||
for class_name, count in object_counts.items()
|
||
if count / len(frames) > 0.5
|
||
]
|
||
dominant_objects.sort()
|
||
|
||
return {
|
||
"total_objects": total_objects,
|
||
"unique_classes": unique_classes,
|
||
"max_confidence": float(max_confidence),
|
||
"avg_confidence": float(avg_confidence),
|
||
"spatial_density": float(spatial_density),
|
||
"dominant_objects": dominant_objects,
|
||
}
|
||
|
||
def _extract_keyframe_objects(self, frames: List[Dict]) -> List[Dict[str, Any]]:
|
||
"""提取關鍵幀物件"""
|
||
keyframe_objects = []
|
||
|
||
# 簡化:每5幀取一個關鍵幀
|
||
for i in range(0, len(frames), 5):
|
||
if i < len(frames):
|
||
frame = frames[i]
|
||
objects = []
|
||
|
||
for obj in frame.get("objects", []):
|
||
objects.append(
|
||
{
|
||
"class_name": obj.get("class_name", ""),
|
||
"class_id": obj.get("class_id", 0),
|
||
"confidence": float(obj.get("confidence", 0)),
|
||
"bbox": {
|
||
"x": obj.get("x1", 0),
|
||
"y": obj.get("y1", 0),
|
||
"width": obj.get("width", 0),
|
||
"height": obj.get("height", 0),
|
||
}
|
||
if "x1" in obj
|
||
else None,
|
||
"occurrence": 1,
|
||
}
|
||
)
|
||
|
||
keyframe_objects.append(
|
||
{
|
||
"timestamp": float(frame.get("timestamp", 0)),
|
||
"frame_number": frame.get("frame_number", 0),
|
||
"objects": objects,
|
||
}
|
||
)
|
||
|
||
return keyframe_objects
|
||
|
||
def _create_chunk_from_frames(
|
||
self, frames: List[Dict], start_frame: int, end_frame: int
|
||
) -> Dict[str, Any]:
|
||
"""從幀列表創建分片"""
|
||
chunk_stats = self._calculate_chunk_stats(frames)
|
||
|
||
return {
|
||
"start_frame": start_frame,
|
||
"end_frame": end_frame, # exclusive
|
||
"frame_count": len(frames),
|
||
"keyframe_objects": self._extract_keyframe_objects(frames),
|
||
"dominant_objects": chunk_stats["dominant_objects"],
|
||
"object_relationships": [], # 可選:後期添加關係檢測
|
||
"scene_description": None, # 可選:後期添加 LLM 生成的場景描述
|
||
"metadata": {
|
||
"object_count": chunk_stats["total_objects"],
|
||
"unique_classes": chunk_stats["unique_classes"],
|
||
"max_confidence": chunk_stats["max_confidence"],
|
||
"avg_confidence": chunk_stats["avg_confidence"],
|
||
"spatial_density": chunk_stats["spatial_density"],
|
||
},
|
||
}
|
||
|
||
def process(self, strategy: str = "fixed", **kwargs) -> Dict[str, Any]:
|
||
"""處理視覺分片生成
|
||
|
||
Args:
|
||
strategy: 分片策略 ("fixed" 或 "similarity")
|
||
**kwargs: 策略參數
|
||
|
||
Returns:
|
||
處理結果
|
||
"""
|
||
if not self.yolo_result:
|
||
self.load_yolo_result()
|
||
|
||
start_time = datetime.now()
|
||
|
||
if strategy == "fixed":
|
||
frames_per_chunk = kwargs.get("frames_per_chunk", 30)
|
||
chunks = self.create_fixed_frame_chunks(frames_per_chunk)
|
||
elif strategy == "similarity":
|
||
similarity_threshold = kwargs.get("similarity_threshold", 0.5)
|
||
min_frames = kwargs.get("min_frames_per_chunk", 10)
|
||
chunks = self.create_similarity_based_chunks(
|
||
similarity_threshold, min_frames
|
||
)
|
||
else:
|
||
raise ValueError(f"Unknown strategy: {strategy}")
|
||
|
||
# 計算總統計
|
||
total_frames = sum(chunk["frame_count"] for chunk in chunks)
|
||
total_objects = sum(chunk["metadata"]["object_count"] for chunk in chunks)
|
||
|
||
# 收集所有唯一類別
|
||
all_unique_classes = set()
|
||
for chunk in chunks:
|
||
all_unique_classes.update(chunk["metadata"]["unique_classes"])
|
||
|
||
processing_time = (datetime.now() - start_time).total_seconds()
|
||
|
||
result = {
|
||
"metadata": {
|
||
"video_path": self.video_path,
|
||
"processing_time": processing_time,
|
||
"strategy": strategy,
|
||
"parameters": kwargs,
|
||
"processed_at": datetime.now().isoformat(),
|
||
},
|
||
"chunk_count": len(chunks),
|
||
"total_frames": total_frames,
|
||
"total_objects": total_objects,
|
||
"unique_classes": len(all_unique_classes),
|
||
"chunks": chunks,
|
||
}
|
||
|
||
return result
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="視覺分片處理器")
|
||
parser.add_argument("video_path", help="視頻文件路徑")
|
||
parser.add_argument("output_path", help="輸出文件路徑")
|
||
parser.add_argument("--yolo-result", help="YOLO 結果文件路徑(可選)")
|
||
parser.add_argument("--uuid", help="檔案 UUID(由 executor 傳入)")
|
||
parser.add_argument(
|
||
"--strategy", choices=["fixed", "similarity"], default="fixed", help="分片策略"
|
||
)
|
||
parser.add_argument(
|
||
"--frames-per-chunk", type=int, default=30, help="固定幀數策略:每個分片的幀數"
|
||
)
|
||
parser.add_argument(
|
||
"--similarity-threshold", type=float, default=0.5, help="相似度策略:相似度閾值"
|
||
)
|
||
parser.add_argument(
|
||
"--min-frames-per-chunk", type=int, default=10, help="相似度策略:最小幀數"
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
print(f"[VisualChunk] Starting processing: {args.video_path}")
|
||
print(f"[VisualChunk] Strategy: {args.strategy}")
|
||
|
||
try:
|
||
processor = VisualChunkProcessor(args.video_path, args.yolo_result)
|
||
|
||
if args.strategy == "fixed":
|
||
result = processor.process(
|
||
strategy="fixed", frames_per_chunk=args.frames_per_chunk
|
||
)
|
||
else:
|
||
result = processor.process(
|
||
strategy="similarity",
|
||
similarity_threshold=args.similarity_threshold,
|
||
min_frames_per_chunk=args.min_frames_per_chunk,
|
||
)
|
||
|
||
# 保存結果
|
||
with open(args.output_path, "w", encoding="utf-8") as f:
|
||
json.dump(result, f, ensure_ascii=False, indent=2)
|
||
|
||
print("[VisualChunk] Processing completed")
|
||
print(f"[VisualChunk] Generated {result['chunk_count']} visual chunks")
|
||
print(f"[VisualChunk] Total frames: {result['total_frames']}")
|
||
print(f"[VisualChunk] Total objects: {result['total_objects']}")
|
||
print(f"[VisualChunk] Unique classes: {result['unique_classes']}")
|
||
print(f"[VisualChunk] Result saved to: {args.output_path}")
|
||
except Exception as e:
|
||
print(f"[VisualChunk] Error: {e}", file=sys.stderr)
|
||
result = {
|
||
"chunk_count": 0,
|
||
"total_frames": 0,
|
||
"total_objects": 0,
|
||
"unique_classes": 0,
|
||
"chunks": [],
|
||
}
|
||
with open(args.output_path, "w", encoding="utf-8") as f:
|
||
json.dump(result, f, ensure_ascii=False, indent=2)
|
||
print(f"[VisualChunk] Fallback: empty result saved to {args.output_path}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|