diff --git a/doc/user_guide/supported_formats/digitalsurf.rst b/doc/user_guide/supported_formats/digitalsurf.rst index 0f6610ccd..08f2705b9 100644 --- a/doc/user_guide/supported_formats/digitalsurf.rst +++ b/doc/user_guide/supported_formats/digitalsurf.rst @@ -3,16 +3,54 @@ DigitalSurf format (SUR & PRO) ------------------------------ -The ``.sur`` and ``.pro`` files are a format developed by the digitalsurf company to handle various types of -scientific measurements data such as profilometer, SEM, AFM, RGB(A) images, multilayer -surfaces and profiles. Even though it is essentially a surfaces format, 1D signals -are supported for spectra and spectral maps. Specifically, this file format is used -by Attolight SA for its scanning electron microscope cathodoluminescence -(SEM-CL) hyperspectral maps. Metadata parsing is supported, including user-specific -metadata, as well as the loading of files containing multiple objects packed together. - -The plugin was developed based on the MountainsMap software documentation, which -contains a description of the binary format. +``.sur`` and ``.pro`` is a format developed by digitalsurf to import/export data in the MountainsMap scientific +analysis software. Target datasets are originally (micro)-topography maps and profile from imaging instruments: +SEM, AFM, profilometery etc. RGB(A) images, multilayer surfaces and profiles are also supported. Even though it +is essentially a surfaces format, 1D signals are supported for spectra and spectral maps. Specifically, this is +the format used by Attolight for saving SEM-cathodoluminescence (SEM-CL) hyperspectral maps. This plugin was +developed based on the MountainsMap software documentation. + +Support for loading ``.sur`` and ``.pro`` files is complete, including parsing of custom metadata, and opening of +files containing multiple objects. Some rare, deprecated object types (e.g. force curves) are not supported, due +to no example data being available. Those can be added upon request to the module, if provided with example data +and a explanations. Unlike hyperspy.signal, ``.sur`` and ``.pro`` objects can be used to represent heterogeneous +data. For instance, float (topography) and int (rgb data) data can coexist along the same navigation dimension. +Those are casted to a homogeneous floating-point representation upon loading. + +Support for data saving is partial, as ``.sur`` and ``.pro`` do not support all features of hyperspy signals. Up +to 3d data arrays with either 1d (series of images) or 2d (spectral maps) navigation space can be saved. ``.sur`` +and ``.pro`` also do not support non-uniform axes and fitted models. Finally, MountainsMap maps intensities along +an axis with constant spacing between numbers by enforcing an integer-representation of the data with scaling and +offset. This means that export from float data is inherently lossy. + +Within these limitations, all features from ``.sur`` and ``.pro`` fileformats are supported. Data compression and +custom metadata allows a good interoperability of hyperspy and Mountainsmap. The file writer splits a signal into +the suitable digitalsurf dataobject. Primarily by inspecting its dimension and datatype. If ambiguity remains, it +inspects the names of signal axes and ``metadata.Signal.quantity``. The criteria are listed here below: + ++-----------------+---------------+------------------------------------------------------------------------------+ +| Nav. dimension | Sig dimension | Extension and MountainsMap subclass | ++=================+===============+==============================================================================+ +| 0 | 1 | ``.pro``: Spectrum (based on axes name), Profile (default) | ++-----------------+---------------+------------------------------------------------------------------------------+ +| 0 | 2 | ``.sur``: BinaryImage (based on dtype), RGBImage (based on dtype), | +| | | Surface (default) | ++-----------------+---------------+------------------------------------------------------------------------------+ +| 1 | 0 | ``.pro``: same as (0,1) | ++-----------------+---------------+------------------------------------------------------------------------------+ +| 1 | 1 | ``.pro``: Spectrum Serie (based on axes name), Profile Serie (default) | ++-----------------+---------------+------------------------------------------------------------------------------+ +| 1 | 2 | ``.sur``: RGBImage Serie (based on dtype), Surface Series (default) | ++-----------------+---------------+------------------------------------------------------------------------------+ +| 2 | 0 | ``.sur``: same as (0,2) | ++-----------------+---------------+------------------------------------------------------------------------------+ +| 2 | 1 | ``.sur``: hyperspectralMap (default) | ++-----------------+---------------+------------------------------------------------------------------------------+ + +Axes named one of ``Wavelength``, ``Energy``, ``Energy Loss`` or ``E`` are considered spectral. A quantity named +one of ``Height``, ``Altitude``, ``Elevation``, ``Depth`` or ``Z`` is considered a surface. The difference between +Surface and IntensitySurface stems from the AFM / profilometry origin of MountainsMap. "Surface" has its proper +meaning of being a 2d-subset of 3d space, whereas "IntensitySurface" is a mere 2D mapping of an arbitrary quantity. API functions ^^^^^^^^^^^^^ diff --git a/rsciio/digitalsurf/__init__.py b/rsciio/digitalsurf/__init__.py index 40459e88b..4627e25ea 100644 --- a/rsciio/digitalsurf/__init__.py +++ b/rsciio/digitalsurf/__init__.py @@ -1,8 +1,6 @@ -from ._api import file_reader +from ._api import file_reader, file_writer, parse_metadata -__all__ = [ - "file_reader", -] +__all__ = ["file_reader", "file_writer", "parse_metadata"] def __dir__(): diff --git a/rsciio/digitalsurf/_api.py b/rsciio/digitalsurf/_api.py index e81695cb4..cdc78e718 100644 --- a/rsciio/digitalsurf/_api.py +++ b/rsciio/digitalsurf/_api.py @@ -23,17 +23,18 @@ # comments can be systematically parsed into metadata and write a support for # original_metadata or other +import ast +import datetime import logging import os +import re import struct -import sys import warnings import zlib +from copy import deepcopy # Commented for now because I don't know what purpose it serves # import traits.api as t -from copy import deepcopy - # Dateutil allows to parse date but I don't think it's useful here # import dateutil.parser import numpy as np @@ -45,12 +46,85 @@ # import rsciio.utils.tools # DictionaryTreeBrowser class handles the fancy metadata dictionnaries # from hyperspy.misc.utils import DictionaryTreeBrowser -from rsciio._docstrings import FILENAME_DOC, LAZY_UNSUPPORTED_DOC, RETURNS_DOC +from rsciio._docstrings import ( + FILENAME_DOC, + LAZY_UNSUPPORTED_DOC, + RETURNS_DOC, + SIGNAL_DOC, +) +from rsciio.utils.date_time_tools import get_date_time_from_metadata from rsciio.utils.exceptions import MountainsMapFileError +from rsciio.utils.rgb_tools import is_rgb, is_rgba _logger = logging.getLogger(__name__) +def parse_metadata(cmt: str, prefix: str = "$", delimiter: str = "=") -> dict: + """ + Parse metadata from the comment field of a digitalsurf file, or any other + str in similar formatting. Return it as a hyperspy-compatible nested dict. + + Parameters + ---------- + cmt : str + Str containing contents of a digitalsurf file "comment" field. + prefix : str + Prefix character, must be present at the start of each line, + otherwise the line is ignored. ``"$"`` for digitalsurf files, + typically an empty string (``""``) when parsing from text files. + Default is ``"$"``. + delimiter : str + Character that delimit key-value pairs in digitalsurf comment. + Default is ``"="``. + + Returns + ------- + dict + Nested dictionnary of the metadata. + """ + # dict_ms is created as an empty dictionnary + dict_md = {} + # Title lines start with an underscore + titlestart = "{:s}_".format(prefix) + + key_main = None + + for line in cmt.splitlines(): + # Here we ignore any empty line or line starting with @@ + ignore = False + if not line.strip() or line.startswith("@@"): + ignore = True + # If the line must not be ignored + if not ignore: + if line.startswith(titlestart): + # We strip keys from whitespace at the end and beginning + key_main = line[len(titlestart) :].strip() + dict_md[key_main] = {} + elif line.startswith(prefix): + if key_main is None: + key_main = "UNTITLED" + dict_md[key_main] = {} + key, *li_value = line.split(delimiter) + # Key is also stripped from beginning or end whitespace + key = key[len(prefix) :].strip() + str_value = li_value[0] if len(li_value) > 0 else "" + # remove whitespace at the beginning of value + str_value = str_value.strip() + li_value = str_value.split(" ") + try: + if key == "Grating": + dict_md[key_main][key] = li_value[ + 0 + ] # we don't want to eval this one + else: + dict_md[key_main][key] = ast.literal_eval(li_value[0]) + except Exception: + dict_md[key_main][key] = li_value[0] + if len(li_value) > 1: + dict_md[key_main][key + "_units"] = li_value[1] + return dict_md + + class DigitalSurfHandler(object): """Class to read Digital Surf MountainsMap files. @@ -81,26 +155,28 @@ class DigitalSurfHandler(object): 6: "_MERIDIANDISC", 7: "_MULTILAYERPROFILE", 8: "_MULTILAYERSURFACE", - 9: "_PARALLELDISC", + 9: "_PARALLELDISC", # not implemented 10: "_INTENSITYIMAGE", 11: "_INTENSITYSURFACE", 12: "_RGBIMAGE", - 13: "_RGBSURFACE", - 14: "_FORCECURVE", - 15: "_SERIEOFFORCECURVE", - 16: "_RGBINTENSITYSURFACE", + 13: "_RGBSURFACE", # Deprecated + 14: "_FORCECURVE", # Deprecated + 15: "_SERIEOFFORCECURVE", # Deprecated + 16: "_RGBINTENSITYSURFACE", # Surface + Image + 17: "_CONTOURPROFILE", + 18: "_SERIESOFRGBIMAGES", 20: "_SPECTRUM", 21: "_HYPCARD", } - def __init__(self, filename=None): + def __init__(self, filename: str): # We do not need to check for file existence here because # io module implements it in the load function self.filename = filename # The signal_dict dictionnary has to be returned by the - # file_reader function. Apparently original_metadata needs - # to be set + # file_reader function. By default, we return the minimal + # mandatory fields self.signal_dict = { "data": np.empty((0, 0, 0)), "axes": [], @@ -115,8 +191,8 @@ def __init__(self, filename=None): # _work_dict['Field']['b_pack_fn'](f,v): pack value v in file f self._work_dict = { "_01_Signature": { - "value": "DSCOMPRESSED", - "b_unpack_fn": lambda f: self._get_str(f, 12, "DSCOMPRESSED"), + "value": "DSCOMPRESSED", # Uncompressed key is DIGITAL SURF + "b_unpack_fn": lambda f: self._get_str(f, 12), "b_pack_fn": lambda f, v: self._set_str(f, v, 12), }, "_02_Format": { @@ -126,8 +202,8 @@ def __init__(self, filename=None): }, "_03_Number_of_Objects": { "value": 1, - "b_unpack_fn": self._get_int16, - "b_pack_fn": self._set_int16, + "b_unpack_fn": self._get_uint16, + "b_pack_fn": self._set_uint16, }, "_04_Version": { "value": 1, @@ -141,12 +217,18 @@ def __init__(self, filename=None): }, "_06_Object_Name": { "value": "", - "b_unpack_fn": lambda f: self._get_str(f, 30, "DOSONLY"), + "b_unpack_fn": lambda f: self._get_str( + f, + 30, + ), "b_pack_fn": lambda f, v: self._set_str(f, v, 30), }, "_07_Operator_Name": { - "value": "", - "b_unpack_fn": lambda f: self._get_str(f, 30, ""), + "value": "ROSETTA", + "b_unpack_fn": lambda f: self._get_str( + f, + 30, + ), "b_pack_fn": lambda f, v: self._set_str(f, v, 30), }, "_08_P_Size": { @@ -181,12 +263,12 @@ def __init__(self, filename=None): }, "_14_W_Size": { "value": 0, - "b_unpack_fn": self._get_int32, - "b_pack_fn": self._set_int32, + "b_unpack_fn": self._get_uint32, + "b_pack_fn": self._set_uint32, }, "_15_Size_of_Points": { "value": 16, - "b_unpack_fn": lambda f: self._get_int16(f, 32), + "b_unpack_fn": self._get_int16, "b_pack_fn": self._set_int16, }, "_16_Zmin": { @@ -200,17 +282,17 @@ def __init__(self, filename=None): "b_pack_fn": self._set_int32, }, "_18_Number_of_Points": { - "value": 0, + "value": 1, "b_unpack_fn": self._get_int32, "b_pack_fn": self._set_int32, }, "_19_Number_of_Lines": { - "value": 0, + "value": 1, "b_unpack_fn": self._get_int32, "b_pack_fn": self._set_int32, }, "_20_Total_Nb_of_Pts": { - "value": 0, + "value": 1, "b_unpack_fn": self._get_int32, "b_pack_fn": self._set_int32, }, @@ -231,47 +313,47 @@ def __init__(self, filename=None): }, "_24_Name_of_X_Axis": { "value": "X", - "b_unpack_fn": lambda f: self._get_str(f, 16, "X"), + "b_unpack_fn": lambda f: self._get_str(f, 16), "b_pack_fn": lambda f, v: self._set_str(f, v, 16), }, "_25_Name_of_Y_Axis": { "value": "Y", - "b_unpack_fn": lambda f: self._get_str(f, 16, "Y"), + "b_unpack_fn": lambda f: self._get_str(f, 16), "b_pack_fn": lambda f, v: self._set_str(f, v, 16), }, "_26_Name_of_Z_Axis": { "value": "Z", - "b_unpack_fn": lambda f: self._get_str(f, 16, "Z"), + "b_unpack_fn": lambda f: self._get_str(f, 16), "b_pack_fn": lambda f, v: self._set_str(f, v, 16), }, "_27_X_Step_Unit": { "value": "um", - "b_unpack_fn": lambda f: self._get_str(f, 16, "um"), + "b_unpack_fn": lambda f: self._get_str(f, 16), "b_pack_fn": lambda f, v: self._set_str(f, v, 16), }, "_28_Y_Step_Unit": { "value": "um", - "b_unpack_fn": lambda f: self._get_str(f, 16, "um"), + "b_unpack_fn": lambda f: self._get_str(f, 16), "b_pack_fn": lambda f, v: self._set_str(f, v, 16), }, "_29_Z_Step_Unit": { "value": "um", - "b_unpack_fn": lambda f: self._get_str(f, 16, "um"), + "b_unpack_fn": lambda f: self._get_str(f, 16), "b_pack_fn": lambda f, v: self._set_str(f, v, 16), }, "_30_X_Length_Unit": { "value": "um", - "b_unpack_fn": lambda f: self._get_str(f, 16, "um"), + "b_unpack_fn": lambda f: self._get_str(f, 16), "b_pack_fn": lambda f, v: self._set_str(f, v, 16), }, "_31_Y_Length_Unit": { "value": "um", - "b_unpack_fn": lambda f: self._get_str(f, 16, "um"), + "b_unpack_fn": lambda f: self._get_str(f, 16), "b_pack_fn": lambda f, v: self._set_str(f, v, 16), }, "_32_Z_Length_Unit": { "value": "um", - "b_unpack_fn": lambda f: self._get_str(f, 16, "um"), + "b_unpack_fn": lambda f: self._get_str(f, 16), "b_pack_fn": lambda f, v: self._set_str(f, v, 16), }, "_33_X_Unit_Ratio": { @@ -305,7 +387,7 @@ def __init__(self, filename=None): "b_pack_fn": self._set_int16, }, "_39_Obsolete": { - "value": 0, + "value": b"", "b_unpack_fn": lambda f: self._get_bytes(f, 12), "b_pack_fn": lambda f, v: self._set_bytes(f, v, 12), }, @@ -355,7 +437,7 @@ def __init__(self, filename=None): "b_pack_fn": self._set_uint32, }, "_49_Obsolete": { - "value": 0, + "value": b"", "b_unpack_fn": lambda f: self._get_bytes(f, 6), "b_pack_fn": lambda f, v: self._set_bytes(f, v, 6), }, @@ -370,7 +452,7 @@ def __init__(self, filename=None): "b_pack_fn": self._set_int16, }, "_52_Client_zone": { - "value": 0, + "value": b"", "b_unpack_fn": lambda f: self._get_bytes(f, 128), "b_pack_fn": lambda f, v: self._set_bytes(f, v, 128), }, @@ -401,12 +483,12 @@ def __init__(self, filename=None): }, "_58_T_Axis_Name": { "value": "T", - "b_unpack_fn": lambda f: self._get_str(f, 13, "Wavelength"), + "b_unpack_fn": lambda f: self._get_str(f, 13), "b_pack_fn": lambda f, v: self._set_str(f, v, 13), }, "_59_T_Step_Unit": { "value": "um", - "b_unpack_fn": lambda f: self._get_str(f, 13, "nm"), + "b_unpack_fn": lambda f: self._get_str(f, 13), "b_pack_fn": lambda f, v: self._set_str(f, v, 13), }, "_60_Comment": { @@ -415,14 +497,14 @@ def __init__(self, filename=None): "b_pack_fn": self._pack_comment, }, "_61_Private_zone": { - "value": 0, + "value": b"", "b_unpack_fn": self._unpack_private, "b_pack_fn": self._pack_private, }, "_62_points": { "value": 0, "b_unpack_fn": self._unpack_data, - "b_pack_fn": lambda f, v: 0, # Not implemented + "b_pack_fn": self._pack_data, }, } @@ -439,9 +521,671 @@ def __init__(self, filename=None): self._Object_type = "_UNKNOWN" # Number of data objects in the file. - self._N_data_object = 1 + self._N_data_objects = 1 self._N_data_channels = 1 + # Attributes useful for save and export + + # Number of nav / sig axes + self._n_ax_nav: int = 0 + self._n_ax_sig: int = 0 + + # All as a rsciio-convention axis dict or empty + self.Xaxis: dict = {} + self.Yaxis: dict = {} + self.Zaxis: dict = {} + self.Taxis: dict = {} + + # These must be set in the split functions + self.data_split = [] + self.objtype_split = [] + + # File Writer Inner methods + + def _write_sur_file(self): + """Write self._list_sur_file_content to a file. This method is + start-and-forget. The brainwork is performed in the construction + of sur_file_content list of dictionaries.""" + + with open(self.filename, "wb") as f: + for dic in self._list_sur_file_content: + # Extremely important! self._work_dict must access + # other fields to properly encode and decode data, + # comments etc. etc. + self._move_values_to_workdict(dic) + # Then inner consistency is trivial + for key in self._work_dict: + self._work_dict[key]["b_pack_fn"](f, self._work_dict[key]["value"]) + + def _build_sur_file_contents( + self, + set_comments: str = "auto", + is_special: bool = False, + compressed: bool = True, + comments: dict = {}, + object_name: str = "", + operator_name: str = "", + absolute: int = 0, + private_zone: bytes = b"", + client_zone: bytes = b"", + ): + """Build the _sur_file_content list necessary to write a signal dictionary to + a ``.sur`` or ``.pro`` file. The signal dictionary's inner consistency is the + responsibility of hyperspy, and the this function's responsibility is to make + a consistent list of _sur_file_content.""" + + self._list_sur_file_content = [] + + # Compute number of navigation / signal axes + self._n_ax_nav, self._n_ax_sig = DigitalSurfHandler._get_n_axes( + self.signal_dict + ) + + # Choose object type based on number of navigation and signal axes + # Populate self._Object_type + # Populate self.Xaxis, self.Yaxis, self.Taxis (if not empty) + # Populate self.data_split and self.objtype_split (always) + self._split_signal_dict() + + # Raise error if wrong extension + # self._validate_filename() + + # Get a dictionary to be saved in the comment fielt of exported file + comment_dict = self._get_comment_dict( + self.signal_dict["original_metadata"], method=set_comments, custom=comments + ) + # Convert the dictionary to a string of suitable format. + comment_str = self._stringify_dict(comment_dict) + + # A _work_dict is created for each of the data arrays and object + # that have splitted from the main object. In most cases, only a + # single object is present in the split. + for data, objtype in zip(self.data_split, self.objtype_split): + self._build_workdict( + data, + objtype, + self.signal_dict["metadata"], + comment=comment_str, + is_special=is_special, + compressed=compressed, + object_name=object_name, + operator_name=operator_name, + absolute=absolute, + private_zone=private_zone, + client_zone=client_zone, + ) + # if the objects are multiple, comment is erased after the first + # object. This is not mandatory, but makes marginally smaller files. + if comment_str: + comment_str = "" + + # Finally we push it all to the content list. + self._append_work_dict_to_content() + + # Signal dictionary analysis methods + @staticmethod + def _get_n_axes(sig_dict: dict): + """Return number of navigation and signal axes in the signal dict (in that order). + Could be moved away from the .sur api as other functions probably use this as well + + Args: + sig_dict (dict): signal dict, has to contain keys: 'data', 'axes', 'metadata' + + Returns: + Tuple[int,int]: nax_nav,nax_sig. Number of navigation and signal axes + """ + nax_nav = 0 + nax_sig = 0 + for ax in sig_dict["axes"]: + if ax["navigate"]: + nax_nav += 1 + else: + nax_sig += 1 + return nax_nav, nax_sig + + def _is_spectrum(self) -> bool: + """Determine if a signal is a spectrum type based on axes naming + for export of sur_files. Could be cross-checked with other criteria + such as hyperspy subclass etc... For now we keep it simple. If it has + an ax named like a spectral axis, then probably its a spectrum.""" + + spectrumlike_axnames = ["Wavelength", "Energy", "Energy Loss", "E"] + is_spec = False + + for ax in self.signal_dict["axes"]: + if ax["name"] in spectrumlike_axnames: + is_spec = True + + return is_spec + + def _is_binary(self) -> bool: + return self.signal_dict["data"].dtype == bool + + # Splitting /subclassing methods + def _split_signal_dict(self): + """Select the suitable _mountains_object_types""" + + n_nav = self._n_ax_nav + n_sig = self._n_ax_sig + + # Here, I manually unfold the nested conditions for legibility. + # Since there are a fixed number of dimensions supported by + # digitalsurf .sur/.pro files, I think this is the best way to + # proceed. + if (n_nav, n_sig) == (0, 1): + if self._is_spectrum(): + self._split_spectrum() + else: + self._split_profile() + elif (n_nav, n_sig) == (0, 2): + if self._is_binary(): + self._split_binary_img() + elif is_rgb(self.signal_dict["data"]): # "_RGBIMAGE" + self._split_rgb() + elif is_rgba(self.signal_dict["data"]): + warnings.warn( + "A channel discarded upon saving \ + RGBA signal in .sur format" + ) + self._split_rgb() + else: # _INTENSITYSURFACE + self._split_surface() + elif (n_nav, n_sig) == (1, 0): + warnings.warn( + f"Exporting surface signal dimension {n_sig} and navigation dimension \ + {n_nav} falls back on profile type but is not good practice. Consider \ + transposing before saving to avoid unexpected behaviour." + ) + self._split_profile() + elif (n_nav, n_sig) == (1, 1): + if self._is_spectrum(): + self._split_spectrum() + else: + self._split_profileserie() + elif (n_nav, n_sig) == (1, 2): + if is_rgb(self.signal_dict["data"]): + self._split_rgbserie() + elif is_rgba(self.signal_dict["data"]): + warnings.warn( + "Alpha channel discarded upon saving RGBA signal in .sur format" + ) + self._split_rgbserie() + else: + self._split_surfaceserie() + elif (n_nav, n_sig) == (2, 0): + warnings.warn( + f"Signal dimension {n_sig} and navigation dimension {n_nav} exported as surface type. Consider transposing signal object before exporting if this is intentional." + ) + if self._is_binary(): + self._split_binary_img() + elif is_rgb(self.signal_dict["data"]): # "_RGBIMAGE" + self._split_rgb() + elif is_rgba(self.signal_dict["data"]): + warnings.warn( + "A channel discarded upon saving \ + RGBA signal in .sur format" + ) + self._split_rgb() + else: + self._split_surface() + elif (n_nav, n_sig) == (2, 1): + self._split_hyperspectral() + else: + raise MountainsMapFileError( + msg=f"Object with signal dimension {n_sig} and navigation dimension {n_nav} not supported for .sur export" + ) + + def _split_spectrum( + self, + ): + """Set _Object_type, axes except Z, data_split, objtype_split _N_data_objects, _N_data_channels""" + # When splitting spectrum, no series axis (T/W), + # X axis is the spectral dimension and Y the series dimension (if series). + obj_type = 20 + self._Object_type = self._mountains_object_types[obj_type] + + nax_nav = self._n_ax_nav + nax_sig = self._n_ax_sig + + # _split_signal_dict ensures that the correct dims are sent here. + if (nax_nav, nax_sig) == (0, 1) or (nax_nav, nax_sig) == (1, 0): + self.Xaxis = self.signal_dict["axes"][0] + elif (nax_nav, nax_sig) == (1, 1): + self.Xaxis = next( + ax for ax in self.signal_dict["axes"] if not ax["navigate"] + ) + self.Yaxis = next(ax for ax in self.signal_dict["axes"] if ax["navigate"]) + + self.data_split = [self.signal_dict["data"]] + self.objtype_split = [obj_type] + self._N_data_objects = 1 + self._N_data_channels = 1 + + def _split_profile( + self, + ): + """Set _Object_type, axes except Z, data_split, objtype_split _N_data_objects, _N_data_channels""" + + obj_type = 1 + self._Object_type = self._mountains_object_types[obj_type] + self.Xaxis = self.signal_dict["axes"][0] + self.data_split = [self.signal_dict["data"]] + self.objtype_split = [obj_type] + self._N_data_objects = 1 + self._N_data_channels = 1 + + def _split_profileserie( + self, + ): + """Set _Object_type, axes except Z, data_split, objtype_split _N_data_objects, _N_data_channels""" + obj_type = 4 # '_PROFILESERIE' + self._Object_type = self._mountains_object_types[obj_type] + + self.Xaxis = next(ax for ax in self.signal_dict["axes"] if not ax["navigate"]) + self.Taxis = next(ax for ax in self.signal_dict["axes"] if ax["navigate"]) + + self.data_split = self._split_data_alongaxis(self.Taxis) + self.objtype_split = [obj_type] + [1] * (len(self.data_split) - 1) + self._N_data_objects = len(self.objtype_split) + self._N_data_channels = 1 + + def _split_binary_img( + self, + ): + """Set _Object_type, axes except Z, data_split, objtype_split _N_data_objects, _N_data_channels""" + obj_type = 3 + self._Object_type = self._mountains_object_types[obj_type] + + self.Xaxis = self.signal_dict["axes"][1] + self.Yaxis = self.signal_dict["axes"][0] + + self.data_split = [self.signal_dict["data"]] + self.objtype_split = [obj_type] + self._N_data_objects = 1 + self._N_data_channels = 1 + + def _split_rgb( + self, + ): + """Set _Object_type, axes except Z, data_split, objtype_split _N_data_objects, _N_data_channels""" + obj_type = 12 + self._Object_type = self._mountains_object_types[obj_type] + self.Xaxis = self.signal_dict["axes"][1] + self.Yaxis = self.signal_dict["axes"][0] + self.data_split = [ + np.int32(self.signal_dict["data"]["R"]), + np.int32(self.signal_dict["data"]["G"]), + np.int32(self.signal_dict["data"]["B"]), + ] + self.objtype_split = [obj_type] + [10, 10] + self._N_data_objects = 1 + self._N_data_channels = 3 + + def _split_surface( + self, + ): + """Set _Object_type, axes except Z, data_split, objtype_split _N_data_objects, _N_data_channels""" + obj_type = 2 + self._Object_type = self._mountains_object_types[obj_type] + self.Xaxis = self.signal_dict["axes"][1] + self.Yaxis = self.signal_dict["axes"][0] + self.data_split = [self.signal_dict["data"]] + self.objtype_split = [obj_type] + self._N_data_objects = 1 + self._N_data_channels = 1 + + def _split_rgbserie(self): + """Set _Object_type, axes except Z, data_split, objtype_split _N_data_objects, _N_data_channels""" + obj_type = 18 # "_SERIESOFRGBIMAGE" + self._Object_type = self._mountains_object_types[obj_type] + + sigaxes_iter = iter(ax for ax in self.signal_dict["axes"] if not ax["navigate"]) + self.Yaxis = next(sigaxes_iter) + self.Xaxis = next(sigaxes_iter) + self.Taxis = next(ax for ax in self.signal_dict["axes"] if ax["navigate"]) + tmp_data_split = self._split_data_alongaxis(self.Taxis) + + # self.data_split = [] + self.objtype_split = [] + for d in tmp_data_split: + self.data_split += [ + d["R"].astype(np.int16).copy(), + d["G"].astype(np.int16).copy(), + d["B"].astype(np.int16).copy(), + ] + # self.objtype_split += [12,10,10] + self.objtype_split = [12, 10, 10] * self.Taxis["size"] + self.objtype_split[0] = obj_type + # self.data_split = rgbx2regular_array(self.signal_dict['data']) + + self._N_data_objects = self.Taxis["size"] + self._N_data_channels = 3 + + def _split_surfaceserie(self): + """Set _Object_type, axes except Z, data_split, objtype_split _N_data_objects, _N_data_channels""" + obj_type = 5 + self._Object_type = self._mountains_object_types[obj_type] + sigaxes_iter = iter(ax for ax in self.signal_dict["axes"] if not ax["navigate"]) + self.Yaxis = next(sigaxes_iter) + self.Xaxis = next(sigaxes_iter) + self.Taxis = next(ax for ax in self.signal_dict["axes"] if ax["navigate"]) + self.data_split = self._split_data_alongaxis(self.Taxis) + self.objtype_split = [2] * len(self.data_split) + self.objtype_split[0] = obj_type + self._N_data_objects = len(self.data_split) + self._N_data_channels = 1 + + def _split_hyperspectral(self): + """Set _Object_type, axes except Z, data_split, objtype_split _N_data_objects, _N_data_channels""" + obj_type = 21 + self._Object_type = self._mountains_object_types[obj_type] + sigaxes_iter = iter(ax for ax in self.signal_dict["axes"] if ax["navigate"]) + self.Yaxis = next(sigaxes_iter) + self.Xaxis = next(sigaxes_iter) + self.Taxis = next(ax for ax in self.signal_dict["axes"] if not ax["navigate"]) + self.data_split = [self.signal_dict["data"]] + self.objtype_split = [obj_type] + self._N_data_objects = 1 + self._N_data_channels = 1 + + def _split_data_alongaxis(self, axis: dict): + """Split the data in a series of lower-dim datasets that can be exported to + a surface / profile file""" + idx = self.signal_dict["axes"].index(axis) + # return idx + datasplit = [] + for dslice in np.rollaxis(self.signal_dict["data"], idx): + datasplit.append(dslice) + return datasplit + + def _norm_data(self, data: np.ndarray, is_special: bool): + """Normalize input data to 16-bits or 32-bits ints and initialize an axis on which the data is normalized. + + Args: + data (np.ndarray): dataset + is_special (bool): whether NaNs get sent to N.M points in the sur format and apply saturation + + Raises: + MountainsMapFileError: raised if input is of complex type + MountainsMapFileError: raised if input is of unsigned int type + MountainsMapFileError: raised if input is of int > 32 bits type + + Returns: + tuple[int,int,int,float,float,np.ndarray[int]]: pointsize, Zmin, Zmax, Zscale, Zoffset, data_int + """ + data_type = data.dtype + + if np.issubdtype(data_type, np.complexfloating): + raise MountainsMapFileError( + "digitalsurf file formats do not support export of complex data. Convert data to real-value representations before before export" + ) + elif np.issubdtype(data_type, bool): + pointsize = 16 + Zmin = 0 + Zmax = 1 + Zscale = 1 + Zoffset = 0 + data_int = data.astype(np.int16) + elif data_type == np.uint8: + warnings.warn("np.uint8 datatype exported as np.int16.") + pointsize = 16 + Zmin, Zmax, Zscale, Zoffset = self._norm_signed_int(data, is_special) + data_int = data.astype(np.int16) + elif data_type == np.uint16: + warnings.warn("np.uint16 datatype exported as np.int32") + pointsize = 32 # Pointsize has to be 16 or 32 in surf format + Zmin, Zmax, Zscale, Zoffset = self._norm_signed_int(data, is_special) + data_int = data.astype(np.int32) + elif np.issubdtype(data_type, np.unsignedinteger): + raise MountainsMapFileError( + "digitalsurf file formats do not support unsigned int >16bits. Convert data to signed integers before export." + ) + elif data_type == np.int8: + pointsize = 16 # Pointsize has to be 16 or 32 in surf format + Zmin, Zmax, Zscale, Zoffset = self._norm_signed_int(data, is_special) + data_int = data.astype(np.int16) + elif data_type == np.int16: + pointsize = 16 + Zmin, Zmax, Zscale, Zoffset = self._norm_signed_int(data, is_special) + data_int = data + elif data_type == np.int32: + pointsize = 32 + data_int = data + Zmin, Zmax, Zscale, Zoffset = self._norm_signed_int(data, is_special) + elif np.issubdtype(data_type, np.integer): + raise MountainsMapFileError( + "digitalsurf file formats do not support export integers larger than 32 bits. Convert data to 32-bit representation before exporting" + ) + elif np.issubdtype(data_type, np.floating): + pointsize = 32 + Zmin, Zmax, Zscale, Zoffset, data_int = self._norm_float(data, is_special) + + return pointsize, Zmin, Zmax, Zscale, Zoffset, data_int + + def _norm_signed_int(self, data: np.ndarray, is_special: bool = False): + """Normalized data of integer type. No normalization per se, but the Zmin and Zmax + threshold are set if saturation flagging is asked.""" + # There are no NaN values for integers. Special points means saturation of integer scale. + data_int_min = np.iinfo(data.dtype).min + data_int_max = np.iinfo(data.dtype).max + + is_satlo = (data == data_int_min).sum() >= 1 and is_special + is_sathi = (data == data_int_max).sum() >= 1 and is_special + + Zmin = data_int_min + 1 if is_satlo else data.min() + Zmax = data_int_max - 1 if is_sathi else data.max() + Zscale = 1.0 + Zoffset = Zmin + + return Zmin, Zmax, Zscale, Zoffset + + def _norm_float( + self, + data: np.ndarray, + is_special: bool = False, + ): + """Normalize float data on a 32 bits int scale. Inherently lossy + but that's how things are with mountainsmap files.""" + + Zoffset_f = np.nanmin(data) + Zmax_f = np.nanmax(data) + is_nan = np.any(np.isnan(data)) + + if is_special and is_nan: + Zmin = -(2 ** (32 - 1)) + 2 + Zmax = 2**32 + Zmin - 3 + else: + Zmin = -(2 ** (32 - 1)) + Zmax = 2**32 + Zmin - 1 + + Zscale = (Zmax_f - Zoffset_f) / (Zmax - Zmin) + data_int = (data - Zoffset_f) / Zscale + Zmin + + if is_special and is_nan: + data_int[np.isnan(data)] = Zmin - 2 + + data_int = data_int.astype(np.int32) + + return Zmin, Zmax, Zscale, Zoffset_f, data_int + + def _get_Zname_Zunit(self, metadata: dict): + """Attempt reading Z-axis name and Unit from metadata.Signal.Quantity field. + Return empty str if do not exist. + + Returns: + tuple[str,str]: Zname,Zunit + """ + quantitystr: str = metadata.get("Signal", {}).get("quantity", "") + quantitystr = quantitystr.strip() + quantity = quantitystr.split(" ") + if len(quantity) > 1: + Zunit = quantity.pop() + Zunit = Zunit.strip("()") + Zname = " ".join(quantity) + elif len(quantity) == 1: + Zname = quantity.pop() + Zunit = "" + + return Zname, Zunit + + def _build_workdict( + self, + data: np.ndarray, + obj_type: int, + metadata: dict = {}, + comment: str = "", + is_special: bool = True, + compressed: bool = True, + object_name: str = "", + operator_name: str = "", + absolute: int = 0, + private_zone: bytes = b"", + client_zone: bytes = b"", + ): + """Populate _work_dict with the""" + + if not compressed: + self._work_dict["_01_Signature"]["value"] = ( + "DIGITAL SURF" # DSCOMPRESSED by default + ) + else: + self._work_dict["_01_Signature"]["value"] = ( + "DSCOMPRESSED" # DSCOMPRESSED by default + ) + + # self._work_dict['_02_Format']['value'] = 0 # Dft. other possible value is 257 for MacintoshII computers with Motorola CPUs. Obv not supported... + self._work_dict["_03_Number_of_Objects"]["value"] = self._N_data_objects + # self._work_dict['_04_Version']['value'] = 1 # Version number. Always default. + self._work_dict["_05_Object_Type"]["value"] = obj_type + self._work_dict["_06_Object_Name"]["value"] = ( + object_name # Obsolete, DOS-version only (Not supported) + ) + self._work_dict["_07_Operator_Name"]["value"] = ( + operator_name # Should be settable from kwargs + ) + self._work_dict["_08_P_Size"]["value"] = self._N_data_channels + + self._work_dict["_09_Acquisition_Type"]["value"] = ( + 0 # AFM data only, could be inferred + ) + self._work_dict["_10_Range_Type"]["value"] = ( + 0 # Only 1 for high-range (z-stage scanning), AFM data only, could be inferred + ) + + self._work_dict["_11_Special_Points"]["value"] = int(is_special) + + self._work_dict["_12_Absolute"]["value"] = ( + absolute # Probably irrelevant in most cases. Absolute vs rel heights (for profilometers), can be inferred + ) + self._work_dict["_13_Gauge_Resolution"]["value"] = ( + 0.0 # Probably irrelevant. Only for profilometers (maybe AFM), can be inferred + ) + + # T-axis acts as W-axis for spectrum / hyperspectrum surfaces. + if obj_type in [21]: + ws = self.Taxis.get("size", 0) + else: + ws = 0 + self._work_dict["_14_W_Size"]["value"] = ws + + bsize, Zmin, Zmax, Zscale, Zoffset, data_int = self._norm_data(data, is_special) + Zname, Zunit = self._get_Zname_Zunit(metadata) + + # Axes element set regardless of object size + self._work_dict["_15_Size_of_Points"]["value"] = bsize + self._work_dict["_16_Zmin"]["value"] = Zmin + self._work_dict["_17_Zmax"]["value"] = Zmax + self._work_dict["_18_Number_of_Points"]["value"] = self.Xaxis.get("size", 1) + self._work_dict["_19_Number_of_Lines"]["value"] = self.Yaxis.get("size", 1) + # This needs to be this way due to the way we export our hyp maps + self._work_dict["_20_Total_Nb_of_Pts"]["value"] = self.Xaxis.get( + "size", 1 + ) * self.Yaxis.get("size", 1) + + self._work_dict["_21_X_Spacing"]["value"] = self.Xaxis.get("scale", 0.0) + self._work_dict["_22_Y_Spacing"]["value"] = self.Yaxis.get("scale", 0.0) + self._work_dict["_23_Z_Spacing"]["value"] = Zscale + self._work_dict["_24_Name_of_X_Axis"]["value"] = self.Xaxis.get("name", "") + self._work_dict["_25_Name_of_Y_Axis"]["value"] = self.Yaxis.get("name", "") + self._work_dict["_26_Name_of_Z_Axis"]["value"] = Zname + self._work_dict["_27_X_Step_Unit"]["value"] = self.Xaxis.get("units", "") + self._work_dict["_28_Y_Step_Unit"]["value"] = self.Yaxis.get("units", "") + self._work_dict["_29_Z_Step_Unit"]["value"] = Zunit + self._work_dict["_30_X_Length_Unit"]["value"] = self.Xaxis.get("units", "") + self._work_dict["_31_Y_Length_Unit"]["value"] = self.Yaxis.get("units", "") + self._work_dict["_32_Z_Length_Unit"]["value"] = Zunit + self._work_dict["_33_X_Unit_Ratio"]["value"] = 1 + self._work_dict["_34_Y_Unit_Ratio"]["value"] = 1 + self._work_dict["_35_Z_Unit_Ratio"]["value"] = 1 + + # _36_Imprint -> Obsolete + # _37_Inverted -> Always No + # _38_Levelled -> Always No + # _39_Obsolete -> Obsolete + + dt: datetime.datetime = get_date_time_from_metadata( + metadata, formatting="datetime" + ) + if dt is not None: + self._work_dict["_40_Seconds"]["value"] = dt.second + self._work_dict["_41_Minutes"]["value"] = dt.minute + self._work_dict["_42_Hours"]["value"] = dt.hour + self._work_dict["_43_Day"]["value"] = dt.day + self._work_dict["_44_Month"]["value"] = dt.month + self._work_dict["_45_Year"]["value"] = dt.year + self._work_dict["_46_Day_of_week"]["value"] = dt.weekday() + + # _47_Measurement_duration -> Nonsaved and non-metadata, but float in seconds + + if compressed: + data_bin = self._compress_data( + data_int, nstreams=1 + ) # nstreams hard-set to 1. Could be unlocked in the future + compressed_size = len(data_bin) + else: + fmt = ( + "= 2**15: + warnings.warn("Comment exceeding max length of 32.0 kB and will be cropped") + comment_len = np.int16(2**15 - 1) + + self._work_dict["_50_Comment_size"]["value"] = comment_len + + privatesize = len(private_zone) + if privatesize >= 2**15: + warnings.warn( + "Private size exceeding max length of 32.0 kB and will be cropped" + ) + privatesize = np.uint16(2**15 - 1) + + self._work_dict["_51_Private_size"]["value"] = privatesize + + self._work_dict["_52_Client_zone"]["value"] = client_zone + + self._work_dict["_53_X_Offset"]["value"] = self.Xaxis.get("offset", 0.0) + self._work_dict["_54_Y_Offset"]["value"] = self.Yaxis.get("offset", 0.0) + self._work_dict["_55_Z_Offset"]["value"] = Zoffset + self._work_dict["_56_T_Spacing"]["value"] = self.Taxis.get("scale", 0.0) + self._work_dict["_57_T_Offset"]["value"] = self.Taxis.get("offset", 0.0) + self._work_dict["_58_T_Axis_Name"]["value"] = self.Taxis.get("name", "") + self._work_dict["_59_T_Step_Unit"]["value"] = self.Taxis.get("units", "") + + self._work_dict["_60_Comment"]["value"] = comment + + self._work_dict["_61_Private_zone"]["value"] = private_zone + self._work_dict["_62_points"]["value"] = data_bin + # Read methods def _read_sur_file(self): """Read the binary, possibly compressed, content of the surface @@ -455,18 +1199,16 @@ def _read_sur_file(self): # We append the first object to the content list self._append_work_dict_to_content() # Lookup how many objects are stored in the file and save - self._N_data_object = self._get_work_dict_key_value("_03_Number_of_Objects") + self._N_data_objects = self._get_work_dict_key_value( + "_03_Number_of_Objects" + ) self._N_data_channels = self._get_work_dict_key_value("_08_P_Size") - # Determine how many objects we need to read - if self._N_data_channels > 0 and self._N_data_object > 0: - n_objects_to_read = self._N_data_channels * self._N_data_object - elif self._N_data_channels > 0: - n_objects_to_read = self._N_data_channels - elif self._N_data_object > 0: - n_objects_to_read = self._N_data_object - else: - n_objects_to_read = 1 + # Determine how many objects we need to read, at least 1 object and 1 channel + # even if metadata is set to 0 (happens sometimes) + n_objects_to_read = max(self._N_data_channels, 1) * max( + self._N_data_objects, 1 + ) # Lookup what object type we are dealing with and save self._Object_type = DigitalSurfHandler._mountains_object_types[ @@ -485,12 +1227,17 @@ def _read_sur_file(self): def _read_single_sur_object(self, file): for key, val in self._work_dict.items(): self._work_dict[key]["value"] = val["b_unpack_fn"](file) + # print(f"{key}: {self._work_dict[key]['value']}") def _append_work_dict_to_content(self): """Save the values stored in the work dict in the surface file list""" datadict = deepcopy({key: val["value"] for key, val in self._work_dict.items()}) self._list_sur_file_content.append(datadict) + def _move_values_to_workdict(self, dic: dict): + for key in self._work_dict: + self._work_dict[key]["value"] = deepcopy(dic[key]) + def _get_work_dict_key_value(self, key): return self._work_dict[key]["value"] @@ -499,9 +1246,7 @@ def _build_sur_dict(self): """Create a signal dict with an unpacked object""" # If the signal is of the type spectrum or hypercard - if self._Object_type in [ - "_HYPCARD", - ]: + if self._Object_type in ["_HYPCARD"]: self._build_hyperspectral_map() elif self._Object_type in ["_SPECTRUM"]: self._build_spectrum() @@ -509,7 +1254,10 @@ def _build_sur_dict(self): self._build_general_1D_data() elif self._Object_type in ["_PROFILESERIE"]: self._build_1D_series() - elif self._Object_type in ["_SURFACE"]: + elif self._Object_type in ["_BINARYIMAGE"]: + self._build_surface() + self.signal_dict.update({"post_process": [self.post_process_binary]}) + elif self._Object_type in ["_SURFACE", "_INTENSITYIMAGE"]: self._build_surface() elif self._Object_type in ["_SURFACESERIE"]: self._build_surface_series() @@ -521,11 +1269,11 @@ def _build_sur_dict(self): self._build_RGB_image() elif self._Object_type in ["_RGBINTENSITYSURFACE"]: self._build_RGB_surface() - elif self._Object_type in ["_BINARYIMAGE"]: - self._build_surface() + elif self._Object_type in ["_SERIESOFRGBIMAGES"]: + self._build_RGB_image_series() else: raise MountainsMapFileError( - self._Object_type + "is not a supported mountain object." + f"{self._Object_type} is not a supported mountain object." ) return self.signal_dict @@ -817,6 +1565,55 @@ def _build_RGB_image( self.signal_dict.update({"post_process": [self.post_process_RGB]}) + def _build_RGB_image_series( + self, + ): + # First object dictionary + hypdic = self._list_sur_file_content[0] + + # Metadata are set from first dictionary + self._set_metadata_and_original_metadata(hypdic) + + # We build the series-axis + self.signal_dict["axes"].append( + self._build_Tax(hypdic, "_03_Number_of_Objects", ind=0, nav=False) + ) + + # All objects must share the same signal axes + self.signal_dict["axes"].append(self._build_Yax(hypdic, ind=1, nav=False)) + self.signal_dict["axes"].append(self._build_Xax(hypdic, ind=2, nav=False)) + + # shape of the surfaces in the series + shape = (hypdic["_19_Number_of_Lines"], hypdic["_18_Number_of_Points"]) + nimg = hypdic["_03_Number_of_Objects"] + nchan = hypdic["_08_P_Size"] + # We put all the data together + data = np.empty(shape=(nimg, *shape, nchan)) + i = 0 + for imgidx in range(nimg): + for chanidx in range(nchan): + obj = self._list_sur_file_content[i] + data[imgidx, ..., chanidx] = obj["_62_points"].reshape(shape) + i += 1 + + # for obj in self._list_sur_file_content: + # data.append(obj["_62_points"].reshape(shape)) + + # data = np.stack(data) + + # data = data.reshape(nimg,nchan,*shape) + # data = np.rollaxis(data,) + + # Pushing data into the dictionary + self.signal_dict["data"] = data + + # Add the color-axis to the signal dict so it can be consumed + self.signal_dict["axes"].append( + self._build_Tax(hypdic, "_08_P_Size", ind=3, nav=True) + ) + + self.signal_dict.update({"post_process": [self.post_process_RGB]}) + # Metadata utility methods @staticmethod @@ -900,9 +1697,9 @@ def _build_original_metadata( original_metadata_dict = {} # Iteration over Number of data objects - for i in range(self._N_data_object): + for i in range(self._N_data_objects): # Iteration over the Number of Data channels - for j in range(self._N_data_channels): + for j in range(max(self._N_data_channels, 1)): # Creating a dictionary key for each object k = (i + 1) * (j + 1) key = "Object_{:d}_Channel_{:d}".format(i, j) @@ -924,7 +1721,7 @@ def _build_original_metadata( # Check if it is the case and append it to original metadata if yes valid_comment = self._check_comments(a["_60_Comment"], "$", "=") if valid_comment: - parsedict = self._MS_parse(a["_60_Comment"], "$", "=") + parsedict = parse_metadata(a["_60_Comment"], "$", "=") parsedict = {k.lstrip("_"): m for k, m in parsedict.items()} original_metadata_dict[key].update({"Parsed": parsedict}) @@ -1118,71 +1915,121 @@ def _check_comments(commentsstr, prefix, delimiter): return valid @staticmethod - def _MS_parse(str_ms, prefix, delimiter): - """Parses a string containing metadata information. The string can be - read from the comment section of a .sur file, or, alternatively, a file - containing them with a similar formatting. + def _get_comment_dict( + original_metadata: dict, method: str = "auto", custom: dict = {} + ) -> dict: + """Return the dictionary used to set the dataset comments (akA custom parameters) while exporting a file. - Parameters - ---------- - str_ms: string containing metadata - prefix: string (or char) character assumed to start each line. - '$' if a .sur file. - delimiter: string that delimits the keyword from value. always '=' + By default (method='auto'), tries to identify if the object was originally imported by rosettasciio + from a digitalsurf .sur/.pro file with a comment field parsed as original_metadata (i.e. + Object_0_Channel_0.Parsed). In that case, digitalsurf ignores non-parsed original metadata + (ie .sur/.pro file headers). If the original metadata contains multiple objects with + non-empty parsed content (Object_0_Channel_0.Parsed, Object_0_Channel_1.Parsed etc...), only + the first non-empty X.Parsed sub-dictionary is returned. This falls back on returning the + raw 'original_metadata' - Returns - ------- - dict_ms: dictionnary in the correct hyperspy metadata format + Optionally the raw 'original_metadata' dictionary can be exported (method='raw'), + a custom dictionary provided by the user (method='custom'), or no comment at all (method='off') + Args: + method (str, optional): method to export. Defaults to 'auto'. + custom (dict, optional): custom dictionary. Ignored unless method is set to 'custom', Defaults to {}. + + Raises: + MountainsMapFileError: if an invalid key is entered + + Returns: + dict: dictionary to be exported as a .sur object """ - # dict_ms is created as an empty dictionnary - dict_ms = {} - # Title lines start with an underscore - titlestart = "{:s}_".format(prefix) + if method == "raw": + return original_metadata + elif method == "custom": + return custom + elif method == "off": + return {} + elif method == "auto": + pattern = re.compile(r"Object_\d*_Channel_\d*") + omd = original_metadata + # filter original metadata content of dict type and matching pattern. + validfields = [ + omd[key] + for key in omd + if pattern.match(key) and isinstance(omd[key], dict) + ] + # In case none match, give up filtering and return raw + if not validfields: + return omd + # In case some match, return first non-empty "Parsed" sub-dict + for field in validfields: + # Return none for non-existing "Parsed" key + candidate = field.get("Parsed") + # For non-none, non-empty dict-type candidate + if candidate and isinstance(candidate, dict): + return candidate + # dict casting for non-none but non-dict candidate + elif candidate is not None: + return {"Parsed": candidate} + # else none candidate, or empty dict -> do nothing + # Finally, if valid fields are present but no candidate + # did a non-empty return, it is safe to return empty + return {} + else: + raise MountainsMapFileError( + "Non-valid method for setting mountainsmap file comment. Choose one of: 'auto','raw','custom','off' " + ) - for line in str_ms.splitlines(): - # Here we ignore any empty line or line starting with @@ - ignore = False - if not line.strip() or line.startswith("@@"): - ignore = True - # If the line must not be ignored - if not ignore: - if line.startswith(titlestart): - # We strip keys from whitespace at the end and beginning - key_main = line[len(titlestart) :].strip() - dict_ms[key_main] = {} - elif line.startswith(prefix): - key, *li_value = line.split(delimiter) - # Key is also stripped from beginning or end whitespace - key = key[len(prefix) :].strip() - str_value = li_value[0] if len(li_value) > 0 else "" - # remove whitespace at the beginning of value - str_value = str_value.strip() - li_value = str_value.split(" ") - try: - if key == "Grating": - dict_ms[key_main][key] = li_value[ - 0 - ] # we don't want to eval this one - else: - dict_ms[key_main][key] = eval(li_value[0]) - except Exception: - dict_ms[key_main][key] = li_value[0] - if len(li_value) > 1: - dict_ms[key_main][key + "_units"] = li_value[1] - return dict_ms + @staticmethod + def _stringify_dict(omd: dict): + """Pack nested dictionary metadata into a string. Pack dictionary-type elements + into digitalsurf "Section title" metadata type ('$_ preceding section title). Pack + other elements into equal-sign separated key-value pairs. + + Supports the key-units logic {'key': value, 'key_units': 'un'} used in hyperspy. + """ + + # Separate dict into list of keys and list of values to authorize index-based pop/insert + keys_queue = list(omd.keys()) + vals_queue = list(omd.values()) + # commentstring to be returned + cmtstr: str = "" + # Loop until queues are empty + while keys_queue: + # pop first object + k = keys_queue.pop(0) + v = vals_queue.pop(0) + # if object is header + if isinstance(v, dict): + cmtstr += f"$_{k}\n" + keys_queue = list(v.keys()) + keys_queue + vals_queue = list(v.values()) + vals_queue + else: + try: + ku_idx = keys_queue.index(k + "_units") + has_units = True + except ValueError: + ku_idx = None + has_units = False + + if has_units: + _ = keys_queue.pop(ku_idx) + vu = vals_queue.pop(ku_idx) + cmtstr += f"${k} = {v.__str__()} {vu}\n" + else: + cmtstr += f"${k} = {v.__str__()}\n" + + return cmtstr # Post processing @staticmethod def post_process_RGB(signal): signal = signal.transpose() - max_data = np.nanmax(signal.data) - if max_data <= 256: + max_data = np.max(signal.data) + if max_data <= 255: signal.change_dtype("uint8") signal.change_dtype("rgb8") elif max_data <= 65536: - signal.change_dtype("uint8") - signal.change_dtype("rgb8") + signal.change_dtype("uint16") + signal.change_dtype("rgb16") else: warnings.warn( """RGB-announced data could not be converted to @@ -1191,29 +2038,41 @@ def post_process_RGB(signal): return signal + @staticmethod + def post_process_binary(signal): + signal.change_dtype("bool") + return signal + # pack/unpack binary quantities + @staticmethod - def _get_int16(file, default=None): + def _get_uint16(file): """Read a 16-bits int with a user-definable default value if no file is given""" - if file is None: - return default b = file.read(2) - if sys.byteorder == "big": - return struct.unpack(">h", b)[0] - else: - return struct.unpack("i", b)[0] - else: - return struct.unpack("I", b)[0] - else: - return struct.unpack(" int: + """Return size of uncompressed data in bytes""" + psize = int(self._get_work_dict_key_value("_15_Size_of_Points") / 8) + # Datapoints in X and Y dimensions + Npts_tot = self._get_work_dict_key_value("_20_Total_Nb_of_Pts") + # Datasize in WL. max between value and 1 as often W_Size saved as 0 + Wsize = max(self._get_work_dict_key_value("_14_W_Size"), 1) + # Wsize = 1 + + datasize = Npts_tot * Wsize * psize + return datasize + + def _unpack_data(self, file, encoding="latin-1"): # Size of datapoints in bytes. Always int16 (==2) or 32 (==4) psize = int(self._get_work_dict_key_value("_15_Size_of_Points") / 8) dtype = np.int16 if psize == 2 else np.int32 @@ -1314,20 +2224,16 @@ def _unpack_data(self, file, encoding="latin-1"): # Datapoints in X and Y dimensions Npts_tot = self._get_work_dict_key_value("_20_Total_Nb_of_Pts") # Datasize in WL - Wsize = self._get_work_dict_key_value("_14_W_Size") + Wsize = max(self._get_work_dict_key_value("_14_W_Size"), 1) # We need to take into account the fact that Wsize is often # set to 0 instead of 1 in non-spectral data to compute the # space occupied by data in the file - readsize = Npts_tot * psize - if Wsize != 0: - readsize *= Wsize - # if Npts_channel is not 0: - # readsize*=Npts_channel + readsize = Npts_tot * psize * Wsize + buf = file.read(readsize) # Read the exact size of the data - _points = np.frombuffer(file.read(readsize), dtype=dtype) - # _points = np.fromstring(file.read(readsize),dtype=dtype) + _points = np.frombuffer(buf, dtype=dtype) else: # If the points are compressed do the uncompress magic. There @@ -1352,36 +2258,90 @@ def _unpack_data(self, file, encoding="latin-1"): # Finally numpy converts it to a numeric object _points = np.frombuffer(rawData, dtype=dtype) - # _points = np.fromstring(rawData, dtype=dtype) # rescale data # We set non measured points to nan according to .sur ways nm = [] if self._get_work_dict_key_value("_11_Special_Points") == 1: - # has unmeasured points + # has non-measured points nm = _points == self._get_work_dict_key_value("_16_Zmin") - 2 - # We set the point in the numeric scale - _points = _points.astype(float) * self._get_work_dict_key_value( + Zmin = self._get_work_dict_key_value("_16_Zmin") + scale = self._get_work_dict_key_value( "_23_Z_Spacing" - ) * self._get_work_dict_key_value( - "_35_Z_Unit_Ratio" - ) + self._get_work_dict_key_value("_55_Z_Offset") + ) / self._get_work_dict_key_value("_35_Z_Unit_Ratio") + offset = self._get_work_dict_key_value("_55_Z_Offset") + + # Packing data into ints or float, with or without scaling. + if self._is_data_int(): + pass # Case left here for future modification + elif self._is_data_scaleint(): + _points = (_points.astype(float) - Zmin) * scale + offset + _points = np.round(_points).astype(int) + elif self._is_data_bin(): + pass + else: + _points = (_points.astype(float) - Zmin) * scale + offset + _points[nm] = np.nan # Ints have no nans - _points[nm] = np.nan # Return the points, rescaled return _points def _pack_data(self, file, val, encoding="latin-1"): - """This needs to be special because it writes until the end of - file.""" - datasize = self._get_work_dict_key_value("_62_points") - self._set_str(file, val, datasize) + """This needs to be special because it writes until the end of file.""" + # Also valid for uncompressed + if self._get_work_dict_key_value("_01_Signature") != "DSCOMPRESSED": + datasize = self._get_uncompressed_datasize() + else: + datasize = self._get_work_dict_key_value("_48_Compressed_data_size") + self._set_bytes(file, val, datasize) + + @staticmethod + def _compress_data(data_int, nstreams: int = 1) -> bytes: + """Pack the input data using the digitalsurf zip approach and return the result as a + binary string ready to be written onto a file.""" + + if nstreams <= 0 or nstreams > 8: + raise MountainsMapFileError( + "Number of compression streams must be >= 1, <= 8" + ) + + bstr = b"" + bstr += struct.pack("`: + +- add support for saving file - see :func:`~.digitalsurf.file_writer` +- add the :func:`~.digitalsurf.parse_metadata` function to parse metadata from ``sur`` file +- add series of RGB images / surfaces support. \ No newline at end of file