497 lines
14 KiB
Python
Executable File
497 lines
14 KiB
Python
Executable File
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
YOLO Processor - Object Detection with Resume Support
|
|
Uses YOLOv8 via ultralytics (local model)
|
|
|
|
Resume Feature (integrated from video_yolo_player):
|
|
- Auto-detect existing results and resume from last frame
|
|
- Auto-save at configurable intervals (default: 30 seconds)
|
|
- Ctrl+C gracefully saves and exits
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import argparse
|
|
import os
|
|
import signal
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Dict, Optional, Set
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
from redis_publisher import RedisPublisher
|
|
|
|
|
|
YOLO_NAMES = [
|
|
"person",
|
|
"bicycle",
|
|
"car",
|
|
"motorbike",
|
|
"aeroplane",
|
|
"bus",
|
|
"train",
|
|
"truck",
|
|
"boat",
|
|
"traffic light",
|
|
"fire hydrant",
|
|
"stop sign",
|
|
"parking meter",
|
|
"bench",
|
|
"bird",
|
|
"cat",
|
|
"dog",
|
|
"horse",
|
|
"sheep",
|
|
"cow",
|
|
"elephant",
|
|
"bear",
|
|
"zebra",
|
|
"giraffe",
|
|
"backpack",
|
|
"umbrella",
|
|
"handbag",
|
|
"tie",
|
|
"suitcase",
|
|
"frisbee",
|
|
"skis",
|
|
"snowboard",
|
|
"sports ball",
|
|
"kite",
|
|
"baseball bat",
|
|
"baseball glove",
|
|
"skateboard",
|
|
"surfboard",
|
|
"tennis racket",
|
|
"bottle",
|
|
"wine glass",
|
|
"cup",
|
|
"fork",
|
|
"knife",
|
|
"spoon",
|
|
"bowl",
|
|
"banana",
|
|
"apple",
|
|
"sandwich",
|
|
"orange",
|
|
"broccoli",
|
|
"carrot",
|
|
"hot dog",
|
|
"pizza",
|
|
"donut",
|
|
"cake",
|
|
"chair",
|
|
"sofa",
|
|
"pottedplant",
|
|
"bed",
|
|
"diningtable",
|
|
"toilet",
|
|
"tvmonitor",
|
|
"laptop",
|
|
"mouse",
|
|
"remote",
|
|
"keyboard",
|
|
"cell phone",
|
|
"microwave",
|
|
"oven",
|
|
"toaster",
|
|
"sink",
|
|
"refrigerator",
|
|
"book",
|
|
"clock",
|
|
"vase",
|
|
"scissors",
|
|
"teddy bear",
|
|
"hair drier",
|
|
"toothbrush",
|
|
]
|
|
|
|
|
|
# Global state for signal handling
|
|
g_detection_data: Optional[Dict] = None
|
|
g_output_file: Optional[str] = None
|
|
g_auto_save_interval: int = 30
|
|
g_auto_save_frames: int = 300 # Save every N frames (in addition to time-based)
|
|
|
|
|
|
def format_time(seconds: float) -> str:
|
|
"""Format seconds to HH:MM:SS"""
|
|
hours = int(seconds // 3600)
|
|
minutes = int((seconds % 3600) // 60)
|
|
secs = int(seconds % 60)
|
|
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
|
|
|
|
|
|
def load_existing_data(output_file: str) -> tuple[Optional[Dict], int]:
|
|
"""Load existing detection data from file. Returns (data, last_processed_frame)"""
|
|
if not os.path.exists(output_file):
|
|
return None, 0
|
|
|
|
try:
|
|
with open(output_file, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
|
|
frames = data.get("frames", {})
|
|
if frames:
|
|
last_frame = max(int(k) for k in frames.keys())
|
|
return data, last_frame
|
|
except (json.JSONDecodeError, KeyError, ValueError) as e:
|
|
print(f"Warning: Could not load existing file: {e}")
|
|
|
|
return None, 0
|
|
|
|
|
|
def save_detection_data(
|
|
output_file: str,
|
|
detection_data: Dict,
|
|
is_interrupted: bool = False,
|
|
silent: bool = False,
|
|
last_saved_frame: int = 0,
|
|
) -> tuple[bool, int]:
|
|
"""Save detection data to JSON file"""
|
|
try:
|
|
metadata = detection_data.get("metadata", {})
|
|
metadata["last_saved_at"] = datetime.now().isoformat()
|
|
metadata["status"] = "interrupted" if is_interrupted else "in_progress"
|
|
metadata["last_saved_frame"] = last_saved_frame
|
|
detection_data["metadata"] = metadata
|
|
|
|
with open(output_file, "w", encoding="utf-8") as f:
|
|
json.dump(detection_data, f, indent=2, ensure_ascii=False)
|
|
|
|
if not silent:
|
|
return True, os.path.getsize(output_file)
|
|
return True, 0
|
|
except Exception as e:
|
|
print(f"Error saving data: {e}")
|
|
return False, 0
|
|
|
|
|
|
def signal_handler(signum, frame):
|
|
"""Handle Ctrl+C to pause and save progress"""
|
|
global g_detection_data, g_output_file
|
|
|
|
print(f"\n\n{'=' * 60}")
|
|
print("PAUSE - Saving progress...")
|
|
print(f"{'=' * 60}")
|
|
|
|
if g_detection_data and g_output_file:
|
|
success, _ = save_detection_data(
|
|
g_output_file, g_detection_data, is_interrupted=True
|
|
)
|
|
if success:
|
|
print(f"Progress saved to: {g_output_file}")
|
|
print("Run the same command again to resume")
|
|
|
|
print(f"{'=' * 60}\n")
|
|
sys.exit(0)
|
|
|
|
|
|
def get_detections_list(result, model) -> list:
|
|
"""Extract detection info as list of dicts"""
|
|
detections = []
|
|
|
|
if result.boxes is None:
|
|
return detections
|
|
|
|
boxes = result.boxes.xyxy.cpu().numpy()
|
|
confidences = result.boxes.conf.cpu().numpy()
|
|
class_ids = result.boxes.cls.cpu().numpy().astype(int)
|
|
|
|
for box, conf, class_id in zip(boxes, confidences, class_ids):
|
|
x1, y1, x2, y2 = box
|
|
class_name = YOLO_NAMES[class_id] if class_id < len(YOLO_NAMES) else "unknown"
|
|
|
|
detections.append(
|
|
{
|
|
"class_id": int(class_id),
|
|
"class_name": class_name,
|
|
"confidence": float(conf),
|
|
"x1": float(x1),
|
|
"y1": float(y1),
|
|
"x2": float(x2),
|
|
"y2": float(y2),
|
|
"width": int(x2 - x1),
|
|
"height": int(y2 - y1),
|
|
}
|
|
)
|
|
|
|
return detections
|
|
|
|
|
|
def process_yolo(
|
|
video_path: str,
|
|
output_path: str,
|
|
uuid: str = "",
|
|
auto_save_interval: int = 30,
|
|
force_restart: bool = False,
|
|
auto_save_frames: int = 300,
|
|
):
|
|
"""Process video for object detection using YOLOv8 with resume support"""
|
|
|
|
global g_detection_data, g_output_file, g_auto_save_interval, g_auto_save_frames
|
|
g_auto_save_interval = auto_save_interval
|
|
g_auto_save_frames = auto_save_frames
|
|
|
|
publisher = RedisPublisher(uuid) if uuid else None
|
|
if publisher:
|
|
publisher.info("yolo", "YOLO_START")
|
|
|
|
# Set up signal handler for graceful pause
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
# Check for existing results (resume support)
|
|
existing_data, last_processed_frame = load_existing_data(output_path)
|
|
resume_mode = (
|
|
existing_data is not None and last_processed_frame > 0 and not force_restart
|
|
)
|
|
|
|
if resume_mode:
|
|
print(f"\nFound existing data: {output_path}")
|
|
print(f"Last processed frame: {last_processed_frame}")
|
|
print(f"Will resume from frame {last_processed_frame + 1}")
|
|
|
|
try:
|
|
from ultralytics import YOLO
|
|
except ImportError:
|
|
if publisher:
|
|
publisher.error("yolo", "ultralytics not installed")
|
|
result = {
|
|
"metadata": {"status": "error", "error": "ultralytics not installed"},
|
|
"frames": {},
|
|
}
|
|
with open(output_path, "w") as f:
|
|
json.dump(result, f, indent=2)
|
|
if publisher:
|
|
publisher.complete("yolo", "0 frames")
|
|
return result
|
|
|
|
if publisher:
|
|
publisher.info("yolo", "YOLO_LOADING_MODEL")
|
|
|
|
# Load YOLO model (prefer CoreML for ANE acceleration, fallback to PyTorch)
|
|
model_path_mlpackage = os.path.join(
|
|
os.path.dirname(os.path.abspath(__file__)), "..", "yolov8s.mlpackage"
|
|
)
|
|
model_path_pt = os.path.join(
|
|
os.path.dirname(os.path.abspath(__file__)), "..", "yolov8s.pt"
|
|
)
|
|
if os.path.exists(model_path_mlpackage):
|
|
model = YOLO(model_path_mlpackage)
|
|
print("YOLO: CoreML model loaded (YOLOv8s, ANE accelerated)")
|
|
elif os.path.exists(model_path_pt):
|
|
model = YOLO(model_path_pt)
|
|
print("YOLO: PyTorch model loaded (YOLOv8s)")
|
|
else:
|
|
model = YOLO("yolov8s.pt") # will auto-download
|
|
|
|
# Get video info
|
|
import cv2
|
|
|
|
cap = cv2.VideoCapture(video_path)
|
|
|
|
if not cap.isOpened():
|
|
print(f"Error: Cannot open video: {video_path}")
|
|
return {"metadata": {"status": "error"}, "frames": {}}
|
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
total_duration = total_frames / fps if fps > 0 else 0
|
|
cap.release()
|
|
|
|
if publisher:
|
|
publisher.info("yolo", f"fps={fps}, total={total_frames}")
|
|
publisher.progress("yolo", 0, total_frames, "Starting")
|
|
|
|
# Initialize or load detection data
|
|
if resume_mode and existing_data:
|
|
detection_data = existing_data
|
|
frame_count = last_processed_frame
|
|
processed_frames: Set[int] = set(
|
|
int(k) for k in existing_data.get("frames", {}).keys()
|
|
)
|
|
|
|
# Seek to resume position
|
|
cap = cv2.VideoCapture(video_path)
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_count)
|
|
else:
|
|
# Initialize new detection data
|
|
detection_data = {
|
|
"metadata": {
|
|
"video_path": os.path.abspath(video_path),
|
|
"fps": fps,
|
|
"width": width,
|
|
"height": height,
|
|
"total_frames": total_frames,
|
|
"total_duration": total_duration,
|
|
"processed_at": datetime.now().isoformat(),
|
|
"auto_save_interval": auto_save_interval,
|
|
"auto_save_frames": auto_save_frames,
|
|
"status": "in_progress",
|
|
"last_saved_at": datetime.now().isoformat(),
|
|
"last_saved_frame": 0,
|
|
},
|
|
"frames": {},
|
|
}
|
|
frame_count = 0
|
|
processed_frames = set()
|
|
|
|
cap = cv2.VideoCapture(video_path)
|
|
|
|
# Set global for signal handler
|
|
g_detection_data = detection_data
|
|
g_output_file = output_path
|
|
|
|
start_time = time.time()
|
|
last_save_time = start_time
|
|
last_save_frame_count = frame_count # Track which frame we last saved at
|
|
auto_save_count = 0
|
|
|
|
print(f"\nProcessing video: {total_frames} frames @ {fps:.2f} fps")
|
|
print(
|
|
f"Auto-save every {auto_save_interval}s or {auto_save_frames} frames (whichever comes first)"
|
|
)
|
|
print(f"Resume from frame {frame_count + 1 if resume_mode else 1}")
|
|
print()
|
|
|
|
# Process frames
|
|
while True:
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
|
|
frame_count += 1
|
|
current_time = (frame_count - 1) / fps if fps > 0 else 0
|
|
|
|
# Skip already processed frames in resume mode
|
|
if frame_count in processed_frames:
|
|
continue
|
|
|
|
# Run YOLO detection
|
|
results = model(frame, verbose=False)
|
|
detections = get_detections_list(results[0], model)
|
|
|
|
# Store detection data
|
|
detection_data["frames"][str(frame_count)] = {
|
|
"frame_number": frame_count,
|
|
"time_seconds": round(current_time, 3),
|
|
"time_formatted": format_time(current_time),
|
|
"detections": detections,
|
|
}
|
|
|
|
processed_frames.add(frame_count)
|
|
|
|
# Progress indicator every 500 frames
|
|
if frame_count % 500 == 0:
|
|
elapsed = time.time() - start_time
|
|
progress = (frame_count / total_frames) * 100
|
|
eta = (
|
|
(elapsed / frame_count) * (total_frames - frame_count)
|
|
if frame_count > 0
|
|
else 0
|
|
)
|
|
print(
|
|
f" Progress: {frame_count}/{total_frames} ({progress:.1f}%) - "
|
|
f"ETA: {eta:.0f}s - {len(detections)} objects"
|
|
)
|
|
|
|
if publisher:
|
|
publisher.progress(
|
|
"yolo", frame_count, total_frames, f"frame {frame_count}"
|
|
)
|
|
|
|
# Auto-save check (time-based OR frame-based)
|
|
current_time_val = time.time()
|
|
time_elapsed = current_time_val - last_save_time >= auto_save_interval
|
|
frames_since_save = frame_count - last_save_frame_count >= auto_save_frames
|
|
|
|
if time_elapsed or frames_since_save:
|
|
success, file_size = save_detection_data(
|
|
output_path,
|
|
detection_data,
|
|
is_interrupted=False,
|
|
silent=True,
|
|
last_saved_frame=frame_count,
|
|
)
|
|
if success:
|
|
auto_save_count += 1
|
|
reason = "time" if time_elapsed else "frames"
|
|
print(
|
|
f" Auto-saved (#{auto_save_count}, {reason}): frame {last_save_frame_count}-{frame_count}"
|
|
)
|
|
last_save_time = current_time_val
|
|
last_save_frame_count = frame_count
|
|
|
|
cap.release()
|
|
processing_time = time.time() - start_time
|
|
|
|
# Update final metadata
|
|
total_detections = sum(
|
|
len(f.get("detections", [])) for f in detection_data.get("frames", {}).values()
|
|
)
|
|
|
|
detection_data["metadata"]["status"] = "completed"
|
|
detection_data["metadata"]["completed_at"] = datetime.now().isoformat()
|
|
detection_data["metadata"]["processing_time"] = processing_time
|
|
detection_data["metadata"]["total_detections"] = total_detections
|
|
detection_data["metadata"]["auto_save_count"] = auto_save_count
|
|
|
|
# Save final data
|
|
save_detection_data(output_path, detection_data, is_interrupted=False)
|
|
|
|
# Print summary
|
|
print(f"\n{'=' * 60}")
|
|
print("YOLO Detection complete!")
|
|
print(f" Total frames processed: {frame_count}")
|
|
print(f" Frames with detections: {len(detection_data['frames'])}")
|
|
print(f" Total objects detected: {total_detections}")
|
|
print(f" Processing time: {processing_time:.1f}s")
|
|
print(f" Auto-saves: {auto_save_count}")
|
|
print(f" Output: {output_path}")
|
|
print(f"{'=' * 60}")
|
|
|
|
if publisher:
|
|
publisher.complete(
|
|
"yolo", f"{len(detection_data['frames'])} frames with objects"
|
|
)
|
|
|
|
return detection_data
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(
|
|
description="YOLO Object Detection with Resume Support"
|
|
)
|
|
parser.add_argument("video_path", help="Path to video file")
|
|
parser.add_argument("output_path", help="Output JSON path")
|
|
parser.add_argument("--uuid", "-u", help="UUID for Redis progress", default="")
|
|
parser.add_argument(
|
|
"--auto-save",
|
|
type=int,
|
|
default=30,
|
|
help="Auto-save interval in seconds (default: 30)",
|
|
)
|
|
parser.add_argument(
|
|
"--auto-save-frames",
|
|
type=int,
|
|
default=300,
|
|
help="Auto-save after N frames (default: 300)",
|
|
)
|
|
parser.add_argument(
|
|
"--force",
|
|
action="store_true",
|
|
help="Force restart from beginning (ignore existing data)",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
process_yolo(
|
|
args.video_path,
|
|
args.output_path,
|
|
args.uuid,
|
|
args.auto_save,
|
|
args.force,
|
|
args.auto_save_frames,
|
|
)
|