Skip to content

Commit

Permalink
Initial IO docs (and minor example updates)
Browse files Browse the repository at this point in the history
  • Loading branch information
shoyer committed Aug 5, 2021
1 parent f614e5b commit 9d8d908
Show file tree
Hide file tree
Showing 3 changed files with 279 additions and 9 deletions.
277 changes: 272 additions & 5 deletions docs/io.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,285 @@
},
{
"cell_type": "markdown",
"id": "0de145f4",
"id": "480ac360",
"metadata": {},
"source": [
"TODO(shoyer): write this page!\n",
"## Loading datasets into chunks"
]
},
{
"cell_type": "markdown",
"id": "f43f8a6b",
"metadata": {},
"source": [
"There are two main options for loading an `xarray.Dataset` into Xarray-Beam. You can either [create the dataset](data-model.ipynb) from scratch or use the {py:class}`~xarray_beam.DatasetToChunks` transform starting at the root of a Beam pipeline:"
]
},
{
"cell_type": "code",
"execution_count": 42,
"id": "3fec02e8",
"metadata": {},
"outputs": [],
"source": [
"# hidden imports & helper functions"
]
},
{
"cell_type": "code",
"execution_count": 39,
"id": "7b431556",
"metadata": {
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"import textwrap\n",
"import apache_beam as beam\n",
"import xarray_beam as xbeam\n",
"import xarray\n",
"\n",
"def summarize_dataset(dataset):\n",
" return f'<xarray.Dataset data_vars={list(dataset.data_vars)} dims={dict(dataset.sizes)}>'\n",
"\n",
"def print_summary(key, chunk):\n",
" print(f'{key}\\n with {summarize_dataset(chunk)}')"
]
},
{
"cell_type": "code",
"execution_count": 40,
"id": "2308b160",
"metadata": {},
"outputs": [],
"source": [
"ds = xarray.tutorial.load_dataset('air_temperature')"
]
},
{
"cell_type": "code",
"execution_count": 41,
"id": "e94b9b00",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Key(offsets={'lat': 0, 'lon': 0, 'time': 0}, vars=None)\n",
" with <xarray.Dataset data_vars=['air'] dims={'lat': 25, 'time': 1000, 'lon': 53}>\n",
"Key(offsets={'lat': 0, 'lon': 0, 'time': 1000}, vars=None)\n",
" with <xarray.Dataset data_vars=['air'] dims={'lat': 25, 'time': 1000, 'lon': 53}>\n",
"Key(offsets={'lat': 0, 'lon': 0, 'time': 2000}, vars=None)\n",
" with <xarray.Dataset data_vars=['air'] dims={'lat': 25, 'time': 920, 'lon': 53}>\n"
]
}
],
"source": [
"with beam.Pipeline() as p:\n",
" p | xbeam.DatasetToChunks(ds, chunks={'time': 1000}) | beam.MapTuple(print_summary)"
]
},
{
"cell_type": "markdown",
"id": "25c5f2a6",
"metadata": {},
"source": [
"Importantly, xarray datasets fed into `DatasetToChunks` **can be lazy**, with data not already loaded eagerly into NumPy arrays. When you feed lazy datasets into `DatasetToChunks`, each individual chunk will be indexed and evaluated separately on Beam workers.\n",
"\n",
"This pattern allows for leveraging Xarray's builtin dataset loaders (e.g., `open_dataset()` and `open_zarr()`) for feeding arbitrarily large datasets into Xarray-Beam.\n",
"\n",
"For best performance, set `chunks=None` when opening datasets and then _explicitly_ provide chunks in `DatasetToChunks`:"
]
},
{
"cell_type": "code",
"execution_count": 47,
"id": "a2ce5049",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Key(offsets={'lat': 0, 'lon': 0, 'time': 0}, vars=None)\n",
" with <xarray.Dataset data_vars=['air'] dims={'time': 1000, 'lat': 25, 'lon': 53}>\n",
"Key(offsets={'lat': 0, 'lon': 0, 'time': 1000}, vars=None)\n",
" with <xarray.Dataset data_vars=['air'] dims={'time': 1000, 'lat': 25, 'lon': 53}>\n",
"Key(offsets={'lat': 0, 'lon': 0, 'time': 2000}, vars=None)\n",
" with <xarray.Dataset data_vars=['air'] dims={'time': 920, 'lat': 25, 'lon': 53}>\n"
]
}
],
"source": [
"# write data into the distributed Zarr format\n",
"ds.chunk({'time': 1000}).to_zarr('example-data.zarr', mode='w')\n",
"\n",
"# load it with zarr\n",
"on_disk = xarray.open_zarr('example-data.zarr', chunks=None)\n",
"\n",
"- Discuss options for lazy datasets: xarray's lazy indexing vs dask\n",
"- Discuss nuances of loading data, e.g., feeding in Dask datasets into DatasetToChunks\n",
"- Discuss nuances of ChunksToZarr (including `template`)\n"
"with beam.Pipeline() as p:\n",
" p | xbeam.DatasetToChunks(on_disk, chunks={'time': 1000}) | beam.MapTuple(print_summary)"
]
},
{
"cell_type": "markdown",
"id": "ea3ec245",
"metadata": {},
"source": [
"`chunks=None` tells Xarray to use its builtin lazy indexing machinery, instead of using Dask. This is advantageous because datasets using Xarray's lazy indexing are serialized much more compactly (via [pickle](https://docs.python.org/3/library/pickle.html)) when passed into Beam transforms."
]
},
{
"cell_type": "markdown",
"id": "1c8bc4bc",
"metadata": {},
"source": [
"Alternatively, you can pass in lazy datasets [using dask](http://xarray.pydata.org/en/stable/user-guide/dask.html). In this case, you don't need to explicitly supply `chunks` to `DatasetToChunks`:"
]
},
{
"cell_type": "code",
"execution_count": 49,
"id": "3c86c82e",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Key(offsets={'lat': 0, 'lon': 0, 'time': 0}, vars=None)\n",
" with <xarray.Dataset data_vars=['air'] dims={'time': 1000, 'lat': 25, 'lon': 53}>\n",
"Key(offsets={'lat': 0, 'lon': 0, 'time': 1000}, vars=None)\n",
" with <xarray.Dataset data_vars=['air'] dims={'time': 1000, 'lat': 25, 'lon': 53}>\n",
"Key(offsets={'lat': 0, 'lon': 0, 'time': 2000}, vars=None)\n",
" with <xarray.Dataset data_vars=['air'] dims={'time': 920, 'lat': 25, 'lon': 53}>\n"
]
}
],
"source": [
"on_disk = xarray.open_zarr('example-data.zarr', chunks={'time': 1000})\n",
"\n",
"with beam.Pipeline() as p:\n",
" p | xbeam.DatasetToChunks(on_disk) | beam.MapTuple(print_summary)"
]
},
{
"cell_type": "markdown",
"id": "db585d3a",
"metadata": {},
"source": [
"Dask's lazy evaluation system is much more general than Xarray's lazy indexing, so as long as resulting dataset can be independently evaluated in each chunk this can be a very convenient way to setup computation for Xarray-Beam.\n",
"\n",
"Unfortunately, it doesn't scale as well. In particular, the overhead of pickling large Dask graphs for passing to Beam workers can be prohibitive for large (typically multiple TB) datasets with millions of chunks. However, a current major effort in Dask on [high level graphs](https://blog.dask.org/2021/07/07/high-level-graphs) should improve this in the near future."
]
},
{
"cell_type": "markdown",
"id": "cdf80b53",
"metadata": {},
"source": [
"```{note}\n",
"We are still figuring out the optimal APIs to facilitate opening data and building lazy datasets in Xarray-Beam. E.g., see [this issue](https://github.com/google/xarray-beam/issues/26) for discussion of a higher level `ZarrToChunks` transform embedding these best practices.\n",
"```"
]
},
{
"cell_type": "markdown",
"id": "233809a4",
"metadata": {},
"source": [
"## Saving data to Zarr"
]
},
{
"cell_type": "markdown",
"id": "67b10192",
"metadata": {},
"source": [
"[Zarr](https://zarr.readthedocs.io/) is the preferred file format for reading and writing data with Xarray-Beam, due to its excellent scalability and support inside Xarray.\n",
"\n",
"{py:class}`~xarray_beam.ChunksToZarr` is Xarray-Beam's API for saving chunks into a (new) Zarr store. \n",
"\n",
"You can get started just using it directly:"
]
},
{
"cell_type": "code",
"execution_count": 50,
"id": "88fc081c",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.\n"
]
}
],
"source": [
"with beam.Pipeline() as p:\n",
" p | xbeam.DatasetToChunks(on_disk) | xbeam.ChunksToZarr('example-data-v2.zarr')"
]
},
{
"cell_type": "markdown",
"id": "70da81a8",
"metadata": {},
"source": [
"By default, `ChunksToZarr` needs to evaluate and combine the entire distributed dataset in order to determine overall Zarr metadata (e.g., array names, shapes, dtypes and attributes). This is fine for relatively small datasets, but can entail significant additional communication and storage costs for large datasets.\n",
"\n",
"The optional `template` argument allows for prespecifying structure of the full on disk dataset in the form of another lazy `xarray.Dataset`. Like the lazy datasets fed into DatasetToChunks, lazy templates can built-up using either Xarray's lazy indexing or lazy operations with Dask, but the data _values_ in a `template` will never be written to disk -- only the metadata structure is used.\n",
"\n",
"One recommended pattern is to use a lazy Dask dataset consisting of a single value to build up the desired template, e.g.,"
]
},
{
"cell_type": "code",
"execution_count": 55,
"id": "b8ea3f4a",
"metadata": {},
"outputs": [],
"source": [
"ds = xarray.open_zarr('example-data.zarr', chunks=None)\n",
"template = xarray.zeros_like(ds.chunk()) # a single virtual chunk of all zeros"
]
},
{
"cell_type": "markdown",
"id": "31748c31",
"metadata": {},
"source": [
"Xarray operations like indexing and expand dimensions (see {py:meth}`xarray.Dataset.expand_dims`) are entirely lazy on this dataset, which makes it relatively straightforward to build up a Dataset with the required variables and dimensions, e.g., as used in the [ERA5 climatology example](https://github.com/google/xarray-beam/blob/main/examples/era5_climatology.py)."
]
}
],
"metadata": {
"celltoolbar": "Tags",
"kernelspec": {
"display_name": "Python 3",
"language": "python",
Expand Down
8 changes: 4 additions & 4 deletions examples/era5_climatology.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ def main(argv):
max_month = source_dataset.time.dt.month.max().item() # normally 12
template = (
source_dataset
.chunk()
.pipe(xarray.zeros_like)
.isel(time=0, drop=True)
.pipe(xarray.zeros_like) # don't load even time=0 into memory
.expand_dims(month=np.arange(1, max_month + 1), hour=np.arange(24))
.chunk({'hour': 1, 'month': 1}) # make lazy with dask
.pipe(xarray.zeros_like) # compress the dask graph
)
output_chunks = {'hour': 1, 'month': 1}

with beam.Pipeline(runner=RUNNER.value, argv=argv) as root:
(
Expand All @@ -74,7 +74,7 @@ def main(argv):
| xbeam.SplitChunks({'time': 1})
| beam.MapTuple(rekey_chunk_on_month_hour)
| xbeam.Mean.PerKey()
| xbeam.ChunksToZarr(OUTPUT_PATH.value, template)
| xbeam.ChunksToZarr(OUTPUT_PATH.value, template, output_chunks)
)


Expand Down
3 changes: 3 additions & 0 deletions examples/era5_rechunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ def main(argv):
with beam.Pipeline(runner=RUNNER.value, argv=argv) as root:
(
root
# Note: splitting across the 19 variables in this dataset is a critical
# optimization step here, because it allows rechunking to make use of
# much larger intermediate chunks.
| xbeam.DatasetToChunks(source_dataset, source_chunks, split_vars=True)
| xbeam.Rechunk(
source_dataset.sizes,
Expand Down

0 comments on commit 9d8d908

Please sign in to comment.