Source code for mantidimaging.core.io.instrument_log
# Copyright (C) 2024 ISIS Rutherford Appleton Laboratory UKRI
# SPDX - License - Identifier: GPL-3.0-or-later
from __future__ import annotations
from abc import ABC, abstractmethod
from enum import Enum, auto
from pathlib import Path
from typing import ClassVar
import numpy as np
from mantidimaging.core.utility.data_containers import ProjectionAngles, Counts
[docs]
class LogColumn(Enum):
TIMESTAMP = auto()
IMAGE_TYPE_IMAGE_COUNTER = auto()
PROJECTION_NUMBER = auto()
PROJECTION_ANGLE = auto()
COUNTS_BEFORE = auto()
COUNTS_AFTER = auto()
TIME_OF_FLIGHT = auto() # in seconds
SPECTRUM_COUNTS = auto()
LogDataType = dict[LogColumn, list[float | int]]
[docs]
class NoParserFound(RuntimeError):
pass
[docs]
class InvalidLog(RuntimeError):
pass
[docs]
class InstrumentLogParser(ABC):
"""
Base class for parsers
"""
def __init__(self, lines: list[str]):
self.lines = lines
def __init_subclass__(subcls) -> None:
"""Automatically register subclasses"""
InstrumentLog.register_parser(subcls)
[docs]
@classmethod
@abstractmethod
def match(cls, lines: list[str], filename: str) -> bool:
"""Check if the name and content of the file is likely to be readable by this parser."""
...
[docs]
@abstractmethod
def parse(self) -> LogDataType:
"""Parse the log file"""
...
[docs]
def cleaned_lines(self) -> list[str]:
return [line for line in self.lines if line.strip() != ""]
[docs]
class InstrumentLog:
"""Multiformat instrument log reader
New parsers can be implemented by subclassing InstrumentLogParser
"""
parsers: ClassVar[list[type[InstrumentLogParser]]] = []
parser: type[InstrumentLogParser]
data: LogDataType
length: int
def __init__(self, lines: list[str], source_file: Path):
self.lines = lines
self.source_file = source_file
self._find_parser()
self.parse()
def _find_parser(self) -> None:
for parser in self.parsers:
if parser.match(self.lines, self.source_file.name):
self.parser = parser
return
raise NoParserFound
[docs]
def parse(self) -> None:
self.data = self.parser(self.lines).parse()
lengths = [len(val) for val in self.data.values()]
if len(set(lengths)) != 1:
raise InvalidLog(f"Mismatch in column lengths: {lengths}")
self.length = lengths[0]
[docs]
@classmethod
def register_parser(cls, parser: type[InstrumentLogParser]) -> None:
cls.parsers.append(parser)
[docs]
def get_column(self, key: LogColumn) -> list[float]:
return self.data[key]
[docs]
def projection_numbers(self) -> np.ndarray:
return np.array(self.get_column(LogColumn.PROJECTION_NUMBER), dtype=np.uint32)
[docs]
def has_projection_angles(self) -> bool:
return LogColumn.PROJECTION_ANGLE in self.data
[docs]
def projection_angles(self) -> ProjectionAngles:
angles = np.array(self.get_column(LogColumn.PROJECTION_ANGLE), dtype=np.float64)
return ProjectionAngles(np.deg2rad(angles))
[docs]
def raise_if_angle_missing(self, image_filenames: list[str]) -> None:
image_numbers = [ifile[ifile.rfind("_") + 1:] for ifile in image_filenames]
if self.length != len(image_numbers):
RuntimeError(f"Log size mismatch. Found {self.length} log entries,"
f"but {len(image_numbers)} images")
if LogColumn.PROJECTION_NUMBER in self.data:
for projection_num, image_num in zip(self.projection_numbers(), image_numbers, strict=True):
if str(projection_num) not in image_num:
raise RuntimeError(f"Mismatching angle for projection {projection_num} "
f"was going to be used for image file {image_num}")
[docs]
def counts(self) -> Counts:
if not (LogColumn.COUNTS_BEFORE in self.data and LogColumn.COUNTS_AFTER in self.data):
raise ValueError("Log does not have counts")
counts_before = np.array(self.get_column(LogColumn.COUNTS_BEFORE))
counts_after = np.array(self.get_column(LogColumn.COUNTS_AFTER))
return Counts(counts_after - counts_before)