diff --git a/NEWS.md b/NEWS.md index 70f7722f..433cd57a 100644 --- a/NEWS.md +++ b/NEWS.md @@ -4,6 +4,7 @@ * Add new controller methods `autoscale()`, `descale()`, and `started()` to facilitate different kinds of Shiny apps. * Deprecate the `scale` and `throttle` methods of `controller$promise()`. `promise()` now always calls `autoscale()` to make sure one and only one auto-scaling loop is running asynchronously. Auto-scaling thus continues even after the promise resolves. * Add a second example vignette that simulates coin flips. +* Add a new `error` argument to `collect()` (#166). # crew 0.9.1 diff --git a/R/crew_controller.R b/R/crew_controller.R index 1f2bcf56..3a0be2aa 100644 --- a/R/crew_controller.R +++ b/R/crew_controller.R @@ -1018,19 +1018,19 @@ crew_class_controller <- R6::R6Class( #' within the last `seconds_interval` seconds. `FALSE` to auto-scale #' every time `scale()` is called. Throttling avoids #' overburdening the `mirai` dispatcher and other resources. - #' @param error Character of length 1, choice of action if + #' @param error `NULL` or character of length 1, choice of action if #' the popped task threw an error. Possible values: #' * `"stop"`: throw an error in the main R session instead of returning #' a value. #' * `"warn"`: throw a warning. - #' * `"silent"`: do nothing special. + #' * `NULL` or `"silent"`: do not react to errors. #' @param controllers Not used. Included to ensure the signature is #' compatible with the analogous method of controller groups. pop = function( scale = TRUE, collect = NULL, throttle = TRUE, - error = "silent", + error = NULL, controllers = NULL ) { crew_deprecate( @@ -1043,6 +1043,16 @@ crew_class_controller <- R6::R6Class( skip_cran = TRUE, frequency = "once" ) + if (!is.null(error)) { + crew_assert( + error, + is.character(.), + !anyNA(.), + nzchar(.), + length(.) == 1L, + error %in% c("stop", "warn", "silent") + ) + } if (scale) { .subset2(self, "scale")(throttle = throttle) } @@ -1091,22 +1101,21 @@ crew_class_controller <- R6::R6Class( return(out) } # nocov end - error_message <- .subset2(out, "error") on.exit({ index <- .subset2(out, "worker") private$.log$tasks[index] <- .subset2(log, "tasks")[index] + 1L private$.log$seconds[index] <- .subset2(log, "seconds")[index] + .subset2(out, "seconds") private$.log$errors[index] <- .subset2(log, "errors")[index] + - !anyNA(error_message) + !anyNA(.subset2(out, "error")) private$.log$warnings[index] <- .subset2(log, "warnings")[index] + !anyNA(.subset2(out, "warnings")) }, add = TRUE) - if (!anyNA(error_message)) { + if (!is.null(error) && !anyNA(.subset2(out, "error"))) { if (identical(error, "stop")) { - crew_error(message = error_message) + crew_error(message = .subset2(out, "error")) } else if (identical(error, "warn")) { - crew_warning(message = error_message) + crew_warning(message = .subset2(out, "error")) } } out @@ -1114,7 +1123,8 @@ crew_class_controller <- R6::R6Class( #' @description Pop all available task results and return them in a tidy #' `tibble`. #' @return A `tibble` of results and metadata of all resolved tasks, - #' with one row per task. + #' with one row per task. Returns `NULL` if there are no tasks + #' to collect. #' @param scale Logical of length 1, #' whether to automatically call `scale()` #' to auto-scale workers to meet the demand of the task load. @@ -1122,12 +1132,23 @@ crew_class_controller <- R6::R6Class( #' within the last `seconds_interval` seconds. `FALSE` to auto-scale #' every time `scale()` is called. Throttling avoids #' overburdening the `mirai` dispatcher and other resources. + #' @param error `NULL` or character of length 1, choice of action if + #' the popped task threw an error. Possible values: + #' * `"stop"`: throw an error in the main R session instead of returning + #' a value. + #' * `"warn"`: throw a warning. + #' * `NULL` or `"silent"`: do not react to errors. #' @param controllers Not used. Included to ensure the signature is #' compatible with the analogous method of controller groups. - collect = function(scale = TRUE, throttle = TRUE, controllers = NULL) { + collect = function( + scale = TRUE, + throttle = TRUE, + error = NULL, + controllers = NULL + ) { pop <- .subset2(self, "pop") results <- list() - while (!is.null(result <- pop(scale = FALSE))) { + while (!is.null(result <- pop(scale = FALSE, error = error))) { results[[length(results) + 1L]] <- result } out <- lapply(results, monad_tibble) diff --git a/R/crew_controller_group.R b/R/crew_controller_group.R index 23255a25..d3e02b15 100644 --- a/R/crew_controller_group.R +++ b/R/crew_controller_group.R @@ -711,19 +711,19 @@ crew_class_controller_group <- R6::R6Class( #' within the last `seconds_interval` seconds. `FALSE` to auto-scale #' every time `scale()` is called. Throttling avoids #' overburdening the `mirai` dispatcher and other resources. - #' @param error Character of length 1, choice of action if + #' @param error `NULL` or character of length 1, choice of action if #' the popped task threw an error. Possible values: #' * `"stop"`: throw an error in the main R session instead of returning #' a value. #' * `"warn"`: throw a warning. - #' * `"silent"`: do nothing special. + #' * `NULL` or `"silent"`: do not react to errors. #' @param controllers Character vector of controller names. #' Set to `NULL` to select all controllers. pop = function( scale = TRUE, collect = NULL, throttle = TRUE, - error = "silent", + error = NULL, controllers = NULL ) { control <- private$.select_controllers(controllers) @@ -751,11 +751,29 @@ crew_class_controller_group <- R6::R6Class( #' within the last `seconds_interval` seconds. `FALSE` to auto-scale #' every time `scale()` is called. Throttling avoids #' overburdening the `mirai` dispatcher and other resources. + #' @param error `NULL` or character of length 1, choice of action if + #' the popped task threw an error. Possible values: + #' * `"stop"`: throw an error in the main R session instead of returning + #' a value. + #' * `"warn"`: throw a warning. + #' * `NULL` or `"silent"`: do not react to errors. #' @param controllers Character vector of controller names. #' Set to `NULL` to select all controllers. - collect = function(scale = TRUE, throttle = TRUE, controllers = NULL) { + collect = function( + scale = TRUE, + throttle = TRUE, + error = NULL, + controllers = NULL + ) { control <- private$.select_controllers(controllers) - out <- map(control, ~.x$collect(scale = scale, throttle = throttle)) + out <- map( + control, + ~.x$collect( + scale = scale, + throttle = throttle, + error = error + ) + ) out <- tibble::new_tibble(data.table::rbindlist(out, use.names = FALSE)) if_any(nrow(out), out, NULL) }, diff --git a/man/crew_class_controller.Rd b/man/crew_class_controller.Rd index 2c73b04c..29e12988 100644 --- a/man/crew_class_controller.Rd +++ b/man/crew_class_controller.Rd @@ -874,7 +874,7 @@ Pop a completed task from the results data frame. scale = TRUE, collect = NULL, throttle = TRUE, - error = "silent", + error = NULL, controllers = NULL )}\if{html}{\out{}} } @@ -897,13 +897,13 @@ within the last \code{seconds_interval} seconds. \code{FALSE} to auto-scale every time \code{scale()} is called. Throttling avoids overburdening the \code{mirai} dispatcher and other resources.} -\item{\code{error}}{Character of length 1, choice of action if +\item{\code{error}}{\code{NULL} or character of length 1, choice of action if the popped task threw an error. Possible values: \itemize{ \item \code{"stop"}: throw an error in the main R session instead of returning a value. \item \code{"warn"}: throw a warning. -\item \code{"silent"}: do nothing special. +\item \code{NULL} or \code{"silent"}: do not react to errors. }} \item{\code{controllers}}{Not used. Included to ensure the signature is @@ -956,6 +956,7 @@ Pop all available task results and return them in a tidy \if{html}{\out{
}}\preformatted{crew_class_controller$collect( scale = TRUE, throttle = TRUE, + error = NULL, controllers = NULL )}\if{html}{\out{
}} } @@ -972,6 +973,15 @@ within the last \code{seconds_interval} seconds. \code{FALSE} to auto-scale every time \code{scale()} is called. Throttling avoids overburdening the \code{mirai} dispatcher and other resources.} +\item{\code{error}}{\code{NULL} or character of length 1, choice of action if +the popped task threw an error. Possible values: +\itemize{ +\item \code{"stop"}: throw an error in the main R session instead of returning +a value. +\item \code{"warn"}: throw a warning. +\item \code{NULL} or \code{"silent"}: do not react to errors. +}} + \item{\code{controllers}}{Not used. Included to ensure the signature is compatible with the analogous method of controller groups.} } @@ -979,7 +989,8 @@ compatible with the analogous method of controller groups.} } \subsection{Returns}{ A \code{tibble} of results and metadata of all resolved tasks, -with one row per task. +with one row per task. Returns \code{NULL} if there are no tasks +to collect. } } \if{html}{\out{
}} diff --git a/man/crew_class_controller_group.Rd b/man/crew_class_controller_group.Rd index 3786a78d..1d9c7a93 100644 --- a/man/crew_class_controller_group.Rd +++ b/man/crew_class_controller_group.Rd @@ -841,7 +841,7 @@ Pop a completed task from the results data frame. scale = TRUE, collect = NULL, throttle = TRUE, - error = "silent", + error = NULL, controllers = NULL )}\if{html}{\out{}} } @@ -860,13 +860,13 @@ within the last \code{seconds_interval} seconds. \code{FALSE} to auto-scale every time \code{scale()} is called. Throttling avoids overburdening the \code{mirai} dispatcher and other resources.} -\item{\code{error}}{Character of length 1, choice of action if +\item{\code{error}}{\code{NULL} or character of length 1, choice of action if the popped task threw an error. Possible values: \itemize{ \item \code{"stop"}: throw an error in the main R session instead of returning a value. \item \code{"warn"}: throw a warning. -\item \code{"silent"}: do nothing special. +\item \code{NULL} or \code{"silent"}: do not react to errors. }} \item{\code{controllers}}{Character vector of controller names. @@ -890,6 +890,7 @@ Pop all available task results and return them in a tidy \if{html}{\out{
}}\preformatted{crew_class_controller_group$collect( scale = TRUE, throttle = TRUE, + error = NULL, controllers = NULL )}\if{html}{\out{
}} } @@ -906,6 +907,15 @@ within the last \code{seconds_interval} seconds. \code{FALSE} to auto-scale every time \code{scale()} is called. Throttling avoids overburdening the \code{mirai} dispatcher and other resources.} +\item{\code{error}}{\code{NULL} or character of length 1, choice of action if +the popped task threw an error. Possible values: +\itemize{ +\item \code{"stop"}: throw an error in the main R session instead of returning +a value. +\item \code{"warn"}: throw a warning. +\item \code{NULL} or \code{"silent"}: do not react to errors. +}} + \item{\code{controllers}}{Character vector of controller names. Set to \code{NULL} to select all controllers.} } diff --git a/tests/testthat/test-crew_controller.R b/tests/testthat/test-crew_controller.R index 4d0eb366..6d859231 100644 --- a/tests/testthat/test-crew_controller.R +++ b/tests/testthat/test-crew_controller.R @@ -76,7 +76,7 @@ crew_test("controller walk()", { ) }) -crew_test("controller collect()", { +crew_test("controller collect() success", { skip_on_cran() skip_on_os("windows") on.exit({ @@ -94,6 +94,64 @@ crew_test("controller collect()", { expect_equal(nrow(out), 2L) expect_equal(as.character(out$result), rep("done", 2)) expect_null(x$collect()) + expect_crew_error(x$collect(error = "bad")) +}) + +crew_test("controller collect() silent error", { + skip_on_cran() + skip_on_os("windows") + on.exit({ + x$terminate() + rm(x) + gc() + crew_test_sleep() + }) + x <- crew_controller_local(workers = 1L, seconds_idle = 30L) + x$start() + x$push("success") + x$push(stop("failure 1")) + x$push(stop("failure 2")) + x$wait(mode = "all") + expect_silent(out <- x$collect(error = "silent")) + expect_true("failure 1" %in% out$error) +}) + +crew_test("controller collect() error as warning", { + skip_on_cran() + skip_on_os("windows") + on.exit({ + x$terminate() + rm(x) + gc() + crew_test_sleep() + }) + x <- crew_controller_local(workers = 1L, seconds_idle = 30L) + x$start() + x$push("success") + x$push(stop("failure 1")) + x$push(stop("failure 2")) + x$wait(mode = "all") + suppressWarnings( + expect_warning(x$collect(error = "warn"), class = "crew_warning") + ) +}) + +crew_test("controller collect() stop on error", { + skip_on_cran() + skip_on_os("windows") + on.exit({ + x$terminate() + rm(x) + gc() + crew_test_sleep() + }) + x <- crew_controller_local(workers = 1L, seconds_idle = 30L) + x$start() + x$push("success") + x$push(stop("failure 1")) + x$push(stop("failure 2")) + x$wait(mode = "all") + expect_crew_error(x$collect(error = "stop")) }) crew_test("controller map() works", { diff --git a/tests/testthat/test-crew_controller_group.R b/tests/testthat/test-crew_controller_group.R index 4c19f223..125c32ac 100644 --- a/tests/testthat/test-crew_controller_group.R +++ b/tests/testthat/test-crew_controller_group.R @@ -399,6 +399,66 @@ crew_test("controller group collect() with two active controllers", { expect_null(x$collect()) }) +crew_test("controller group collect() silent error", { + skip_on_cran() + skip_on_os("windows") + on.exit({ + x$terminate() + rm(x) + gc() + crew_test_sleep() + }) + a <- crew_controller_local(workers = 1L, seconds_idle = 30L) + x <- crew_controller_group(a) + x$start() + x$push("success") + x$push(stop("failure 1")) + x$push(stop("failure 2")) + x$wait(mode = "all") + expect_silent(out <- x$collect(error = "silent")) + expect_true("failure 1" %in% out$error) +}) + +crew_test("controller group collect() error as warning", { + skip_on_cran() + skip_on_os("windows") + on.exit({ + x$terminate() + rm(x) + gc() + crew_test_sleep() + }) + a <- crew_controller_local(workers = 1L, seconds_idle = 30L) + x <- crew_controller_group(a) + x$start() + x$push("success") + x$push(stop("failure 1")) + x$push(stop("failure 2")) + x$wait(mode = "all") + suppressWarnings( + expect_warning(x$collect(error = "warn"), class = "crew_warning") + ) +}) + +crew_test("controller group collect() stop on error", { + skip_on_cran() + skip_on_os("windows") + on.exit({ + x$terminate() + rm(x) + gc() + crew_test_sleep() + }) + a <- crew_controller_local(workers = 1L, seconds_idle = 30L) + x <- crew_controller_group(a) + x$start() + x$push("success") + x$push(stop("failure 1")) + x$push(stop("failure 2")) + x$wait(mode = "all") + expect_crew_error(x$collect(error = "stop")) +}) + crew_test("controller group map() works", { skip_on_cran() skip_on_os("windows")