diff --git a/docs/io.ipynb b/docs/io.ipynb index 32f4fec..f13210f 100644 --- a/docs/io.ipynb +++ b/docs/io.ipynb @@ -10,18 +10,285 @@ }, { "cell_type": "markdown", - "id": "0de145f4", + "id": "480ac360", "metadata": {}, "source": [ - "TODO(shoyer): write this page!\n", + "## Loading datasets into chunks" + ] + }, + { + "cell_type": "markdown", + "id": "f43f8a6b", + "metadata": {}, + "source": [ + "There are two main options for loading an `xarray.Dataset` into Xarray-Beam. You can either [create the dataset](data-model.ipynb) from scratch or use the {py:class}`~xarray_beam.DatasetToChunks` transform starting at the root of a Beam pipeline:" + ] + }, + { + "cell_type": "code", + "execution_count": 42, + "id": "3fec02e8", + "metadata": {}, + "outputs": [], + "source": [ + "# hidden imports & helper functions" + ] + }, + { + "cell_type": "code", + "execution_count": 39, + "id": "7b431556", + "metadata": { + "tags": [ + "hide-input" + ] + }, + "outputs": [], + "source": [ + "import textwrap\n", + "import apache_beam as beam\n", + "import xarray_beam as xbeam\n", + "import xarray\n", + "\n", + "def summarize_dataset(dataset):\n", + " return f''\n", + "\n", + "def print_summary(key, chunk):\n", + " print(f'{key}\\n with {summarize_dataset(chunk)}')" + ] + }, + { + "cell_type": "code", + "execution_count": 40, + "id": "2308b160", + "metadata": {}, + "outputs": [], + "source": [ + "ds = xarray.tutorial.load_dataset('air_temperature')" + ] + }, + { + "cell_type": "code", + "execution_count": 41, + "id": "e94b9b00", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Key(offsets={'lat': 0, 'lon': 0, 'time': 0}, vars=None)\n", + " with \n", + "Key(offsets={'lat': 0, 'lon': 0, 'time': 1000}, vars=None)\n", + " with \n", + "Key(offsets={'lat': 0, 'lon': 0, 'time': 2000}, vars=None)\n", + " with \n" + ] + } + ], + "source": [ + "with beam.Pipeline() as p:\n", + " p | xbeam.DatasetToChunks(ds, chunks={'time': 1000}) | beam.MapTuple(print_summary)" + ] + }, + { + "cell_type": "markdown", + "id": "25c5f2a6", + "metadata": {}, + "source": [ + "Importantly, xarray datasets fed into `DatasetToChunks` **can be lazy**, with data not already loaded eagerly into NumPy arrays. When you feed lazy datasets into `DatasetToChunks`, each individual chunk will be indexed and evaluated separately on Beam workers.\n", + "\n", + "This pattern allows for leveraging Xarray's builtin dataset loaders (e.g., `open_dataset()` and `open_zarr()`) for feeding arbitrarily large datasets into Xarray-Beam.\n", + "\n", + "For best performance, set `chunks=None` when opening datasets and then _explicitly_ provide chunks in `DatasetToChunks`:" + ] + }, + { + "cell_type": "code", + "execution_count": 47, + "id": "a2ce5049", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Key(offsets={'lat': 0, 'lon': 0, 'time': 0}, vars=None)\n", + " with \n", + "Key(offsets={'lat': 0, 'lon': 0, 'time': 1000}, vars=None)\n", + " with \n", + "Key(offsets={'lat': 0, 'lon': 0, 'time': 2000}, vars=None)\n", + " with \n" + ] + } + ], + "source": [ + "# write data into the distributed Zarr format\n", + "ds.chunk({'time': 1000}).to_zarr('example-data.zarr', mode='w')\n", + "\n", + "# load it with zarr\n", + "on_disk = xarray.open_zarr('example-data.zarr', chunks=None)\n", "\n", - "- Discuss options for lazy datasets: xarray's lazy indexing vs dask\n", - "- Discuss nuances of loading data, e.g., feeding in Dask datasets into DatasetToChunks\n", - "- Discuss nuances of ChunksToZarr (including `template`)\n" + "with beam.Pipeline() as p:\n", + " p | xbeam.DatasetToChunks(on_disk, chunks={'time': 1000}) | beam.MapTuple(print_summary)" + ] + }, + { + "cell_type": "markdown", + "id": "ea3ec245", + "metadata": {}, + "source": [ + "`chunks=None` tells Xarray to use its builtin lazy indexing machinery, instead of using Dask. This is advantageous because datasets using Xarray's lazy indexing are serialized much more compactly (via [pickle](https://docs.python.org/3/library/pickle.html)) when passed into Beam transforms." + ] + }, + { + "cell_type": "markdown", + "id": "1c8bc4bc", + "metadata": {}, + "source": [ + "Alternatively, you can pass in lazy datasets [using dask](http://xarray.pydata.org/en/stable/user-guide/dask.html). In this case, you don't need to explicitly supply `chunks` to `DatasetToChunks`:" + ] + }, + { + "cell_type": "code", + "execution_count": 49, + "id": "3c86c82e", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Key(offsets={'lat': 0, 'lon': 0, 'time': 0}, vars=None)\n", + " with \n", + "Key(offsets={'lat': 0, 'lon': 0, 'time': 1000}, vars=None)\n", + " with \n", + "Key(offsets={'lat': 0, 'lon': 0, 'time': 2000}, vars=None)\n", + " with \n" + ] + } + ], + "source": [ + "on_disk = xarray.open_zarr('example-data.zarr', chunks={'time': 1000})\n", + "\n", + "with beam.Pipeline() as p:\n", + " p | xbeam.DatasetToChunks(on_disk) | beam.MapTuple(print_summary)" + ] + }, + { + "cell_type": "markdown", + "id": "db585d3a", + "metadata": {}, + "source": [ + "Dask's lazy evaluation system is much more general than Xarray's lazy indexing, so as long as resulting dataset can be independently evaluated in each chunk this can be a very convenient way to setup computation for Xarray-Beam.\n", + "\n", + "Unfortunately, it doesn't scale as well. In particular, the overhead of pickling large Dask graphs for passing to Beam workers can be prohibitive for large (typically multiple TB) datasets with millions of chunks. However, a current major effort in Dask on [high level graphs](https://blog.dask.org/2021/07/07/high-level-graphs) should improve this in the near future." + ] + }, + { + "cell_type": "markdown", + "id": "cdf80b53", + "metadata": {}, + "source": [ + "```{note}\n", + "We are still figuring out the optimal APIs to facilitate opening data and building lazy datasets in Xarray-Beam. E.g., see [this issue](https://github.com/google/xarray-beam/issues/26) for discussion of a higher level `ZarrToChunks` transform embedding these best practices.\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "233809a4", + "metadata": {}, + "source": [ + "## Saving data to Zarr" + ] + }, + { + "cell_type": "markdown", + "id": "67b10192", + "metadata": {}, + "source": [ + "[Zarr](https://zarr.readthedocs.io/) is the preferred file format for reading and writing data with Xarray-Beam, due to its excellent scalability and support inside Xarray.\n", + "\n", + "{py:class}`~xarray_beam.ChunksToZarr` is Xarray-Beam's API for saving chunks into a (new) Zarr store. \n", + "\n", + "You can get started just using it directly:" + ] + }, + { + "cell_type": "code", + "execution_count": 50, + "id": "88fc081c", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.\n" + ] + } + ], + "source": [ + "with beam.Pipeline() as p:\n", + " p | xbeam.DatasetToChunks(on_disk) | xbeam.ChunksToZarr('example-data-v2.zarr')" + ] + }, + { + "cell_type": "markdown", + "id": "70da81a8", + "metadata": {}, + "source": [ + "By default, `ChunksToZarr` needs to evaluate and combine the entire distributed dataset in order to determine overall Zarr metadata (e.g., array names, shapes, dtypes and attributes). This is fine for relatively small datasets, but can entail significant additional communication and storage costs for large datasets.\n", + "\n", + "The optional `template` argument allows for prespecifying structure of the full on disk dataset in the form of another lazy `xarray.Dataset`. Like the lazy datasets fed into DatasetToChunks, lazy templates can built-up using either Xarray's lazy indexing or lazy operations with Dask, but the data _values_ in a `template` will never be written to disk -- only the metadata structure is used.\n", + "\n", + "One recommended pattern is to use a lazy Dask dataset consisting of a single value to build up the desired template, e.g.," + ] + }, + { + "cell_type": "code", + "execution_count": 55, + "id": "b8ea3f4a", + "metadata": {}, + "outputs": [], + "source": [ + "ds = xarray.open_zarr('example-data.zarr', chunks=None)\n", + "template = xarray.zeros_like(ds.chunk()) # a single virtual chunk of all zeros" + ] + }, + { + "cell_type": "markdown", + "id": "31748c31", + "metadata": {}, + "source": [ + "Xarray operations like indexing and expand dimensions (see {py:meth}`xarray.Dataset.expand_dims`) are entirely lazy on this dataset, which makes it relatively straightforward to build up a Dataset with the required variables and dimensions, e.g., as used in the [ERA5 climatology example](https://github.com/google/xarray-beam/blob/main/examples/era5_climatology.py)." ] } ], "metadata": { + "celltoolbar": "Tags", "kernelspec": { "display_name": "Python 3", "language": "python", diff --git a/examples/era5_climatology.py b/examples/era5_climatology.py index 1036a15..ec32cbf 100644 --- a/examples/era5_climatology.py +++ b/examples/era5_climatology.py @@ -60,12 +60,12 @@ def main(argv): max_month = source_dataset.time.dt.month.max().item() # normally 12 template = ( source_dataset + .chunk() + .pipe(xarray.zeros_like) .isel(time=0, drop=True) - .pipe(xarray.zeros_like) # don't load even time=0 into memory .expand_dims(month=np.arange(1, max_month + 1), hour=np.arange(24)) - .chunk({'hour': 1, 'month': 1}) # make lazy with dask - .pipe(xarray.zeros_like) # compress the dask graph ) + output_chunks = {'hour': 1, 'month': 1} with beam.Pipeline(runner=RUNNER.value, argv=argv) as root: ( @@ -74,7 +74,7 @@ def main(argv): | xbeam.SplitChunks({'time': 1}) | beam.MapTuple(rekey_chunk_on_month_hour) | xbeam.Mean.PerKey() - | xbeam.ChunksToZarr(OUTPUT_PATH.value, template) + | xbeam.ChunksToZarr(OUTPUT_PATH.value, template, output_chunks) ) diff --git a/examples/era5_rechunk.py b/examples/era5_rechunk.py index d1c41c3..1f47fc2 100644 --- a/examples/era5_rechunk.py +++ b/examples/era5_rechunk.py @@ -39,6 +39,9 @@ def main(argv): with beam.Pipeline(runner=RUNNER.value, argv=argv) as root: ( root + # Note: splitting across the 19 variables in this dataset is a critical + # optimization step here, because it allows rechunking to make use of + # much larger intermediate chunks. | xbeam.DatasetToChunks(source_dataset, source_chunks, split_vars=True) | xbeam.Rechunk( source_dataset.sizes,