Skip to content

Commit

Permalink
No public description
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 686909542
  • Loading branch information
Xarray-Beam authors committed Oct 17, 2024
1 parent 86c8f5b commit f5a40db
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 14 deletions.
3 changes: 3 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
ChunksToZarr
DatasetToZarr
make_template
setup_zarr
validate_zarr_chunk
write_chunk_to_zarr
```

## Aggregation
Expand Down
3 changes: 3 additions & 0 deletions xarray_beam/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
from xarray_beam._src.zarr import (
open_zarr,
make_template,
setup_zarr,
validate_zarr_chunk,
write_chunk_to_zarr,
ChunksToZarr,
DatasetToZarr,
)
Expand Down
80 changes: 67 additions & 13 deletions xarray_beam/_src/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,24 @@ def _infer_zarr_chunks(dataset: xarray.Dataset) -> Dict[str, int]:
}


def _setup_zarr(template, store, zarr_chunks):
"""Setup a Zarr store."""
def setup_zarr(
template: xarray.Dataset,
store: WritableStore,
zarr_chunks: Optional[Mapping[str, int]] = None,
) -> None:
"""Setup a Zarr store.
Creates a zarr template at the specified store by writing template metadata.
Args:
template: a lazy xarray.Dataset already chunked using Dask (e.g., as created
by `xarray_beam.make_template`). One or more variables are expected to be
"chunked" with Dask, and will only have their metadata written to Zarr
without array values.
store: a string corresponding to a Zarr path or an existing Zarr store.
zarr_chunks: chunking scheme to use for Zarr. If set, overrides the chunking
scheme on already chunked arrays in template.
"""
if zarr_chunks is not None:
template = _override_chunks(template, zarr_chunks)
_verify_template_is_lazy(template)
Expand All @@ -256,16 +272,37 @@ def _setup_zarr(template, store, zarr_chunks):
template2.to_zarr(store, compute=False, consolidated=True, mode='w')


def _validate_zarr_chunk(key, chunk, template, zarr_chunks):
"""Check a chunk for consistency against the given template."""
def validate_zarr_chunk(
key: core.Key,
chunk: xarray.Dataset,
template: xarray.Dataset,
zarr_chunks: Optional[Mapping[str, int]] = None,
) -> None:
"""Check a chunk for consistency against the given template.
Args:
key: the Key corresponding to the position of the chunk to write in the
template.
chunk: the chunk to write.
by `xarray_beam.make_template`). One or more variables are expected to be
"chunked" with Dask, and will only have their metadata written to Zarr
without array values.
template: a lazy xarray.Dataset already chunked using Dask (e.g., as created
by `xarray_beam.make_template`). One or more variables are expected to be
"chunked" with Dask, and will only have their metadata written to Zarr
without array values.
zarr_chunks: chunking scheme to use for Zarr. If set, overrides the chunking
scheme on already chunked arrays in template.
"""
unexpected_indexes = [k for k in chunk.indexes if k not in template.indexes]
if unexpected_indexes:
raise ValueError(
'unexpected new indexes found in chunk but not template: '
f'{unexpected_indexes}'
)

region = core.offsets_to_slices(key.offsets, chunk.sizes)
# Immutable dicts not considered a Mapping type which method expects.
region = core.offsets_to_slices(key.offsets, chunk.sizes) # pytype: disable=wrong-arg-types
for dim, full_index in template.indexes.items():
if dim in chunk.indexes:
expected_index = full_index[region[dim]]
Expand Down Expand Up @@ -303,9 +340,26 @@ def _validate_zarr_chunk(key, chunk, template, zarr_chunks):
# Note that variable names, shapes & dtypes are verified in xarray's to_zarr()


def _write_chunk_to_zarr(key, chunk, store, template):
"""Write a single Dataset chunk to Zarr."""
region = core.offsets_to_slices(key.offsets, chunk.sizes)
def write_chunk_to_zarr(
key: core.Key,
chunk: xarray.Dataset,
store: WritableStore,
template: xarray.Dataset,
) -> None:
"""Write a single Dataset chunk to Zarr.
Args:
key: the Key corresponding to the position of the chunk to write in the
template.
chunk: the chunk to write.
store: a string corresponding to a Zarr path or an existing Zarr store.
template: a lazy xarray.Dataset already chunked using Dask (e.g., as created
by `xarray_beam.make_template`). One or more variables are expected to be
"chunked" with Dask, and will only have their metadata written to Zarr
without array values.
"""
# Immutable dicts not considered a Mapping type which method expects.
region = core.offsets_to_slices(key.offsets, chunk.sizes) # pytype: disable=wrong-arg-types
already_written = [
k for k in chunk.variables if k in _unchunked_vars(template)
]
Expand Down Expand Up @@ -369,9 +423,9 @@ def __init__(
# pyformat: enable
if isinstance(template, xarray.Dataset):
if setup_executor is not None:
setup_executor.submit(_setup_zarr, template, store, zarr_chunks)
setup_executor.submit(setup_zarr, template, store, zarr_chunks)
else:
_setup_zarr(template, store, zarr_chunks)
setup_zarr(template, store, zarr_chunks)
if zarr_chunks is None:
zarr_chunks = _infer_zarr_chunks(template)
template = _make_template_from_chunked(template)
Expand All @@ -394,12 +448,12 @@ def _validate_zarr_chunk(self, key, chunk, template=None):
# must have defaults for MapTuple". Beam should probably be happy with a
# keyword-only argument, too, but it doesn't like that yet.
assert template is not None
_validate_zarr_chunk(key, chunk, template, self.zarr_chunks)
validate_zarr_chunk(key, chunk, template, self.zarr_chunks)
return key, chunk

def _write_chunk_to_zarr(self, key, chunk, template=None):
assert template is not None
return _write_chunk_to_zarr(key, chunk, self.store, template)
return write_chunk_to_zarr(key, chunk, self.store, template)

def expand(self, pcoll):
if isinstance(self.template, xarray.Dataset):
Expand All @@ -415,7 +469,7 @@ def expand(self, pcoll):
)
setup_result = beam.pvalue.AsSingleton(
template.pvalue
| 'SetupZarr' >> beam.Map(_setup_zarr, self.store, self.zarr_chunks)
| 'SetupZarr' >> beam.Map(setup_zarr, self.store, self.zarr_chunks)
)
return (
pcoll
Expand Down
2 changes: 1 addition & 1 deletion xarray_beam/_src/zarr_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ def test_validate_zarr_chunk_accepts_partial_key(self):
coords={'x': np.arange(3), 'y': np.arange(2)},
)
# Should not raise an exception:
xbeam._src.zarr._validate_zarr_chunk(
xbeam._src.zarr.validate_zarr_chunk(
key=xbeam.Key({'x': 0}),
chunk=dataset,
template=dataset.chunk(),
Expand Down

0 comments on commit f5a40db

Please sign in to comment.