diff --git a/docs/api.md b/docs/api.md index a8d29fe..964ea58 100644 --- a/docs/api.md +++ b/docs/api.md @@ -24,6 +24,9 @@ ChunksToZarr DatasetToZarr make_template + setup_zarr + validate_zarr_chunk + write_chunk_to_zarr ``` ## Aggregation diff --git a/xarray_beam/__init__.py b/xarray_beam/__init__.py index e107f1b..791fee1 100644 --- a/xarray_beam/__init__.py +++ b/xarray_beam/__init__.py @@ -44,6 +44,9 @@ from xarray_beam._src.zarr import ( open_zarr, make_template, + setup_zarr, + validate_zarr_chunk, + write_chunk_to_zarr, ChunksToZarr, DatasetToZarr, ) diff --git a/xarray_beam/_src/zarr.py b/xarray_beam/_src/zarr.py index d8b1f6c..e471a50 100644 --- a/xarray_beam/_src/zarr.py +++ b/xarray_beam/_src/zarr.py @@ -241,8 +241,24 @@ def _infer_zarr_chunks(dataset: xarray.Dataset) -> Dict[str, int]: } -def _setup_zarr(template, store, zarr_chunks): - """Setup a Zarr store.""" +def setup_zarr( + template: xarray.Dataset, + store: WritableStore, + zarr_chunks: Optional[Mapping[str, int]] = None, +) -> None: + """Setup a Zarr store. + + Creates a zarr template at the specified store by writing template metadata. + + Args: + template: a lazy xarray.Dataset already chunked using Dask (e.g., as created + by `xarray_beam.make_template`). One or more variables are expected to be + "chunked" with Dask, and will only have their metadata written to Zarr + without array values. + store: a string corresponding to a Zarr path or an existing Zarr store. + zarr_chunks: chunking scheme to use for Zarr. If set, overrides the chunking + scheme on already chunked arrays in template. + """ if zarr_chunks is not None: template = _override_chunks(template, zarr_chunks) _verify_template_is_lazy(template) @@ -256,8 +272,28 @@ def _setup_zarr(template, store, zarr_chunks): template2.to_zarr(store, compute=False, consolidated=True, mode='w') -def _validate_zarr_chunk(key, chunk, template, zarr_chunks): - """Check a chunk for consistency against the given template.""" +def validate_zarr_chunk( + key: core.Key, + chunk: xarray.Dataset, + template: xarray.Dataset, + zarr_chunks: Optional[Mapping[str, int]] = None, +) -> None: + """Check a chunk for consistency against the given template. + + Args: + key: the Key corresponding to the position of the chunk to write in the + template. + chunk: the chunk to write. + by `xarray_beam.make_template`). One or more variables are expected to be + "chunked" with Dask, and will only have their metadata written to Zarr + without array values. + template: a lazy xarray.Dataset already chunked using Dask (e.g., as created + by `xarray_beam.make_template`). One or more variables are expected to be + "chunked" with Dask, and will only have their metadata written to Zarr + without array values. + zarr_chunks: chunking scheme to use for Zarr. If set, overrides the chunking + scheme on already chunked arrays in template. + """ unexpected_indexes = [k for k in chunk.indexes if k not in template.indexes] if unexpected_indexes: raise ValueError( @@ -265,7 +301,8 @@ def _validate_zarr_chunk(key, chunk, template, zarr_chunks): f'{unexpected_indexes}' ) - region = core.offsets_to_slices(key.offsets, chunk.sizes) + # Immutable dicts not considered a Mapping type which method expects. + region = core.offsets_to_slices(key.offsets, chunk.sizes) # pytype: disable=wrong-arg-types for dim, full_index in template.indexes.items(): if dim in chunk.indexes: expected_index = full_index[region[dim]] @@ -303,9 +340,26 @@ def _validate_zarr_chunk(key, chunk, template, zarr_chunks): # Note that variable names, shapes & dtypes are verified in xarray's to_zarr() -def _write_chunk_to_zarr(key, chunk, store, template): - """Write a single Dataset chunk to Zarr.""" - region = core.offsets_to_slices(key.offsets, chunk.sizes) +def write_chunk_to_zarr( + key: core.Key, + chunk: xarray.Dataset, + store: WritableStore, + template: xarray.Dataset, +) -> None: + """Write a single Dataset chunk to Zarr. + + Args: + key: the Key corresponding to the position of the chunk to write in the + template. + chunk: the chunk to write. + store: a string corresponding to a Zarr path or an existing Zarr store. + template: a lazy xarray.Dataset already chunked using Dask (e.g., as created + by `xarray_beam.make_template`). One or more variables are expected to be + "chunked" with Dask, and will only have their metadata written to Zarr + without array values. + """ + # Immutable dicts not considered a Mapping type which method expects. + region = core.offsets_to_slices(key.offsets, chunk.sizes) # pytype: disable=wrong-arg-types already_written = [ k for k in chunk.variables if k in _unchunked_vars(template) ] @@ -369,9 +423,9 @@ def __init__( # pyformat: enable if isinstance(template, xarray.Dataset): if setup_executor is not None: - setup_executor.submit(_setup_zarr, template, store, zarr_chunks) + setup_executor.submit(setup_zarr, template, store, zarr_chunks) else: - _setup_zarr(template, store, zarr_chunks) + setup_zarr(template, store, zarr_chunks) if zarr_chunks is None: zarr_chunks = _infer_zarr_chunks(template) template = _make_template_from_chunked(template) @@ -394,12 +448,12 @@ def _validate_zarr_chunk(self, key, chunk, template=None): # must have defaults for MapTuple". Beam should probably be happy with a # keyword-only argument, too, but it doesn't like that yet. assert template is not None - _validate_zarr_chunk(key, chunk, template, self.zarr_chunks) + validate_zarr_chunk(key, chunk, template, self.zarr_chunks) return key, chunk def _write_chunk_to_zarr(self, key, chunk, template=None): assert template is not None - return _write_chunk_to_zarr(key, chunk, self.store, template) + return write_chunk_to_zarr(key, chunk, self.store, template) def expand(self, pcoll): if isinstance(self.template, xarray.Dataset): @@ -415,7 +469,7 @@ def expand(self, pcoll): ) setup_result = beam.pvalue.AsSingleton( template.pvalue - | 'SetupZarr' >> beam.Map(_setup_zarr, self.store, self.zarr_chunks) + | 'SetupZarr' >> beam.Map(setup_zarr, self.store, self.zarr_chunks) ) return ( pcoll diff --git a/xarray_beam/_src/zarr_test.py b/xarray_beam/_src/zarr_test.py index d85cceb..c8e0dff 100644 --- a/xarray_beam/_src/zarr_test.py +++ b/xarray_beam/_src/zarr_test.py @@ -257,7 +257,7 @@ def test_validate_zarr_chunk_accepts_partial_key(self): coords={'x': np.arange(3), 'y': np.arange(2)}, ) # Should not raise an exception: - xbeam._src.zarr._validate_zarr_chunk( + xbeam._src.zarr.validate_zarr_chunk( key=xbeam.Key({'x': 0}), chunk=dataset, template=dataset.chunk(),