Files
momentry_core/scripts/visual_chunk_processor.py

446 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
視覺分片處理器 (Phase 2.2)
從 YOLO 結果生成視覺分片,支持多種分片策略:
1. 固定幀數分片
2. 基於物件相似度分片
3. 基於場景變化分片
"""
import json
import sys
import os
import argparse
from pathlib import Path
from typing import Dict, List, Any, Optional
import numpy as np
from datetime import datetime
# 添加父目錄到路徑以導入其他模組
sys.path.insert(0, str(Path(__file__).parent.parent))
from scripts.yolo_processor_contract_v1 import YOLOProcessor
class VisualChunkProcessor:
"""視覺分片處理器"""
def __init__(self, video_path: str, yolo_result_path: Optional[str] = None):
self.video_path = video_path
self.yolo_result_path = yolo_result_path
self.yolo_result = None
def load_yolo_result(self):
"""加載 YOLO 結果"""
if self.yolo_result_path and os.path.exists(self.yolo_result_path):
with open(self.yolo_result_path, "r", encoding="utf-8") as f:
self.yolo_result = json.load(f)
else:
# 如果沒有提供 YOLO 結果路徑,則運行 YOLO 檢測
print(f"[VisualChunk] Running YOLO detection for: {self.video_path}")
yolo_processor = YOLOProcessor(self.video_path)
yolo_result = yolo_processor.process()
self.yolo_result = yolo_processor.to_json_dict()
def create_fixed_frame_chunks(
self, frames_per_chunk: int = 30
) -> List[Dict[str, Any]]:
"""創建固定幀數分片
Args:
frames_per_chunk: 每個分片的幀數
Returns:
視覺分片列表
"""
if not self.yolo_result:
self.load_yolo_result()
frames = self.yolo_result.get("frames", {})
if not frames:
return []
# 將幀字典轉換為排序後的列表
frame_list = []
for frame_key, frame_data in frames.items():
frame_list.append(
{
"frame_number": int(frame_key),
"timestamp": frame_data.get("time_seconds", 0),
"objects": frame_data.get("detections", []),
}
)
# 按幀號排序
frame_list.sort(key=lambda x: x["frame_number"])
chunks = []
total_frames = len(frame_list)
for start_idx in range(0, total_frames, frames_per_chunk):
end_idx = min(start_idx + frames_per_chunk, total_frames)
chunk_frames = frame_list[start_idx:end_idx]
if not chunk_frames:
continue
# 計算分片統計
chunk_stats = self._calculate_chunk_stats(chunk_frames)
chunk = {
"start_frame": chunk_frames[0]["frame_number"],
"end_frame": chunk_frames[-1]["frame_number"] + 1, # exclusive
"frame_count": len(chunk_frames),
"keyframe_objects": self._extract_keyframe_objects(chunk_frames),
"dominant_objects": chunk_stats["dominant_objects"],
"metadata": {
"object_count": chunk_stats["total_objects"],
"unique_classes": chunk_stats["unique_classes"],
"max_confidence": chunk_stats["max_confidence"],
"avg_confidence": chunk_stats["avg_confidence"],
"spatial_density": chunk_stats["spatial_density"],
},
}
chunks.append(chunk)
return chunks
def create_similarity_based_chunks(
self, similarity_threshold: float = 0.5, min_frames_per_chunk: int = 10
) -> List[Dict[str, Any]]:
"""基於物件相似度創建分片
Args:
similarity_threshold: 相似度閾值 (0-1)
min_frames_per_chunk: 最小幀數
Returns:
視覺分片列表
"""
if not self.yolo_result:
self.load_yolo_result()
frames = self.yolo_result.get("frames", {})
if not frames:
return []
# 將幀字典轉換為排序後的列表
frame_list = []
for frame_key, frame_data in frames.items():
frame_list.append(
{
"frame_number": int(frame_key),
"timestamp": frame_data.get("time_seconds", 0),
"objects": frame_data.get("detections", []),
}
)
# 按幀號排序
frame_list.sort(key=lambda x: x["frame_number"])
chunks = []
current_chunk_frames = []
current_start_frame = 0
for i, frame in enumerate(frame_list):
if not current_chunk_frames:
current_chunk_frames.append(frame)
current_start_frame = frame["frame_number"]
continue
# 計算相似度
last_frame = current_chunk_frames[-1]
similarity = self._calculate_frame_similarity(last_frame, frame)
if similarity >= similarity_threshold:
# 相似度高,加入當前分片
current_chunk_frames.append(frame)
else:
# 相似度低,創建新分片
if len(current_chunk_frames) >= min_frames_per_chunk:
chunk = self._create_chunk_from_frames(
current_chunk_frames,
current_start_frame,
frame_list[i - 1]["frame_number"] + 1,
)
chunks.append(chunk)
# 開始新的分片
current_chunk_frames = [frame]
current_start_frame = frame["frame_number"]
# 處理最後一個分片
if len(current_chunk_frames) >= min_frames_per_chunk:
chunk = self._create_chunk_from_frames(
current_chunk_frames,
current_start_frame,
current_chunk_frames[-1]["frame_number"] + 1,
)
chunks.append(chunk)
return chunks
def _calculate_frame_similarity(self, frame1: Dict, frame2: Dict) -> float:
"""計算兩個幀之間的相似度(基於物件類別)"""
objects1 = frame1.get("objects", [])
objects2 = frame2.get("objects", [])
if not objects1 and not objects2:
return 1.0
if not objects1 or not objects2:
return 0.0
# 提取物件類別
classes1 = set(
obj.get("class_name", "") for obj in objects1 if obj.get("class_name")
)
classes2 = set(
obj.get("class_name", "") for obj in objects2 if obj.get("class_name")
)
# 計算 Jaccard 相似度
intersection = classes1.intersection(classes2)
union = classes1.union(classes2)
if not union:
return 0.0
return len(intersection) / len(union)
def _calculate_chunk_stats(self, frames: List[Dict]) -> Dict[str, Any]:
"""計算分片統計信息"""
all_objects = []
for frame in frames:
all_objects.extend(frame.get("objects", []))
# 總物件數
total_objects = len(all_objects)
# 唯一類別
unique_classes = list(
set(
obj.get("class_name", "")
for obj in all_objects
if obj.get("class_name")
)
)
# 信心值統計
confidences = [obj.get("confidence", 0) for obj in all_objects]
max_confidence = max(confidences) if confidences else 0
avg_confidence = np.mean(confidences) if confidences else 0
# 空間密度(每幀平均物件數)
spatial_density = total_objects / len(frames) if frames else 0
# 主要物件(出現在大多數幀中的物件)
object_counts = {}
for frame in frames:
frame_classes = set(
obj.get("class_name", "")
for obj in frame.get("objects", [])
if obj.get("class_name")
)
for class_name in frame_classes:
object_counts[class_name] = object_counts.get(class_name, 0) + 1
dominant_objects = [
class_name
for class_name, count in object_counts.items()
if count / len(frames) > 0.5
]
dominant_objects.sort()
return {
"total_objects": total_objects,
"unique_classes": unique_classes,
"max_confidence": float(max_confidence),
"avg_confidence": float(avg_confidence),
"spatial_density": float(spatial_density),
"dominant_objects": dominant_objects,
}
def _extract_keyframe_objects(self, frames: List[Dict]) -> List[Dict[str, Any]]:
"""提取關鍵幀物件"""
keyframe_objects = []
# 簡化每5幀取一個關鍵幀
for i in range(0, len(frames), 5):
if i < len(frames):
frame = frames[i]
objects = []
for obj in frame.get("objects", []):
objects.append(
{
"class_name": obj.get("class_name", ""),
"class_id": obj.get("class_id", 0),
"confidence": float(obj.get("confidence", 0)),
"bbox": {
"x": obj.get("x1", 0),
"y": obj.get("y1", 0),
"width": obj.get("width", 0),
"height": obj.get("height", 0),
}
if "x1" in obj
else None,
"occurrence": 1,
}
)
keyframe_objects.append(
{
"timestamp": float(frame.get("timestamp", 0)),
"frame_number": frame.get("frame_number", 0),
"objects": objects,
}
)
return keyframe_objects
def _create_chunk_from_frames(
self, frames: List[Dict], start_frame: int, end_frame: int
) -> Dict[str, Any]:
"""從幀列表創建分片"""
chunk_stats = self._calculate_chunk_stats(frames)
return {
"start_frame": start_frame,
"end_frame": end_frame, # exclusive
"frame_count": len(frames),
"keyframe_objects": self._extract_keyframe_objects(frames),
"dominant_objects": chunk_stats["dominant_objects"],
"object_relationships": [], # 可選:後期添加關係檢測
"scene_description": None, # 可選:後期添加 LLM 生成的場景描述
"metadata": {
"object_count": chunk_stats["total_objects"],
"unique_classes": chunk_stats["unique_classes"],
"max_confidence": chunk_stats["max_confidence"],
"avg_confidence": chunk_stats["avg_confidence"],
"spatial_density": chunk_stats["spatial_density"],
},
}
def process(self, strategy: str = "fixed", **kwargs) -> Dict[str, Any]:
"""處理視覺分片生成
Args:
strategy: 分片策略 ("fixed""similarity")
**kwargs: 策略參數
Returns:
處理結果
"""
if not self.yolo_result:
self.load_yolo_result()
start_time = datetime.now()
if strategy == "fixed":
frames_per_chunk = kwargs.get("frames_per_chunk", 30)
chunks = self.create_fixed_frame_chunks(frames_per_chunk)
elif strategy == "similarity":
similarity_threshold = kwargs.get("similarity_threshold", 0.5)
min_frames = kwargs.get("min_frames_per_chunk", 10)
chunks = self.create_similarity_based_chunks(
similarity_threshold, min_frames
)
else:
raise ValueError(f"Unknown strategy: {strategy}")
# 計算總統計
total_frames = sum(chunk["frame_count"] for chunk in chunks)
total_objects = sum(chunk["metadata"]["object_count"] for chunk in chunks)
# 收集所有唯一類別
all_unique_classes = set()
for chunk in chunks:
all_unique_classes.update(chunk["metadata"]["unique_classes"])
processing_time = (datetime.now() - start_time).total_seconds()
result = {
"metadata": {
"video_path": self.video_path,
"processing_time": processing_time,
"strategy": strategy,
"parameters": kwargs,
"processed_at": datetime.now().isoformat(),
},
"chunk_count": len(chunks),
"total_frames": total_frames,
"total_objects": total_objects,
"unique_classes": len(all_unique_classes),
"chunks": chunks,
}
return result
def main():
parser = argparse.ArgumentParser(description="視覺分片處理器")
parser.add_argument("video_path", help="視頻文件路徑")
parser.add_argument("output_path", help="輸出文件路徑")
parser.add_argument("--yolo-result", help="YOLO 結果文件路徑(可選)")
parser.add_argument("--uuid", help="檔案 UUID由 executor 傳入)")
parser.add_argument(
"--strategy", choices=["fixed", "similarity"], default="fixed", help="分片策略"
)
parser.add_argument(
"--frames-per-chunk", type=int, default=30, help="固定幀數策略:每個分片的幀數"
)
parser.add_argument(
"--similarity-threshold", type=float, default=0.5, help="相似度策略:相似度閾值"
)
parser.add_argument(
"--min-frames-per-chunk", type=int, default=10, help="相似度策略:最小幀數"
)
args = parser.parse_args()
print(f"[VisualChunk] Starting processing: {args.video_path}")
print(f"[VisualChunk] Strategy: {args.strategy}")
try:
processor = VisualChunkProcessor(args.video_path, args.yolo_result)
if args.strategy == "fixed":
result = processor.process(
strategy="fixed", frames_per_chunk=args.frames_per_chunk
)
else:
result = processor.process(
strategy="similarity",
similarity_threshold=args.similarity_threshold,
min_frames_per_chunk=args.min_frames_per_chunk,
)
# 保存結果
with open(args.output_path, "w", encoding="utf-8") as f:
json.dump(result, f, ensure_ascii=False, indent=2)
print("[VisualChunk] Processing completed")
print(f"[VisualChunk] Generated {result['chunk_count']} visual chunks")
print(f"[VisualChunk] Total frames: {result['total_frames']}")
print(f"[VisualChunk] Total objects: {result['total_objects']}")
print(f"[VisualChunk] Unique classes: {result['unique_classes']}")
print(f"[VisualChunk] Result saved to: {args.output_path}")
except Exception as e:
print(f"[VisualChunk] Error: {e}", file=sys.stderr)
result = {
"chunk_count": 0,
"total_frames": 0,
"total_objects": 0,
"unique_classes": 0,
"chunks": [],
}
with open(args.output_path, "w", encoding="utf-8") as f:
json.dump(result, f, ensure_ascii=False, indent=2)
print(f"[VisualChunk] Fallback: empty result saved to {args.output_path}")
if __name__ == "__main__":
main()