diff --git a/.gitignore b/.gitignore index 2a0c74a..45aaaab 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ *.egg-info +.DS_Store build dist docs/.ipynb_checkpoints diff --git a/README.md b/README.md index f3517c6..17d16f4 100644 --- a/README.md +++ b/README.md @@ -17,65 +17,23 @@ multi-dimensional labeled arrays, such as: - Calculating statistics (e.g., "climatology") across distributed datasets with arbitrary groups. -Xarray-Beam is implemented as a _thin layer_ on top of existing libraries for -working with large-scale Xarray datasets. For example, it leverages -[Dask](https://dask.org/) for describing lazy arrays and for executing -multi-threaded computation on a single machine. +For more about our approach and how to get started, +**[read the documentation](https://xarray-beam.readthedocs.io/)**! **🚨 Warning: Xarray-Beam is new and unpolished 🚨** Expect sharp edges 🔪 and performance cliffs 🧗, particularly related to the management of lazy data with Dask and reading/writing data with Zarr. We have -used it to efficiently process 5 TB datasets. We _expect_ it to scale to PB size -datasets but that's easier said than done. We welcome feedback and contributions -from early adopters, and hope to have it ready for wider audience soon. +used it to efficiently process ~25 TB datasets. We _expect_ it to scale to PB +size datasets but that's easier said than done. We welcome feedback and +contributions from early adopters, and hope to have it ready for wider audience +soon. -## How does Xarray-Beam compare to Dask? - -We love Dask! Xarray-Beam explores a different part of the design space for -distributed data pipelines than Xarray's built-in Dask integration: - -- Xarray-Beam is built around explicit manipulation of `(xarray_beam.Key, - xarray.Dataset)` pairs to perform operations on distributed datasets, where - `Key` is an immutable dict keeping track of the offsets from the origin for - a small contiguous "chunk" of a larger distributed dataset. This requires - more boilerplate but is also more robust than generating distributed - computation graphs in Dask using Xarray's built-in API. The user is expected - to have a mental model for how their data pipeline is distributed across - many machines. -- Xarray-Beam distributes datasets by splitting them into many - `xarray.Dataset` chunks, rather than the chunks of NumPy arrays typically - used by Xarray with Dask (unless using - [xarray.map_blocks](http://xarray.pydata.org/en/stable/user-guide/dask.html#automatic-parallelization-with-apply-ufunc-and-map-blocks)). - Chunks of datasets is a more convenient data-model for writing ad-hoc whole - dataset transformations, but is potentially a bit less efficient. -- Beam ([like Spark](https://docs.dask.org/en/latest/spark.html)) was designed - around a higher-level model for distributed computation than Dask (although - Dask has been making - [progress in this direction](https://coiled.io/blog/dask-under-the-hood-scheduler-refactor/)). - Roughly speaking, this trade-off favors scalability over flexibility. -- Beam allows for executing distributed computation using multiple runners, - notably including Google Cloud Dataflow and Apache Spark. These runners are - more mature than Dask, and in many cases are supported as a service by major - commercial cloud providers. - -![Xarray-Beam datamodel vs Xarray-Dask](./static/xarray-beam-vs-xarray-dask.png) - -These design choices are not set in stone. In particular, in the future we -_could_ imagine writing a high-level `xarray_beam.Dataset` that emulates the -`xarray.Dataset` API, similar to the popular high-level DataFrame APIs in Beam, -Spark and Dask. This could be built on top of the lower-level transformations -currently in Xarray-Beam, or alternatively could use a "chunks of NumPy arrays" -representation similar to that used by dask.array. - -## Getting started +## Installation Xarray-Beam requires recent versions of immutabledict, xarray, dask, rechunker -and zarr. It needs the latest release of Apache Beam (2.31.0 or later). For good -performance when writing Zarr files, we strongly recommend patching Xarray with -[this pull request](https://github.com/pydata/xarray/pull/5252). - -TODO(shoyer): write a tutorial here! For now, see the test suite for examples. +and zarr, and the *latest* release of Apache Beam (2.31.0 or later). For best +performance when writing Zarr files, use Xarray 0.19.0 or later. ## Disclaimer @@ -93,3 +51,4 @@ Contributors: - Stephan Hoyer - Jason Hickey - Cenk Gazen +- Alex Merose diff --git a/static/README.md b/docs/_static/README.md similarity index 100% rename from static/README.md rename to docs/_static/README.md diff --git a/static/xarray-beam-logo.png b/docs/_static/xarray-beam-logo.png similarity index 100% rename from static/xarray-beam-logo.png rename to docs/_static/xarray-beam-logo.png diff --git a/static/xarray-beam-vs-xarray-dask.png b/docs/_static/xarray-beam-vs-xarray-dask.png similarity index 100% rename from static/xarray-beam-vs-xarray-dask.png rename to docs/_static/xarray-beam-vs-xarray-dask.png diff --git a/docs/data-model.ipynb b/docs/data-model.ipynb index 868eccc..d173e0d 100644 --- a/docs/data-model.ipynb +++ b/docs/data-model.ipynb @@ -2,39 +2,38 @@ "cells": [ { "cell_type": "markdown", - "metadata": {}, "source": [ "# Core data model" - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ - "Xarray-Beam tries to make it _straightforward_ to write distributed pipelines with Xarray objects, but unlikely libraries like [Xarray with Dask](http://xarray.pydata.org/en/stable/user-guide/dask.html) or Dask/Spark DataFrames, it doesn't hide the distributed magic inside high-level objects.\n", + "Xarray-Beam tries to make it _straightforward_ to write distributed pipelines with Xarray objects, but unlike libraries like [Xarray with Dask](http://xarray.pydata.org/en/stable/user-guide/dask.html) or Dask/Spark DataFrames, it doesn't hide the distributed magic inside high-level objects.\n", "\n", "Xarray-Beam is a lower-level tool. You will be manipulating large datasets piece-by-piece yourself, and you as the developer will be responsible for maintaining Xarray-Beam's internal invariants. This means that to successfully use Xarray-Beam, **you will need to understand how how it represents distributed datasets**.\n", "\n", "This responsibility requires a bit more coding and understanding, but offers benefits in performance and flexibility. This brief tutorial will show you how.\n", "\n", "We'll start off with some standard imports:" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": 16, - "metadata": {}, - "outputs": [], "source": [ "import apache_beam as beam\n", "import numpy as np\n", "import xarray_beam as xbeam\n", "import xarray" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "## Keys in Xarray-Beam\n", "\n", @@ -47,111 +46,110 @@ "\n", "1. `offsets`: integer offests for chunks from the origin in an `immutabledict`\n", "2. `vars`: The subset of variables included in each chunk, either as a `frozenset`, or as `None` to indicate \"all variables\"." - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "Making a {py:class}`~xarray_beam.Key` from scratch is simple:" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": 2, - "metadata": {}, + "source": [ + "key = xbeam.Key({'x': 0, 'y': 10}, vars=None)\n", + "key" + ], "outputs": [ { + "output_type": "execute_result", "data": { "text/plain": [ "Key(offsets={'x': 0, 'y': 10}, vars=None)" ] }, - "execution_count": 2, "metadata": {}, - "output_type": "execute_result" + "execution_count": 2 } ], - "source": [ - "key = xbeam.Key({'x': 0, 'y': 10}, vars=None)\n", - "key" - ] + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "Or given an existing {py:class}`~xarray_beam.Key`, you can easily modify it with `replace()` or `with_offsets()`:" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": 3, - "metadata": {}, + "source": [ + "key.replace(vars={'foo', 'bar'})" + ], "outputs": [ { + "output_type": "execute_result", "data": { "text/plain": [ "Key(offsets={'x': 0, 'y': 10}, vars={'bar', 'foo'})" ] }, - "execution_count": 3, "metadata": {}, - "output_type": "execute_result" + "execution_count": 3 } ], - "source": [ - "key.replace(vars={'foo', 'bar'})" - ] + "metadata": {} }, { "cell_type": "code", "execution_count": 4, - "metadata": {}, + "source": [ + "key.with_offsets(x=None, z=1)" + ], "outputs": [ { + "output_type": "execute_result", "data": { "text/plain": [ "Key(offsets={'y': 10, 'z': 1}, vars=None)" ] }, - "execution_count": 4, "metadata": {}, - "output_type": "execute_result" + "execution_count": 4 } ], - "source": [ - "key.with_offsets(x=None, z=1)" - ] + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "{py:class}`~xarray_beam.Key` objects don't do very much. They are just simple structs with two attributes, along with various special methods required to use them as `dict` keys or as keys in Beam pipelines. You can find a more examples of manipulating keys {py:class}`in its docstring `." - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "## Creating PCollections" - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "The standard inputs & outputs for Xarray-Beam are PCollections of tuples of `(xbeam.Key, xarray.Dataset)` pairs. Xarray-Beam provides a bunch of PCollections for typical tasks, but many pipelines will still involve some manual manipulation of `Key` and `Dataset` objects, e.g., with builtin Beam transforms like `beam.Map`.\n", "\n", "To start off, let's write a helper functions for creating our first collection from scratch:" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": 67, - "metadata": {}, - "outputs": [], "source": [ "def create_records():\n", " for offset in [0, 4]:\n", @@ -162,30 +160,35 @@ " 'bar': (('x', 'y'), 100 + data),\n", " })\n", " yield key, chunk" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "Let's take a look the entries, which are lazily constructed with the generator:" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": 68, - "metadata": {}, - "outputs": [], "source": [ "inputs = list(create_records())" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "code", "execution_count": 69, - "metadata": {}, + "source": [ + "inputs" + ], "outputs": [ { + "output_type": "execute_result", "data": { "text/plain": [ "[(Key(offsets={'x': 0, 'y': 0}, vars=None),\n", @@ -204,18 +207,14 @@ " bar (x, y) int64 108 109 110 111 112 113 114 115)]" ] }, - "execution_count": 69, "metadata": {}, - "output_type": "execute_result" + "execution_count": 69 } ], - "source": [ - "inputs" - ] + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "```{note}\n", "There are multiple valid ways to represent a chunk of a larger dataset with a `Key`.\n", @@ -223,21 +222,25 @@ "- **Offsets for unchunked dimensions are optional**. Because all chunks have the same offset along the `y` axis, including `y` in `offsets` is not required as long as we don't need to create multiple chunks along that dimension.\n", "- **Indicating variables is optional, if all chunks have the same variables**. We could have set `vars={'foo', 'bar'}` on each of these `Key` objects instead of `vars=None`. This would be an equally valid representation of the same records, since all of our datasets have the same variables.\n", "```" - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "We now have the inputs we need to use Xarray-Beam's helper functions and PTransforms. For example, we can fully consolidate chunks & variables to see what single `xarray.Dataset` these values would correspond to:" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": 70, - "metadata": {}, + "source": [ + "xbeam.consolidate_fully(inputs)" + ], "outputs": [ { + "output_type": "execute_result", "data": { "text/plain": [ "(Key(offsets={'x': 0, 'y': 0}, vars={'bar', 'foo'}),\n", @@ -249,37 +252,37 @@ " bar (x, y) int64 100 101 102 103 104 105 ... 110 111 112 113 114 115)" ] }, - "execution_count": 70, "metadata": {}, - "output_type": "execute_result" + "execution_count": 70 } ], - "source": [ - "xbeam.consolidate_fully(inputs)" - ] + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "To execute with Beam, of course, we need to turn Python lists/generators into Beam PCollections, e.g., with `beam.Create()`:" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": 71, - "metadata": {}, + "source": [ + "with beam.Pipeline() as p:\n", + " p | beam.Create(create_records()) | beam.Map(print)" + ], "outputs": [ { - "name": "stderr", "output_type": "stream", + "name": "stderr", "text": [ "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.\n" ] }, { - "name": "stdout", "output_type": "stream", + "name": "stdout", "text": [ "(Key(offsets={'x': 0, 'y': 0}, vars=None), \n", "Dimensions: (x: 4, y: 2)\n", @@ -296,76 +299,76 @@ ] } ], - "source": [ - "with beam.Pipeline() as p:\n", - " p | beam.Create(create_records()) | beam.Map(print)" - ] + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "## Writing pipelines" - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "Transforms in Xarray-Beam typically act on (key, value) pairs of `(xbeam.Key, xarray.Dataset)`. For example, we can dump our dataset on disk in the scalable [Zarr](https://zarr.readthedocs.io/) format using {py:class}`~xarray_beam.ChunksToZarr`:" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": 66, - "metadata": {}, + "source": [ + "inputs | xbeam.ChunksToZarr('my-data.zarr')" + ], "outputs": [ { - "name": "stderr", "output_type": "stream", + "name": "stderr", "text": [ "WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['/Users/shoyer/miniconda3/envs/xarray-beam/lib/python3.9/site-packages/ipykernel_launcher.py', '-f', '/Users/shoyer/Library/Jupyter/runtime/kernel-f0f63827-5e6c-42f6-9eda-1f445e1ba489.json']\n", "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.\n" ] }, { + "output_type": "execute_result", "data": { "text/plain": [ "[None, None]" ] }, - "execution_count": 66, "metadata": {}, - "output_type": "execute_result" + "execution_count": 66 } ], - "source": [ - "inputs | xbeam.ChunksToZarr('my-data.zarr')" - ] + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "Xarray-Beam doesn't try to provide transformations for everything. In particular, it omits most [embarrassingly parallel](https://en.wikipedia.org/wiki/Embarrassingly_parallel) operations that can be performed independently on each chunk of a larger dataset. You can write these yourself using [`beam.Map`](https://beam.apache.org/documentation/transforms/python/elementwise/map/).\n", "\n", "For example, consider elementwise arithmetic. We can write a `lambda` function that acts on each key-value pair updating the xarray.Dataset objects appropriately, and put it into an Xarray-Beam pipeline using `beam.MapTuple`:" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": 80, - "metadata": {}, + "source": [ + "inputs | beam.MapTuple(lambda k, v: (k, v + 1))" + ], "outputs": [ { - "name": "stderr", "output_type": "stream", + "name": "stderr", "text": [ "WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['/Users/shoyer/miniconda3/envs/xarray-beam/lib/python3.9/site-packages/ipykernel_launcher.py', '-f', '/Users/shoyer/Library/Jupyter/runtime/kernel-f0f63827-5e6c-42f6-9eda-1f445e1ba489.json']\n", "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.\n" ] }, { + "output_type": "execute_result", "data": { "text/plain": [ "[(Key(offsets={'x': 0, 'y': 0}, vars=None),\n", @@ -384,36 +387,36 @@ " bar (x, y) int64 109 110 111 112 113 114 115 116)]" ] }, - "execution_count": 80, "metadata": {}, - "output_type": "execute_result" + "execution_count": 80 } ], - "source": [ - "inputs | beam.MapTuple(lambda k, v: (k, v + 1))" - ] + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "For operations that add or remove (unchunked) dimensions, you may need to update `Key` objects as well to maintain the Xarray-Beam invariants, e.g., if we want to remove the `y` dimension entirely:" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": 82, - "metadata": {}, + "source": [ + "inputs | beam.MapTuple(lambda k, v: (k.with_offsets(y=None), v.mean('y')))" + ], "outputs": [ { - "name": "stderr", "output_type": "stream", + "name": "stderr", "text": [ "WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['/Users/shoyer/miniconda3/envs/xarray-beam/lib/python3.9/site-packages/ipykernel_launcher.py', '-f', '/Users/shoyer/Library/Jupyter/runtime/kernel-f0f63827-5e6c-42f6-9eda-1f445e1ba489.json']\n", "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.\n" ] }, { + "output_type": "execute_result", "data": { "text/plain": [ "[(Key(offsets={'x': 0}, vars=None),\n", @@ -432,23 +435,20 @@ " bar (x) float64 108.5 110.5 112.5 114.5)]" ] }, - "execution_count": 82, "metadata": {}, - "output_type": "execute_result" + "execution_count": 82 } ], - "source": [ - "inputs | beam.MapTuple(lambda k, v: (k.with_offsets(y=None), v.mean('y')))" - ] + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "```{note}\n", "Missing transformations in Xarray-Beam is partially an intentional design decision to reduce scope, and partially just a reflection of what we've gotten around to implementing. If after reading through the rest of docs you notice missing transformations or are wondering how to compute something in Xarray-Beam, please [open an issue](https://github.com/google/xarray-beam/issues) to discuss.\n", "```" - ] + ], + "metadata": {} } ], "metadata": { @@ -475,4 +475,4 @@ }, "nbformat": 4, "nbformat_minor": 2 -} +} \ No newline at end of file diff --git a/docs/index.md b/docs/index.md index 837231e..bc25f06 100644 --- a/docs/index.md +++ b/docs/index.md @@ -10,6 +10,7 @@ We recommend reading both, as well as a few [end to end examples](https://github ```{toctree} :maxdepth: 1 +why-xarray-beam.md data-model.ipynb read-write.ipynb aggregation.ipynb diff --git a/docs/read-write.ipynb b/docs/read-write.ipynb index 8a7ab65..de7aa83 100644 --- a/docs/read-write.ipynb +++ b/docs/read-write.ipynb @@ -2,48 +2,37 @@ "cells": [ { "cell_type": "markdown", - "id": "8e4f05ea", - "metadata": {}, "source": [ "# Reading and writing data" - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "id": "480ac360", - "metadata": {}, "source": [ "## Read datasets into chunks" - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "id": "f43f8a6b", - "metadata": {}, "source": [ "There are two main options for loading an `xarray.Dataset` into Xarray-Beam. You can either [create the dataset](data-model.ipynb) from scratch or use the {py:class}`~xarray_beam.DatasetToChunks` transform starting at the root of a Beam pipeline:" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": 42, - "id": "93d8abec", - "metadata": {}, - "outputs": [], "source": [ "# hidden imports & helper functions" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "code", "execution_count": 39, - "id": "bb9f5306", - "metadata": { - "tags": [ - "hide-input" - ] - }, - "outputs": [], "source": [ "import textwrap\n", "import apache_beam as beam\n", @@ -55,34 +44,41 @@ "\n", "def print_summary(key, chunk):\n", " print(f'{key}\\n with {summarize_dataset(chunk)}')" - ] + ], + "outputs": [], + "metadata": { + "tags": [ + "hide-input" + ] + } }, { "cell_type": "code", "execution_count": 40, - "id": "2308b160", - "metadata": {}, - "outputs": [], "source": [ "ds = xarray.tutorial.load_dataset('air_temperature')" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "code", "execution_count": 41, - "id": "e94b9b00", - "metadata": {}, + "source": [ + "with beam.Pipeline() as p:\n", + " p | xbeam.DatasetToChunks(ds, chunks={'time': 1000}) | beam.MapTuple(print_summary)" + ], "outputs": [ { - "name": "stderr", "output_type": "stream", + "name": "stderr", "text": [ "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.\n" ] }, { - "name": "stdout", "output_type": "stream", + "name": "stdout", "text": [ "Key(offsets={'lat': 0, 'lon': 0, 'time': 0}, vars=None)\n", " with \n", @@ -93,39 +89,43 @@ ] } ], - "source": [ - "with beam.Pipeline() as p:\n", - " p | xbeam.DatasetToChunks(ds, chunks={'time': 1000}) | beam.MapTuple(print_summary)" - ] + "metadata": {} }, { "cell_type": "markdown", - "id": "32acc6f5", - "metadata": {}, "source": [ "Importantly, xarray datasets fed into `DatasetToChunks` **can be lazy**, with data not already loaded eagerly into NumPy arrays. When you feed lazy datasets into `DatasetToChunks`, each individual chunk will be indexed and evaluated separately on Beam workers.\n", "\n", "This pattern allows for leveraging Xarray's builtin dataset loaders (e.g., `open_dataset()` and `open_zarr()`) for feeding arbitrarily large datasets into Xarray-Beam.\n", "\n", "For best performance, set `chunks=None` when opening datasets and then _explicitly_ provide chunks in `DatasetToChunks`:" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": 47, - "id": "e9d9df0e", - "metadata": {}, + "source": [ + "# write data into the distributed Zarr format\n", + "ds.chunk({'time': 1000}).to_zarr('example-data.zarr', mode='w')\n", + "\n", + "# load it with zarr\n", + "on_disk = xarray.open_zarr('example-data.zarr', chunks=None)\n", + "\n", + "with beam.Pipeline() as p:\n", + " p | xbeam.DatasetToChunks(on_disk, chunks={'time': 1000}) | beam.MapTuple(print_summary)" + ], "outputs": [ { - "name": "stderr", "output_type": "stream", + "name": "stderr", "text": [ "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.\n" ] }, { - "name": "stdout", "output_type": "stream", + "name": "stdout", "text": [ "Key(offsets={'lat': 0, 'lon': 0, 'time': 0}, vars=None)\n", " with \n", @@ -136,49 +136,42 @@ ] } ], - "source": [ - "# write data into the distributed Zarr format\n", - "ds.chunk({'time': 1000}).to_zarr('example-data.zarr', mode='w')\n", - "\n", - "# load it with zarr\n", - "on_disk = xarray.open_zarr('example-data.zarr', chunks=None)\n", - "\n", - "with beam.Pipeline() as p:\n", - " p | xbeam.DatasetToChunks(on_disk, chunks={'time': 1000}) | beam.MapTuple(print_summary)" - ] + "metadata": {} }, { "cell_type": "markdown", - "id": "dee1e6e1", - "metadata": {}, "source": [ "`chunks=None` tells Xarray to use its builtin lazy indexing machinery, instead of using Dask. This is advantageous because datasets using Xarray's lazy indexing are serialized much more compactly (via [pickle](https://docs.python.org/3/library/pickle.html)) when passed into Beam transforms." - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "id": "6006a96e", - "metadata": {}, "source": [ "Alternatively, you can pass in lazy datasets [using dask](http://xarray.pydata.org/en/stable/user-guide/dask.html). In this case, you don't need to explicitly supply `chunks` to `DatasetToChunks`:" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": 49, - "id": "62563df5", - "metadata": {}, + "source": [ + "on_disk = xarray.open_zarr('example-data.zarr', chunks={'time': 1000})\n", + "\n", + "with beam.Pipeline() as p:\n", + " p | xbeam.DatasetToChunks(on_disk) | beam.MapTuple(print_summary)" + ], "outputs": [ { - "name": "stderr", "output_type": "stream", + "name": "stderr", "text": [ "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.\n" ] }, { - "name": "stdout", "output_type": "stream", + "name": "stdout", "text": [ "Key(offsets={'lat': 0, 'lon': 0, 'time': 0}, vars=None)\n", " with \n", @@ -189,102 +182,100 @@ ] } ], - "source": [ - "on_disk = xarray.open_zarr('example-data.zarr', chunks={'time': 1000})\n", - "\n", - "with beam.Pipeline() as p:\n", - " p | xbeam.DatasetToChunks(on_disk) | beam.MapTuple(print_summary)" - ] + "metadata": {} }, { "cell_type": "markdown", - "id": "fe8869a6", - "metadata": {}, "source": [ "Dask's lazy evaluation system is much more general than Xarray's lazy indexing, so as long as resulting dataset can be independently evaluated in each chunk this can be a very convenient way to setup computation for Xarray-Beam.\n", "\n", "Unfortunately, it doesn't scale as well. In particular, the overhead of pickling large Dask graphs for passing to Beam workers can be prohibitive for large (typically multiple TB) datasets with millions of chunks. However, a current major effort in Dask on [high level graphs](https://blog.dask.org/2021/07/07/high-level-graphs) should improve this in the near future." - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "id": "2dd6e083", - "metadata": {}, "source": [ "```{note}\n", "We are still figuring out the optimal APIs to facilitate opening data and building lazy datasets in Xarray-Beam. E.g., see [this issue](https://github.com/google/xarray-beam/issues/26) for discussion of a higher level `ZarrToChunks` transform embedding these best practices.\n", "```" - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "id": "233809a4", - "metadata": {}, "source": [ "## Writing data to Zarr" - ] + ], + "metadata": {} }, { "cell_type": "markdown", - "id": "672aaea7", - "metadata": {}, "source": [ "[Zarr](https://zarr.readthedocs.io/) is the preferred file format for reading and writing data with Xarray-Beam, due to its excellent scalability and support inside Xarray.\n", "\n", "{py:class}`~xarray_beam.ChunksToZarr` is Xarray-Beam's API for saving chunks into a Zarr store. \n", "\n", "You can get started just using it directly:" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": 50, - "id": "8fd3fede", - "metadata": {}, + "source": [ + "with beam.Pipeline() as p:\n", + " p | xbeam.DatasetToChunks(on_disk) | xbeam.ChunksToZarr('example-data-v2.zarr')" + ], "outputs": [ { - "name": "stderr", "output_type": "stream", + "name": "stderr", "text": [ "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.\n" ] } ], - "source": [ - "with beam.Pipeline() as p:\n", - " p | xbeam.DatasetToChunks(on_disk) | xbeam.ChunksToZarr('example-data-v2.zarr')" - ] + "metadata": {} }, { "cell_type": "markdown", - "id": "c035aee7", - "metadata": {}, "source": [ "By default, `ChunksToZarr` needs to evaluate and combine the entire distributed dataset in order to determine overall Zarr metadata (e.g., array names, shapes, dtypes and attributes). This is fine for relatively small datasets, but can entail significant additional communication and storage costs for large datasets.\n", "\n", "The optional `template` argument allows for prespecifying structure of the full on disk dataset in the form of another lazy `xarray.Dataset`. Like the lazy datasets fed into DatasetToChunks, lazy templates can built-up using either Xarray's lazy indexing or lazy operations with Dask, but the data _values_ in a `template` will never be written to disk -- only the metadata structure is used.\n", "\n", "One recommended pattern is to use a lazy Dask dataset consisting of a single value to build up the desired template, e.g.," - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": 55, - "id": "63bc75e1", - "metadata": {}, - "outputs": [], "source": [ "ds = xarray.open_zarr('example-data.zarr', chunks=None)\n", "template = xarray.zeros_like(ds.chunk()) # a single virtual chunk of all zeros" - ] + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "Xarray operations like indexing and expand dimensions (see {py:meth}`xarray.Dataset.expand_dims`) are entirely lazy on this dataset, which makes it relatively straightforward to build up a Dataset with the required variables and dimensions, e.g., as used in the [ERA5 climatology example](https://github.com/google/xarray-beam/blob/main/examples/era5_climatology.py).\n", + "\n", + "Note that if supply a `template`, you will also typically need to specify the `chunks` argument in order to ensure that the data ends up appropriately chunked in the Zarr store." + ], + "metadata": {} }, { "cell_type": "markdown", - "id": "711026c7", - "metadata": {}, "source": [ - "Xarray operations like indexing and expand dimensions (see {py:meth}`xarray.Dataset.expand_dims`) are entirely lazy on this dataset, which makes it relatively straightforward to build up a Dataset with the required variables and dimensions, e.g., as used in the [ERA5 climatology example](https://github.com/google/xarray-beam/blob/main/examples/era5_climatology.py)." - ] + "```{warning}\n", + "Xarray-Beam does not use locks when writing data to Zarr. If multiple Beam chunks correspond to the same Zarr chunk, your will almost certainly end up with corrupted data due to concurrent writes. To avoid such issues, ensure your data is [chunked appropriately](rechunking.ipynb) before exporting to Zarr. \n", + "```" + ], + "metadata": {} } ], "metadata": { @@ -309,4 +300,4 @@ }, "nbformat": 4, "nbformat_minor": 5 -} +} \ No newline at end of file diff --git a/docs/why-xarray-beam.md b/docs/why-xarray-beam.md new file mode 100644 index 0000000..8ba6e57 --- /dev/null +++ b/docs/why-xarray-beam.md @@ -0,0 +1,75 @@ +# Why Xarray-Beam + +## Our goals + +Xarray-Beam is a Python library for building +[Apache Beam](https://beam.apache.org/) pipelines with +[Xarray](http://xarray.pydata.org/en/stable/) datasets. + +The project aims to facilitate data transformations and analysis on large-scale +multi-dimensional labeled arrays, such as: + +- Ad-hoc computation on Xarray data, by dividing a `xarray.Dataset` into many + smaller pieces ("chunks"). +- Adjusting array chunks, using the + [Rechunker algorithm](https://rechunker.readthedocs.io/en/latest/algorithm.html) +- Ingesting large multi-dimensional array datasets into an analysis-ready, + cloud-optimized format, namely [Zarr](https://zarr.readthedocs.io/) (see + also [Pangeo Forge](https://github.com/pangeo-forge/pangeo-forge-recipes)) +- Calculating statistics (e.g., "climatology") across distributed datasets + with arbitrary groups. + +## Our approach + +In Xarray-Beam, distributed Xarray datasets are represented by Beam PCollections +of `(xarray_beam.Key, xarray.Dataset)` pairs and are manipulated by beam +PTransforms, where {py:class}`~xarray_beam.Key` provides metadata specifying +variables and chunk offsets for each "chunk" in the unordered PCollection. +The chunking model is flexible, allowing datasets to be split across multiple +variables and/or into orthogonal contiguous "chunks" along dimensions. + +Xarray-Beam does not (yet) include high-level abstrations like a "distributed +dataset" object. Users need to have a mental model for how their data pipeline +is distributed across many machines, which is facilitated by its direct +representation as a Beam pipeline. (In our experience, building such a mental +model is basically required to get good performance out of large-scale +pipelines, anyways.) + +Implementation wise, Xarray-Beam is a _thin layer_ on top of existing libraries +for working with large-scale Xarray datasets. For example, it leverages +[Dask](https://dask.org/) for describing lazy arrays and for executing +multi-threaded computation on a single machine. + +## How does Dask compare? + +We love Dask! Xarray-Beam explores a different part of the design space for +distributed data pipelines than Xarray's built-in Dask integration: + +- Xarray-Beam is built around explicit manipulation of `(xarray_beam.Key, + xarray.Dataset)`. This requires more boilerplate but is also + more robust than generating distributed computation graphs in Dask using + Xarray's built-in API. +- Xarray-Beam distributes datasets by splitting them into many + `xarray.Dataset` chunks, rather than the chunks of NumPy arrays typically + used by Xarray with Dask (unless using + [xarray.map_blocks](http://xarray.pydata.org/en/stable/user-guide/dask.html#automatic-parallelization-with-apply-ufunc-and-map-blocks)). + Chunks of datasets is a more convenient data-model for writing ad-hoc whole + dataset transformations, but is potentially a bit less efficient. +- Beam ([like Spark](https://docs.dask.org/en/latest/spark.html)) was designed + around a higher-level model for distributed computation than Dask (although + Dask has been making + [progress in this direction](https://coiled.io/blog/dask-under-the-hood-scheduler-refactor/)). + Roughly speaking, this trade-off favors scalability over flexibility. +- Beam allows for executing distributed computation using multiple runners, + notably including Google Cloud Dataflow and Apache Spark. These runners are + more mature than Dask, and in many cases are supported as a service by major + commercial cloud providers. + +![Xarray-Beam datamodel vs Xarray-Dask](./_static/xarray-beam-vs-xarray-dask.png) + +These design choices are not set in stone. In particular, in the future we +_could_ imagine writing a high-level `xarray_beam.Dataset` that emulates the +`xarray.Dataset` API, similar to the popular high-level DataFrame APIs in Beam, +Spark and Dask. This could be built on top of the lower-level transformations +currently in Xarray-Beam, or alternatively could use a "chunks of NumPy arrays" +representation similar to that used by dask.array.