Source code for mantidimaging.core.utility.progress_reporting.progress

# Copyright (C) 2024 ISIS Rutherford Appleton Laboratory UKRI
# SPDX - License - Identifier: GPL-3.0-or-later
from __future__ import annotations

import threading
import time
from logging import getLogger
from typing import NamedTuple, SupportsInt

from mantidimaging.core.utility.memory_usage import get_memory_usage_linux_str

ProgressHistory = NamedTuple('ProgressHistory', [('time', float), ('step', int), ('msg', str)])


[docs] class ProgressHandler: def __init__(self): self.progress = None
[docs] def progress_update(self): raise NotImplementedError("Need to implement this method in the child class")
STEPS_TO_AVERAGE = 30
[docs] class Progress: """ Class used to perform basic progress monitoring and reporting. """
[docs] @staticmethod def ensure_instance(p: Progress | None = None, *args, num_steps: int | None = None, **kwargs) -> Progress: """ Helper function used to select either a non-None Progress instance as a parameter, or simply create and configure a new one. """ if p is None: p = Progress(*args, **kwargs) if num_steps: p.set_estimated_steps(num_steps) return p
def __init__(self, num_steps: int = 1, task_name: str = 'Task') -> None: self.task_name = task_name # Current step being executed (0 denoting not started) self.current_step = 0 # Estimated number of steps (used to calculated percentage complete) self.end_step = 0 self.set_estimated_steps(num_steps) # Flag indicating completion self.complete = False # List of tuples defining progress history # (timestamp, step, message) self.progress_history: list[ProgressHistory] = [] # Lock used to synchronise modifications to the progress state self.lock = threading.Lock() # Handlers that receive notifications when progress updates occur self.progress_handlers: list[ProgressHandler] = [] # Levels of nesting when used as a context manager self.context_nesting_level = 0 # Flag to indicate cancellation of the current task self.cancel_msg = None # Add initial step to history self.update(0, 'init') # Log initial memory usage getLogger(__name__).debug("Memory usage before execution: %s", get_memory_usage_linux_str()) def __str__(self): return 'Progress(\n{})'.format('\n'.join([str(ph) for ph in self.progress_history])) def __enter__(self): self.context_nesting_level += 1 return self def __exit__(self, exc_type, exc_val, exc_tb): self.context_nesting_level -= 1 # Only when we have left the context at all levels is the task complete if self.context_nesting_level == 0: self.mark_complete()
[docs] def is_started(self): """ Checks if the task has been started. A task starts when it reports it's first progress update. """ return self.current_step > 0
[docs] def is_completed(self): """ Checks if the task has been marked as completed. """ return self.complete
[docs] def completion(self): """ Gets the completion of the task in the range of 0.0 - 1.0 """ with self.lock: return round(self.current_step / self.end_step, 3)
[docs] def last_status_message(self): """ Gets the message from the last progress update. """ with self.lock: if len(self.progress_history) > 0: msg = self.progress_history[-1][2] return msg if len(msg) > 0 else None return None
[docs] def execution_time(self): """ Gets the total time this task has been executing. Total time is measured from the timestamp of the first progress message to the timestamp of the last progress message. """ if len(self.progress_history) > 2: start = self.progress_history[1][0] last = self.progress_history[-1][0] return last - start else: return 0.0
[docs] def set_estimated_steps(self, num_steps: int): """ Sets the number of steps this task is expected to take to complete. """ self.current_step = 0 self.end_step = num_steps
[docs] def add_estimated_steps(self, num_steps): """ Increments the number of steps this task is expected to take to complete. """ self.end_step += num_steps
[docs] def add_progress_handler(self, handler: ProgressHandler): """ Adds a handler to receiver progress updates. :param handler: Instance of a progress handler """ if not isinstance(handler, ProgressHandler): raise ValueError("Progress handlers must be of type ProgressHandler") self.progress_handlers.append(handler) handler.progress = self
@staticmethod def _format_time(t: SupportsInt) -> str: t = int(t) return f'{t // 3600:02}:{t % 3600 // 60:02}:{t % 60:02}'
[docs] def update(self, steps: int = 1, msg: str = "", force_continue: bool = False) -> None: """ Updates the progress of the task. :param steps: Number of steps that have been completed since last call to this function :param msg: Message describing current step :param force_continue: Prevent cancellation of the async progress """ # Acquire lock while manipulating progress state with self.lock: # Update current step self.current_step += steps if self.current_step > self.end_step: self.end_step = self.current_step + 1 mean_time = self.calculate_mean_time(self.progress_history) eta = mean_time * (self.end_step - self.current_step) msg = f"{f'{msg}' if len(msg) > 0 else ''} | {self.current_step}/{self.end_step} | " \ f"Time: {self._format_time(self.execution_time())}, ETA: {self._format_time(eta)}" step_details = ProgressHistory(time.perf_counter(), self.current_step, msg) self.progress_history.append(step_details) # process progress callbacks for cb in self.progress_handlers: cb.progress_update() # Force cancellation on progress update if self.should_cancel and not force_continue: raise RuntimeError('Task has been cancelled')
[docs] @staticmethod def calculate_mean_time(progress_history: list[ProgressHistory]) -> float: if len(progress_history) > 1: average_over_steps = min(STEPS_TO_AVERAGE, len(progress_history)) time_diff = progress_history[-1].time - progress_history[-average_over_steps].time return time_diff / (average_over_steps - 1) else: return 0
[docs] def cancel(self, msg='cancelled'): """ Mark the task tree that uses this progress instance for cancellation. Task should either periodically inspect should_cancel or have suitably many calls to update() to be cancellable. """ self.cancel_msg = msg
@property def should_cancel(self): """ Checks if the task should be cancelled. """ return self.cancel_msg is not None
[docs] def mark_complete(self, msg='complete'): """ Marks the task as completed. """ log = getLogger(__name__) self.update(force_continue=True, msg=self.cancel_msg if self.should_cancel else msg) if not self.should_cancel: self.complete = True self.end_step = self.current_step # Log elapsed time and final memory usage log.info("Elapsed time: %d sec.", self.execution_time()) log.debug("Memory usage after execution: %s", get_memory_usage_linux_str())