Source code for mantidimaging.core.utility.imat_log_file_parser

# Copyright (C) 2022 ISIS Rutherford Appleton Laboratory UKRI
# SPDX - License - Identifier: GPL-3.0-or-later

import csv
import re
from enum import Enum, auto
from itertools import zip_longest
from typing import Dict, List, Optional

import numpy

from mantidimaging.core.utility.data_containers import Counts, ProjectionAngles


def _get_projection_number(s: str) -> int:
    return int(re.sub(r"\D", "", s.split(":")[1]))


def _get_angle(s: str) -> str:
    return s[s.rfind(":") + 1:].strip()


[docs] class IMATLogColumn(Enum): TIMESTAMP = auto() # currently these 2 are the same column because # the log file formatting is inconsistent IMAGE_TYPE_IMAGE_COUNTER = auto() PROJECTION_NUMBER = auto() PROJECTION_ANGLE = auto() COUNTS_BEFORE = auto() COUNTS_AFTER = auto()
[docs] class TextLogParser: EXPECTED_HEADER_FOR_IMAT_TEXT_LOG_FILE = \ ' TIME STAMP IMAGE TYPE IMAGE COUNTER COUNTS BM3 before image COUNTS BM3 after image\n' def __init__(self, data: List[str]) -> None: self.data = [line.strip().split(" ") for line in data]
[docs] def parse(self) -> Dict[IMATLogColumn, List]: parsed_log: Dict[IMATLogColumn, List] = { IMATLogColumn.TIMESTAMP: [], IMATLogColumn.PROJECTION_NUMBER: [], IMATLogColumn.PROJECTION_ANGLE: [], IMATLogColumn.COUNTS_BEFORE: [], IMATLogColumn.COUNTS_AFTER: [] } # ignores the headers (index 0) as they're not the same as the data anyway # and index 1 is an empty line for line in self.data[2:]: parsed_log[IMATLogColumn.TIMESTAMP].append(line[0]) parsed_log[IMATLogColumn.PROJECTION_NUMBER].append(_get_projection_number(line[1])) parsed_log[IMATLogColumn.PROJECTION_ANGLE].append(float(_get_angle(line[1]))) parsed_log[IMATLogColumn.COUNTS_BEFORE].append(int(_get_angle(line[2]))) parsed_log[IMATLogColumn.COUNTS_AFTER].append(int(_get_angle(line[3]))) return parsed_log
[docs] @staticmethod def validate(file_contents) -> bool: if TextLogParser.EXPECTED_HEADER_FOR_IMAT_TEXT_LOG_FILE != file_contents[0].rstrip() + "\n": return False return True
[docs] class CSVLogParser: EXPECTED_HEADER_FOR_IMAT_CSV_LOG_FILE = \ "TIME STAMP,IMAGE TYPE,IMAGE COUNTER,COUNTS BM3 before image,COUNTS BM3 after image\n" def __init__(self, data: List[str]) -> None: self.data = data
[docs] def parse(self) -> Dict[IMATLogColumn, List]: parsed_log: Dict[IMATLogColumn, List] = { IMATLogColumn.TIMESTAMP: [], IMATLogColumn.PROJECTION_NUMBER: [], IMATLogColumn.PROJECTION_ANGLE: [], IMATLogColumn.COUNTS_BEFORE: [], IMATLogColumn.COUNTS_AFTER: [] } reader = csv.reader(self.data) # skip headings next(reader) for row in reader: parsed_log[IMATLogColumn.TIMESTAMP].append(row[0]) parsed_log[IMATLogColumn.PROJECTION_NUMBER].append(int(row[2])) angle_raw = row[3] parsed_log[IMATLogColumn.PROJECTION_ANGLE].append(float(_get_angle(angle_raw))) counts_before_raw = row[4] parsed_log[IMATLogColumn.COUNTS_BEFORE].append(int(_get_angle(counts_before_raw))) counts_after_raw = row[5] parsed_log[IMATLogColumn.COUNTS_AFTER].append(int(_get_angle(counts_after_raw))) return parsed_log
[docs] @staticmethod def validate(file_contents) -> bool: if CSVLogParser.EXPECTED_HEADER_FOR_IMAT_CSV_LOG_FILE != file_contents[0]: return False return True
[docs] class IMATLogFile: def __init__(self, data: List[str], source_file: str): self._source_file = source_file self.parser = self.find_parser(data) self._data = self.parser.parse()
[docs] @staticmethod def find_parser(data: List[str]): if TextLogParser.validate(data): return TextLogParser(data) elif CSVLogParser.validate(data): return CSVLogParser(data) else: raise RuntimeError("The format of the log file is not recognised.")
@property def source_file(self) -> str: return self._source_file
[docs] def projection_numbers(self): proj_nums = numpy.zeros(len(self._data[IMATLogColumn.PROJECTION_NUMBER]), dtype=numpy.uint32) proj_nums[:] = self._data[IMATLogColumn.PROJECTION_NUMBER] return proj_nums
[docs] def projection_angles(self) -> ProjectionAngles: angles = numpy.zeros(len(self._data[IMATLogColumn.PROJECTION_ANGLE])) angles[:] = self._data[IMATLogColumn.PROJECTION_ANGLE] return ProjectionAngles(numpy.deg2rad(angles))
[docs] def counts(self) -> Counts: counts = numpy.zeros(len(self._data[IMATLogColumn.COUNTS_BEFORE])) for i, [before, after] in enumerate(zip(self._data[IMATLogColumn.COUNTS_BEFORE], self._data[IMATLogColumn.COUNTS_AFTER])): # clips the string before the count number counts[i] = float(after) - float(before) return Counts(counts)
[docs] def raise_if_angle_missing(self, image_filenames: Optional[List[str]]) -> None: if image_filenames is None: return proj_numbers = self.projection_numbers() image_numbers = [ifile[ifile.rfind("_") + 1:] for ifile in image_filenames] if len(proj_numbers) < len(image_numbers): msg = "Missing projection from log. " elif len(proj_numbers) > len(image_numbers): msg = "Missing image file from sample data. " else: msg = "" msg += f"Found {len(proj_numbers)} angles, but {len(image_numbers)} images" for projection_num, image_num in zip_longest(proj_numbers, image_numbers): # is None happens if exactly the last projection/angle is missing if projection_num is None or image_num is None or str(projection_num) not in image_num: raise RuntimeError(f"{msg}\n\nMismatching angle for projection {projection_num} " f"was going to be used for image file {image_num}")