Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specify meta in dask.array.map_blocks #5989

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
4 changes: 2 additions & 2 deletions docs/src/whatsnew/latest.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ This document explains the changes made to Iris for this release
🐛 Bugs Fixed
=============

#. N/A

#. `@rcomer`_ enabled partial collapse of multi-dimensional string coordinates,
fixing :issue:`3653`. (:pull:`5955`)

#. `@bouweandela`_ made further updates to the ``chunktype`` of Dask arrays,
so it corresponds better with the array content. (:pull:`5989`)

💣 Incompatible Changes
=======================
Expand Down
12 changes: 9 additions & 3 deletions lib/iris/_lazy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,11 +537,12 @@ def lazy_elementwise(lazy_array, elementwise_op):
# call may cast to float, or not, depending on unit equality : Thus, it's
# much safer to get udunits to decide that for us.
dtype = elementwise_op(np.zeros(1, lazy_array.dtype)).dtype
meta = da.utils.meta_from_array(lazy_array).astype(dtype)

return da.map_blocks(elementwise_op, lazy_array, dtype=dtype)
return da.map_blocks(elementwise_op, lazy_array, dtype=dtype, meta=meta)


def map_complete_blocks(src, func, dims, out_sizes, *args, **kwargs):
def map_complete_blocks(src, func, dims, out_sizes, dtype, *args, **kwargs):
"""Apply a function to complete blocks.

Complete means that the data is not chunked along the chosen dimensions.
Expand All @@ -557,6 +558,8 @@ def map_complete_blocks(src, func, dims, out_sizes, *args, **kwargs):
Dimensions that cannot be chunked.
out_sizes : tuple of int
Output size of dimensions that cannot be chunked.
dtype :
Output dtype.
*args : tuple
Additional arguments to pass to `func`.
**kwargs : dict
Expand Down Expand Up @@ -596,8 +599,11 @@ def map_complete_blocks(src, func, dims, out_sizes, *args, **kwargs):
for dim, size in zip(dims, out_sizes):
out_chunks[dim] = size

# Assume operation preserves mask.
meta = da.utils.meta_from_array(data).astype(dtype)

result = data.map_blocks(
func, *args, chunks=out_chunks, dtype=src.dtype, **kwargs
func, *args, chunks=out_chunks, meta=meta, dtype=dtype, **kwargs
)

return result
7 changes: 4 additions & 3 deletions lib/iris/analysis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1390,9 +1390,10 @@ def _percentile(data, percent, fast_percentile_method=False, **kwargs):

result = iris._lazy_data.map_complete_blocks(
data,
_calc_percentile,
(-1,),
percent.shape,
func=_calc_percentile,
dims=(-1,),
out_sizes=percent.shape,
dtype=np.float64,
percent=percent,
fast_percentile_method=fast_percentile_method,
**kwargs,
Expand Down
13 changes: 10 additions & 3 deletions lib/iris/analysis/_area_weighted.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,11 +392,18 @@ def _regrid_area_weighted_rectilinear_src_and_grid__perform(

tgt_shape = (len(grid_y.points), len(grid_x.points))

# Specify the output dtype
if np.issubdtype(src_cube.dtype, np.integer):
out_dtype = np.float64
else:
out_dtype = src_cube.dtype

new_data = map_complete_blocks(
src_cube,
_regrid_along_dims,
(src_y_dim, src_x_dim),
meshgrid_x.shape,
func=_regrid_along_dims,
dims=(src_y_dim, src_x_dim),
out_sizes=meshgrid_x.shape,
dtype=out_dtype,
x_dim=src_x_dim,
y_dim=src_y_dim,
weights=weights,
Expand Down
13 changes: 10 additions & 3 deletions lib/iris/analysis/_regrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -935,11 +935,18 @@ def __call__(self, src):
x_dim = src.coord_dims(src_x_coord)[0]
y_dim = src.coord_dims(src_y_coord)[0]

# Specify the output dtype
if self._method == "linear" and np.issubdtype(src.dtype, np.integer):
out_dtype = np.float64
else:
out_dtype = src.dtype

data = map_complete_blocks(
src,
self._regrid,
(y_dim, x_dim),
sample_grid_x.shape,
func=self._regrid,
dims=(y_dim, x_dim),
out_sizes=sample_grid_x.shape,
dtype=out_dtype,
x_dim=x_dim,
y_dim=y_dim,
src_x_coord=src_x_coord,
Expand Down
5 changes: 4 additions & 1 deletion lib/iris/mesh/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ def fill_region(target, regiondata, regioninds):
# Notes on resultant calculation properties:
# 1. map_blocks is chunk-mapped, so it is parallelisable and space-saving
# 2. However, fetching less than a whole chunk is not efficient
meta = np.ma.array(
np.empty((0,) * result_array.ndim, dtype=result_array.dtype), mask=True
)
for cube in submesh_cubes:
# Lazy data array from the region cube
sub_data = cube.lazy_data()
Expand All @@ -300,7 +303,7 @@ def fill_region(target, regiondata, regioninds):
sub_data,
indarr,
dtype=result_array.dtype,
meta=np.ndarray,
meta=meta,
)

# Construct the result cube
Expand Down
13 changes: 13 additions & 0 deletions lib/iris/tests/unit/analysis/regrid/test_RectilinearRegridder.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,12 +474,25 @@ def setUp(self):
self.args = ("linear", "mask")
self.regridder = Regridder(self.cube, self.cube, *self.args)
self.lazy_cube = self.cube.copy(da.asarray(self.cube.data))
self.lazy_masked_cube = self.lazy_cube.copy(da.ma.masked_array(self.cube.data))
self.lazy_regridder = Regridder(self.lazy_cube, self.lazy_cube, *self.args)

def test_lazy_regrid(self):
result = self.lazy_regridder(self.lazy_cube)
self.assertTrue(result.has_lazy_data())
meta = da.utils.meta_from_array(result.core_data())
self.assertTrue(meta.__class__ is np.ndarray)
expected = self.regridder(self.cube)
self.assertEqual(result.dtype, expected.dtype)
self.assertTrue(result == expected)

def test_lazy_masked_regrid(self):
result = self.lazy_regridder(self.lazy_masked_cube)
self.assertTrue(result.has_lazy_data())
meta = da.utils.meta_from_array(result.core_data())
self.assertTrue(isinstance(meta, np.ma.MaskedArray))
expected = self.regridder(self.cube)
self.assertEqual(result.dtype, expected.dtype)
self.assertTrue(result == expected)


Expand Down
10 changes: 6 additions & 4 deletions lib/iris/tests/unit/analysis/test_PERCENTILE.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,10 @@ def test_default_kwargs_passed(self, mocked_mquantiles):
if self.lazy:
data = as_lazy_data(data)

self.agg_method(data, axis=axis, percent=percent)
result = self.agg_method(data, axis=axis, percent=percent)

# Trigger calculation for lazy case.
as_concrete_data(data)
as_concrete_data(result)
for key in ["alphap", "betap"]:
self.assertEqual(mocked_mquantiles.call_args.kwargs[key], 1)

Expand All @@ -170,10 +170,12 @@ def test_chosen_kwargs_passed(self, mocked_mquantiles):
if self.lazy:
data = as_lazy_data(data)

self.agg_method(data, axis=axis, percent=percent, alphap=0.6, betap=0.5)
result = self.agg_method(
data, axis=axis, percent=percent, alphap=0.6, betap=0.5
)

# Trigger calculation for lazy case.
as_concrete_data(data)
as_concrete_data(result)
for key, val in zip(["alphap", "betap"], [0.6, 0.5]):
self.assertEqual(mocked_mquantiles.call_args.kwargs[key], val)

Expand Down
64 changes: 57 additions & 7 deletions lib/iris/tests/unit/lazy_data/test_map_complete_blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,27 @@ def create_mock_cube(array):
class Test_map_complete_blocks(tests.IrisTest):
def setUp(self):
self.array = np.arange(8).reshape(2, 4)
self.func = lambda chunk: chunk + 1

def func(chunk):
"""Use a function that cannot be 'sampled'.

