diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ea9e473..e88f2bb 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -4,6 +4,6 @@ repos: hooks: - id: black - repo: https://github.com/PyCQA/flake8.git - rev: 4.0.1 + rev: 6.1.0 hooks: - id: flake8 diff --git a/README.md b/README.md index e5d0781..4329080 100644 --- a/README.md +++ b/README.md @@ -10,25 +10,81 @@ stores from completely independent processes easier. ## Usage -The primary use-case for this is something like this. Say you have a lot of -netCDF files output from a simulation or observational dataset that you would -like to stitch together into a zarr store. If you have a way of combining -those files lazily -- i.e. opening them into dask-backed arrays -- into a -single dataset, then you can write contiguous "partitions" of that dataset out -via independent processes. A "partition" corresponds to a contiguous group of -dask "chunks." I.e. it can correspond to one or more chunks across any number -of dimensions. A key detail is no partition straddles any dask chunks; this -makes writing from independent processes completely safe. +The primary use-case is something like this. Say you have a lot of netCDF files +output from a simulation or observational dataset that you would like to stitch +together into a zarr store. If you have a way of combining those files lazily — +i.e. opening them into dask-backed arrays — into a single dataset with maybe +some additional light computations, then you can write contiguous "partitions" +of that dataset out via independent processes. A "partition" corresponds to a +contiguous group of dask "chunks." I.e. it can correspond to one or more chunks +across any number of dimensions. A key detail is no partition straddles any +dask chunks; this makes writing from independent processes completely safe. `xpartition` provides an accessor called `partition` that implements -`initialize_store` and `write` methods. The pattern is to have some function -that constructs the dataset lazily, then call `initialize_store`, and finally -in a set of separate processes, call `write`. +`initialize_store` and `write` methods. The pattern is to have some code that +constructs the dataset lazily, then call `initialize_store`, and finally in a +set of separate processes, call `write`. -The process might look something like this. Assume through some external -package we have a function that can construct a dataset lazily. Let's write a -couple command-line interfaces that initialize the store and write a -partition. We'll start with one called `initialize_store.py`: +### Simple serial example + +Before illustrating a use-case of `xpartition` on a cluster, we can start with a +simple serial example. From this example it should be straightforward to +imagine how to extend this to various distributed computing platforms, whether +HPC or cloud-based, to do the same thing in parallel. + +Assume through some external package we have a function that can construct a +dataset lazily. To incrementally write it to zarr using `xpartition` we would +only need to do the following: + +```python +import xpartition + +from external import construct_lazy_dataset + +store = "store.zarr" +partitions = 16 +partition_dims = ["tile", "time"] + +ds = construct_lazy_dataset() +ds.partition.initialize_store(store) +for partition in range(partitions): + ds.partition.write(store, partitions, partition_dims, partition) +``` + +The `partition_dims` describe the dimensions over which to partition the +dataset; if chunks exist along dimensions that are not among the partition +dimensions, then they will all be grouped together. If you are not particular +about this, simply using `ds.dims` will also work out of the box. + +### Parallelization using `multiprocessing` + +A parallel example can easily be illustrated using the built-in +`multiprocessing` library; something similar could be done with `dask.bag`: + +```python +import xpartition + +from external import construct_lazy_dataset + +store = "store.zarr" +partitions = 16 +partition_dims = ["tile", "time"] + +ds = construct_lazy_dataset() +ds.partition.initialize_store(store) +with multiprocessing.get_context("spawn").Pool(partitions) as pool: + pool.map( + ds.partition.mappable_write(store, partitions, partition_dims), + range(partitions) + ) +``` + +### Parallelization using a SLURM array job + +Finally, the example below describes how one might use `xpartition` on an HPC +cluster using a SLURM array job. We first start by writing a couple +command-line interfaces that initialize the store and write a partition. We'll +start with one called `initialize_store.py`: ```python import argparse @@ -72,7 +128,7 @@ ds = construct_lazy_dataset() ds.partition.write(args.store, args.ranks, dims, args.rank) ``` -Now let's write a couple bash scripts. The first will be a SLURM array job +Now we can write a couple bash scripts. The first will be a SLURM array job that writes all the partitions. The second will be a "main" script that controls the whole workflow. @@ -130,8 +186,8 @@ out of memory errors ([this issue](https://github.com/dask/distributed/issues/63 is perhaps a good summary of the state of things currently in dask). Breaking the problem down in the way that `xpartition` does, allows you to gain the benefits of dask's laziness on each independent process, while working in a distributed -environment. *In an ideal world we wouldn't need a package like this -- we would -let dask and dask distributed handle everything -- but in practice that does not +environment. *In an ideal world we wouldn't need a package like this — we would +let dask and dask distributed handle everything — but in practice that does not work perfectly yet.* ## Installation diff --git a/setup.cfg b/setup.cfg index 151badb..51ba293 100644 --- a/setup.cfg +++ b/setup.cfg @@ -43,12 +43,17 @@ setup_requires = setuptools_scm [flake8] -ignore = - E203 # whitespace before ':' - doesn't work well with black - E402 # module level import not at top of file - E501 # line too long - let black worry about that - E731 # do not assign a lambda expression, use a def - W503 # line break before binary operator +ignore = + # whitespace before ':' - doesn't work well with black + E203 + # module level import not at top of file + E402 + # line too long - let black worry about that + E501 + # do not assign a lambda expression, use a def + E731 + # line break before binary operator + W503 exclude = .eggs doc