From eb0b19fa52b4fab20a36675938b3b41d4f555ccd Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Tue, 10 Dec 2024 21:32:31 +0100 Subject: [PATCH] Cache Dask arrays to speed up loading files with multiple variables --- lib/iris/_lazy_data.py | 55 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 50 insertions(+), 5 deletions(-) diff --git a/lib/iris/_lazy_data.py b/lib/iris/_lazy_data.py index a3dfa1edb4..cde589b6ea 100644 --- a/lib/iris/_lazy_data.py +++ b/lib/iris/_lazy_data.py @@ -10,7 +10,7 @@ from functools import lru_cache, wraps from types import ModuleType -from typing import Sequence +from typing import Any, Sequence import dask import dask.array as da @@ -202,6 +202,7 @@ def _optimum_chunksize_internals( dim = working[0] working = working[1:] result.append(dim) + result = tuple(result) return result @@ -227,6 +228,33 @@ def _optimum_chunksize( ) +class LRUCache: + def __init__(self, maxsize: int): + self._cache: dict[str, Any] = {} + self.maxsize = maxsize + + def __getitem__(self, key): + value = self._cache.pop(key) + self._cache[key] = value + return value + + def __setitem__(self, key, value): + self._cache[key] = value + if len(self._cache) > self.maxsize: + self._cache.pop(next(iter(self._cache))) + + def __contains__(self, key): + return key in self._cache + + def __repr__(self): + return ( + f"<{self.__class__.__name__} maxsize={self.maxsize} cache={self._cache!r} >" + ) + + +CACHE = LRUCache(100) + + def as_lazy_data(data, chunks=None, asarray=False, meta=None, dims_fixed=None): """Convert the input array `data` to a :class:`dask.array.Array`. @@ -264,6 +292,8 @@ def as_lazy_data(data, chunks=None, asarray=False, meta=None, dims_fixed=None): but reduced by a factor if that exceeds the dask default chunksize. """ + from iris.fileformats.netcdf._thread_safe_nc import NetCDFDataProxy + if isinstance(data, ma.core.MaskedConstant): data = ma.masked_array(data.data, mask=data.mask) @@ -277,7 +307,7 @@ def as_lazy_data(data, chunks=None, asarray=False, meta=None, dims_fixed=None): if chunks is None: # No existing chunks : Make a chunk the shape of the entire input array # (but we will subdivide it if too big). - chunks = list(data.shape) + chunks = tuple(data.shape) # Adjust chunk size for better dask performance, # NOTE: but only if no shape dimension is zero, so that we can handle the @@ -291,9 +321,24 @@ def as_lazy_data(data, chunks=None, asarray=False, meta=None, dims_fixed=None): dims_fixed=dims_fixed, ) - if not is_lazy_data(data): - data = da.from_array(data, chunks=chunks, asarray=asarray, meta=meta) - return data + # Define a cache key for caching arrays created from NetCDFDataProxy objects. + # Creating new Dask arrays is relatively slow, therefore caching is beneficial + # if many cubes in the same file share coordinate arrays. + if isinstance(data, NetCDFDataProxy): + key = (repr(data), chunks, asarray, meta.dtype, type(meta)) + else: + key = None + + if is_lazy_data(data): + result = data + elif key in CACHE: + result = CACHE[key].copy() + else: + result = da.from_array(data, chunks=chunks, asarray=asarray, meta=meta) + if key is not None: + CACHE[key] = result.copy() + + return result def _co_realise_lazy_arrays(arrays):