To make sure the call to map_blocks is correct for any function,
we define this function that cannot be called with size 0 arrays
to infer the output meta.
"""
if chunk.size == 0:
raise ValueError
return chunk + 1

self.func = func
self.func_result = self.array + 1

def test_non_lazy_input(self):
# Check that a non-lazy input doesn't trip up the functionality.
cube, cube_data = create_mock_cube(self.array)
result = map_complete_blocks(cube, self.func, dims=(1,), out_sizes=(4,))
result = map_complete_blocks(
cube, self.func, dims=(1,), out_sizes=(4,), dtype=self.array.dtype
)
self.assertFalse(is_lazy_data(result))
self.assertArrayEqual(result, self.func_result)
# check correct data was accessed
Expand All @@ -48,7 +62,9 @@ def test_non_lazy_input(self):
def test_lazy_input(self):
lazy_array = da.asarray(self.array, chunks=((1, 1), (4,)))
cube, cube_data = create_mock_cube(lazy_array)
result = map_complete_blocks(cube, self.func, dims=(1,), out_sizes=(4,))
result = map_complete_blocks(
cube, self.func, dims=(1,), out_sizes=(4,), dtype=lazy_array.dtype
)
self.assertTrue(is_lazy_data(result))
self.assertArrayEqual(result.compute(), self.func_result)
# check correct data was accessed
Expand All @@ -57,14 +73,44 @@ def test_lazy_input(self):

def test_dask_array_input(self):
lazy_array = da.asarray(self.array, chunks=((1, 1), (4,)))
result = map_complete_blocks(lazy_array, self.func, dims=(1,), out_sizes=(4,))
result = map_complete_blocks(
lazy_array, self.func, dims=(1,), out_sizes=(4,), dtype=lazy_array.dtype
)
self.assertTrue(is_lazy_data(result))
self.assertArrayEqual(result.compute(), self.func_result)

def test_dask_masked_array_input(self):
array = da.ma.masked_array(np.arange(2), mask=np.arange(2))
result = map_complete_blocks(
array, self.func, dims=tuple(), out_sizes=tuple(), dtype=array.dtype
)
self.assertTrue(is_lazy_data(result))
self.assertTrue(isinstance(da.utils.meta_from_array(result), np.ma.MaskedArray))
self.assertArrayEqual(result.compute(), np.ma.masked_array([1, 2], mask=[0, 1]))

def test_dask_array_input_with_different_output_dtype(self):
lazy_array = da.ma.masked_array(self.array, chunks=((1, 1), (4,)))
dtype = np.float32

def func(chunk):
if chunk.size == 0:
raise ValueError
return (chunk + 1).astype(np.float32)

result = map_complete_blocks(
lazy_array, func, dims=(1,), out_sizes=(4,), dtype=dtype
)
self.assertTrue(isinstance(da.utils.meta_from_array(result), np.ma.MaskedArray))
self.assertTrue(result.dtype == dtype)
self.assertTrue(result.compute().dtype == dtype)
self.assertArrayEqual(result.compute(), self.func_result)

def test_rechunk(self):
lazy_array = da.asarray(self.array, chunks=((1, 1), (2, 2)))
cube, _ = create_mock_cube(lazy_array)
result = map_complete_blocks(cube, self.func, dims=(1,), out_sizes=(4,))
result = map_complete_blocks(
cube, self.func, dims=(1,), out_sizes=(4,), dtype=lazy_array.dtype
)
self.assertTrue(is_lazy_data(result))
self.assertArrayEqual(result.compute(), self.func_result)

Expand All @@ -76,15 +122,19 @@ def func(_):
return np.arange(2).reshape(1, 2)

func_result = [[0, 1], [0, 1]]
result = map_complete_blocks(cube, func, dims=(1,), out_sizes=(2,))
result = map_complete_blocks(
cube, func, dims=(1,), out_sizes=(2,), dtype=lazy_array.dtype
)
self.assertTrue(is_lazy_data(result))
self.assertArrayEqual(result.compute(), func_result)

def test_multidimensional_input(self):
array = np.arange(2 * 3 * 4).reshape(2, 3, 4)
lazy_array = da.asarray(array, chunks=((1, 1), (1, 2), (4,)))
cube, _ = create_mock_cube(lazy_array)
result = map_complete_blocks(cube, self.func, dims=(1, 2), out_sizes=(3, 4))
result = map_complete_blocks(
cube, self.func, dims=(1, 2), out_sizes=(3, 4), dtype=lazy_array.dtype
)
self.assertTrue(is_lazy_data(result))
self.assertArrayEqual(result.compute(), array + 1)

Expand Down
Loading