#!/opt/homebrew/bin/python3.11 """ ResumeFramework - Shared Resume Support for All Processors This module provides a unified resume mechanism for all processors (YOLO, OCR, Face, Pose, etc.). Features: - Auto-detect existing results and resume from last checkpoint - Auto-save at configurable intervals (time-based or frame-based) - Graceful Ctrl+C handling with progress save - JSON Lines (.jsonl) support for incremental writes - Progress tracking and ETA calculation Usage: from resume_framework import ResumeFramework framework = ResumeFramework( output_path="output.json", processor_name="yolo", uuid="vid_001", auto_save_interval=30, auto_save_frames=300 ) # Load existing data (if resuming) existing_data, last_checkpoint = framework.load_existing_data() # Set data for signal handler framework.set_data(detection_data) # Save progress periodically framework.save_progress(frame_count, is_interrupted=False) # Finalize on completion framework.finalize(total_frames) """ import sys import os import json import signal import time from datetime import datetime from typing import Dict, Optional, Tuple, Callable class ResumeFramework: """ Resume Framework for Processors Attributes: output_path (str): Output JSON/JSONL file path processor_name (str): Processor name (yolo, ocr, face, pose, etc.) uuid (str): Video UUID auto_save_interval (int): Auto-save interval in seconds auto_save_frames (int): Auto-save interval in frames publisher (RedisPublisher): Redis publisher for progress updates data (Dict): Current processing data use_jsonl (bool): Use JSON Lines format (.jsonl) """ def __init__( self, output_path: str, processor_name: str, uuid: str = "", auto_save_interval: int = 30, auto_save_frames: int = 300, use_jsonl: bool = False, force_restart: bool = False, progress_callback: Optional[Callable] = None, ): """ Initialize Resume Framework Args: output_path: Output file path processor_name: Processor name uuid: Video UUID auto_save_interval: Auto-save interval in seconds (default: 30) auto_save_frames: Auto-save interval in frames (default: 300) use_jsonl: Use JSON Lines format (.jsonl) for incremental writes force_restart: Force restart (ignore existing data) progress_callback: Optional callback for progress updates """ self.output_path = output_path self.processor_name = processor_name self.uuid = uuid self.auto_save_interval = auto_save_interval self.auto_save_frames = auto_save_frames self.use_jsonl = use_jsonl self.force_restart = force_restart self.progress_callback = progress_callback self.data: Optional[Dict] = None self.publisher = None self.last_save_time = 0.0 self.last_save_frame = 0 self.auto_save_count = 0 # Import RedisPublisher if uuid provided if uuid: sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from redis_publisher import RedisPublisher self.publisher = RedisPublisher(uuid) # Register signal handler self._register_signal_handler() def _register_signal_handler(self): """Register signal handlers for graceful pause""" signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) def _signal_handler(self, signum, frame): """Handle Ctrl+C / SIGTERM to pause and save progress""" print(f"\n\n{'=' * 60}") print(f"PAUSE - Saving progress for {self.processor_name}...") print(f"{'=' * 60}") if self.data: success, file_size = self.save_progress( checkpoint=self.last_save_frame, is_interrupted=True, silent=False ) if success: print(f"Progress saved to: {self.output_path}") print(f"Last checkpoint: frame {self.last_save_frame}") print(f"File size: {file_size} bytes") print("Run the same command again to resume") print(f"{'=' * 60}\n") sys.exit(0) def load_existing_data(self) -> Tuple[Optional[Dict], int]: """ Load existing data from file Returns: Tuple of (existing_data, last_checkpoint) - existing_data: Loaded data dict or None - last_checkpoint: Last processed frame/segment index """ if self.force_restart: return None, 0 if not os.path.exists(self.output_path): return None, 0 try: if self.use_jsonl: return self._load_jsonl() else: return self._load_json() except (json.JSONDecodeError, KeyError, ValueError) as e: print(f"Warning: Could not load existing file: {e}") return None, 0 def _load_json(self) -> Tuple[Optional[Dict], int]: """Load JSON format file""" with open(self.output_path, "r", encoding="utf-8") as f: data = json.load(f) metadata = data.get("metadata", {}) last_checkpoint = metadata.get("last_saved_frame", 0) if last_checkpoint > 0: return data, last_checkpoint return None, 0 def _load_jsonl(self) -> Tuple[Optional[Dict], int]: """Load JSON Lines format file""" data = {"metadata": {}, "frames": {}} last_checkpoint = 0 with open(self.output_path, "r", encoding="utf-8") as f: for line in f: try: entry = json.loads(line.strip()) if "metadata" in entry: data["metadata"] = entry["metadata"] elif "frame" in entry: frame_num = entry["frame"] data["frames"][str(frame_num)] = entry last_checkpoint = max(last_checkpoint, frame_num) except json.JSONDecodeError: continue if last_checkpoint > 0: return data, last_checkpoint return None, 0 def set_data(self, data: Dict): """ Set current processing data for signal handler Args: data: Current processing data dict """ self.data = data def save_progress( self, checkpoint: int, is_interrupted: bool = False, silent: bool = False, extra_metadata: Optional[Dict] = None, ) -> Tuple[bool, int]: """ Save progress to file Args: checkpoint: Current checkpoint (frame/segment index) is_interrupted: Is this an interrupted save silent: Suppress output extra_metadata: Extra metadata to add Returns: Tuple of (success, file_size) """ if not self.data: return False, 0 try: metadata = self.data.get("metadata", {}) metadata["last_saved_at"] = datetime.now().isoformat() metadata["status"] = "interrupted" if is_interrupted else "in_progress" metadata["last_saved_frame"] = checkpoint metadata["auto_save_count"] = self.auto_save_count if extra_metadata: metadata.update(extra_metadata) self.data["metadata"] = metadata if self.use_jsonl: file_size = self._save_jsonl(is_interrupted) else: file_size = self._save_json() self.last_save_frame = checkpoint self.last_save_time = time.time() self.auto_save_count += 1 if not silent: self._print_save_info(checkpoint, file_size, is_interrupted) return True, file_size except Exception as e: print(f"Error saving progress: {e}") return False, 0 def _save_json(self) -> int: """Save as JSON format""" with open(self.output_path, "w", encoding="utf-8") as f: json.dump(self.data, f, indent=2, ensure_ascii=False) return os.path.getsize(self.output_path) def _save_jsonl(self, is_interrupted: bool = False) -> int: """ Save as JSON Lines format For resume, we append new frames to existing .jsonl file """ mode = "a" if self.last_save_frame > 0 else "w" with open(self.output_path, mode, encoding="utf-8") as f: if mode == "w": metadata_entry = {"metadata": self.data["metadata"]} f.write(json.dumps(metadata_entry, ensure_ascii=False) + "\n") for frame_key, frame_data in self.data.get("frames", {}).items(): if int(frame_key) > self.last_save_frame: f.write(json.dumps(frame_data, ensure_ascii=False) + "\n") return os.path.getsize(self.output_path) def _print_save_info(self, checkpoint: int, file_size: int, is_interrupted: bool): """Print save info""" status = "INTERRUPTED" if is_interrupted else "AUTO-SAVE" print( f"\n[{status}] Saved progress: frame {checkpoint}, " f"file size: {file_size} bytes, auto_save #{self.auto_save_count}\n" ) def should_auto_save(self, current_checkpoint: int) -> bool: """ Check if should auto-save Args: current_checkpoint: Current checkpoint Returns: True if should auto-save """ current_time = time.time() time_elapsed = current_time - self.last_save_time >= self.auto_save_interval frames_elapsed = current_checkpoint - self.last_save_frame >= self.auto_save_frames return time_elapsed or frames_elapsed def init_metadata( self, video_path: str, fps: float, width: int, height: int, total_frames: int, total_duration: float, extra: Optional[Dict] = None, ) -> Dict: """ Initialize metadata for new processing Args: video_path: Video file path fps: Frame rate width: Video width height: Video height total_frames: Total frames total_duration: Total duration in seconds extra: Extra metadata Returns: Metadata dict """ metadata = { "video_path": os.path.abspath(video_path), "fps": fps, "width": width, "height": height, "total_frames": total_frames, "total_duration": total_duration, "processor": self.processor_name, "processed_at": datetime.now().isoformat(), "auto_save_interval": self.auto_save_interval, "auto_save_frames": self.auto_save_frames, "status": "in_progress", "last_saved_at": datetime.now().isoformat(), "last_saved_frame": 0, "auto_save_count": 0, } if extra: metadata.update(extra) return metadata def finalize( self, total_processed: int, extra_metadata: Optional[Dict] = None, ): """ Finalize processing (mark as completed) Args: total_processed: Total processed frames/segments extra_metadata: Extra metadata to add """ if not self.data: return metadata = self.data.get("metadata", {}) metadata["status"] = "completed" metadata["completed_at"] = datetime.now().isoformat() metadata["total_processed"] = total_processed metadata["last_saved_frame"] = total_processed if extra_metadata: metadata.update(extra_metadata) self.data["metadata"] = metadata # Final save self.save_progress( checkpoint=total_processed, is_interrupted=False, silent=True ) print(f"\n[COMPLETED] {self.processor_name} processed {total_processed} items") print(f"Output saved to: {self.output_path}") if self.publisher: self.publisher.complete( self.processor_name, f"{total_processed} items" ) def publish_progress(self, current: int, total: int, message: str = ""): """ Publish progress to Redis Args: current: Current progress total: Total count message: Progress message """ if self.publisher: self.publisher.progress(self.processor_name, current, total, message) if self.progress_callback: self.progress_callback(current, total, message) def publish_info(self, message: str): """ Publish info message to Redis Args: message: Info message """ if self.publisher: self.publisher.info(self.processor_name, message) def publish_error(self, message: str): """ Publish error message to Redis Args: message: Error message """ if self.publisher: self.publisher.error(self.processor_name, message) def format_time(seconds: float) -> str: """ Format seconds to HH:MM:SS Args: seconds: Time in seconds Returns: Formatted time string """ hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) return f"{hours:02d}:{minutes:02d}:{secs:02d}" def calculate_eta(elapsed: float, current: int, total: int) -> float: """ Calculate ETA Args: elapsed: Elapsed time in seconds current: Current progress total: Total count Returns: ETA in seconds """ if current <= 0: return 0 return (elapsed / current) * (total - current) def print_progress( current: int, total: int, elapsed: float, extra_info: str = "", ): """ Print progress indicator Args: current: Current progress total: Total count elapsed: Elapsed time in seconds extra_info: Extra info to display """ progress_pct = (current / total) * 100 if total > 0 else 0 eta = calculate_eta(elapsed, current, total) print( f" Progress: {current}/{total} ({progress_pct:.1f}%) - " f"ETA: {eta:.0f}s - {extra_info}" )