Source code for mantidimaging.core.parallel.manager

# Copyright (C) 2024 ISIS Rutherford Appleton Laboratory UKRI
# SPDX - License - Identifier: GPL-3.0-or-later
from __future__ import annotations

import time
from multiprocessing import get_context
import os
import uuid
from logging import getLogger
from typing import TYPE_CHECKING

import psutil
from psutil import NoSuchProcess, AccessDenied

from mantidimaging.core.operations.loader import load_filter_packages

if TYPE_CHECKING:
    from multiprocessing.pool import Pool

MEM_PREFIX = 'MI'
MEM_DIR_LINUX = '/dev/shm'
CURRENT_PID = psutil.Process().pid

LOG = getLogger(__name__)
perf_logger = getLogger("perf." + __name__)

cores: int = 1
pool: Pool | None = None


[docs] def create_and_start_pool(process_count: int) -> None: t0 = time.monotonic() context = get_context('spawn') global cores if process_count == 0: cores = context.cpu_count() else: cores = process_count global pool LOG.info(f'Creating process pool with {cores} processes') pool = context.Pool(cores, initializer=worker_setup) if perf_logger.isEnabledFor(1): perf_logger.info(f"Process pool started in {time.monotonic() - t0}")
[docs] def worker_setup(): # Required to import modules for running operations load_filter_packages()
[docs] def end_pool(): if pool: pool.close() pool.terminate()
[docs] def generate_mi_shared_mem_name() -> str: return f'{MEM_PREFIX}_{CURRENT_PID}_{uuid.uuid4()}'
[docs] def clear_memory_from_current_process_linux() -> None: for mem_name in _get_shared_mem_names_linux(): if _is_mi_memory_from_current_process(mem_name): free_shared_memory_linux([mem_name])
[docs] def find_memory_from_previous_process_linux() -> list[str]: old_memory = [] for mem_name in _get_shared_mem_names_linux(): if _is_safe_to_remove(mem_name): old_memory.append(mem_name) return old_memory
[docs] def free_shared_memory_linux(mem_names: list[str]) -> None: for mem_name in mem_names: os.remove(f'{MEM_DIR_LINUX}/{mem_name}')
def _is_safe_to_remove(mem_name: str) -> bool: process_start = psutil.Process().create_time() if _is_mi_shared_mem(mem_name) and os.path.getmtime(f'{MEM_DIR_LINUX}/{mem_name}') < process_start: try: pid = int(mem_name.split('_')[1]) _lookup_process(pid) except NoSuchProcess: # The process that owns the memory has ended return True except AccessDenied: # The process that owns the memory still exists return False return False def _get_shared_mem_names_linux() -> list[str]: return os.listdir(MEM_DIR_LINUX) def _lookup_process(pid) -> None: psutil.Process(pid) def _is_mi_shared_mem(mem_name: str) -> bool: split_name = mem_name.split('_') try: int(split_name[1]) except (IndexError, ValueError): return False return len(split_name) == 3 and split_name[0] == MEM_PREFIX def _is_mi_memory_from_current_process(mem_name: str) -> bool: return _is_mi_shared_mem(mem_name) and int(mem_name.split('_')[1]) == CURRENT_PID