diff --git a/lib/iris/_concatenate.py b/lib/iris/_concatenate.py index 41ac806be7..ae7d4e4cc1 100644 --- a/lib/iris/_concatenate.py +++ b/lib/iris/_concatenate.py @@ -290,8 +290,7 @@ def _hash_array(a: da.Array | np.ndarray) -> np.int64: """Calculate a hash representation of the provided array. Calculates a 64-bit non-cryptographic hash of the provided array, using - the extremely fast ``xxhash`` hashing algorithm, and returns the hexdigest - string representation of the hash. + the fast ``xxhash`` hashing algorithm. Note that the computed hash depends on how the array is chunked. @@ -303,12 +302,13 @@ def _hash_array(a: da.Array | np.ndarray) -> np.int64: Returns ------- np.int64 - The string hexadecimal representation of the item's 64-bit hash. + The array's hash. """ def arrayhash(x): - value = xxh3_64(x.data.tobytes()) + value = xxh3_64(np.array(x.shape, dtype=np.uint).tobytes()) + value.update(x.data.tobytes()) if is_masked_data(x): value.update(x.mask.tobytes()) return np.frombuffer(value.digest(), dtype=np.int64) @@ -335,47 +335,34 @@ def __eq__(self, other: "_ArrayHash") -> bool: return self.value == other.value -def _compute_hashes( - cubes: Iterable[iris.cube.Cube], - check_aux_coords: bool, - check_derived_coords: bool, - check_cell_measures: bool, - check_ancils: bool, -) -> dict[str, _ArrayHash]: +def _compute_hashes(arrays: Iterable[np.ndarray | da.Array]) -> dict[str, _ArrayHash]: """Compute hashes for the arrays that will be compared.""" - arrays = [] - for cube in cubes: - if check_aux_coords: - for coord in cube.aux_coords: - arrays.append(coord.core_points()) - if coord.has_bounds(): - arrays.append(coord.core_bounds()) - if check_derived_coords: - for coord in cube.derived_coords: - arrays.append(coord.core_points()) - if coord.has_bounds(): - arrays.append(coord.core_bounds()) - if check_cell_measures: - for var in cube.cell_measures(): - arrays.append(var.core_data()) - if check_ancils: - for var in cube.ancillary_variables(): - arrays.append(var.core_data()) - hashes = {} - def get_shape(a): - return a.shape + def is_numerical(dtype): + return np.issubdtype(dtype, np.bool_) or np.issubdtype(dtype, np.number) - arrays.sort(key=get_shape) - for _, group in itertools.groupby(arrays, key=get_shape): + def group_key(a): + if is_numerical(a.dtype): + dtype = "numerical" + else: + dtype = str(a.dtype) + return a.shape, dtype + + arrays = sorted(arrays, key=group_key) + for _, group in itertools.groupby(arrays, key=group_key): group = list(group) - # TODO: Unify dtype as the hash depends on the dtype + # Unify dtype for numerical arrays, as the hash depends on it + if is_numerical(group[0].dtype): + dtype = np.result_type(*group) + same_dtype_arrays = [a.astype(dtype) for a in group] + else: + same_dtype_arrays = group # Unify chunks as the hash depends on the chunks. indices = tuple(range(group[0].ndim))[::-1] - argpairs = [(a, indices) for a in group] - _, rechunked_group = da.core.unify_chunks(*itertools.chain(*argpairs)) - for array, rechunked in zip(group, rechunked_group): + argpairs = [(a, indices) for a in same_dtype_arrays] + rechunked_arrays = da.core.unify_chunks(*itertools.chain(*argpairs))[1] + for array, rechunked in zip(group, rechunked_arrays): hashes[dask.base.tokenize(array)] = ( _hash_array(rechunked), rechunked.chunks, @@ -435,15 +422,29 @@ def concatenate( # which requires to be negotiated. axis = None - # Register each cube with its appropriate proto-cube. - hashes = _compute_hashes( - cubes, - check_aux_coords=check_aux_coords, - check_cell_measures=check_cell_measures, - check_ancils=check_ancils, - check_derived_coords=check_derived_coords, - ) + # Compute hashes for parallel array comparison. + arrays = [] + for cube in cubes: + if check_aux_coords: + for coord in cube.aux_coords: + arrays.append(coord.core_points()) + if coord.has_bounds(): + arrays.append(coord.core_bounds()) + if check_derived_coords: + for coord in cube.derived_coords: + arrays.append(coord.core_points()) + if coord.has_bounds(): + arrays.append(coord.core_bounds()) + if check_cell_measures: + for var in cube.cell_measures(): + arrays.append(var.core_data()) + if check_ancils: + for var in cube.ancillary_variables(): + arrays.append(var.core_data()) + hashes = _compute_hashes(arrays) + + # Register each cube with its appropriate proto-cube. for cube in cubes: registered = False diff --git a/lib/iris/tests/unit/concatenate/test_hashing.py b/lib/iris/tests/unit/concatenate/test_hashing.py new file mode 100644 index 0000000000..609661b5f7 --- /dev/null +++ b/lib/iris/tests/unit/concatenate/test_hashing.py @@ -0,0 +1,21 @@ +import dask.array as da +from dask.base import tokenize +import numpy as np +import pytest + +from iris import _concatenate + + +@pytest.mark.parametrize( + "a,b,eq", + [ + (np.arange(2), da.arange(2), True), + (np.array([1], dtype=np.float32), np.array([1], dtype=bool), True), + (np.array([1]), np.array([[1]]), False), + (np.ma.array([1, 2], mask=[0, 1]), np.ma.array([1, 2], mask=[0, 1]), True), + (da.arange(2, chunks=1), da.arange(2, chunks=2), True), + ], +) +def test_compute_hashes(a, b, eq): + hashes = _concatenate._compute_hashes([a, b]) + assert eq == (hashes[tokenize(a)].value == hashes[tokenize(b)].value)