-
Notifications
You must be signed in to change notification settings - Fork 29
/
Copy pathhpc.qmd
366 lines (281 loc) · 22.6 KB
/
hpc.qmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
---
execute:
freeze: auto
---
# High-performance computing (the old way) {#hpc}
```{r, message = FALSE, warning = FALSE, echo = FALSE}
knitr::opts_knit$set(root.dir = fs::dir_create(tempfile()))
knitr::opts_chunk$set(collapse = TRUE, comment = "#>", eval = TRUE)
```
```{r, message = FALSE, warning = FALSE, echo = FALSE, eval = TRUE}
library(targets)
library(tarchetypes)
```
:::{.callout-tip}
## Performance
See the [performance chapter](#performance) for options, settings, and other choices to make parallel and distributed pipelines more efficient.
:::
`targets` supports parallel and distributed computing with the `tar_make_clustermq()` and `tar_make_future()` functions. These functions are like `tar_make()`, but they allow multiple targets to run simultaneously over parallel workers.^[Please use `tar_make_clustermq()` or `tar_make_future()` instead of multiple concurrent calls to `tar_make()`. The former is the proper way to use parallel computing in `targets`, and the latter will probably break the [data store](#data).] These workers can be processes on your local machine, or they can be jobs on a computing cluster. The main process automatically sends a target to a worker as soon as
1. The worker is available, and
1. All the target's upstream dependency targets have been checked or built.
Consider the following sketch of a pipeline.
```{r, echo = FALSE, eval = TRUE}
tar_script({
list(
tar_target(data, get_data()),
tar_target(fast_fit, fit_small_model(data)),
tar_target(slow_fit, fit_slow_model(data)),
tar_target(plot_1, make_plot(fast_fit)),
tar_target(plot_2, make_plot(slow_fit))
)
})
```
```{r, eval = FALSE}
# _targets.R
library(targets)
library(tarchetypes)
list(
tar_target(data, get_data()),
tar_target(fast_fit, fit_small_model(data)),
tar_target(slow_fit, fit_slow_model(data)),
tar_target(plot_1, make_plot(fast_fit)),
tar_target(plot_2, make_plot(slow_fit))
)
```
```{r, eval = TRUE}
# R console
tar_visnetwork()
```
When we run this pipeline with high-performance computing, `targets` automatically knows to wait for `data` to finish running before moving on to the other targets. Once `data` is finished, it moves on to targets `fast_fit` and `slow_fit`. If `fast_fit` finishes before `slow_fit`, target `plot_1` begins even as `slow_fit` is still running.^[Unlike [`drake`](https://github.com/ropensci/drake), `targets` applies this behavior not only to stem targets, but also to branches of patterns.]
The following sections cover the implementation details of parallel and distributed pipelines in `targets`.
:::{.callout-tip}
## Start small
Even if you plan to create a large-scale heavy-duty pipeline with hundreds of time-consuming parallel targets, it is best to start small. First, create a version of the pipeline with a small number of quick-to-run targets, follow directions in the [walkthrough](#walkthrough) chapter to inspect it, and run it locally with `tar_make()`. Do not scale up to the full-sized pipeline until you are sure everything is working in the downsized pipeline that runs locally.
:::
## Clustermq
[`tar_make_clustermq()`](https://docs.ropensci.org/targets/reference/tar_make_clustermq.html) uses the [`clustermq`](https://mschubert.github.io/clustermq/) package, and prior familiarity with [`clustermq`](https://mschubert.github.io/clustermq/) is extremely helpful for configuring `targets` and diagnosing errors. So before you use [`tar_make_clustermq()`](https://docs.ropensci.org/targets/reference/tar_make_clustermq.html), please read the documentation at <https://mschubert.github.io/clustermq/> and try out [`clustermq`](https://mschubert.github.io/clustermq/) directly. If you plan to use a scheduler like SLURM or SGE, please configure and experiment with [`clustermq`](https://mschubert.github.io/clustermq/) on your scheduler without `targets`. And if you later experience issues with [`tar_make_clustermq()`](https://docs.ropensci.org/targets/reference/tar_make_clustermq.html), try to isolate the problem by creating a [reproducible example](https://www.tidyverse.org/help/) that uses [`clustermq`](https://mschubert.github.io/clustermq/) and not `targets`. Peeling back layers can help isolate problems and point toward specific solutions, and `targets` is usually one of the outer layers.
### Persistent workers
`tar_make_clustermq()` uses persistent workers. A persistent worker is an R process that launches early in the pipeline and stays running until the whole pipeline starts to wind down. A persistent worker usually runs multiple targets during its lifecycle, and it is not possible to precisely predict in advance which targets will be assigned to which workers. The video clip below visualizes the concept.
<script src="https://fast.wistia.com/embed/medias/ycczhxwkjw.jsonp" async></script><script src="https://fast.wistia.com/assets/external/E-v1.js" async></script><div class="wistia_responsive_padding" style="padding:56.21% 0 0 0;position:relative;"><div class="wistia_responsive_wrapper" style="height:100%;left:0;position:absolute;top:0;width:100%;"><div class="wistia_embed wistia_async_ycczhxwkjw videoFoam=true" style="height:100%;position:relative;width:100%"><div class="wistia_swatch" style="height:100%;left:0;opacity:0;overflow:hidden;position:absolute;top:0;transition:opacity 200ms;width:100%;"><img src="https://fast.wistia.com/embed/medias/ycczhxwkjw/swatch" style="filter:blur(5px);height:100%;object-fit:contain;width:100%;" alt="" onload="this.parentNode.style.opacity=1;" /></div></div></div></div>
### Clustermq installation
Persistent workers require the [`clustermq`](https://github.com/mschubert/clustermq) R package, which in turn requires [ZeroMQ](http://zeromq.org/). Please refer to the [`clustermq` installation guide](https://github.com/mschubert/clustermq/blob/master/README.md#installation) for specific instructions.
### Automatic configuration with `use_targets()`
If `clustermq` is installed, then `use_targets()` function configures `clustermq` automatically. `use_targets()` tries to detect the cluster you are using and write the appropriate `_targets.R` settings `clustermq` configuration files commensurate with your system resources. On most systems, this should allow you you to run `tar_make_clustermq()` with no extra configuration. If so, you can skip the rest of the `clustermq` section. If not, read on to learn how to configure `clustermq` for `targets`.
### Clustermq local configuration
To utilize parallel computing on your local machine, set the `clustermq.scheduler` option to `"multicore"` or `"multiprocess"` in your `_targets.R` file. See the [configuration instructions](https://mschubert.github.io/clustermq/articles/userguide.html#configuration) for details.
```{r, eval = FALSE}
# _targets.R
options(clustermq.scheduler = "multiprocess")
list(
tar_target(data, get_data()),
tar_target(fast_fit, fit_small_model(data)),
tar_target(slow_fit, fit_slow_model(data)),
tar_target(plot_1, make_plot(fast_fit)),
tar_target(plot_2, make_plot(slow_fit))
)
```
Then, run `tar_make_clustermq()` with the appropriate number of workers. These workers are local R processes that run concurrently to run the outdated targets in the pipeline. The workers launch as soon as there is an outdated target with `deployment = "worker"`, and they continue running until there are no more targets to run.
```{r, eval = FALSE}
# R console
tar_make_clustermq(workers = 2)
```
### Clustermq remote configuration
A cluster is a collection of multiple computers that work together to run computationally demanding workloads. A cluster is capable of running far more tasks simultaneously than a local workstation like a PC or laptop. When configured for a cluster, `tar_make_clustermq()` uses `clustermq` to run persistent workers as long-running distributed jobs. To manually set the configuration,
1. Choose a [scheduler listed here](https://mschubert.github.io/clustermq/articles/userguide.html#configuration) that corresponds to your cluster's resource manager.
1. Create a [template file](https://github.com/mschubert/clustermq/tree/master/inst) that configures the computing requirements and other settings for the cluster.
Supply the scheduler option and template file to the `clustermq.scheduler` and `clustermq.template` global options in your target script file (default: `_targets.R`).
```{r, eval = FALSE}
# _targets.R
options(clustermq.scheduler = "sge", clustermq.template = "sge.tmpl")
list(
tar_target(data, get_data()),
tar_target(fast_fit, fit_small_model(data)),
tar_target(slow_fit, fit_slow_model(data)),
tar_target(plot_1, make_plot(fast_fit)),
tar_target(plot_2, make_plot(slow_fit))
)
```
Above, `sge_tmpl` refers to a template file like the one below.
```
## From https://github.com/mschubert/clustermq/wiki/SGE
#$ -N {{ job_name }} # Worker name.
#$ -t 1-{{ n_jobs }} # Submit workers as an array.
#$ -j y # Combine stdout and stderr into one worker log file.
#$ -o /dev/null # Worker log files.
#$ -cwd # Use project root as working directory.
#$ -V # Use environment variables.
module load R/3.6.3 # Needed if R is an environment module on the cluster.
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ main }}")' # Leave alone.
```
Then, run `tar_make_clustermq()` as before. Instead of running locally, the workers will run as jobs on the cluster. Because they are persistent workers, they launch as soon as there is an outdated target with `deployment = "worker"`, and they continue running until they complete their current targets and there are no more targets to assign.
```{r, eval = FALSE}
# R console
tar_make_clustermq(workers = 2)
```
See the examples [linked from here](https://docs.ropensci.org/targets/index.html#examples) to see how this setup works in real-world projects.
### Clustermq template file configuration
In addition to configuration options hard-coded in the template file, you can supply custom computing resources with the `resources` argument of `tar_option_set()`. As an example, let's use a wildcard for the number of cores per worker on an SGE cluster. In the template file, supply `{{ num_cores }}` wildcard to the `-pe smp` flag.
```
#$ -pe smp {{ num_cores }} # Number of cores per worker
#$ -N {{ job_name | 1 }}
#$ -t 1-{{ n_jobs }}
#$ -j y
#$ -o /dev/null
#$ -cwd
#$ -V
module load R/3.6.3
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ main }}")'
```
Then, supply the value of `num_cores` to the `resources` option from within the target script file (default: `_targets.R`). In older version of `targets`, `resources` was a named list. In `targets` 0.5.0.9000 and above, please create the `resources` argument with helpers `tar_resources()` and `tar_resources_clustermq()`.
```{r, eval = FALSE}
# _targets.R
# With older versions of targets:
# tar_option_set(resources = list(num_cores = 2))
# With targets >= 0.5.0.9000:
tar_option_set(
resources = tar_resources(
clustermq = tar_resources_clustermq(template = list(num_cores = 2))
)
)
list(
tar_target(...),
... # more targets
)
```
Finally, call `tar_make_clustermq()` normally.
```{r, eval = FALSE}
# R console
tar_make_clustermq(workers = 2)
```
This particular use case comes up when you have custom parallel computing within targets and need to take advantage of multiple cores.
:::{.callout-warning}
## `clustermq` resources
Remember: with `tar_make_clustermq()`, the workers are *persistent*, which means there is not a one-to-one correspondence between workers and targets. In the `resources` arguemnt of `tar_option_set()`, the `clustermq`-specific resources apply to the workers, not the targets directly. So in the `resources` argument of `tar_target()`, any `clustermq`-specific will be ignored. `tar_option_set()` is the only way to set `clustermq` resources.
:::
## Future
[`tar_make_future()`](https://docs.ropensci.org/targets/reference/tar_make_future.html) uses the [`future`](https://future.futureverse.org/) package, and prior familiarity with [`future`](https://future.futureverse.org/) is extremely helpful for configuring `targets` and diagnosing errors. So before you use [`tar_make_future()`](https://docs.ropensci.org/targets/reference/tar_make_future.html), please read the documentation at <https://future.futureverse.org/> and try out [`future`](https://future.futureverse.org/) directly, ideally with backends like [`future.callr`](https://future.callr.futureverse.org/) and possibly [`future.batchtools`](https://future.batchtools.futureverse.org/). If you plan to use a scheduler like SLURM or SGE, please configure and experiment with [`future`](https://future.futureverse.org/) on your scheduler without `targets`. And if you later experience issues with [`tar_make_future()`](https://docs.ropensci.org/targets/reference/tar_make_future.html), try to isolate the problem by creating a [reproducible example](https://www.tidyverse.org/help/) that uses [`future`](https://future.futureverse.org/) and not `targets`. Same goes for [`future.batchtools`](https://future.batchtools.futureverse.org/) if applicable. Peeling back layers can help isolate problems and point toward specific solutions, and `targets` is usually one of the outer layers.
### Transient workers
`tar_make_future()` runs transient workers. That means each target gets its own worker which initializes when the target begins and terminates when the target ends. The following video clip demonstrates the concept.
<script src="https://fast.wistia.com/embed/medias/340yvlp515.jsonp" async></script><script src="https://fast.wistia.com/assets/external/E-v1.js" async></script><div class="wistia_responsive_padding" style="padding:56.21% 0 0 0;position:relative;"><div class="wistia_responsive_wrapper" style="height:100%;left:0;position:absolute;top:0;width:100%;"><div class="wistia_embed wistia_async_340yvlp515 videoFoam=true" style="height:100%;position:relative;width:100%"><div class="wistia_swatch" style="height:100%;left:0;opacity:0;overflow:hidden;position:absolute;top:0;transition:opacity 200ms;width:100%;"><img src="https://fast.wistia.com/embed/medias/340yvlp515/swatch" style="filter:blur(5px);height:100%;object-fit:contain;width:100%;" alt="" onload="this.parentNode.style.opacity=1;" /></div></div></div></div><br>
### Automatic configuration with `use_targets()`
If `future` and `future.batchtools` are installed, then `use_targets()` function configures `future` automatically. `use_targets()` tries to detect the cluster you are using and write the appropriate `_targets.R` settings, and if appropriate, the `batchtools` configuration file commensurate with your system resources. On most systems, this should allow you you to run `tar_make_future()` with no extra configuration. If so, you can skip the rest of the `future` section. If not, read on to learn how to configure `future` for `targets`.
### Future installation
Install the [`future`](https://github.com/HenrikBengtsson/future) package.
```{r, eval = FALSE}
install.packages("future")
```
If you intend to use a cluster, be sure to install the [`future.batchtools`](https://github.com/HenrikBengtsson/future.batchtools) package too.
```{r, eval = FALSE}
install.packages("future.batchtools")
```
The [`future`](https://github.com/HenrikBengtsson/future) ecosystem contains even more packages that extend [`future`](https://github.com/HenrikBengtsson/future)'s parallel computing functionality, such as [`future.callr`](https://github.com/HenrikBengtsson/future.callr).
```{r, eval = FALSE}
install.packages("future.callr")
```
### Future locally
If you ar manually configuring `future`/`future.batchtools` for your pipeline, declare a `future` plan in your target script file (default: `_targets.R`). The `callr` plan from the [`future.callr`](https://future.callr.futureverse.org) package is recommended.^[Some alternative local future plans are [listed here](https://github.com/HenrikBengtsson/future/#controlling-how-futures-are-resolved).]
It is crucial that `future::plan()` is called in the target script file itself - defining a plan interactively before invoking `tar_make_future()` does not leverage the future package.
```{r, eval = FALSE}
# _targets.R
library(future)
library(future.callr)
plan(callr)
list(
tar_target(data, get_data()),
tar_target(fast_fit, fit_small_model(data)),
tar_target(slow_fit, fit_slow_model(data)),
tar_target(plot_1, make_plot(fast_fit)),
tar_target(plot_2, make_plot(slow_fit))
)
```
Then, run `tar_make_future()` with the desired number of workers. Here, the `workers` argument specifies the maximum number of transient workers to allow at a given time. Some `future` plans also have optional `workers` arguments that set their own caps.
```{r, eval = FALSE}
# R console
tar_make_future(workers = 2)
```
### Future remotely
To run transient workers on a cluster, first install the [`future.batchtools`](https://github.com/HenrikBengtsson/future.batchtools) package. Then, set one of [these plans](https://github.com/HenrikBengtsson/future.batchtools#choosing-batchtools-backend) in your target script file (default: `_targets.R`).
```{r, eval = FALSE}
# _targets.R
library(future)
library(future.batchtools)
plan(batchtools_sge, template = "sge.tmpl")
list(
tar_target(data, get_data()),
tar_target(fast_fit, fit_small_model(data)),
tar_target(slow_fit, fit_slow_model(data)),
tar_target(plot_1, make_plot(fast_fit)),
tar_target(plot_2, make_plot(slow_fit))
)
```
Here, our template file `sge.tmpl` is configured for `batchtools`.
```
#!/bin/bash
#$ -cwd # Run in the current working directory.
#$ -j y # Direct stdout and stderr to the same file.
#$ -o <%= log.file %> # log file
#$ -V # Use environment variables.
#$ -N <%= job.name %> # job name
module load R/3.6.3 # Uncomment and adjust if R is an environment module.
Rscript -e 'batchtools::doJobCollection("<%= uri %>")' # Leave alone.
exit 0 # Leave alone.
```
### Future configuration
The `tar_target()`, `tar_target_raw()`, and `tar_option_set()` functions accept a `resources` argument.^[The `resources` of `tar_target()` defaults to `tar_option_get("resources")`. You can set the default value for all targets using `tar_option_set()`.] For example, if our `batchtools` template file has a wildcard for the number of cores for a job,
```
#!/bin/bash
#$ -pe smp <%= resources[["num_cores"]] | 1 %> # Wildcard for cores per job.
#$ -cwd
#$ -j y
#$ -o <%= log.file %>
#$ -V
#$ -N <%= job.name %>
module load R/3.6.3
Rscript -e 'batchtools::doJobCollection("<%= uri %>")'
exit 0
```
then you can set the number of cores for an individual target using a target-specific `future` plan. In the case below, maybe the slow model needs 2 cores to run fast enough. Because of the `resources[["num_cores"]]` placeholder in the above template file, we can control the number of cores in each target through its local plan.^[In older version of `targets`, `resources` was a named list. In `targets` version 0.5.0.9000 and above, please create the `resources` argument with helpers `tar_resources()` and `tar_resources_future()`.]
```{r, eval = FALSE}
# _targets.R
library(future)
library(future.batchtools)
plan(batchtools_sge, template = "sge.tmpl")
list(
tar_target(data, get_data()),
tar_target(fast_fit, fit_small_model(data)),
# With older version of targets:
# tar_target(slow_fit, fit_slow_model(data), resources = list(num_cores = 2)),
# With targets >= 0.5.0.9000:
tar_target(
slow_fit,
fit_slow_model(data),
resources = tar_resources(
future = tar_resources_future(
plan = tweak(
batchtools_sge,
template = "sge.tmpl",
resources = list(num_cores = 2)
)
)
)
),
tar_target(plot_1, make_plot(fast_fit)),
tar_target(plot_2, make_plot(slow_fit))
)
```
Then, run `tar_make_future()` as usual.
```{r, eval = FALSE}
# R console
tar_make_future(workers = 2)
```
## Advanced
Functions `tar_target()`, `tar_target_raw()`, and `tar_option_set()` support advanced configuration options for heavy-duty pipelines that require high-performance computing.
* `deployment`: With the `deployment` argument, you can choose to run some targets locally on the main process instead of on a high-performance computing worker. This options is suitable for lightweight targets such as R Markdown reports where runtime is quick and a cluster would be excessive.
* `memory`: Choose whether to retain a target in memory or remove it from memory whenever it is not needed at the moment. This is a tradeoff between memory consumption and storage read speeds, and like all of the options listed here, you can set it on a target-by-target basis. The default settings consume a lot of memory to avoid frequently reading from storage. To keep memory usage down to a minimum, set `memory = "transient"` and `garbage_collection = TRUE` in `tar_target()` or `tar_option_set()`. For cloud-based dynamic files such as `format = "aws_file"`, this memory policy applies to temporary local copies of the file in `_targets/scratch/`: `"persistent"` means they remain until the end of the pipeline, and `"transient"` means they get deleted from the file system as soon as possible. The former conserves bandwidth, and the latter conserves local storage.
* `garbage_collection`: Choose whether to run `base::gc()` just before running the target.
* `storage`: Choose whether the parallel workers or the main process is responsible for saving the target's value. For slow network file systems on clusters, `storage = "main"` is often faster for small numbers of targets. For large numbers of targets or low-bandwidth connections between the main and workers, `storage = "worker"` is often faster. Always choose `storage = "main"` if the workers do not have access to the file system with the `_targets/` data store.
* `retrieval`: Choose whether the parallel workers or the main process is responsible for reading dependency targets from disk. Should usually be set to whatever you choose for `storage` (default). Always choose `retrieval = "main"` if the workers do not have access to the file system with the `_targets/` data store.
* `format`: If your pipeline has large computation, it may also have large data. Consider setting the `format` argument to help `targets` store and retrieve your data faster.
* `error`: Set `error` to `"continue"` to let the rest of the pipeline keep running even if a target encounters an error.
## Cloud computing
Right now, `targets` does not have built-in cloud-based distributed computing support. However, future development plans include seamless integration with [AWS Batch](https://aws.amazon.com/batch/). As a temporary workaround, it is possible to deploy a burstable [SLURM](https://slurm.schedmd.com/documentation.html) cluster using [AWS ParallelCluster](https://aws.amazon.com/hpc/parallelcluster/) and leverage `targets`' [existing support for traditional schedulers](#hpc).