Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Level creation now supports aggregation method mode #1078

Merged
merged 5 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
* Bug fix in `resampling_in_space` when projecting from geographic to non-geographic
projection. (#1073)

### Enhancements

* Level creation now supports aggregation method `mode` to aggregate to the value which
is most frequent. (#913)

## Changes in 1.7.0

### Enhancements
Expand Down
1 change: 1 addition & 0 deletions docs/source/mldatasets.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ The values are aggregation methods. Valid values are
| `max` | Minimum value of a window of N x N pixels. |
| `mean` | Mean value of a window of N x N pixels. |
| `median` | Median value of a window of N x N pixels. |
| `mode` | Modal (most common) value of a window of N x N pixels. |

The following is an example of the `.zlevels` file for a dataset with the
data variables `CHL` (chlorophyll) if type `float32` and a variable
Expand Down
71 changes: 71 additions & 0 deletions test/core/test_subsampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,77 @@ def test_subsample_dataset_max(self):
np.array([2.0, 4.0, 6.0]),
)

def test_subsample_dataset_mode_numpy(self):
subsampled_dataset = subsample_dataset(self.dataset, step=2, agg_methods="mode")
self.assert_subsampling_ok(
subsampled_dataset,
np.array(
[
[[2, 4, 6], [4, 6, 8], [1, 3, 5]],
[[12, 14, 16], [14, 16, 18], [11, 13, 15]],
],
dtype=np.int16,
),
np.array(
[
[[0.2, 0.4, 0.6], [0.4, 0.6, 0.8], [0.1, 0.3, 0.5]],
[[1.2, 1.4, 1.6], [1.4, 1.6, 1.8], [1.1, 1.3, 1.5]],
],
dtype=np.float64,
),
np.array([1.0, 3.0, 5.0]),
np.array([2.0, 4.0, 6.0]),
)

def test_subsample_dataset_mode_dask(self):
import dask.array as da
test_data_1 = da.array(
[
[1, 2, 3, 4, 5, 6],
[2, 3, 4, 5, 6, 7],
[3, 4, 5, 6, 7, 8],
[4, 5, 6, 7, 8, 9],
[1, 2, 3, 4, 5, 6],
],
dtype=np.int16,
)
test_data_1 = da.stack([test_data_1, test_data_1 + 10])
test_data_2 = 0.1 * test_data_1
dask_dataset = new_cube(
width=6, # even
height=5, # odd
x_name="x",
y_name="y",
x_start=0.5,
y_start=1.5,
x_res=1.0,
y_res=1.0,
time_periods=2,
crs=CRS84,
crs_name="spatial_ref",
variables=dict(var_1=test_data_1, var_2=test_data_2),
)
subsampled_dataset = subsample_dataset(dask_dataset, step=2, agg_methods="mode")
self.assert_subsampling_ok(
subsampled_dataset,
np.array(
[
[[2, 4, 6], [4, 6, 8], [1, 3, 5]],
[[12, 14, 16], [14, 16, 18], [11, 13, 15]],
],
dtype=np.int16,
),
np.array(
[
[[0.2, 0.4, 0.6], [0.4, 0.6, 0.8], [0.1, 0.3, 0.5]],
[[1.2, 1.4, 1.6], [1.4, 1.6, 1.8], [1.1, 1.3, 1.5]],
],
dtype=np.float64,
),
np.array([1.0, 3.0, 5.0]),
np.array([2.0, 4.0, 6.0]),
)

def assert_subsampling_ok(
self,
subsampled_dataset: xr.Dataset,
Expand Down
4 changes: 2 additions & 2 deletions xcube/cli/level.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@
default=DEFAULT_AGG_METHOD,
help=f"Aggregation method(s) to be used for data variables."
f' Either one of "first", "min", "max", "mean", "median",'
f' "auto" or list of assignments to individual variables'
f" using the notation"
f' "mode", "auto" or list of assignments to individual'
f' variables using the notation'
f' "<var1>=<method1>,<var2>=<method2>,..."'
f' Defaults to "{DEFAULT_AGG_METHOD}".',
)
Expand Down
6 changes: 3 additions & 3 deletions xcube/core/mldataset/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ class BaseMultiLevelDataset(LazyMultiLevelDataset):
agg_methods: Optional aggregation methods. May be given as
string or as mapping from variable name pattern to
aggregation method. Valid aggregation methods are None,
"first", "min", "max", "mean", "median". If None, the
default, "first" is used for integer variables and "mean"
for floating point variables.
"first", "min", "max", "mean", "median", and "mode".
If None, the default, "first" is used for integer
variables and "mean" for floating point variables.
"""

def __init__(
Expand Down
79 changes: 69 additions & 10 deletions xcube/core/subsampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@
# https://opensource.org/licenses/MIT.

import collections.abc
import dask.array as da
import fnmatch
from typing import Tuple, Optional, Union
from typing import Optional, Union, List
from collections.abc import Hashable, Mapping

import numpy as np
from scipy import stats
import xarray as xr

from xcube.util.assertions import assert_instance, assert_in, assert_true

AGG_METHODS = "auto", "first", "min", "max", "mean", "median"
AGG_METHODS = "auto", "first", "min", "max", "mean", "median", "mode"
DEFAULT_INT_AGG_METHOD = "first"
DEFAULT_FLOAT_AGG_METHOD = "mean"

Expand Down Expand Up @@ -40,7 +42,7 @@ def subsample_dataset(
agg_methods: Optional aggregation methods.
May be given as string or as mapping from variable name pattern
to aggregation method. Valid aggregation methods are
"auto", "first", "min", "max", "mean", "median".
"auto", "first", "min", "max", "mean", "median", and "mode".
If "auto", the default, "first" is used for integer variables
and "mean" for floating point variables.
Returns:
Expand Down Expand Up @@ -69,13 +71,12 @@ def subsample_dataset(
assert slices is not None
new_var = var[slices]
else:
dim = dict()
if x_name in var.dims:
dim[x_name] = step
if y_name in var.dims:
dim[y_name] = step
var_coarsen = var.coarsen(dim=dim, boundary="pad", coord_func="min")
new_var: xr.DataArray = getattr(var_coarsen, agg_method)()
if agg_method == "mode":
new_var: xr.DataArray = _agg_mode(var, x_name, y_name, step)
else:
new_var: xr.DataArray = _agg_builtin(
var, x_name, y_name, step, agg_method
)
if new_var.dtype != var.dtype:
# We don't want, e.g. "mean", to turn data
# from dtype unit16 into float64
Expand Down Expand Up @@ -109,6 +110,64 @@ def subsample_dataset(
return xr.Dataset(data_vars=new_data_vars, attrs=dataset.attrs)


def _agg_mode(var: xr.DataArray, x_name: str, y_name: str, step: int) -> xr.DataArray:
var_coarsen = _coarsen(var, x_name, y_name, step)
drop_axis = _get_drop_axis(var, x_name, y_name, step)

def _scipy_mode(x, axis, **kwargs):
return stats.mode(x, axis, nan_policy="omit", **kwargs).mode

def _mode_dask(x, axis, **kwargs):
return x.map_blocks(
_scipy_mode, axis=axis, dtype=x.dtype, drop_axis=drop_axis, **kwargs
)

if isinstance(var.data, da.Array):
return var_coarsen.reduce(_mode_dask)
else:
return var_coarsen.reduce(_scipy_mode)


def _agg_builtin(
var: xr.DataArray, x_name: str, y_name: str, step: int, agg_method: str
) -> xr.DataArray:
var_coarsen = _coarsen(var, x_name, y_name, step)
return getattr(var_coarsen, agg_method)()


def _coarsen(var: xr.DataArray, x_name: str, y_name: str, step: int):
dim = dict()
if x_name in var.dims:
dim[x_name] = step
if y_name in var.dims:
dim[y_name] = step
return var.coarsen(dim=dim, boundary="pad", coord_func="min")


def _get_drop_axis(var: xr.DataArray, x_name: str, y_name: str, step: int) -> List[int]:
"""
This function serves to determine the indexes of the dimensions of a coarsened
array that will not be included in the array after reduction. These dimensions
are the indexes following the indexes of the spatial dimensions from the original
array, so first these indexes are determined, then they are shifted appropriately.
"""
drop_axis = []
if x_name in var.dims:
drop_axis.append(var.dims.index(x_name))
if y_name in var.dims:
drop_axis.append(var.dims.index(y_name))
if len(drop_axis) == 1:
drop_axis[0] += 1
elif len(drop_axis) == 2:
# The latter index must be shifted by two positions
if drop_axis[0] > drop_axis[1]:
drop_axis[0] += 2
drop_axis[1] += 1
else:
drop_axis[0] += 1
drop_axis[1] += 2
return drop_axis

def get_dataset_agg_methods(
dataset: xr.Dataset,
xy_dim_names: Optional[tuple[str, str]] = None,
Expand Down
Loading