diff --git a/doc/source/advanced/index.rst b/doc/source/advanced/index.rst new file mode 100644 index 0000000000..876037eb5d --- /dev/null +++ b/doc/source/advanced/index.rst @@ -0,0 +1,94 @@ +Advanced topics +=============== + +This section of the documentation contains advanced topics that are not relevant +for most users. + +.. _preload: + +Preloading segments for improved timeliness +------------------------------------------- + +Normally, data need to exist before they can be read. This requirement +impacts data processing timeliness. For data arriving in segments, +Satpy can process each segment immediately as it comes in. +This feature currently only works with the :mod:`~satpy.readers.fci_l1c_nc` reader. +This is experimental and likely to be instable and might change. + +Consider a near real time data reception situation where FCI segments are +delivered one by one. Classically, to produce full disc imagery, users +would wait for all needed segments to arrive, before they start processing +any data by passing all segments to the :class:`~satpy.scene.Scene`. +For a more timely imagery production, users can create the Scene, load the +data, resample, and even call :meth:`~satpy.scene.Scene.save_datasets` +before the data are complete (:meth:`~satpy.scene.Scene.save_datasets` will wait until the data +are available, unless ``compute=False``). +Upon computation, much of the overhead in Satpy +internals has already been completed, and Satpy will process each segment +as it comes in. + +To do so, Satpy caches a selection of data and metadata between segments +and between repeat cycles. Caching between segments happens in-memory +and needs no preparation from the user, but where data are cached +between repeat cycles, the user needs to create this cache first from +a repeat cycle that is available completely:: + + >>> from satpy.readers import create_preloadable_cache + >>> create_preloadable_cache("fci_l1c_nc", fci_files) + +This needs to be done only once as long as data or metadata cached +between repeat cycles does not change (for example, the rows at which +each repeat cycle starts and ends). To make use of eager processing, set +the configuration variable ``readers.preload_segments``. When creating +the scene, pass only the path to the first segment:: + + >>> satpy.config.set({"readers.preload_segments": True}) + >>> sc = Scene( + ... filenames=[path_to_first_segment], + ... reader="fci_l1c_nc") + +Satpy will figure out the names of the remaining segments and find them as +they come in. If the data are already available, processing is similar to +the regular case. If the data are not yet available, Satpy will wait during +the computation of the dask graphs until data become available. + +For additional configuration parameters, see the :ref:`configuration documentation `. + +Known limitations as of Satpy 0.51: + +- Mixing different file types for the same reader is not yet supported. + For FCI, that means it is not yet possible to mix FDHSI and HRFI data. +- When segments are missing, processing times out and no image will be produced. + There is currently no way to produce an incomplete image with the missing + segment left out. +- Dask may not order the processing of the chunks optimally. That means some + dask workers may be waiting for chunks 33–40 as chunks 1–32 are coming in + and are not being processed. Possible workarounds: + - Use as many workers are there are chunks (for example, 40). + - Use the dask distributed scheduler using + ``from dask.distributed import Client; Client()``. This has only + limited support in Satpy and is highly experimental. It should be possible + to read FCI L1C data, resample it + using the gradient search resampler, and write the resulting data using the + ``simple_image`` writer. The nearest neighbour resampler or the GeoTIFF + writer do not currently work (see https://github.com/pytroll/satpy/issues/1762). + If you use this scheduler, set the configuration variable + ``readers.preload_dask_distributed`` to True. + This is not currently recommended in a production environment. Any feedback + is highly welcome. +- Currently, Satpy merely checks the existence of a file and not whether it + has been completely written. This may lead to incomplete files being read, + which might lead to failures. + +For more technical background reading including hints +on how this could be extended to other readers, see the API documentations for +:class:`~satpy.readers.netcdf_utils.PreloadableSegments` and +:class:`~satpy.readers.yaml_reader.GEOSegmentYAMLReader`. + +.. versionadded:: 0.52 + +.. toctree:: + :hidden: + :maxdepth: 1 + + preloaded_reading diff --git a/doc/source/config.rst b/doc/source/config.rst index 9babc1abbf..5fa52c7fdc 100644 --- a/doc/source/config.rst +++ b/doc/source/config.rst @@ -274,6 +274,47 @@ Clipping of negative radiances is currently implemented for the following reader * ``abi_l1b``, ``ami_l1b`` +.. _preload_settings: + +Preloading segments +^^^^^^^^^^^^^^^^^^^ + +.. note:: + + Preloading is an advanced topic and experimental. See :ref:`preload` + for details. + +* **Environment variable**: ``SATPY_READERS__PRELOAD__ENABLE`` +* **YAML-Config Key**: ``readers.preload.enable`` +* **Default**: False + +Preload segments for those readers where it is supported. + +* **Environment variable**: ``SATPY_READERS__PRELOAD__STEP`` +* **YAML-Config Key**: ``readers.preload.step`` +* **Default**: 2 + +When preloading, internal time in seconds to check if a file has become +available. + +* **Environment variable**: ``SATPY_READERS__PRELOAD__ATTEMPTS``. +* **YAML-Config Key**: ``readers.preload.attempts``. +* **Default**: 300 + +When preloading, how many times to check if a file has become available +before giving up. + +* **Environment variable**: ``SATPY_READERS__PRELOAD__ASSUME_DISTRIBUTED`` +* **YAML-Config Key**: ``readers.preload.assume_distributed``. +* **Default**: False + +When preloading, assume we are working in a dask distributed environment. +When active, Satpy workers will secede and rejoin while waiting for files. +This might partially avoid the problem that tasks waiting for later files +are blocking workers, while tasks working on earlier files are needlessly +waiting in the queue. However, Satpy has limited compatibility with +dask distributed. Please refer to the note at :ref:`preload` before +considering this option. Temporary Directory ^^^^^^^^^^^^^^^^^^^ diff --git a/doc/source/index.rst b/doc/source/index.rst index 66a069fcda..919ff56e10 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -70,6 +70,7 @@ Documentation enhancements writing multiscene + advanced/index dev_guide/index .. toctree:: diff --git a/satpy/_config.py b/satpy/_config.py index fbfcb0c0d5..c5f8635c50 100644 --- a/satpy/_config.py +++ b/satpy/_config.py @@ -53,6 +53,12 @@ "sensor_angles_position_preference": "actual", "readers": { "clip_negative_radiances": False, + "preload": { + "enable": False, + "step": 2, + "attempts": 300, + "assume_distributed": False, + }, }, } diff --git a/satpy/etc/readers/fci_l1c_nc.yaml b/satpy/etc/readers/fci_l1c_nc.yaml index fb461a517a..1ca6b449d5 100644 --- a/satpy/etc/readers/fci_l1c_nc.yaml +++ b/satpy/etc/readers/fci_l1c_nc.yaml @@ -21,33 +21,67 @@ file_types: - "{pflag}_{location_indicator},{data_designator},MTI{spacecraft_id:1d}+{data_source}-1C-RRAD-FDHSI-{coverage}-{subsetting}-{component1}-BODY-{component3}-{purpose}-{format}_{oflag}_{originator}_{processing_time:%Y%m%d%H%M%S}_{facility_or_tool}_{environment}_{start_time:%Y%m%d%H%M%S}_{end_time:%Y%m%d%H%M%S}_{processing_mode}_{special_compression}_{disposition_mode}_{repeat_cycle_in_day:>04d}_{count_in_repeat_cycle:>04d}.nc" expected_segments: 40 required_netcdf_variables: &required-variables - - attr/platform - - data/{channel_name}/measured/start_position_row - - data/{channel_name}/measured/end_position_row - - data/{channel_name}/measured/radiance_to_bt_conversion_coefficient_wavenumber - - data/{channel_name}/measured/radiance_to_bt_conversion_coefficient_a - - data/{channel_name}/measured/radiance_to_bt_conversion_coefficient_b - - data/{channel_name}/measured/radiance_to_bt_conversion_constant_c1 - - data/{channel_name}/measured/radiance_to_bt_conversion_constant_c2 - - data/{channel_name}/measured/radiance_unit_conversion_coefficient - - data/{channel_name}/measured/channel_effective_solar_irradiance - - data/{channel_name}/measured/effective_radiance - - data/{channel_name}/measured/x - - data/{channel_name}/measured/y - - data/{channel_name}/measured/pixel_quality - - data/{channel_name}/measured/index_map - - data/mtg_geos_projection - - data/swath_direction - - data/swath_number - - index - - state/celestial/earth_sun_distance - - state/celestial/subsolar_latitude - - state/celestial/subsolar_longitude - - state/celestial/sun_satellite_distance - - state/platform/platform_altitude - - state/platform/subsatellite_latitude - - state/platform/subsatellite_longitude - - time + # key/value; keys are names, value is a list of string on how this may be + # cached between segments or between repeat cycles or neither + attr/platform: + - segment + - rc + data/{channel_name}/measured/start_position_row: + - rc + data/{channel_name}/measured/end_position_row: + - rc + data/{channel_name}/measured/radiance_to_bt_conversion_coefficient_wavenumber: + - segment + - rc + data/{channel_name}/measured/radiance_to_bt_conversion_coefficient_a: + - segment + - rc + data/{channel_name}/measured/radiance_to_bt_conversion_coefficient_b: + - segment + - rc + data/{channel_name}/measured/radiance_to_bt_conversion_constant_c1: + - segment + - rc + data/{channel_name}/measured/radiance_to_bt_conversion_constant_c2: + - segment + - rc + data/{channel_name}/measured/radiance_unit_conversion_coefficient: + - segment + - rc + data/{channel_name}/measured/channel_effective_solar_irradiance: + - segment + - rc + data/{channel_name}/measured/effective_radiance: [] + data/{channel_name}/measured/x: + - segment + - rc + data/{channel_name}/measured/y: + - rc + data/{channel_name}/measured/pixel_quality: [] + data/{channel_name}/measured/index_map: [] + data/mtg_geos_projection: + - segment + - rc + data/swath_direction: + - segment + data/swath_number: + - rc + index: [] + state/celestial/earth_sun_distance: + - segment + state/celestial/subsolar_latitude: + - segment + state/celestial/subsolar_longitude: + - segment + state/celestial/sun_satellite_distance: + - segment + state/platform/platform_altitude: + - segment + state/platform/subsatellite_latitude: + - segment + state/platform/subsatellite_longitude: + - segment + time: [] variable_name_replacements: channel_name: - vis_04 @@ -66,12 +100,24 @@ file_types: - ir_105 - ir_123 - ir_133 + # information needed for preloading + segment_tag: count_in_repeat_cycle + time_tags: &tt + - processing_time + - start_time + - end_time + variable_tags: # other tags where RC caching doesn't care about variation + - repeat_cycle_in_day fci_l1c_hrfi: file_reader: !!python/name:satpy.readers.fci_l1c_nc.FCIL1cNCFileHandler file_patterns: - "{pflag}_{location_indicator},{data_designator},MTI{spacecraft_id:1d}+{data_source}-1C-RRAD-HRFI-{coverage}-{subsetting}-{component1}-BODY-{component3}-{purpose}-{format}_{oflag}_{originator}_{processing_time:%Y%m%d%H%M%S}_{facility_or_tool}_{environment}_{start_time:%Y%m%d%H%M%S}_{end_time:%Y%m%d%H%M%S}_{processing_mode}_{special_compression}_{disposition_mode}_{repeat_cycle_in_day:>04d}_{count_in_repeat_cycle:>04d}.nc" expected_segments: 40 required_netcdf_variables: *required-variables + segment_tag: count_in_repeat_cycle + time_tags: *tt + variable_tags: + - repeat_cycle_in_day variable_name_replacements: channel_name: - vis_06_hr diff --git a/satpy/readers/__init__.py b/satpy/readers/__init__.py index 64055c7232..f01b3ddfed 100644 --- a/satpy/readers/__init__.py +++ b/satpy/readers/__init__.py @@ -1,6 +1,4 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# Copyright (c) 2015-2018 Satpy developers +# Copyright (c) 2015-2024 Satpy developers # # This file is part of satpy. # @@ -808,3 +806,31 @@ def open_file_or_filename(unknown_file_thing, mode=None): except AttributeError: f_obj = unknown_file_thing return f_obj + + +def create_preloadable_cache(reader_name, filenames): + """Create on-disk cache for preloadables. + + Some readers allow on-disk caching of metadata. This can be used to + preload data, creating file handlers and a scene object before data files + are available. This utility function creates the associated cache for + multiple filenames. + + Args: + reader_name (str): + Reader for which to create the cache. + filenames (List[str]): + Files for which to create the cache. Typically, this would be + the same set of files to create a single scene. + """ + reader_instances = load_readers(filenames, reader_name) + for (nm, reader_inst) in reader_instances.items(): + for (tp, handlers) in reader_inst.file_handlers.items(): + for handler in handlers: + filename = reader_inst._get_cache_filename( + handler.filename, + handler.filename_info, + handler) + p = pathlib.Path(filename) + p.parent.mkdir(exist_ok=True, parents=True) + handler.store_cache(filename) diff --git a/satpy/readers/fci_l1c_nc.py b/satpy/readers/fci_l1c_nc.py index fc40916699..a1de51d3c8 100644 --- a/satpy/readers/fci_l1c_nc.py +++ b/satpy/readers/fci_l1c_nc.py @@ -1,6 +1,4 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# Copyright (c) 2017-2019 Satpy developers +# Copyright (c) 2017-2024 Satpy developers # # This file is part of satpy. # @@ -20,12 +18,10 @@ This module defines the :class:`FCIL1cNCFileHandler` file handler, to be used for reading Meteosat Third Generation (MTG) Flexible Combined -Imager (FCI) Level-1c data. FCI flies -on the MTG Imager (MTG-I) series of satellites, with the first satellite (MTG-I1) -launched on the 13th of December 2022. -For more information about FCI, see `EUMETSAT`_. +Imager (FCI) Level-1c data. FCI flies on the MTG Imager (MTG-I) series +of satellites, with the first satellite (MTG-I1) launched on the 13th +of December 2022. For more information about FCI, see `EUMETSAT`_. -For simulated test data to be used with this reader, see `test data releases`_. For the Product User Guide (PUG) of the FCI L1c data, see `PUG`_. .. note:: @@ -130,7 +126,7 @@ from satpy.readers._geos_area import get_geos_area_naming from satpy.readers.eum_base import get_service_mode -from .netcdf_utils import NetCDF4FsspecFileHandler +from .netcdf_utils import NetCDF4FsspecFileHandler, PreloadableSegments logger = logging.getLogger(__name__) @@ -183,7 +179,7 @@ def _get_channel_name_from_dsname(dsname): return channel_name -class FCIL1cNCFileHandler(NetCDF4FsspecFileHandler): +class FCIL1cNCFileHandler(PreloadableSegments, NetCDF4FsspecFileHandler): """Class implementing the MTG FCI L1c Filehandler. This class implements the Meteosat Third Generation (MTG) Flexible @@ -208,12 +204,15 @@ class using the :mod:`~satpy.Scene.load` method with the reader "MTI3": "MTG-I3", "MTI4": "MTG-I4"} - def __init__(self, filename, filename_info, filetype_info): + def __init__(self, filename, filename_info, filetype_info, + *args, **kwargs): """Initialize file handler.""" super().__init__(filename, filename_info, filetype_info, + *args, cache_var_size=0, - cache_handle=True) + cache_handle=True, + **kwargs) logger.debug("Reading: {}".format(self.filename)) logger.debug("Start: {}".format(self.start_time)) logger.debug("End: {}".format(self.end_time)) diff --git a/satpy/readers/netcdf_utils.py b/satpy/readers/netcdf_utils.py index c8b8a3f85f..01aeafbf1b 100644 --- a/satpy/readers/netcdf_utils.py +++ b/satpy/readers/netcdf_utils.py @@ -1,6 +1,4 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# Copyright (c) 2016-2020 Satpy developers +# Copyright (c) 2016-2024 Satpy developers # # This file is part of satpy. # @@ -17,16 +15,24 @@ # satpy. If not, see . """Helpers for reading netcdf-based files.""" +import functools +import glob import logging +import os +import pickle # nosec +import time +import warnings import dask.array as da +import dask.distributed import netCDF4 import numpy as np import xarray as xr +import satpy from satpy.readers import open_file_or_filename from satpy.readers.file_handlers import BaseFileHandler -from satpy.readers.utils import np2str +from satpy.readers.utils import get_serializable_dask_array, np2str from satpy.utils import get_legacy_chunk_size LOG = logging.getLogger(__name__) @@ -85,10 +91,12 @@ class NetCDF4FileHandler(BaseFileHandler): xarray_kwargs (dict): Addition arguments to `xarray.open_dataset` cache_var_size (int): Cache variables smaller than this size. cache_handle (bool): Keep files open for lifetime of filehandler. + Uses xarray.backends.CachingFileManager, which uses a least + recently used cache. """ - file_handle = None + manager = None def __init__(self, filename, filename_info, filetype_info, auto_maskandscale=False, xarray_kwargs=None, @@ -99,32 +107,51 @@ def __init__(self, filename, filename_info, filetype_info, self.file_content = {} self.cached_file_content = {} self._use_h5netcdf = False - try: - file_handle = self._get_file_handle() - except IOError: - LOG.exception( - "Failed reading file %s. Possibly corrupted file", self.filename) - raise + self._auto_maskandscale = auto_maskandscale + if cache_handle: + file_handle = self._get_cached_file_handle(auto_maskandscale) + else: + try: + file_handle = self._get_file_handle() + except IOError: + LOG.exception( + "Failed reading file %s. Possibly corrupted file", self.filename) + raise - self._set_file_handle_auto_maskandscale(file_handle, auto_maskandscale) + self._set_file_handle_auto_maskandscale(file_handle, auto_maskandscale) self._set_xarray_kwargs(xarray_kwargs, auto_maskandscale) listed_variables = filetype_info.get("required_netcdf_variables") - if listed_variables: + if listed_variables is not None: self._collect_listed_variables(file_handle, listed_variables) else: self.collect_metadata("", file_handle) self.collect_dimensions("", file_handle) self.collect_cache_vars(cache_var_size) - if cache_handle: - self.file_handle = file_handle - else: + if not cache_handle: file_handle.close() def _get_file_handle(self): return netCDF4.Dataset(self.filename, "r") + def _get_cached_file_handle(self, auto_maskandscale): + self.manager = xr.backends.CachingFileManager( + functools.partial(_nc_dataset_wrapper, + auto_maskandscale=auto_maskandscale), + self.filename, mode="r") + return self.manager.acquire() + + @property + def file_handle(self): + """Backward-compatible way for file handle caching.""" + warnings.warn( + "attribute .file_handle is deprecated, use .manager instead", + DeprecationWarning) + if self.manager is None: + return None + return self.manager.acquire() + @staticmethod def _set_file_handle_auto_maskandscale(file_handle, auto_maskandscale): if hasattr(file_handle, "set_auto_maskandscale"): @@ -196,11 +223,8 @@ def _get_required_variable_names(listed_variables, variable_name_replacements): def __del__(self): """Delete the file handler.""" - if self.file_handle is not None: - try: - self.file_handle.close() - except RuntimeError: # presumably closed already - pass + if self.manager is not None: + self.manager.close() def _collect_global_attrs(self, obj): """Collect all the global attributes for the provided file object.""" @@ -289,8 +313,8 @@ def _get_variable(self, key, val): group, key = parts else: group = None - if self.file_handle is not None: - val = self._get_var_from_filehandle(group, key) + if self.manager is not None: + val = self._get_var_from_manager(group, key) else: val = self._get_var_from_xr(group, key) return val @@ -319,18 +343,27 @@ def _get_var_from_xr(self, group, key): val.load() return val - def _get_var_from_filehandle(self, group, key): + def _get_var_from_manager(self, group, key): # Not getting coordinates as this is more work, therefore more # overhead, and those are not used downstream. + + with self.manager.acquire_context() as ds: + if group is not None: + v = ds[group][key] + else: + v = ds[key] if group is None: - g = self.file_handle + dv = get_serializable_dask_array( + self.manager, key, + chunks=v.shape, dtype=v.dtype) else: - g = self.file_handle[group] - v = g[key] + dv = get_serializable_dask_array( + self.manager, "/".join([group, key]), + chunks=v.shape, dtype=v.dtype) attrs = self._get_object_attrs(v) x = xr.DataArray( - da.from_array(v), dims=v.dimensions, attrs=attrs, - name=v.name) + dv, + dims=v.dimensions, attrs=attrs, name=v.name) return x def __contains__(self, item): @@ -443,3 +476,221 @@ def _get_attr(self, obj, key): if self._use_h5netcdf: return obj.attrs[key] return super()._get_attr(obj, key) + +class PreloadableSegments: + """Mixin class for pre-loading segments. + + This is a mixin class designed to use with file handlers that support + preloading segments. Subclasses deriving from this mixin are expected + to be geo-segmentable file handlers, and can be created based on + a glob pattern for a non-existing file (as well as a path or glob + pattern to an existing file). The first segment of a repeat cycle + is always processed only after the file exists. For the remaining + segments, metadata are collected as much as possible before the file + exists, from two possible sources: + + - From the first segment. For some file formats, many pieces of metadata + can be expected to be identical between segments. + - From the same segment number for a previous repeat cycle. In this case, + caching is done on disk. + + To implement a filehandler using this, make sure it derives from both + :class:`NetCDF4FileHandler` and this class, and make sure to pass keyword + arguments in ``__init__`` to the superclass. In the YAML file, + ``required_netcdf_variables`` must be defined as a dictionary. The keys + are variable names, and values are a list of strings describing what + assumptions can be made about caching, where ``"rc"`` means the value is + expected to be constant between repeat cycles, and ``"segment"`` means it is + expected to be constant between the segments within a repeat cycle. For + variables that are actually variable, define the empty list. + + Attributes (variable, global, and group), shapes, dtypes, and dimension names + are assumed to be always shareable between repeat cycles. + + To use preloading, set the satpy configuration variable + ``readers.preload_segments`` to True. The initialisation parameter + for this filehandler might still be False, because the first segment (the + reference segment) of a repeat cycle is loaded normally. + + This feature is experimental. + + .. versionadded:: 0.52 + """ + + def __init__(self, *args, preload=False, ref_fh=None, rc_cache=None, **kwargs): + """Store attributes needed for preloading to work.""" + self.preload = preload + self.preload_attempts = satpy.config.get("readers.preload.attempts") + self.preload_step = satpy.config.get("readers.preload.step") + self.use_distributed = satpy.config.get("readers.preload.assume_distributed") + if preload: + if not isinstance(ref_fh, BaseFileHandler): + raise TypeError( + "Expect reference filehandler when preloading, got " + f"{type(ref_fh)!s}") + self.ref_fh = ref_fh + if not isinstance(rc_cache, (str, bytes, os.PathLike)): + raise TypeError( + "Expected cache file when preloading, got " + f"{type(rc_cache)!s}") + self.rc_cache = rc_cache + super().__init__(*args, **kwargs) + if "required_netcdf_variables" not in self.filetype_info: + raise ValueError("For preloadable filehandlers, " + "required_netcdf_variables is mandatory.") + + def _collect_listed_variables(self, file_handle, listed_variables): + """Collect listed variables, either preloaded or regular.""" + if self.preload: + self._preload_listed_variables(listed_variables) + else: + super()._collect_listed_variables(file_handle, listed_variables) + + def _preload_listed_variables(self, listed_variables): + """Preload listed variables, either from RC cache or segment cache.""" + variable_name_replacements = self.filetype_info.get("variable_name_replacements") + with open(self.rc_cache, "rb") as fp: + self.file_content.update(pickle.load(fp)) # nosec + for raw_name in listed_variables: + for subst_name in self._get_required_variable_names( + [raw_name], variable_name_replacements): + if self._can_get_from_other_segment(raw_name): + self.file_content[subst_name] = self.ref_fh.file_content[subst_name] + elif not self._can_get_from_other_rc(raw_name): + self._collect_variable_delayed(subst_name) + + def _can_get_from_other_segment(self, itm): + """Return true if variable is segment-cachable.""" + return "segment" in self.filetype_info["required_netcdf_variables"][itm] + + def _can_get_from_other_rc(self, itm): + """Return true if variable is rc-cachable.""" + # it's always safe to store variable attributes, shapes, dtype, dimensions + # between repeat cycles + if self._is_safely_shareable_metadata(itm): + return True + # need an inverted mapping so I can tell which variables I can store + invmap = self._get_inv_name_map() + if "rc" in self.filetype_info["required_netcdf_variables"][invmap.get(itm, itm)]: + return True + return False + + def _is_safely_shareable_metadata(self, itm): + """Check if item refers to safely shareable metadata.""" + for meta in ("/attr/", "/shape", "/dtype", "/dimension/", "/dimensions"): + if meta in itm: + return True + + def _get_inv_name_map(self): + """Get inverted mapping for variable name replacements.""" + listed_variables = self.filetype_info.get("required_netcdf_variables") + variable_name_replacements = self.filetype_info.get("variable_name_replacements") + invmap = {} + for raw_name in listed_variables: + for subst_name in self._get_required_variable_names([raw_name], + variable_name_replacements): + invmap[subst_name] = raw_name + return invmap + + def _collect_variable_delayed(self, subst_name): + md = self.ref_fh[subst_name] # some metadata from reference segment + fn_matched = _wait_for_file(self.filename, self.preload_attempts, + self.preload_step, + self.use_distributed) + dade = _get_delayed_value_from_nc(fn_matched, subst_name) + array = da.from_delayed( + dade, + shape=self[subst_name + "/shape"], # not safe from reference segment! + dtype=md.dtype) + + var_obj = xr.DataArray( + array, + dims=md.dims, # dimension /names/ should be safe + attrs=md.attrs, # FIXME: is this safe? More involved to gather otherwise + name=md.name) + self.file_content[subst_name] = var_obj + + def _get_file_handle(self): + if self.preload: + return open(os.devnull, "r") + return super()._get_file_handle() + + def _get_cached_file_handle(self, auto_maskandscale): + if self.preload: + return open(os.devnull, "r") + return super()._get_cached_file_handle(auto_maskandscale) + + def store_cache(self, filename=None): + """Store RC-cachable data to cache.""" + if self.preload: + raise ValueError("Cannot store cache with pre-loaded handler") + to_store = {} + for key in self.file_content.keys(): + if self._can_get_from_other_rc(key): + try: + to_store[key] = self[key].compute() + except AttributeError: # not a dask array + to_store[key] = self[key] + + filename = filename or self.rc_cache + LOG.info(f"Storing cache to {filename!s}") + with open(filename, "wb") as fp: + # FIXME: potential security risk? Acceptable? + pickle.dump(to_store, fp) + + +@dask.delayed +def _wait_for_file(pat, max_tries=300, wait=2, use_distributed=False): + """Wait for file to appear.""" + for i in range(max_tries): + if (match := _check_for_matching_file(i, pat, wait, use_distributed)): + LOG.debug(f"Found {match!s} matching {pat!s}!") + return match + else: + raise TimeoutError(f"File matching {pat!s} failed to materialise") + + +def _check_for_matching_file(i, pat, wait, use_distributed=False): + fns = glob.glob(pat) + if len(fns) == 0: + _log_and_wait(i, pat, wait, use_distributed) + return + if len(fns) > 1: + raise ValueError(f"Expected one matching file, found {len(fns):d}") + return fns[0] + + +def _log_and_wait(i, pat, wait, use_distributed=False): + """Maybe log that we're waiting for pat, then wait.""" + if i == 0: + LOG.debug(f"Waiting for {pat!s} to appear.") + if i % 60 == 30: + LOG.debug(f"Still waiting for {pat!s}") + if use_distributed: + dask.distributed.secede() + time.sleep(wait) + if use_distributed: + dask.distributed.rejoin() + + +@dask.delayed +def _get_delayed_value_from_nc(fn, var, auto_maskandscale=False): + if "/" in var: + (grp, var) = var.rsplit("/", maxsplit=1) + else: + (grp, var) = (None, var) + with xr.open_dataset(fn, group=grp, mask_and_scale=auto_maskandscale, engine="h5netcdf") as nc: + return nc[var][:] + + +def _nc_dataset_wrapper(*args, auto_maskandscale, **kwargs): + """Wrap netcdf4.Dataset setting auto_maskandscale globally. + + Helper function that wraps netcdf4.Dataset while setting extra parameters. + By encapsulating this in a helper function, we can + pass it to CachingFileManager directly. Currently sets + auto_maskandscale globally (for all variables). + """ + nc = netCDF4.Dataset(*args, **kwargs) + nc.set_auto_maskandscale(auto_maskandscale) + return nc diff --git a/satpy/readers/utils.py b/satpy/readers/utils.py index 983225acd5..3a6f3dbe0b 100644 --- a/satpy/readers/utils.py +++ b/satpy/readers/utils.py @@ -29,6 +29,7 @@ from shutil import which from subprocess import PIPE, Popen # nosec +import dask.array as da import numpy as np import pyproj import xarray as xr @@ -497,6 +498,48 @@ def remove_earthsun_distance_correction(reflectance, utc_date=None): return reflectance +def get_serializable_dask_array(manager, varname, chunks, dtype): + """Construct a serializable dask array from a variable. + + When we construct a dask array using da.array from a file, and use + that to create an xarray dataarray, the result is not serializable + and dask graphs using this dataarray cannot be computed when the dask + distributed scheduler is in use. To circumvent this problem, xarray + provides the CachingFileManager. See GH#2815 for more information. + + Should have at least one dimension. + + Example:: + + >>> import netCDF4 + >>> from xarray.backends import CachingFileManager + >>> cfm = CachingFileManager(netCDF4.Dataset, filename, mode="r") + >>> arr = get_serializable_dask_array(cfm, "my_var", 1024, "f4") + + Args: + manager (xarray.backends.CachingFileManager): + Instance of :class:`~xarray.backends.CachingFileManager` encapsulating the + dataset to be read. + varname (str): + Name of the variable (possibly including a group path). + chunks (tuple): + Chunks to use when creating the dask array. + dtype (dtype): + What dtype to use. + """ + def get_chunk(block_info=None): + arrloc = block_info[None]["array-location"] + with manager.acquire_context() as nc: + var = nc[varname] + return var[tuple(slice(*x) for x in arrloc)] + + return da.map_blocks( + get_chunk, + chunks=chunks, + dtype=dtype, + meta=np.array([], dtype=dtype)) + + class _CalibrationCoefficientParser: """Parse user-defined calibration coefficients.""" diff --git a/satpy/readers/yaml_reader.py b/satpy/readers/yaml_reader.py index 5bbaba4a6c..5620d27f83 100644 --- a/satpy/readers/yaml_reader.py +++ b/satpy/readers/yaml_reader.py @@ -1,6 +1,4 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# Copyright (c) 2016-2022 Satpy developers +# Copyright (c) 2016-2024 Satpy developers # # This file is part of satpy. # @@ -17,10 +15,12 @@ # satpy. If not, see . """Base classes and utilities for all readers configured by YAML files.""" +import datetime import glob import itertools import logging import os +import pathlib import warnings from abc import ABCMeta, abstractmethod from collections import OrderedDict, deque @@ -28,12 +28,15 @@ from fnmatch import fnmatch from weakref import WeakValueDictionary +import appdirs import numpy as np import xarray as xr import yaml from pyresample.boundary import AreaDefBoundary, Boundary from pyresample.geometry import AreaDefinition, StackedAreaDefinition, SwathDefinition -from trollsift.parser import globify, parse +from trollsift.parser import Parser, globify, parse + +import satpy try: from yaml import CLoader as Loader @@ -1156,8 +1159,8 @@ class GEOSegmentYAMLReader(GEOFlippableFileYAMLReader): This reader pads the data to full geostationary disk if necessary. This reader uses an optional ``pad_data`` keyword argument that can be - passed to :meth:`Scene.load` to control if padding is done (True by - default). Passing `pad_data=False` will return data unpadded. + passed to :meth:`~satpy.scene.Scene.load` to control if padding is done (True by + default). Passing ``pad_data=False`` will return data unpadded. When using this class in a reader's YAML configuration, segmented file types (files that may have multiple segments) should specify an extra @@ -1167,8 +1170,25 @@ class GEOSegmentYAMLReader(GEOFlippableFileYAMLReader): field which will be used if ``expected_segments`` is not defined. This will default to 1 segment. + This reader uses the ``readers.preload_segments`` configuration setting. + This argument is intended for near real time processing. When only + one segment has arrived, the user can create a scene using this one + segment and ``preload=True``, and Satpy will populate a scene based on + all expected segments with dask delayed objects. When computing the + dask graphs, satpy will process each segment as soon as it comes in, + strongly reducing the timeliness for processing a full disc image in + near real time. Checking for new files can be controlled with the + arguments ``preload_step`` (time in seconds) and ``preload_tries`` + (how many tries before giving up). + + This feature is experimental. Use at your own risk. """ + def __init__(self, *args, **kwargs): + """Initialise object.""" + self.preload = satpy.config.get("readers.preload.enable") + super().__init__(*args, **kwargs) + def create_filehandlers(self, filenames, fh_kwargs=None): """Create file handler objects and determine expected segments for each. @@ -1330,6 +1350,129 @@ def _get_new_areadef_heights(self, previous_area, previous_seg_size, **kwargs): return new_height_proj_coord, new_height_px + def _new_filehandler_instances(self, filetype_info, filename_items, fh_kwargs=None): + """Get new filehandler instances. + + Gets new filehandler instances, either just for files that exist, or, + if self.preload is True, also for predicted files that don't exist, + as a glob pattern. + """ + if fh_kwargs is None: + fh_kwargs = {} + fh_kwargs_without_preload = fh_kwargs.copy() + fh_kwargs_without_preload.pop("preload", None) + fh_it = super()._new_filehandler_instances(filetype_info, + filename_items, + fh_kwargs=fh_kwargs_without_preload) + if not self.preload: + yield from fh_it + return + if "requires" in filetype_info: + raise ValueError("Unable to preload with required types") + try: + fh_first = next(fh_it) + except StopIteration: + return + yield fh_first + yield from fh_it + yield from self._new_preloaded_filehandler_instances( + filetype_info, fh_first) + + def _new_preloaded_filehandler_instances(self, filetype_info, fh): + """Get filehandler instances for non-existing files. + + Based on the filehandler for a single existing file, yield filehandler + instances for all files that we expect for a full disc cycle. Those + filehandler instances are usually based on glob patterns rather than on + the explicit filename, which may not be reliably predictable. + """ + if filetype_info.get("requires"): + raise NotImplementedError("Pre-loading not implemented for " + "handlers with required segments.") + filetype_cls = filetype_info["file_reader"] + for (filename, filename_info) in self._predict_filenames(filetype_info, fh): + # FIXME: handle fh_kwargs + yield filetype_cls(filename, filename_info, filetype_info, + preload=True, ref_fh=fh, + rc_cache=self._get_cache_filename( + filename, filename_info, fh)) + + def _predict_filenames(self, filetype_info, fh): + """Predict what filenames or glob patterns we should expect. + + Based on the filehandler for a single existing file, yield strings + for filenames or glob patterns for all files that we expect to make up + a scene in the end. + """ + for i in range(fh.filename_info[filetype_info["segment_tag"]]+1, + fh.filetype_info["expected_segments"]+1): + yield self._predict_filename(fh, i) + + def _select_pattern(self, filename): + """Choose the matching file pattern. + + Given a filename for a yamlreader with multiple file patterns, + return the matching file pattern. + """ + for pattern in self.file_patterns: + if _match_filenames([filename], pattern): + return pattern + else: + raise ValueError("Cannot predict filenames, because the " + "initial file doesn't appear to meet any defined " + "pattern.") + + def _predict_filename(self, fh, i): + """Predict filename or glob pattern. + + Given a filehandler for an extant file, predict what the filename or + glob pattern will be for segment i. + + Returns filename pattern and filename info dict. + """ + pat = self._select_pattern(fh.filename) + p = Parser(pat) + new_info = _predict_filename_info(fh, i) + new_filename = p.compose(new_info) + basedir = pathlib.Path(fh.filename).parent + new_filename = new_filename.replace("235959", "??????") + new_filename = os.fspath(basedir / new_filename) + return (new_filename, new_info) + + def _get_cache_filename(self, filename, filename_info, fh): + """Get filename for inter-rc caching.""" + dirname = self._get_cache_dir(fh) + pat = self._select_pattern(filename) + p = Parser(pat) + new_info = filename_info.copy() + for tm in fh.filetype_info.get("time_tags", []): + new_info[tm] = datetime.datetime.max + for ct in fh.filetype_info.get("variable_tags", []): + new_info[ct] = 0 + name = p.compose(new_info) + pt = pathlib.Path(name) + pt = pt.with_suffix(".pkl") + return os.path.join(dirname, os.fspath(pt)) + + def _get_cache_dir(self, fh): + return os.path.join(appdirs.user_cache_dir(), "satpy", "preloadable", + type(fh).__name__) + + +def _predict_filename_info(fh, i): + """Predict filename info for file that doesn't exist yet. + + Taking an existing filehandler that refers to a real file with a segmented + reader, return a glob pattern that should match the file for segment number + i. + """ + new_info = fh.filename_info.copy() + new_info[fh.filetype_info["segment_tag"]] = i + for tm in fh.filetype_info["time_tags"]: + new_info[tm] = new_info[tm].replace( + hour=23, minute=59, second=59) + return new_info + def _stack_area_defs(area_def_dict): """Stack given dict of area definitions and return a StackedAreaDefinition.""" diff --git a/satpy/tests/reader_tests/test_netcdf_utils.py b/satpy/tests/reader_tests/test_netcdf_utils.py index 60d8be48de..beeed8f5df 100644 --- a/satpy/tests/reader_tests/test_netcdf_utils.py +++ b/satpy/tests/reader_tests/test_netcdf_utils.py @@ -1,6 +1,4 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# Copyright (c) 2017-2020 Satpy developers +# Copyright (c) 2017-2024 Satpy developers # # This file is part of satpy. # @@ -17,17 +15,26 @@ # satpy. If not, see . """Module for testing the satpy.readers.netcdf_utils module.""" +import concurrent.futures import os -import unittest +import pickle +import time +import dask.array as da import numpy as np import pytest +import xarray as xr try: - from satpy.readers.netcdf_utils import NetCDF4FileHandler + from satpy.readers.netcdf_utils import NetCDF4FileHandler, PreloadableSegments except ImportError: # fake the import so we can at least run the tests in this file NetCDF4FileHandler = object # type: ignore + # setting Preloadable to object leads to an MRO error when defining + # FakePreloadableHandler, therefore define own class + class PreloadableSegments: # type: ignore + """Backup preloadable class if import fails.""" + pass class FakeNetCDF4FileHandler(NetCDF4FileHandler): @@ -71,13 +78,15 @@ def get_test_content(self, filename, filename_info, filetype_info): raise NotImplementedError("Fake File Handler subclass must implement 'get_test_content'") -class TestNetCDF4FileHandler(unittest.TestCase): +class TestNetCDF4FileHandler: """Test NetCDF4 File Handler Utility class.""" - def setUp(self): + @pytest.fixture() + def dummy_nc_file(self, tmp_path): """Create a test NetCDF4 file.""" from netCDF4 import Dataset - with Dataset("test.nc", "w") as nc: + fn = tmp_path / "test.nc" + with Dataset(fn, "w") as nc: # Create dimensions nc.createDimension("rows", 10) nc.createDimension("cols", 100) @@ -116,17 +125,14 @@ def setUp(self): d.test_attr_str = "test_string" d.test_attr_int = 0 d.test_attr_float = 1.2 + return fn - def tearDown(self): - """Remove the previously created test file.""" - os.remove("test.nc") - - def test_all_basic(self): + def test_all_basic(self, dummy_nc_file): """Test everything about the NetCDF4 class.""" import xarray as xr from satpy.readers.netcdf_utils import NetCDF4FileHandler - file_handler = NetCDF4FileHandler("test.nc", {}, {}) + file_handler = NetCDF4FileHandler(dummy_nc_file, {}, {}) assert file_handler["/dimension/rows"] == 10 assert file_handler["/dimension/cols"] == 100 @@ -165,7 +171,7 @@ def test_all_basic(self): assert file_handler.file_handle is None assert file_handler["ds2_sc"] == 42 - def test_listed_variables(self): + def test_listed_variables(self, dummy_nc_file): """Test that only listed variables/attributes area collected.""" from satpy.readers.netcdf_utils import NetCDF4FileHandler @@ -175,12 +181,12 @@ def test_listed_variables(self): "attr/test_attr_str", ] } - file_handler = NetCDF4FileHandler("test.nc", {}, filetype_info) + file_handler = NetCDF4FileHandler(dummy_nc_file, {}, filetype_info) assert len(file_handler.file_content) == 2 assert "test_group/attr/test_attr_str" in file_handler.file_content assert "attr/test_attr_str" in file_handler.file_content - def test_listed_variables_with_composing(self): + def test_listed_variables_with_composing(self, dummy_nc_file): """Test that composing for listed variables is performed.""" from satpy.readers.netcdf_utils import NetCDF4FileHandler @@ -199,7 +205,7 @@ def test_listed_variables_with_composing(self): ], } } - file_handler = NetCDF4FileHandler("test.nc", {}, filetype_info) + file_handler = NetCDF4FileHandler(dummy_nc_file, {}, filetype_info) assert len(file_handler.file_content) == 3 assert "test_group/ds1_f/attr/test_attr_str" in file_handler.file_content assert "test_group/ds1_i/attr/test_attr_str" in file_handler.file_content @@ -208,10 +214,10 @@ def test_listed_variables_with_composing(self): assert not any("another_parameter" in var for var in file_handler.file_content) assert "test_group/attr/test_attr_str" in file_handler.file_content - def test_caching(self): + def test_caching(self, dummy_nc_file): """Test that caching works as intended.""" from satpy.readers.netcdf_utils import NetCDF4FileHandler - h = NetCDF4FileHandler("test.nc", {}, {}, cache_var_size=1000, + h = NetCDF4FileHandler(dummy_nc_file, {}, {}, cache_var_size=1000, cache_handle=True) assert h.file_handle is not None assert h.file_handle.isopen() @@ -226,8 +232,6 @@ def test_caching(self): np.testing.assert_array_equal( h["ds2_f"], np.arange(10. * 100).reshape((10, 100))) - h.__del__() - assert not h.file_handle.isopen() def test_filenotfound(self): """Test that error is raised when file not found.""" @@ -237,21 +241,21 @@ def test_filenotfound(self): with pytest.raises(IOError, match=".*(No such file or directory|Unknown file format).*"): NetCDF4FileHandler("/thisfiledoesnotexist.nc", {}, {}) - def test_get_and_cache_npxr_is_xr(self): + def test_get_and_cache_npxr_is_xr(self, dummy_nc_file): """Test that get_and_cache_npxr() returns xr.DataArray.""" import xarray as xr from satpy.readers.netcdf_utils import NetCDF4FileHandler - file_handler = NetCDF4FileHandler("test.nc", {}, {}, cache_handle=True) + file_handler = NetCDF4FileHandler(dummy_nc_file, {}, {}, cache_handle=True) data = file_handler.get_and_cache_npxr("test_group/ds1_f") assert isinstance(data, xr.DataArray) - def test_get_and_cache_npxr_data_is_cached(self): + def test_get_and_cache_npxr_data_is_cached(self, dummy_nc_file): """Test that the data are cached when get_and_cache_npxr() is called.""" from satpy.readers.netcdf_utils import NetCDF4FileHandler - file_handler = NetCDF4FileHandler("test.nc", {}, {}, cache_handle=True) + file_handler = NetCDF4FileHandler(dummy_nc_file, {}, {}, cache_handle=True) data = file_handler.get_and_cache_npxr("test_group/ds1_f") # Delete the dataset from the file content dict, it should be available from the cache @@ -265,7 +269,6 @@ class TestNetCDF4FsspecFileHandler: def test_default_to_netcdf4_lib(self): """Test that the NetCDF4 backend is used by default.""" - import os import tempfile import h5py @@ -296,6 +299,239 @@ def test_use_h5netcdf_for_file_not_accessible_locally(self): assert fh._use_h5netcdf +class FakePreloadableHandler(PreloadableSegments, FakeNetCDF4FileHandler): + """Fake preloadable handler.""" + + def __init__(self, filename, filename_info, filetype_info, **kwargs): + """Initialise the fake preloadable handler.""" + super().__init__(filename, filename_info, filetype_info, **kwargs) + if self.preload: + self._collect_listed_variables(None, + filetype_info.get("required_netcdf_variables")) + + def get_test_content(self, filename, filename_info, filetype_info): + """Get fake test content.""" + return {} + + +class TestPreloadableHandler: + """Test functionality related to preloading.""" + + @staticmethod + def test_wait_for_file_already_exists(tmp_path): + """Test case where file already exists.""" + from satpy.readers.netcdf_utils import _wait_for_file + fn = tmp_path / "file1" + fn.parent.mkdir(exist_ok=True, parents=True) + fn.touch() + waiter = _wait_for_file(os.fspath(fn), max_tries=1, wait=0) + assert waiter.compute() == os.fspath(fn) + + @staticmethod + def test_wait_for_file_appears(tmp_path): + """Test case where file appears after a bit.""" + from satpy.readers.netcdf_utils import _wait_for_file + def _wait_and_create(path): + time.sleep(0.08) + path.touch() + fn = tmp_path / "file2" + waiter = _wait_for_file(os.fspath(fn), max_tries=8, wait=0.07) + with concurrent.futures.ThreadPoolExecutor() as executor: + executor.submit(_wait_and_create, fn) + t1 = time.time() + res = waiter.compute() + t2 = time.time() + assert res == os.fspath(fn) + assert t2 - t1 > 0.05 + + @staticmethod + def test_wait_for_file_not_appears(tmp_path): + """Test case where file fails to appear.""" + from satpy.readers.netcdf_utils import _wait_for_file + fn = tmp_path / "file3" + waiter = _wait_for_file(os.fspath(fn), max_tries=1, wait=0) + with pytest.raises(TimeoutError): + waiter.compute() + + @staticmethod + def test_wait_for_file_multiple_appear(tmp_path): + """Test case where file fails to appear.""" + from satpy.readers.netcdf_utils import _wait_for_file + fn = tmp_path / "file?" + waiter = _wait_for_file(os.fspath(fn), max_tries=1, wait=0) + with pytest.raises(TimeoutError): + waiter.compute() + + @staticmethod + def test_wait_for_file_multiple_appears(tmp_path): + """Test case where file appears after a bit.""" + from satpy.readers.netcdf_utils import _wait_for_file + def _wait_and_create(path1, path2): + path1.touch() + path2.touch() + fn4 = tmp_path / "file4" + fn5 = tmp_path / "file5" + pat = os.fspath(tmp_path / "file?") + waiter = _wait_for_file(pat, max_tries=2, wait=0.1) + with concurrent.futures.ThreadPoolExecutor() as executor: + executor.submit(_wait_and_create, fn4, fn5) + with pytest.raises(ValueError, + match="Expected one matching file, found 2"): + waiter.compute() + + def test_get_delayed_from_file(self, tmp_path): + """Get getting a value from a file (delayed).""" + from satpy.readers.netcdf_utils import _get_delayed_value_from_nc + ncname = tmp_path / "croatia.nc" + ds = xr.Dataset({"rijeka": (("y", "x"), np.zeros((3, 3)))}) + ds.to_netcdf(ncname) + var_del = _get_delayed_value_from_nc(ncname, "rijeka") + np.testing.assert_array_equal(var_del.compute(), np.zeros((3, 3))) + + ncname = tmp_path / "liechtenstein.nc" + ds = xr.Dataset({"vaduz": (("y", "x"), np.ones((2, 2)))}) + ds.to_netcdf(ncname, group="/earth/europe") + var_del = _get_delayed_value_from_nc(ncname, "earth/europe/vaduz") + np.testing.assert_array_equal(var_del.compute(), np.ones((2, 2))) + + @pytest.fixture() + def fake_config(self): + """Get a fake required net vars configuration.""" + return {"required_netcdf_variables": { + "/iceland/reykjavík": ["rc"], + "/iceland/bolungarvík": ["segment"], + "/iceland/{volcano}/lava": ["rc"], + "/iceland/{volcano}/crater": ["segment"], + "/iceland/{volcano}/magma": [], + }, + "variable_name_replacements": { + "volcano": ["eyjafjallajökull", "sýlingarfell"] + }} + + @pytest.fixture() + def preloadable_false(self, fake_config, tmp_path): + """Return a preloadable filehandler with preload=False.""" + ds = xr.Dataset() + ds.to_netcdf(tmp_path / "grimsvötn.nc") + handler = FakePreloadableHandler( + os.fspath(tmp_path / "grimsvötn.nc"), + {}, + fake_config, + preload=False) + handler.file_content["/iceland/reykjavík"] = xr.DataArray( + da.from_array([[0, 1, 2]])) + handler.file_content["/iceland/bolungarvík"] = "012" + handler.file_content["/iceland/eyjafjallajökull/lava"] = xr.DataArray( + da.from_array([[1, 2, 3]])) + handler.file_content["/iceland/eyjafjallajökull/crater"] = xr.DataArray( + da.from_array([[2, 2, 3]])) + handler.file_content["/iceland/eyjafjallajökull/magma"] = xr.DataArray( + da.from_array([[3, 2, 3]])) + handler.file_content["/iceland/eyjafjallajökull/magma/shape"] = (1, 3) + handler.file_content["/iceland/sýlingarfell/lava"] = xr.DataArray( + da.from_array([[4, 2, 3]])) + handler.file_content["/iceland/sýlingarfell/crater"] = xr.DataArray( + da.from_array([[5, 2, 3]])) + handler.file_content["/iceland/sýlingarfell/magma"] = xr.DataArray( + da.from_array([[6, 2, 3]])) + handler.file_content["/iceland/sýlingarfell/magma/shape"] = (1, 3) + return handler + + @pytest.fixture() + def cache_file(self, preloadable_false, tmp_path): + """Define a cache file.""" + cf = os.fspath(tmp_path / "test.pkl") + preloadable_false.store_cache(cf) + return cf + + @pytest.fixture() + def preloadable_true(self, preloadable_false, tmp_path, cache_file, + fake_config): + """Return a preloadable filehandler with preload=True.""" + handler = FakePreloadableHandler( + "dummy", + {}, + fake_config, + preload=True, + ref_fh=preloadable_false, + rc_cache=cache_file) + return handler + + def test_store_and_load_cache(self, tmp_path, preloadable_false, + cache_file, fake_config): + """Test that cache is stored as expected.""" + dt = pickle.load(open(cache_file, mode="rb")) + assert isinstance(dt["/iceland/reykjavík"].data, np.ndarray) # calculated + np.testing.assert_array_equal(dt["/iceland/reykjavík"], + np.array([[0, 1, 2]])) + # segment-only should not be stored + assert "/iceland/bolungarvík" not in dt + # and test loading + handler = FakePreloadableHandler( + "dummy", + {}, + fake_config, + preload=True, + ref_fh=preloadable_false, + rc_cache=cache_file) + np.testing.assert_array_equal(handler["/iceland/reykjavík"], + np.array([[0, 1, 2]])) + + def test_can_get_from_other_segment(self, preloadable_true): + """Test if segment-cachable check works correctly.""" + assert preloadable_true._can_get_from_other_segment("/iceland/bolungarvík") + assert not preloadable_true._can_get_from_other_segment("/iceland/reykjavík") + + def test_can_get_from_other_rc(self, preloadable_true): + """Test if rc-cachable check works correctly.""" + assert not preloadable_true._can_get_from_other_rc("/iceland/bolungarvík") + assert preloadable_true._can_get_from_other_rc("/iceland/reykjavík") + assert preloadable_true._can_get_from_other_rc("/iceland/bolungarvík/shape") + assert not preloadable_true._can_get_from_other_rc("/iceland/eyjafjallajökull/magma") + assert preloadable_true._can_get_from_other_rc("/iceland/eyjafjallajökull/lava") + assert preloadable_true._can_get_from_other_rc("/iceland/sýlingarfell/lava") + assert preloadable_true._can_get_from_other_rc("/iceland/eyjafjallajökull/magma/dtype") + + def test_get_from_other_segment(self, preloadable_true): + """Test that loading from a cached segment works.""" + assert preloadable_true["/iceland/bolungarvík"] == "012" + + def test_get_from_other_rc(self, preloadable_true): + """Test that loading from a cached repeat cycle works.""" + assert preloadable_true["/iceland/bolungarvík"] == "012" + + def test_incorrect_usage(self, fake_config, preloadable_false, cache_file, + tmp_path): + """Test exceptions raised when usage is incorrect.""" + with pytest.raises(TypeError, match="Expect reference filehandler"): + FakePreloadableHandler("dummy", {}, fake_config, preload=True, + rc_cache=cache_file) + with pytest.raises(TypeError, match="Expected cache file"): + FakePreloadableHandler("dummy", {}, fake_config, preload=True, + ref_fh=preloadable_false) + fph = FakePreloadableHandler("dummy", {}, fake_config, preload=True, + rc_cache=cache_file, ref_fh=preloadable_false) + with pytest.raises(ValueError, match="Cannot store cache with pre-loaded handler"): + fph.store_cache(tmp_path / "nowhere-special.pkl") + fc2 = fake_config.copy() + del fc2["required_netcdf_variables"] + with pytest.raises(ValueError, + match="For preloadable filehandlers, " + "required_netcdf_variables is mandatory"): + fph = FakePreloadableHandler("dummy", {}, fc2, preload=False, + rc_cache=cache_file) + + def test_get_file_handle(self, preloadable_true, preloadable_false, + tmp_path): + """Test getting the file handle.""" + preloadable_true._get_file_handle() + preloadable_false._get_file_handle() + + def test_collect_vars(self, tmp_path, fake_config, preloadable_false): + """Test collecting variables is delegated.""" + preloadable_false._collect_listed_variables(None, {}) + + NC_ATTRS = { "standard_name": "test_data", "scale_factor": 0.01, @@ -393,3 +629,40 @@ def test_get_data_as_xarray_scalar_h5netcdf(tmp_path): res = get_data_as_xarray(fid["test_data"]) np.testing.assert_equal(res.data, np.array(data)) assert res.attrs == NC_ATTRS + + +@pytest.fixture() +def dummy_nc(tmp_path): + """Fixture to create a dummy NetCDF file and return its path.""" + import xarray as xr + + fn = tmp_path / "sjaunja.nc" + ds = xr.Dataset(data_vars={"kaitum": (["x"], np.arange(10))}) + ds.to_netcdf(fn) + return fn + + +def test_caching_distributed(dummy_nc): + """Test that the distributed scheduler works with file handle caching. + + This is a test for GitHub issue 2815. + """ + from dask.distributed import Client + + from satpy.readers.netcdf_utils import NetCDF4FileHandler + + fh = NetCDF4FileHandler(dummy_nc, {}, {}, cache_handle=True) + + def doubler(x): + return x * 2 + + # As documented in GH issue 2815, using dask distributed with the file + # handle cacher might fail in non-trivial ways, such as giving incorrect + # results. Testing map_blocks is one way to reproduce the problem + # reliably, even though the problem also manifests itself (in different + # ways) without map_blocks. + + + with Client(): + dask_doubler = fh["kaitum"].map_blocks(doubler) + dask_doubler.compute() diff --git a/satpy/tests/reader_tests/test_utils.py b/satpy/tests/reader_tests/test_utils.py index b36a2b1d60..40a872db29 100644 --- a/satpy/tests/reader_tests/test_utils.py +++ b/satpy/tests/reader_tests/test_utils.py @@ -514,6 +514,64 @@ def test_generic_open_binary(tmp_path, data, filename, mode): assert read_binary_data == dummy_data +class TestDistributed: + """Distributed-related tests. + + Distributed-related tests are grouped so that they can share a class-scoped + fixture setting up the distributed client, as this setup is relatively + slow. + """ + + @pytest.fixture(scope="class") + def dask_dist_client(self): + """Set up and close a dask distributed client.""" + from dask.distributed import Client + cl = Client() + yield cl + cl.close() + + + @pytest.mark.parametrize("shape", [(2,), (2, 3), (2, 3, 4)]) + @pytest.mark.parametrize("dtype", ["i4", "f4", "f8"]) + @pytest.mark.parametrize("grp", ["/", "/in/a/group"]) + def test_get_serializable_dask_array(self, tmp_path, dask_dist_client, shape, dtype, grp): + """Test getting a dask distributed friendly serialisable dask array.""" + import netCDF4 + from xarray.backends import CachingFileManager + + fn = tmp_path / "sjaunja.nc" + ds = xr.Dataset( + data_vars={ + "kaitum": (["x", "y", "z"][:len(shape)], + np.arange(np.prod(shape), + dtype=dtype).reshape(shape))}) + ds.to_netcdf(fn, group=grp) + + cfm = CachingFileManager(netCDF4.Dataset, fn, mode="r") + arr = hf.get_serializable_dask_array(cfm, "/".join([grp, "kaitum"]), + chunks=shape, dtype=dtype) + + # As documented in GH issue 2815, using dask distributed with the file + # handle cacher might fail in non-trivial ways, such as giving incorrect + # results. Testing map_blocks is one way to reproduce the problem + # reliably, even though the problem also manifests itself (in different + # ways) without map_blocks. + + def doubler(x): + # with a workaround for https://github.com/numpy/numpy/issues/27029 + return x * x.dtype.type(2) + + dask_doubler = arr.map_blocks(doubler, dtype=arr.dtype) + res = dask_doubler.compute() + # test before and after computation, as to confirm we have the correct + # shape and dtype and that computing doesn't change them + assert shape == dask_doubler.shape + assert shape == res.shape + assert dtype == dask_doubler.dtype + assert dtype == res.dtype + np.testing.assert_array_equal(res, np.arange(np.prod(shape)).reshape(shape)*2) + + class TestCalibrationCoefficientPicker: """Unit tests for calibration coefficient selection.""" diff --git a/satpy/tests/test_readers.py b/satpy/tests/test_readers.py index 6971efca41..9663b643e8 100644 --- a/satpy/tests/test_readers.py +++ b/satpy/tests/test_readers.py @@ -1,6 +1,4 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# Copyright (c) 2019, 2022, 2023 Satpy developers +# Copyright (c) 2019, 2022-2024 Satpy developers # # This file is part of satpy. # @@ -22,6 +20,7 @@ import contextlib import datetime as dt import os +import pickle import sys import unittest import warnings @@ -29,11 +28,13 @@ from typing import Iterator from unittest import mock +import dask.array as da import numpy as np import pytest import xarray as xr from pytest_lazy_fixtures import lf as lazy_fixture +import satpy from satpy.dataset.data_dict import get_key from satpy.dataset.dataid import DataID, ModifierTuple, WavelengthRange from satpy.readers import FSFile, find_files_and_readers, open_file_or_filename @@ -1115,6 +1116,40 @@ def test_fs_property_is_read_only(self, local_filename): fsf.fs = "foo" +def test_create_preloadable_cache(tmp_path): + """Test utility function creating a test for preloading.""" + from satpy.readers import create_preloadable_cache + from satpy.readers.yaml_reader import GEOSegmentYAMLReader + from satpy.tests.reader_tests.test_netcdf_utils import FakePreloadableHandler + fake_config = {"reader": { + "name": "tartupaluk"}, + "file_types": { + "m9g": { + "file_reader": FakePreloadableHandler, + "file_patterns": ["a-{segment:d}.nc"], + "expected_segments": 3, + "required_netcdf_variables": {"/iceland/reykjavík": ["rc"]}}}} + dph = FakePreloadableHandler( + os.fspath(tmp_path / "a-0.nc"), + {"segment": 0}, fake_config["file_types"]["m9g"], + rc_cache=tmp_path / "test.pkl", + preload=False) + dph.file_content["/iceland/reykjavík"] = xr.DataArray(da.from_array([[0, 1, 2]])) + with satpy.config.set({"readers.preload.enable": True}): + gsyr = GEOSegmentYAMLReader(fake_config) + gsyr.file_handlers["handler"] = [dph] + + with unittest.mock.patch("satpy.readers.load_readers") as srl: + with unittest.mock.patch("appdirs.user_cache_dir") as au: + au.return_value = os.fspath(tmp_path / "cache") + srl.return_value = {"tartupaluk": gsyr} + create_preloadable_cache("tartupaluk", [tmp_path / "a-0.nc"]) + with (tmp_path / "cache" / "satpy" / "preloadable" / + "FakePreloadableHandler" / "a-0.pkl").open(mode="rb") as fp: + data = pickle.load(fp) + assert data.keys() + + def test_open_file_or_filename_uses_mode(tmp_path): """Test that open_file_or_filename uses provided mode.""" filename = tmp_path / "hej" diff --git a/satpy/tests/test_yaml_reader.py b/satpy/tests/test_yaml_reader.py index 699f6619b6..5ebed47b1b 100644 --- a/satpy/tests/test_yaml_reader.py +++ b/satpy/tests/test_yaml_reader.py @@ -29,6 +29,7 @@ import pytest import xarray as xr +import satpy import satpy.readers.yaml_reader as yr from satpy._compat import cache from satpy.dataset import DataQuery @@ -1529,3 +1530,262 @@ def test_get_empty_segment_with_height(self): new_height = 140 new_empty_segment = geswh(empty_segment, new_height, dim) assert new_empty_segment is empty_segment + + +_fake_filetype_info = { + "file_reader": DummyReader, + "file_patterns": [ + "{platform}-a-{start_time:%Y%m%d%H%M%S}-{end_time:%Y%m%d%H%M%S}-{segment:>04d}.nc", + "{platform}-b-{start_time:%Y%m%d%H%M%S}-{end_time:%Y%m%d%H%M%S}-{segment:>04d}.nc"], + "expected_segments": 5, + "time_tags": ["start_time", "end_time"], + "segment_tag": "segment"} + + +def _get_fake_handler(parent, filename): + fake_filename = os.fspath(parent / filename) + fake_filename_info = { + "platform": "M9G", + "start_time": dt.datetime(2100, 1, 1, 10, 0, 0), + "end_time": dt.datetime(2100, 1, 1, 10, 1, 0), + "segment": 1} + fakehandler = BaseFileHandler( + fake_filename, + fake_filename_info, + _fake_filetype_info) + return fakehandler + + +def test_predict_filename_info(tmp_path): + """Test prediction of filename info.""" + from satpy.readers.yaml_reader import _predict_filename_info + + fh = _get_fake_handler( + tmp_path, + "M9G-a-21000101100000-21000101100100-0001.nc") + info = _predict_filename_info(fh, 3) + assert info == { + "platform": "M9G", + "start_time": dt.datetime(2100, 1, 1, 23, 59, 59), + "end_time": dt.datetime(2100, 1, 1, 23, 59, 59), + "segment": 3} + + +@pytest.fixture() +def fake_gsyreader(): + """Create a fake GeoSegmentYAMLReader.""" + from satpy.readers.yaml_reader import GEOSegmentYAMLReader + with satpy.config.set({"readers.preload.enable": True}): + return GEOSegmentYAMLReader( + {"reader": { + "name": "alicudi"}, + "file_types": { + "m9g": _fake_filetype_info}}) + + +def test_predict_filename(tmp_path, fake_gsyreader): + """Test predicting a filename.""" + fh = _get_fake_handler( + tmp_path, + "M9G-a-21000101100000-21000101100100-0001.nc") + newname = fake_gsyreader._predict_filename(fh, 4) + assert newname[0] == os.fspath(tmp_path / "M9G-a-21000101??????-21000101??????-0004.nc") + fh = _get_fake_handler( + tmp_path, + "M9G-b-21000101100000-21000101100100-0001.nc") + newname = fake_gsyreader._predict_filename(fh, 4) + assert newname[0] == os.fspath(tmp_path / "M9G-b-21000101??????-21000101??????-0004.nc") + + st = dt.datetime(2023, 12, 20, 15, 8, 49) + et = st + dt.timedelta(minutes=5) + fn = f"M9G-b-{st:%Y%m%d%H%M%S}-{et:%Y%m%d%H%M%S}-0001.nc" + pt = "M9G-b-20231220??????-20231220??????-0004.nc" + fake_filename_info = {"platform": "M9G", "start_time": st, "end_time": et, "segment": 1} + fakehandler = BaseFileHandler( + os.fspath(tmp_path / fn), + fake_filename_info, + _fake_filetype_info) + newname = fake_gsyreader._predict_filename(fakehandler, 4) + assert newname[0] == os.fspath(tmp_path / pt) + + +def test_select_pattern(fake_gsyreader): + """Test selecting the appropriate pattern.""" + assert fake_gsyreader._select_pattern( + "M9G-a-21000101100000-21000101100100-0001.nc") == ( + "{platform}-a-{start_time:%Y%m%d%H%M%S}-{end_time:%Y%m%d%H%M%S}-{segment:>04d}.nc") + assert fake_gsyreader._select_pattern( + "M9G-b-21000101100000-21000101100100-0001.nc") == ( + "{platform}-b-{start_time:%Y%m%d%H%M%S}-{end_time:%Y%m%d%H%M%S}-{segment:>04d}.nc") + with pytest.raises(ValueError, match="Cannot predict filenames"): + fake_gsyreader._select_pattern( + "M9G-c-21000101100000-21000101100100-0001.nc") + + +@pytest.fixture() +def fake_simple_nc_file(tmp_path): + """Create a small dummy NetCDF file for testing preloaded instances. + + Returns the filename. + """ + nm = tmp_path / "M9G-a-21000101053000-21000101053100-01.nc" + nm.parent.mkdir(exist_ok=True, parents=True) + ds = xr.Dataset() + ds["panarea"] = xr.DataArray(np.array([[0, 1, 2]]), dims=["y", "x"]) + ds["strómboli"] = xr.DataArray(np.array([[1, 1, 2]]), dims=["y", "x"]) + ds["salina"] = xr.DataArray(np.array([[2, 1, 2]]), dims=["y", "x"]) + ds.to_netcdf(nm, group="/grp") + + return nm + + +@pytest.fixture() +def dummy_preloadable_handler(): + """Return a dummy preloadable netcdf4-based filehandler.""" + from satpy.readers.netcdf_utils import NetCDF4FileHandler, PreloadableSegments + class DummyPreloadableHandler(PreloadableSegments, NetCDF4FileHandler): + pass + return DummyPreloadableHandler + + +@pytest.fixture() +def fake_filetype_info(dummy_preloadable_handler): + """Return a fake filetype info dict.""" + ft_info = { + "file_reader": dummy_preloadable_handler, + "file_patterns": [ + "{platform}-a-{start_time:%Y%m%d%H%M%S}-{end_time:%Y%m%d%H%M%S}-{segment:>02d}.nc", + "{platform}-b-{start_time:%Y%m%d%H%M%S}-{end_time:%Y%m%d%H%M%S}-{segment:>02d}.nc"], + "expected_segments": 3, + "time_tags": ["start_time", "end_time"], + "segment_tag": "segment", + "required_netcdf_variables": { + "grp/panarea": ["segment"], # put in group to avoid https://github.com/pytroll/satpy/issues/2704 + "grp/strómboli": ["rc"], + "grp/salina": []}} + + return ft_info + + +def test_preloaded_instances_works( + tmp_path, fake_gsyreader, fake_simple_nc_file, + dummy_preloadable_handler, fake_filetype_info): + """That that preloaded instances are generated.""" + from satpy.readers.yaml_reader import GEOSegmentYAMLReader + + ft_info_2 = {**fake_filetype_info, + "file_patterns": [ + "{platform}-c-{start_time:%Y%m%d%H%M%S}-{end_time:%Y%m%d%H%M%S}-{segment:>02d}.nc", + "{platform}-d-{start_time:%Y%m%d%H%M%S}-{end_time:%Y%m%d%H%M%S}-{segment:>02d}.nc"]} + + with satpy.config.set({"readers.preload.enable": True}): + gsyr = GEOSegmentYAMLReader( + {"reader": { + "name": "island-reader"}, + "file_types": { + "m9g": fake_filetype_info, + "mag": ft_info_2}}) + + + # filename info belonging to fake_simple_nc_file + + fn_info = {"platform": "M9a", "start_time": dt.datetime(2100, 1, 1, 5, 30), + "end_time": dt.datetime(2100, 1, 1, 5, 31), "segment": 1} + + with unittest.mock.patch("appdirs.user_cache_dir") as au: + au.return_value = os.fspath(tmp_path / "cache") + # prepare cache files + dph = dummy_preloadable_handler(os.fspath(fake_simple_nc_file), + fn_info, fake_filetype_info) + for i in range(2, 4): # disk cache except for nr. 1 + fn = (tmp_path / "cache" / "satpy" / "preloadable" / + "DummyPreloadableHandler" / + f"M9G-a-99991231235959-99991231235959-{i:>02d}.pkl") + fn.parent.mkdir(exist_ok=True, parents=True) + dph.store_cache(os.fspath(fn)) + + fhs = gsyr.create_filehandlers([os.fspath(fake_simple_nc_file)]) + assert len(fhs["m9g"]) == 3 + + +def test_preloaded_instances_requirement( + tmp_path, fake_gsyreader, fake_simple_nc_file, + dummy_preloadable_handler, fake_filetype_info): + """Test that pre-loading instances fails if there is a required tag.""" + from satpy.readers.yaml_reader import GEOSegmentYAMLReader + + ft_info = {**fake_filetype_info, + "requires": ["pergola"]} + + with satpy.config.set({"readers.preload.enable": True}): + gsyr = GEOSegmentYAMLReader( + {"reader": { + "name": "alicudi"}, + "file_types": { + "m9g": ft_info}}) + g = gsyr._new_filehandler_instances( + ft_info, + [(os.fspath(fake_simple_nc_file), + {"platform": "M9G", + "start_time": dt.datetime(2100, 1, 1, 5, 30, ), + "end_time": dt.datetime(2100, 1, 1, 5, 31, ), + "segment": 1})]) + with pytest.raises(ValueError, match="Unable to preload"): + list(g) + + +def test_preloaded_instances_not_implemented(tmp_path, fake_gsyreader, + fake_filetype_info): + """Test that pre-loading instances fails if it is not implemented.""" + ft_info = {**fake_filetype_info, "requires": ["pergola"]} + + # Second argument irrelevant, as this part of the method should not be + # reached + g = fake_gsyreader._new_preloaded_filehandler_instances(ft_info, None) + with pytest.raises(NotImplementedError, match="Pre-loading not implemented"): + list(g) + + +@pytest.mark.parametrize("include", [(), ("rc",), ("rc", "time")]) +def test_get_cache_filename(tmp_path, include): + """Test getting the pre-loading cache filename.""" + from satpy.readers.yaml_reader import GEOSegmentYAMLReader + + fn = fp = "a-" + fn_info = {"segment": 1} + ft_info = { + "file_reader": BaseFileHandler, + "segment_tag": "segment", + "expected_segments": 5} + if "time" in include: + fn += "20421015234500-234600-" + fp += "{start_time:%Y%m%d%H%M%S}-{end_time:%H%M%S}-" + fn_info["start_time"] = dt.datetime(2042, 10, 15, 23, 45) + fn_info["end_time"] = dt.datetime(2042, 10, 15, 23, 46) + ft_info["time_tags"] = ["start_time", "end_time"] + if "rc" in include: + fn += "04-" + fp += "{rc:>02d}-" + fn_info["rc"] = 4 + fn += "01.nc" + fp += "{segment:>02d}.nc" + + ft_info["file_patterns"] = [fp] + if "time" in include: + ref_fn = "a-99991231235959-235959-04-01.pkl" + else: + ref_fn = fn[:-2] + "pkl" + + with satpy.config.set({"readers.preload.enable": True}): + gsyr = GEOSegmentYAMLReader( + {"reader": { + "name": "filicudi"}, + "file_types": { + "m9g": ft_info}}) + fh = BaseFileHandler(fn, fn_info, ft_info) + + with unittest.mock.patch("appdirs.user_cache_dir") as au: + au.return_value = os.fspath(tmp_path / "cache") + cf = gsyr._get_cache_filename(os.fspath(fn), fn_info, fh) + assert cf == os.fspath(tmp_path / "cache" / "satpy" / "preloadable" / + "BaseFileHandler" / ref_fn)