diff --git a/.Rbuildignore b/.Rbuildignore index 91fb891..c1b41a0 100644 --- a/.Rbuildignore +++ b/.Rbuildignore @@ -4,3 +4,4 @@ ^docs$ ^pkgdown$ ^\.github$ +^codecov\.yml$ diff --git a/.github/workflows/test-coverage.yaml b/.github/workflows/test-coverage.yaml new file mode 100644 index 0000000..9882260 --- /dev/null +++ b/.github/workflows/test-coverage.yaml @@ -0,0 +1,61 @@ +# Workflow derived from https://github.com/r-lib/actions/tree/v2/examples +# Need help debugging build failures? Start at https://github.com/r-lib/actions#where-to-find-help +on: + push: + branches: [main, master] + pull_request: + branches: [main, master] + +name: test-coverage.yaml + +permissions: read-all + +jobs: + test-coverage: + runs-on: ubuntu-latest + env: + GITHUB_PAT: ${{ secrets.GITHUB_TOKEN }} + + steps: + - uses: actions/checkout@v4 + + - uses: r-lib/actions/setup-r@v2 + with: + use-public-rspm: true + + - uses: r-lib/actions/setup-r-dependencies@v2 + with: + extra-packages: any::covr, any::xml2 + needs: coverage + + - name: Test coverage + run: | + cov <- covr::package_coverage( + quiet = FALSE, + clean = FALSE, + install_path = file.path(normalizePath(Sys.getenv("RUNNER_TEMP"), winslash = "/"), "package") + ) + covr::to_cobertura(cov) + shell: Rscript {0} + + - uses: codecov/codecov-action@v4 + with: + fail_ci_if_error: ${{ github.event_name != 'pull_request' && true || false }} + file: ./cobertura.xml + plugin: noop + disable_search: true + token: ${{ secrets.CODECOV_TOKEN }} + + - name: Show testthat output + if: always() + run: | + ## -------------------------------------------------------------------- + find '${{ runner.temp }}/package' -name 'testthat.Rout*' -exec cat '{}' \; || true + shell: bash + + - name: Upload test results + if: failure() + uses: actions/upload-artifact@v4 + with: + name: coverage-test-failures + path: ${{ runner.temp }}/package diff --git a/DESCRIPTION b/DESCRIPTION index 81e6484..535f29f 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -38,4 +38,4 @@ Imports: R6, rlang Suggests: - testthat + testthat (>= 3.0.0) diff --git a/NAMESPACE b/NAMESPACE index 74008b6..a2e310b 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -3,7 +3,7 @@ importFrom("callr", "r_session", "r_session_options") importFrom("cli", "cli_abort", "cli_bullets", "cli_div", "cli_end", "cli_fmt", "cli_text", "cli_warn", "cli_rule", "col_grey", "col_green", "col_red", "col_yellow" ) -importFrom("globals", "globalsOf") +importFrom("globals", "cleanup", "globalsOf") importFrom("later", "later", "run_now") importFrom("magrittr", "%>%", "%<>%") importFrom("parallelly", "availableCores") @@ -12,7 +12,7 @@ importFrom("promises", "promise_race", "promise_reduce", "promise_resolve", "then" ) importFrom("R6", "R6Class") importFrom("rlang", - "%||%", "as_environment", "as_function", "env_get", "env_has", "env_name", + "%||%", "as_environment", "as_function", "dots_list", "env_get", "env_has", "env_name", "is_character", "is_formula", "is_false", "is_function", "is_list", "is_named", "is_named2", "is_null", "is_na", "is_true", "is_scalar_character", "is_scalar_integerish", "is_scalar_logical", "is_string", "new_environment" ) diff --git a/R/future.r b/R/future.r index 75fc2a5..94298b1 100644 --- a/R/future.r +++ b/R/future.r @@ -82,7 +82,7 @@ #' } #' #' class(jobqueue) <- c('JobQueuePlan', class(jobqueue)) -#' attr(jobqueue, "tweakable") <- c('tmax', 'hooks', 'stop_id', 'copy_id', 'cpus', 'queue') +#' attr(jobqueue, "tweakable") <- c('timeout', 'hooks', 'stop_id', 'copy_id', 'cpus', 'queue') #' #' #' future_by_uid <- fastmap() @@ -180,7 +180,7 @@ #' future$job <- queue$run( #' expr = getExpression(future), #' vars = globals, -#' tmax = future[['tmax']], +#' timeout = future[['timeout']], #' hooks = future[['hooks']], #' reformat = future[['reformat']], #' cpus = future[['cpus']], diff --git a/R/job.r b/R/job.r index e7e57fb..cbd2b63 100644 --- a/R/job.r +++ b/R/job.r @@ -1,5 +1,7 @@ -#' Define a Job to Run on a Background Worker Process. +#' Define a Job in Isolation from Queues/Workers. +#' +#' #' #' @name Job #' @@ -10,10 +12,19 @@ #' @param vars A list of named variables to make available to `expr` during #' evaluation. #' -#' @param tmax A named numeric vector indicating the maximum number of +#' @param scan Should additional variables be added to `vars` based on +#' scanning `expr` for missing global variables? The default, +#' `scan = TRUE` always scans and adds to `vars`, set `scan = FALSE` to +#' never scan, and `scan = ` to look for +#' globals there. `vars` defined by the user are always left untouched. +#' +#' @param ignore A character vector of variable names that should NOT be added +#' to `vars` when `scan=TRUE`. +#' +#' @param timeout A named numeric vector indicating the maximum number of #' seconds allowed for each state the job passes through, or 'total' to #' apply a single timeout from 'submitted' to 'done'. Example: -#' `tmax = c(total = 2.5, running = 1)` will force-stop a job 2.5 +#' `timeout = c(total = 2.5, running = 1)` will force-stop a job 2.5 #' seconds after it is submitted, and also limits its time in the #' running state to just 1 second. #' @@ -41,8 +52,8 @@ #' \item{`'*'` - }{ Every time the state changes. } #' \item{`'.next'` - }{ Only one time, the next time the state changes. } #' \item{`'created'` - }{ After `Job$new()` initialization. } -#' \item{`'submitted'` - }{ Before `stop_id` and `copy_id` are resolved. } -#' \item{`'queued'` - }{ After `$queue` is assigned. } +#' \item{`'submitted'` - }{ After `$queue` is assigned. } +#' \item{`'queued'` - }{ After `stop_id` and `copy_id` are resolved. } #' \item{`'dispatched'` - }{ After `$worker` is assigned. } #' \item{`'starting'` - }{ Before evaluation begins. } #' \item{`'running'` - }{ After evaluation begins. } @@ -57,6 +68,8 @@ #' #' @param reason A message or other value to include in the 'interrupt' #' condition object that will be returned as the Job's result. +#' +#' @param ... Arbitrary named values to add to the returned Job object. #' #' @export #' @@ -71,8 +84,23 @@ Job <- R6Class( #' @description #' Creates a Job object defining how to run an expression on a background worker process. #' @return A Job object. - initialize = function (expr, vars = NULL, tmax = NULL, hooks = NULL, reformat = TRUE, cpus = 1L) - j_initialize(self, private, expr, vars, tmax, hooks, reformat, cpus), + initialize = function ( + expr, + vars = NULL, + scan = TRUE, + ignore = NULL, + envir = parent.frame(), + timeout = NULL, + hooks = NULL, + reformat = TRUE, + cpus = 1L, + ... ) { + + j_initialize( + self, private, + expr, vars, scan, ignore, envir, + timeout, hooks, reformat, cpus, ... ) + }, #' @description #' Print method for a Job. @@ -83,7 +111,12 @@ Job <- R6Class( #' @description #' Attach a callback function. #' @return A function that when called removes this callback from the Job. - on = function (state, func) j_on(self, private, state, func), + on = function (state, func) u_on(self, private, 'JH', state, func), + + #' @description + #' Blocks until the Job enters the given state. + #' @return This Job, invisibly. + wait = function (state = 'done') u_wait(self, private, state), #' @description #' Stop this Job. If the Job is running, the worker process will be rebooted. @@ -95,8 +128,11 @@ Job <- R6Class( .expr = NULL, .vars = NULL, + .scan = NULL, + .ignore = NULL, + .envir = NULL, .cpus = NULL, - .tmax = list(), + .timeout = list(), .hooks = list(), .reformat = list(), @@ -118,6 +154,18 @@ Job <- R6Class( #' environment before evaluation. vars = function (value) j_vars(private, value), + #' @field scan + #' Get or set whether to scan for missing `vars`. + scan = function (value) j_scan(private, value), + + #' @field ignore + #' Get or set a character vector of variable names to NOT add to `vars`. + ignore = function (value) j_ignore(private, value), + + #' @field envir + #' Get or set the environment where missing `vars` can be found. + envir = function (value) j_envir(private, value), + #' @field reformat #' Get or set the `function (job, output)` for transforming raw `callr` #' output to the Job's result. @@ -127,9 +175,9 @@ Job <- R6Class( #' Get or set the number of CPUs to reserve for evaluating `expr`. cpus = function (value) j_cpus(private, value), - #' @field tmax + #' @field timeout #' Get or set the time limits to apply to this Job. - tmax = function (value) j_tmax(self, private, value), + timeout = function (value) j_timeout(self, private, value), #' @field proxy #' Get or set the Job to proxy in place of running `expr`. @@ -140,16 +188,16 @@ Job <- R6Class( state = function (value) j_state(self, private, value), #' @field output - #' Get or set the Job's raw `callr` output (setting will change the Job's - #' state to 'done'). + #' Get or set the Job's raw `callr` output (assigning to `$output` will + #' change the Job's state to 'done'). output = function (value) j_output(self, private, value), #' @field result - #' Get the result of `expr`. Will block until Job is finished. + #' Result of `expr`. Will block until Job is finished. result = function () j_result(self, private), #' @field hooks - #' Get all currently registered callback hooks - a named list of functions. + #' Currently registered callback hooks as a named list of functions. hooks = function () private$.hooks, #' @field is_done @@ -157,20 +205,30 @@ Job <- R6Class( is_done = function () private$.is_done, #' @field uid - #' Returns a short string, e.g. 'J16', that uniquely identifies this Job. + #' A short string, e.g. 'J16', that uniquely identifies this Job. uid = function () private$.uid ) ) # Sanitize and track values for later use. -j_initialize <- function (self, private, expr, vars, tmax, hooks, reformat, cpus) { +j_initialize <- function (self, private, expr, vars, scan, ignore, envir, timeout, hooks, reformat, cpus, ...) { expr_subst <- substitute(expr, env = parent.frame()) private$.expr <- validate_expression(expr, expr_subst, null_ok = FALSE) + dots <- dots_list(..., .named = TRUE) + for (i in names(dots)) + self[[i]] <- dots[[i]] + + if (is_null(envir)) + envir <- parent.frame(n = 2) + self$vars <- vars - self$tmax <- tmax + self$scan <- scan + self$ignore <- ignore + self$envir <- envir + self$timeout <- timeout self$reformat <- reformat self$cpus <- cpus @@ -185,24 +243,7 @@ j_initialize <- function (self, private, expr, vars, tmax, hooks, reformat, cpus j_print <- function (self) { cli_text('{self$uid} {.cls {class(self)}} [{self$state}]') - return (invisible(off)) -} - - -j_on <- function (self, private, state, func) { - - state <- validate_string(state) - func <- validate_function(func, null_ok = FALSE) - - uid <- attr(func, '.uid') <- increment_uid('JH') - private$.hooks %<>% c(setNames(list(func), state)) - - off <- function () private$.hooks %<>% attr_ne('.uid', uid) - - if (state == self$state) func(self) - if (state == '*') func(self) - - return (invisible(off)) + return (invisible(self)) } @@ -217,7 +258,12 @@ j_stop <- function (self, private, reason) { j_output <- function (self, private, value) { if (missing(value)) { - while (!private$.is_done) run_now(timeoutSecs = 0.2) + + if (self$state == 'created') + if (inherits(self$queue, 'Queue')) + self$queue$submit(self) + + self$wait() # blocking return (private$.output) } @@ -244,9 +290,15 @@ j_result <- function (self, private) { if (is_false(reformat)) return (output) if (is_function(reformat)) return (reformat(output)) - if (!is_null(output[['error']])) { result <- output[['error']] } - else if (hasName(output, 'result')) { result <- output[['result']] } - else { result <- output } + if (hasName(output, 'error') && !is_null(output[['error']])) { + result <- output[['error']] + } + else if (hasName(output, 'result')) { + result <- output[['result']] + } + else { + result <- output + } return (result) } @@ -281,44 +333,41 @@ j_state <- function (self, private, value) { if (missing(value)) return (private$.state) - state <- validate_string(value) + new_state <- validate_string(value) + curr_state <- private$.state - # cli_text('Job {self$uid} state change: {private$.state} -> {state}') - - if (state == private$.state) return (NULL) - if (private$.is_done == 'done') - cli_abort("Job$state can't be changed from 'done' to '{state}'.") - if (state == 'done' && !private$.is_done) + if (new_state == curr_state) return (NULL) + if (curr_state == 'done') + cli_abort("Job$state can't be changed from 'done' to '{new_state}'.") + if (new_state == 'done' && !private$.is_done) cli_abort("Job$state can't be set to 'done' until Job$output is set.") - # Start the 'total' timeout when we leave the 'created' state. - if (private$.state == 'created') - if (!is_null(tmax <- private$.tmax[['total']])) { - msg <- 'total runtime exceeded {tmax} second{?s}' + private$.state <- new_state + + # Start the 'total' timeout when we enter the 'submitted' state. + if (new_state == 'submitted') + if (!is_null(timeout <- private$.timeout[['total']])) { + msg <- 'total runtime exceeded {timeout} second{?s}' msg <- cli_fmt(cli_text(msg)) - self$on('done', later(~self$stop(msg), delay = tmax)) + self$on('done', later(~self$stop(msg), delay = timeout)) } - private$.state <- state - hooks <- private$.hooks private$.hooks <- hooks[names(hooks) != '.next'] - hooks <- hooks[names(hooks) %in% c('*', '.next', state)] + hooks <- hooks[names(hooks) %in% c('*', '.next', new_state)] for (i in seq_along(hooks)) { func <- hooks[[i]] - # uid <- attr(func, '.uid', exact = TRUE) - # cli_text('Executing Job {self$uid} {.val {state}} callback hook {uid}.') if (!is_null(formals(func))) { func(self) } else if (is.primitive(func)) { func(self) } else { func() } } - # Start the timeout for this state, if present. - if (!is_null(tmax <- private$.tmax[[state]])) { - msg <- 'exceeded {.val {tmax}} second{?s} while in {.val {state}} state' + # Start the timeout for this new state, if present. + if (!is_null(timeout <- private$.timeout[[new_state]])) { + msg <- 'exceeded {.val {timeout}} second{?s} while in {.val {new_state}} state' msg <- cli_fmt(cli_text(msg)) - clear_timeout <- later(~{ self$stop(msg) }, delay = tmax) + clear_timeout <- later(~{ self$stop(msg) }, delay = timeout) self$on('.next', function (job) { clear_timeout() }) } } @@ -331,9 +380,24 @@ j_vars <- function (private, value) { private$.vars <- validate_list(value) } -j_tmax <- function (self, private, value) { - if (missing(value)) return (private$.tmax) - private$.tmax <- validate_tmax(value) +j_scan <- function (private, value) { + if (missing(value)) return (private$.scan) + private$.scan <- validate_logical(value) +} + +j_ignore <- function (private, value) { + if (missing(value)) return (private$.ignore) + private$.ignore <- validate_character_vector(value) +} + +j_envir <- function (private, value) { + if (missing(value)) return (private$.envir) + private$.envir <- validate_environment(value) +} + +j_timeout <- function (self, private, value) { + if (missing(value)) return (private$.timeout) + private$.timeout <- validate_timeout(value) } j_reformat <- function (private, value) { diff --git a/R/queue.r b/R/queue.r index cc371b4..c8e00f9 100644 --- a/R/queue.r +++ b/R/queue.r @@ -1,5 +1,5 @@ -#' Interruptible, Asynchronous Background Jobs. +#' Initialize and Run Jobs on a Set of Workers. #' #' @name Queue #' @@ -15,7 +15,7 @@ #' object which can be passed to `then()` to schedule work to be done on the #' result, when it is ready. #' -#' The [Job] object also has a `$stop()` element which can be called to return +#' The [Job] object also has a `$stop()` method which can be called to return #' a result immediately. If the job was actively running in a background R #' session, that process is killed and a new process is started to take its #' place. @@ -37,10 +37,19 @@ #' @param vars A list of named variables to make available to `expr` during #' evaluation. #' -#' @param tmax A named numeric vector indicating the maximum number of +#' @param scan Should additional variables be added to `vars` based on +#' scanning `expr` for missing global variables? The default, +#' `scan = TRUE` always scans and adds to `vars`, set `scan = FALSE` to +#' never scan, and `scan = ` to look for +#' globals there. `vars` defined by the user are always left untouched. +#' +#' @param ignore A character vector of variable names that should NOT be added +#' to `vars` by `scan`. +#' +#' @param timeout A named numeric vector indicating the maximum number of #' seconds allowed for each state the job passes through, or 'total' to #' apply a single timeout from 'submitted' to 'done'. Example: -#' `tmax = c(total = 2.5, running = 1)` will force-stop a job 2.5 +#' `timeout = c(total = 2.5, running = 1)` will force-stop a job 2.5 #' seconds after it is submitted, and also limits its time in the #' running state to just 1 second. #' @@ -75,8 +84,10 @@ #' #' @param job A [Job] object, as created by `Job$new()`. #' -#' @param start Should a Job be submitted to the Queue (`start = TRUE`) or -#' just created (`start = FALSE`)? +#' @param lazy When `lazy = FALSE` (the default), the job is submitted for +#' evaluation immediately. Otherwise it is lazily evaluated - that is, +#' only once `$result`, `$output`, or `$submit()` +#' is called. #' #' @param stop_id If an existing [Job] in the Queue has the same `stop_id`, #' that Job will be stopped and return an 'interrupt' condition object @@ -89,21 +100,24 @@ #' returning whatever result the earlier Job returns. `copy_id` can also #' be a `function (job)` that returns the `copy_id` to assign to a given #' Job. A `copy_id` of `NULL` disables this feature. -#' -#' @param scan Should additional variables be added to `vars` based on -#' scanning `expr` for missing global variables? By default, -#' `scan = is.null(vars)`, meaning if you set `vars = list()` then no -#' scan is done. Set `scan = TRUE` to always scan, `scan = FALSE` to -#' never scan, and `scan = ` to look for -#' globals there. When scanning, the worker's environment is taken into -#' account, and globals on the worker are favored over globals locally. -#' `vars` defined by the user are always left untouched. -#' -#' @param ignore A character vector of variable names that should NOT be added -#' to `vars` by `scan`. #' #' @param reason Passed to `$stop(reason)` for any Jobs currently #' managed by this Queue. +#' +#' @param state The Queue state that will trigger this function. One of: +#' \describe{ +#' \item{`'*'` - }{ Every time the state changes. } +#' \item{`'.next'` - }{ Only one time, the next time the state changes. } +#' \item{`'starting'` - }{ Workers are starting. } +#' \item{`'active'` - }{ Workers are ready. } +#' \item{`'stopped'` - }{ Shutdown is complete. } +#' \item{`'error'` - }{ Workers did not start cleanly. } +#' } +#' +#' @param func A function that accepts a Queue object as input. Return value +#' is ignored. +#' +#' @param ... Arbitrary named values to add to the returned Job object. #' #' @export #' @@ -122,9 +136,15 @@ Queue <- R6Class( #' not use more than `max_cpus` at once, assuming the `cpus` argument is #' properly set for each Job. #' - #' @param tmax,hooks,reformat,stop_id,copy_id - #' Defaults for this Queue's `$run()` method. Here, `stop_id` and - #' `copy_id` must be a `function (job)` or `NULL`. + #' @param scan,ignore,envir,timeout,hooks,reformat,cpus,stop_id,copy_id,lazy + #' Defaults for this Queue's `$run()` method. Here only, + #' + #' * `stop_id` and `copy_id` must be a `function (job)` or `NULL`. + #' * `hooks` can be a list of lists with the form: + #' `hooks = list(queue = list(), worker = list(), job = list())`. + #' * `hooks` can be prefixed with `q_` or `w_` to assign them to + #' the queue or workers, respectively, instead of to jobs, e.g. + #' `hooks = list(q_active = function (queue) {...}, * = ~{...})` #' #' @return A `Queue` object. initialize = function ( @@ -134,16 +154,22 @@ Queue <- R6Class( max_cpus = availableCores(omit = 1L), workers = ceiling(max_cpus * 1.2), options = r_session_options(), - tmax = NULL, + scan = TRUE, + ignore = NULL, + envir = NULL, + timeout = NULL, hooks = NULL, reformat = TRUE, + cpus = 1L, stop_id = NULL, - copy_id = NULL ) { + copy_id = NULL, + lazy = FALSE ) { q_initialize( self, private, - globals, packages, init, max_cpus, workers, options, - tmax, hooks, reformat, stop_id, copy_id ) + globals, packages, init, max_cpus, workers, options, + scan, ignore, envir, timeout, hooks, reformat, cpus, + stop_id, copy_id, lazy ) }, @@ -161,31 +187,24 @@ Queue <- R6Class( #' @return The new [Job] object. run = function ( expr, - vars = NULL, - scan = is.null(vars), - ignore = NULL, - tmax = NA, + vars = list(), + scan = NA, + ignore = NA, + envir = NA, + timeout = NA, hooks = NA, reformat = NA, - cpus = 1L, + cpus = NA, stop_id = NA, copy_id = NA, - start = TRUE ) { + lazy = NA, + ... ) { q_run( - self = self, - private = private, - expr = expr, - vars = vars, - scan = scan, - ignore = ignore, - tmax = tmax, - hooks = hooks, - reformat = reformat, - cpus = cpus, - stop_id = stop_id, - copy_id = copy_id, - start = start ) + self, private, + expr, vars, + scan, ignore, envir, timeout, hooks, reformat, cpus, + stop_id, copy_id, lazy, ... ) }, @@ -194,8 +213,17 @@ Queue <- R6Class( #' @return This Queue, invisibly. submit = function (job) q_submit(self, private, job), - - + + #' @description + #' Blocks until the Queue enters the given state. + #' @return This Queue, invisibly. + wait = function (state = 'active') u_wait(self, private, state), + + #' @description + #' Attach a callback function. + #' @return A function that when called removes this callback from the Queue. + on = function (state, func) u_on(self, private, 'QH', state, func), + #' @description #' Stop all jobs and workers. #' @return This Queue, invisibly. @@ -208,12 +236,13 @@ Queue <- R6Class( private = list( finalize = function (reason = 'queue was garbage collected') { - private$.state <- 'stopped' + private$set_state('stopped') fmap(private$.workers, 'stop', reason) fmap(private$.jobs, 'stop', reason) return (invisible(NULL)) }, + .hooks = list(), .uid = NULL, .jobs = list(), .workers = list(), @@ -222,21 +251,27 @@ Queue <- R6Class( n_workers = NULL, up_since = NULL, total_runs = 0L, - job_defaults = list(), + j_conf = list(), + w_conf = list(), max_cpus = NULL, - poll_startup = function () q__poll_startup(self, private), - dispatch = function (...) q__dispatch(self, private) + set_state = function (state) u__set_state(self, private, state), + poll_startup = function () q__poll_startup(self, private), + dispatch = function (...) q__dispatch(self, private) ), active = list( + #' @field hooks + #' A named list of currently registered callback hooks. + hooks = function () private$.hooks, + #' @field jobs #' List of [Job]s currently managed by this Queue. jobs = function () private$.jobs, #' @field workers - #' List of [Worker]s used to process Jobs. + #' List of [Worker]s used for processing Jobs. workers = function () private$.workers, #' @field uid @@ -256,26 +291,46 @@ Queue <- R6Class( q_initialize <- function ( self, private, - globals, packages, init, max_cpus, workers, options, - tmax, hooks, reformat, stop_id, copy_id ) { + globals, packages, init, max_cpus, workers, options, + scan, ignore, envir, timeout, hooks, reformat, cpus, + stop_id, copy_id, lazy ) { init_subst <- substitute(init, env = parent.frame()) + # Assign hooks by q_ and w_ prefixes + hooks <- validate_list(hooks, if_null = list()) + if (!all(names(hooks) %in% c('queue', 'worker', 'job'))) + hooks <- list( + job = hooks[grep('^[qw]_', names(hooks), invert = TRUE)], + queue = hooks[grep('^q_', names(hooks))], + worker = hooks[grep('^w_', names(hooks))] ) + hooks[['worker']] %<>% c(list('idle' = private$dispatch), .) + + # Queue configuration private$up_since <- Sys.time() private$.uid <- increment_uid('Q') - - private$globals <- validate_list(globals) - private$packages <- validate_character_vector(packages) - private$init <- validate_expression(init, init_subst) + private$.hooks <- validate_hooks(hooks[['queue']], 'QH') private$max_cpus <- validate_positive_integer(max_cpus, if_null = availableCores(omit = 1L)) private$n_workers <- validate_positive_integer(workers, if_null = ceiling(private$max_cpus * 1.2)) private$options <- validate_list(options, if_null = r_session_options()) - private$job_defaults[['tmax']] <- validate_tmax(tmax) - private$job_defaults[['hooks']] <- validate_hooks(hooks, 'JH') - private$job_defaults[['reformat']] <- validate_function(reformat, bool_ok = TRUE, if_null = TRUE) - private$job_defaults[['stop_id']] <- validate_function(stop_id) - private$job_defaults[['copy_id']] <- validate_function(copy_id) + # Worker configuration + private$w_conf[['globals']] <- validate_list(globals) + private$w_conf[['packages']] <- validate_character_vector(packages) + private$w_conf[['init']] <- validate_expression(init, init_subst) + private$w_conf[['hooks']] <- validate_hooks(hooks[['worker']], 'WH') + + # Job configuration defaults + private$j_conf[['scan']] <- validate_logical(scan) + private$j_conf[['ignore']] <- validate_character_vector(ignore) + private$j_conf[['envir']] <- validate_environment(envir, null_ok = TRUE) + private$j_conf[['timeout']] <- validate_timeout(timeout) + private$j_conf[['hooks']] <- validate_hooks(hooks[['job']], 'JH') + private$j_conf[['reformat']] <- validate_function(reformat, bool_ok = TRUE, if_null = TRUE) + private$j_conf[['cpus']] <- validate_positive_integer(cpus, if_null = 1L) + private$j_conf[['stop_id']] <- validate_function(stop_id) + private$j_conf[['copy_id']] <- validate_function(copy_id) + private$j_conf[['lazy']] <- validate_logical(lazy) private$poll_startup() @@ -283,15 +338,6 @@ q_initialize <- function ( } -# Stop all jobs and prevent more from being added. -q_shutdown <- function (self, private, reason) { - private$finalize(reason) - private$.workers <- list() - private$.jobs <- list() - return (invisible(self)) -} - - q_print <- function (self, private) { js <- map(private$.jobs, 'state') @@ -328,53 +374,41 @@ q_print <- function (self, private) { } -q_run <- function (self, private, expr, vars, scan, ignore, tmax, hooks, reformat, cpus, stop_id, copy_id, start) { +q_run <- function ( + self, private, + expr, vars, scan, ignore, envir, + timeout, hooks, reformat, cpus, stop_id, copy_id, lazy, ...) { expr_subst <- substitute(expr, env = parent.frame()) expr <- validate_expression(expr, expr_subst, null_ok = FALSE) - # Find globals in expr and add them to vars. - if (!is_false(scan)) { - - if (is_true(scan)) { envir <- parent.frame(n = 2) } - else { envir <- validate_environment(scan) } - - vars <- validate_list(vars, if_null = list()) - ignore <- validate_character_vector(ignore) + # Replace NA values with defaults from Queue$new() call. + j_conf <- private$j_conf - add <- globalsOf(expr, envir = envir, method = "liberal", mustExist = FALSE) - pkg <- sapply(attr(add, 'where'), env_name) - nms <- setdiff(names(add), c(names(vars), ignore, private$.loaded$globals)) - - for (nm in nms) - if (!identical(pkg[[nm]], private$.loaded$attached[[nm]])) - vars[[nm]] <- add[[nm]] - } + if (is_null(j_conf[['envir']])) + j_conf[['envir']] <- parent.frame(n = 2) - # Replace NA values with defaults from Queue$new() call. - if (is_na(tmax)) tmax <- private$job_defaults[['tmax']] - if (is_na(hooks)) hooks <- private$job_defaults[['hooks']] - if (is_na(reformat)) reformat <- private$job_defaults[['reformat']] + if (is_formula(stop_id)) stop_id <- as_function(stop_id) + if (is_formula(copy_id)) copy_id <- as_function(copy_id) job <- Job$new( expr = expr, - vars = vars, - tmax = tmax, - hooks = hooks, - reformat = reformat, - cpus = cpus ) - - # Compute stop_id and copy_id, but don't act on them yet. - for (id in c('stop_id', 'copy_id')) { - x <- get(id, inherits = FALSE) - if (is_na(x)) { x <- private$job_defaults[[id]] } - else if (is_formula(x)) { x <- as_function(x) } - job[[id]] <- if (is_function(x)) x(job) else x - } + vars = vars, + queue = self, + scan = if (is_na(scan)) j_conf[['scan']] else scan, + ignore = if (is_na(ignore)) j_conf[['ignore']] else ignore, + envir = if (is_na(envir)) j_conf[['envir']] else envir, + timeout = if (is_na(timeout)) j_conf[['timeout']] else timeout, + hooks = if (is_na(hooks)) j_conf[['hooks']] else hooks, + reformat = if (is_na(reformat)) j_conf[['reformat']] else reformat, + cpus = if (is_na(cpus)) j_conf[['cpus']] else cpus, + stop_id = if (is_na(stop_id)) j_conf[['stop_id']] else stop_id, + copy_id = if (is_na(copy_id)) j_conf[['copy_id']] else copy_id, + ... ) # Option to not start the job just yet. - if (is_true(start)) - self$submit(job) + if (is_na(lazy)) lazy <- j_conf[['lazy']] + if (is_false(lazy)) self$submit(job) return (invisible(job)) } @@ -385,21 +419,49 @@ q_submit <- function (self, private, job) { if (!inherits(job, 'Job')) cli_abort('`job` must be a Job object, not {.type {job}}.') - if (!(private$.state %in% c('starting', 'active'))) - cli_abort('Queue cannot accept new jobs. State is "{col_red(private$.state)}".') - job$queue <- self - job$state <- 'submitted' private$total_runs <- private$total_runs + 1L + job$state <- 'submitted' + if (job$state != 'submitted') + return (invisible(job)) + + # Find globals in expr and add them to vars. + if (is_true(job$scan)) { + + expr <- job$expr + vars <- job$vars %||% list() + envir <- job$envir + ignore <- job$ignore + + if (private$.state != 'active') self$wait() + globals <- private$.loaded$globals + attached <- private$.loaded$attached + + add <- globalsOf(expr, envir, method = "liberal", mustExist = FALSE) + add <- cleanup(add) + pkg <- sapply(attr(add, 'where'), env_name) + nms <- setdiff(names(add), c(names(vars), ignore, globals)) + + for (nm in nms) + if (!identical(pkg[[nm]], attached[[nm]])) + vars[[nm]] <- add[[nm]] + + job$vars <- vars + } + + # Compute stop_id and copy_id. + if (is_function(job$stop_id)) job$stop_id <- job$stop_id(job) + if (is_function(job$copy_id)) job$copy_id <- job$copy_id(job) + # Check for `stop_id` hash collision => stop the other job. - if (!is_null(job$stop_id)) - if (nz(stop_jobs <- get_eq(private$.jobs, 'stop_id', job$stop_id))) + if (!is_null(id <- job$stop_id)) + if (nz(stop_jobs <- get_eq(private$.jobs, 'stop_id', id))) fmap(stop_jobs, 'stop', 'duplicated stop_id') # Check for `copy_id` hash collision => proxy the other job. - if (!is_null(job$copy_id)) - if (nz(copy_jobs <- get_eq(private$.jobs, 'copy_id', job$copy_id))) + if (!is_null(id <- job$copy_id)) + if (nz(copy_jobs <- get_eq(private$.jobs, 'copy_id', id))) job$proxy <- copy_jobs[[1]] if (job$state == 'submitted') { @@ -412,6 +474,14 @@ q_submit <- function (self, private, job) { } +# Stop all jobs and prevent more from being added. +q_shutdown <- function (self, private, reason) { + private$finalize(reason) + private$.workers <- list() + private$.jobs <- list() + return (invisible(self)) +} + # Use any idle workers to run queued jobs. q__dispatch <- function (self, private) { @@ -424,17 +494,16 @@ q__dispatch <- function (self, private) { return (invisible(NULL)) # See how many remaining CPUs are available. - running <- get_eq(private$.workers, 'state', 'running') - free_cpus <- private$max_cpus - sum(map(running, 'cpus')) + running_jobs <- get_eq(private$.jobs, 'state', 'running') + free_cpus <- private$max_cpus - sum(map(running_jobs, 'cpus')) if (free_cpus < 1) return (invisible(NULL)) # Connect queued jobs to idle workers. - jobs <- get_eq(private$.jobs, 'state', 'queued') - workers <- get_eq(private$.workers, 'state', 'idle') - for (i in seq_len(min(length(workers), length(jobs)))) { - free_cpus <- free_cpus - jobs[[i]]$cpus - if (free_cpus < 0) break - workers[[i]]$run(jobs[[i]]) + workers <- get_eq(private$.workers, 'state', 'idle') + queued_jobs <- get_eq(private$.jobs, 'state', 'queued') + queued_jobs <- queued_jobs[cumsum(map(queued_jobs, 'cpus')) <= free_cpus] + for (i in seq_len(min(length(workers), length(queued_jobs)))) { + workers[[i]]$run(queued_jobs[[i]]) } return (invisible(NULL)) @@ -449,16 +518,14 @@ q__poll_startup <- function (self, private) { if (any(states == 'stopped')) { self$shutdown('worker process did not start cleanly') - private$.state <- 'error' + private$set_state('error') } else if (sum(states == 'idle') == private$n_workers) { - loaded <- private$.workers[[1]]$loaded - private$.loaded[['globals']] <- loaded[['globals']] - private$.loaded[['packages']] <- loaded[['packages']] + private$.loaded <- private$.workers[[1]]$loaded - private$.state <- 'active' + private$set_state('active') private$dispatch() } @@ -471,11 +538,11 @@ q__poll_startup <- function (self, private) { for (i in integer(n)) { worker <- Worker$new( - globals = private$globals, - packages = private$packages, - init = private$init, - options = private$options, - hooks = list('idle' = private$dispatch) ) + globals = private$w_conf[['globals']], + packages = private$w_conf[['packages']], + init = private$w_conf[['init']], + options = private$w_conf[['options']], + hooks = private$w_conf[['hooks']] ) private$.workers %<>% c(worker) } diff --git a/R/utils.r b/R/utils.r index 2b325f5..10946bc 100644 --- a/R/utils.r +++ b/R/utils.r @@ -2,39 +2,77 @@ .jobqueue_env <- new_environment() -new_promise <- function () { - promise(function (resolve, reject) { NULL }) + +u_wait <- function (self, private, state) { + + state <- validate_string(state) + curr <- private$.state + + if (state != curr) { + prev <- curr + while (TRUE) { + curr <- private$.state + if (curr != prev) + if (state %in% c('*', '.next', curr)) + break + run_now(timeoutSecs = 0.2) + } + } + + return (invisible(self)) +} + + +u_on <- function (self, private, prefix, state, func) { + + state <- validate_string(state) + func <- validate_function(func, null_ok = FALSE) + + uid <- attr(func, '.uid') <- increment_uid(prefix) + private$.hooks %<>% c(setNames(list(func), state)) + + off <- function () private$.hooks %<>% attr_ne('.uid', uid) + + if (state == self$state) func(self) + if (state == '*') func(self) + + return (invisible(off)) } -noop <- function (...) return (invisible(NULL)) -p_resolve <- function (promise, value) attr(promise, 'promise_impl')$resolve(value) -p_reject <- function (promise, reason) attr(promise, 'promise_impl')$reject(reason) -p_resolver <- function (promise) attr(promise, 'promise_impl')$resolve -p_rejecter <- function (promise) attr(promise, 'promise_impl')$reject -p_state <- function (promise) attr(promise, 'promise_impl')$.__enclos_env__$private$state -p_result <- function (promise) attr(promise, 'promise_impl')$.__enclos_env__$private$value -p_visible <- function (promise) attr(promise, 'promise_impl')$.__enclos_env__$private$visible -p_pending <- function (promise) (p_state(promise) == 'pending') -p_done <- function (promise) (p_state(promise) != 'pending') -p_stop <- function (promise, reason) p_resolve(promise, interrupted(reason)) +u__set_state <- function (self, private, state) { + + if (private$.state != state) { + + hooks <- private$.hooks + private$.hooks <- hooks[names(hooks) != '.next'] + hooks <- hooks[names(hooks) %in% c('*', '.next', state)] + + private$.state <- state + for (i in seq_along(hooks)) { + func <- hooks[[i]] + if (!is_null(formals(func))) { func(self) } + else if (is.primitive(func)) { func(self) } + else { func() } + } + + } + return (invisible(NULL)) +} interrupted <- function (reason = 'stopped') errorCondition(message = reason, class = 'interrupt') -increment_uid <- function (prefix, object = NULL) { - if (!is_null(attr(object, '.uid', exact = TRUE))) - return (attr(object, '.uid', exact = TRUE)) +increment_uid <- function (prefix) { nm <- paste0('uid_', prefix) value <- env_get(.jobqueue_env, nm, 1L) assign(nm, value + 1L, .jobqueue_env) return (paste0(prefix, value)) } -coan <- function (x, i = NULL) { - if (!is_null(i)) x <- names(x)[[i]] +coan <- function (x) { capture.output(as.name(x)) } @@ -82,8 +120,8 @@ idx_must_be <- function (expected) { if (!env_has(parent.frame(), ij)) break idx <- env_get(parent.frame(), ij, NULL) key <- names(value[[idx]]) %||% '' - if (nzchar(key)) { varname <- paste(varname, '$', coan(key)) } - else { varname <- paste(varname, '[[', idx, ']]') } + if (nzchar(key)) { varname <- paste0(varname, '$', coan(key)) } + else { varname <- paste0(varname, '[[', idx, ']]') } value <- value[[idx]] } diff --git a/R/validate.r b/R/validate.r index 4bcec17..7023900 100644 --- a/R/validate.r +++ b/R/validate.r @@ -3,10 +3,10 @@ validate_function <- function (value, bool_ok = FALSE, if_null = NULL, null_ok = if (is_function(value)) return (value) if (is_null(value) && is_true(null_ok)) return (if_null) if (is_scalar_logical(value) && is_true(bool_ok)) return (value) - varname <- substitute(value) - errmsg <- cant_cast('a function') - tryCatch(as_function(value), error = function (e) { - cli_abort(c(errmsg, 'x' = as.character(e) ))}) + varname <- substitute(value) + errmsg <- cant_cast('a function') + on_error <- function (e) { cli_abort(c(errmsg, 'x' = as.character(e) )) } + tryCatch(as_function(value), error = on_error, warning = on_error) } validate_expression <- function (value, subst, null_ok = TRUE) { @@ -41,7 +41,7 @@ validate_list <- function (value, null_ok = TRUE, if_null = NULL, of_type = NULL if (identical(of_type, 'numeric')) of_type %<>% c('integer') for (i in seq_along(value)) if (!inherits(value[[i]], of_type)) - idx_must_be(cli_fmt(cli_text('{.or {.val {of_type}}}'))) + cli_abort(idx_must_be(cli_fmt(cli_text('{.or {.val {of_type}}}')))) } if (!is_list(value)) value <- as.list(value) @@ -50,7 +50,12 @@ validate_list <- function (value, null_ok = TRUE, if_null = NULL, of_type = NULL } validate_hooks <- function (hooks, prefix = 'H') { - hooks <- validate_list(hooks) + + hooks <- validate_list(hooks, if_null = list()) + if (length(hooks) == 0) return (hooks) + + names(hooks) <- sub('^[qwj]_', '', names(hooks)) + for (i in seq_along(hooks)) { func <- validate_function(hooks[[i]], null_ok = FALSE) if (is_null(attr(func, '.uid', exact = TRUE))) @@ -61,59 +66,50 @@ validate_hooks <- function (hooks, prefix = 'H') { } -validate_tmax <- function (tmax) { +validate_timeout <- function (timeout) { - if (length(tmax) == 1 && !is_named(tmax)) names(tmax) <- 'total' - tmax <- validate_list(tmax) + timeout <- validate_list(timeout, if_null = list(), default = 'total') - if (length(dups <- unique(names(tmax)[duplicated(names(tmax))]))) - cli_abort('`tmax` cannot have duplicate names: {.val {dups}}') + if (length(dups <- unique(names(timeout)[duplicated(names(timeout))]))) + cli_abort('`timeout` cannot have duplicate names: {.val {dups}}') expected <- 'a single positive number or NULL' - for (i in seq_along(tmax)) { + for (i in seq_along(timeout)) { - key <- paste0('`tmax$', coan(names(tmax)[[i]]), '`') - val <- tryCatch( - expr = as.numeric(tmax[[i]]), - error = function (e) cli_abort(c( - cant_cast(expected, value = tmax[[i]], varname = key), - 'x' = as.character(e) ))) + key <- paste0('timeout$', coan(names(timeout)[[i]]), '') + on_error <- function (e) cli_abort(c( + cant_cast(expected, value = timeout[[i]], varname = key), + 'x' = as.character(e) )) - if (length(val) != 1) cli_abort('{key} must be {expected}, not {.type {tmax[[i]]}}') - if (val <= 0) cli_abort('{key} must be {expected}, not {.val {tmax[[i]]}}') + val <- tryCatch(as.numeric(timeout[[i]]), error = on_error, warning = on_error) - tmax[[i]] <- val + if (length(val) != 1) cli_abort('{`key`} must be {expected}, not {.type {timeout[[i]]}}') + if (val <= 0) cli_abort('{`key`} must be {expected}, not {.val {timeout[[i]]}}') + + timeout[[i]] <- val } - return (tmax) + return (timeout) } -validate_environment <- function (value) { +validate_environment <- function (value, null_ok = TRUE, if_null = NULL) { varname <- substitute(value) - errmsg <- cant_cast('an environment') - tryCatch(as_environment(value), error = function (e) { - cli_abort(c(errmsg, 'x' = as.character(e) ))}) -} - -validate_positive_numeric <- function (value, if_null = NULL, null_ok = TRUE) { + if (is_null(value) && is_true(null_ok)) return (if_null) - varname <- substitute(value) - errmsg <- must_be('a single positive number') - tryCatch( - expr = { - value <- as.numeric(value) - stopifnot(length(value) == 1) - stopifnot(is_true(value > 0)) - value - }, - error = function (e) { cli_abort(errmsg) }) + + errmsg <- cant_cast('an environment') + on_error <- function (e) { cli_abort(c(errmsg, 'x' = as.character(e) )) } + + tryCatch(as_environment(value), error = on_error, warning = on_error) } validate_positive_integer <- function (value, if_null = NULL, null_ok = TRUE) { if (is_null(value) && is_true(null_ok)) return (if_null) - varname <- substitute(value) - errmsg <- must_be('a single positive integer') + varname <- substitute(value) + errmsg <- must_be('a single positive integer') + on_error <- function (e) cli_abort(c(errmsg, 'x' = as.character(e) )) + tryCatch( expr = { value <- as.integer(value) @@ -121,7 +117,7 @@ validate_positive_integer <- function (value, if_null = NULL, null_ok = TRUE) { stopifnot(is_true(value > 0)) value }, - error = function (e) { cli_abort(errmsg) }) + error = on_error, warning = on_error) } validate_logical <- function (value) { @@ -137,32 +133,10 @@ validate_character_vector <- function (value, if_null = NULL) { cli_abort(must_be('a character vector')) } -validate_file <- function (value, if_null = NULL, mustWork = TRUE) { - if (is_null(value)) return (if_null) - value <- normalizePath(value, winslash = '/', mustWork = mustWork) - return (value) -} - - -validate_string <- function (value, null_ok = FALSE, zlen_ok = FALSE, na_ok = FALSE) { +validate_string <- function (value) { varname <- substitute(value) - if (is_null(value) && is_true(null_ok)) return (value) - if (is_na(value) && is_true(na_ok)) return (value) - if (!is_scalar_character(value)) cli_abort(must_be('a string')) - if (!nzchar(value) && is_false(zlen_ok)) cli_abort(cannot('be ""')) + if (!is_scalar_character(value) || is_na(value) || !nzchar(value)) + cli_abort(must_be('a string')) return (value) } -validate_string_options <- function (value, options) { - - if (is_string(value, options)) return (value) - - varname <- substitute(value) - if (is_scalar_character(value)) - cli_abort(c('!' = "`{varname}` must be one of {.val {options}}, not {.val {value}}.")) - cli_abort(c('!' = "`{varname}` must be one of {.val {options}}, not {.type {value}}.")) -} - - - - diff --git a/R/worker.r b/R/worker.r index 094a80b..488702b 100644 --- a/R/worker.r +++ b/R/worker.r @@ -1,5 +1,5 @@ -#' Evaluates a Job's Expression on a Background Process. +#' A Single Background Process for Running Jobs. #' #' @name Worker #' @@ -96,7 +96,12 @@ Worker <- R6Class( #' @description #' Attach a callback function. #' @return A function that when called removes this callback from the Worker. - on = function (state, func) w_on(self, private, state, func), + on = function (state, func) u_on(self, private, 'WH', state, func), + + #' @description + #' Blocks until the Worker enters the given state. + #' @return This Worker, invisibly. + wait = function (state = 'idle') u_wait(self, private, state), #' @description #' Assigns a Job to this Worker for evaluation on its background @@ -119,7 +124,7 @@ Worker <- R6Class( .job = NULL, config_file = NULL, - set_state = function (state) w__set_state(self, private, state), + set_state = function (state) u__set_state(self, private, state), next_job = function () w__next_job(self, private), poll_job = function () w__poll_job(self, private), poll_startup = function () w__poll_startup(self, private), @@ -195,20 +200,6 @@ w_print <- function (self) { } -w_on <- function (self, private, state, func) { - - state <- validate_string(state) - func <- validate_function(func, null_ok = FALSE) - - uid <- attr(func, '.uid') <- increment_uid('WH') - private$.hooks %<>% c(setNames(list(func), state)) - - off <- function () private$.hooks %<>% attr_ne('.uid', uid) - - return (invisible(off)) -} - - # Create a new 'callr::r_session' background process. w_start <- function (self, private) { @@ -275,28 +266,6 @@ w_run <- function (self, private, job) { return (invisible(self)) } -w__set_state <- function (self, private, state) { - - if (private$.state != state) { - - hooks <- private$.hooks - private$.hooks <- hooks[names(hooks) != '.next'] - hooks <- hooks[names(hooks) %in% c('*', '.next', state)] - - private$.state <- state - for (i in seq_along(hooks)) { - func <- hooks[[i]] - # uid <- attr(func, '.uid', exact = TRUE) - # cli_text('Executing Worker {self$uid} {.val {state}} callback hook {uid}.') - if (!is_null(formals(func))) { func(self) } - else if (is.primitive(func)) { func(self) } - else { func() } - } - - } - return (invisible(NULL)) -} - w__next_job <- function (self, private) { @@ -330,7 +299,7 @@ w__next_job <- function (self, private) { # Run the user's job on the r_session external process. # cli_text('Starting job {private$.job$uid} on {self$uid}.') private$.r_session$call( - args = list(private$.job$expr, private$.job$vars %||% list()), + args = list(private$.job$expr, private$.job$vars), func = function (expr, vars) { eval(expr = expr, envir = vars, enclos = .GlobalEnv) }) @@ -425,9 +394,9 @@ w__configure <- function (self, private) { func = function (config_file) { if (!is.null(config_file)) { - + config <- readRDS(config_file) - + for (i in seq_along(p <- config[['packages']])) require(package = p[[i]], character.only = TRUE) @@ -456,4 +425,3 @@ w__configure <- function (self, private) { return (NULL) } - diff --git a/README.md b/README.md index 93e0596..cb2ac66 100644 --- a/README.md +++ b/README.md @@ -6,9 +6,16 @@ [![dev](https://github.com/cmmr/jobqueue/actions/workflows/R-CMD-check.yaml/badge.svg)](https://github.com/cmmr/jobqueue/actions/workflows/R-CMD-check.yaml) [![cran](https://www.r-pkg.org/badges/version/jobqueue)](https://CRAN.R-project.org/package=jobqueue) [![conda](https://anaconda.org/conda-forge/r-jobqueue/badges/version.svg)](https://anaconda.org/conda-forge/r-jobqueue) +[![Codecov test coverage](https://codecov.io/gh/cmmr/jobqueue/graph/badge.svg)](https://app.codecov.io/gh/cmmr/jobqueue) -The goal of jobqueue is to run interruptible R commands in background processes. + +The goals of jobqueue are to: + + * Run jobs in parallel on background processes. + * Allow jobs to be stopped at any point. + * Process job results with asynchronous callbacks. + ## Installation @@ -21,6 +28,7 @@ install.packages("pak") pak::pak("cmmr/jobqueue") ``` + ## Example ``` r @@ -33,6 +41,7 @@ job$result #> [1] "Hello world!" ``` + ## Asynchronous Callbacks ``` r @@ -70,6 +79,7 @@ as.promise(job)$then(callback) #> resolved with: 42 ``` + ## Stopping Jobs When a running job is stopped, the background process for it is @@ -94,16 +104,16 @@ will be returned in the condition object. #### Max Runtime ``` r -job <- q$run({ Sys.sleep(2); 'Zzzzz' }, tmax = 0.2) +job <- q$run({ Sys.sleep(2); 'Zzzzz' }, timeout = 0.2) job$result #> ``` Limits (in seconds) can be set on: -- the total 'submitted' to 'done' time: `tmax = 2` -- on a per-state basis: `tmax = list(queued = 1, running = 2)` -- or both: `tmax = list(total = 3, queued = 2, running = 2)` +- the total 'submitted' to 'done' time: `timeout = 2` +- on a per-state basis: `timeout = list(queued = 1, running = 2)` +- or both: `timeout = list(total = 3, queued = 2, running = 2)` #### Stop ID @@ -131,6 +141,7 @@ job2$result #> [1] "A" ``` + ## Variables #### Automaticly identified diff --git a/codecov.yml b/codecov.yml new file mode 100644 index 0000000..04c5585 --- /dev/null +++ b/codecov.yml @@ -0,0 +1,14 @@ +comment: false + +coverage: + status: + project: + default: + target: auto + threshold: 1% + informational: true + patch: + default: + target: auto + threshold: 1% + informational: true diff --git a/tests/testthat.r b/tests/testthat.r new file mode 100644 index 0000000..e21e494 --- /dev/null +++ b/tests/testthat.r @@ -0,0 +1,12 @@ +# This file is part of the standard setup for testthat. +# It is recommended that you do not modify it. +# +# Where should you do additional test configuration? +# Learn more about the roles of various files in: +# * https://r-pkgs.org/testing-design.html#sec-tests-files-overview +# * https://testthat.r-lib.org/articles/special-files.html + +library(testthat) +library(jobqueue) + +test_check("jobqueue") diff --git a/tests/testthat/test-job.r b/tests/testthat/test-job.r new file mode 100644 index 0000000..4147af9 --- /dev/null +++ b/tests/testthat/test-job.r @@ -0,0 +1,44 @@ + +test_that('job', { + + q <- expect_silent(Queue$new(workers = 1L)) + + job <- expect_silent(q$run( + expr = quote(2 + 3), + scan = FALSE, + cpus = NULL, + lazy = TRUE, + envir = NULL, + timeout = list('queued' = 1), + reformat = ~{ .$result * 2 })) + + expect_true(is.function(job$reformat)) + expect_equal(job$result, 10) + p <- expect_silent(as.promise(job)) + + job <- expect_silent(q$run({ 2 + 3 }, reformat = FALSE)) + p <- expect_silent(as.promise(job)) + expect_error(job$proxy <- 'not a Job') + expect_error(job$state <- 'done') + + expect_silent(job$wait()) + expect_error(job$state <- 'not done') + expect_silent(job$state <- 'done') + + expect_true(is.list(job$hooks)) + expect_true(is.list(job$timeout)) + expect_true(startsWith(job$uid, 'J')) + expect_no_error(job$print()) + + + job1 <- expect_silent(q$run({ Sys.sleep(1) }, hooks = list('submitted' = class))) + job2 <- expect_silent(Job$new({4})) + expect_silent(job2$proxy <- job1) + expect_error(job2$state <- 'not proxy') + expect_silent(job1$output <- "custom") + expect_equal(job1$result, job2$result) + + + expect_silent(q$shutdown()) +}) + diff --git a/tests/testthat/test-queue.r b/tests/testthat/test-queue.r new file mode 100644 index 0000000..1d2e564 --- /dev/null +++ b/tests/testthat/test-queue.r @@ -0,0 +1,135 @@ + +test_that('basic', { + + q <- expect_silent(Queue$new(workers = 1L)) + expect_equal(q$state, 'starting') + + expect_no_error(q$print()) + + job <- expect_silent(q$run({ 2 + 2 }, scan = FALSE)) + expect_false(job$is_done) + expect_equal(job$result, 4) + expect_equal(job$state, 'done') + expect_true(job$is_done) + + job <- expect_silent(q$run({ x + y }, vars = list(x = 3, y = 5))) + expect_equal(job$result, 8) + + z <- 2 + job <- expect_silent(q$run({ x + z }, vars = list(x = 3))) + expect_equal(job$result, 5) + + expect_equal(q$state, 'active') + expect_equal(length(q$workers), 1L) + + expect_true(is.list(q$hooks)) + expect_true(startsWith(q$uid, 'Q')) + expect_setequal(names(q$loaded), c('globals', 'attached')) + + expect_error(q$submit('not a Job')) + + expect_silent(q$shutdown()) + + expect_equal(q$state, 'stopped') + expect_equal(length(q$workers), 0) +}) + + + + +test_that('config', { + + e <- new.env(parent = emptyenv()) + + q <- expect_silent(Queue$new( + workers = 1L, + globals = list(x = 42), + packages = 'magrittr', + init = { y <- 37 }, + hooks = list( + 'q_active' = ~{ e$state = .$state }, + 'q_.next' = ~{ e$.next = .$state }, + 'q_*' = ~{ e$.star = .$state } + ))) + + q$on('stopped', function () { e$state = 'stopped' }) + q$on('stopped', class) # A primitive; for code coverage. + + x <- y <- 1 + job <- q$run({c(x, y) %>% sum}) + + expect_equal(job$result, 42 + 37) + expect_equal(e$state, 'active') + expect_equal(e$.next, 'active') + expect_equal(e$.star, 'active') + + expect_silent(q$shutdown()) + expect_equal(e$state, 'stopped') + expect_equal(e$.next, 'active') + expect_equal(e$.star, 'stopped') +}) + + +test_that('workers', { + + q <- expect_silent(Queue$new(workers = 2L, max_cpus = 3L)) + + q$run({ Sys.sleep(1) }, scan = FALSE) + q$run({ Sys.sleep(1) }, scan = FALSE) + q$run({ Sys.sleep(1) }, scan = FALSE) + + expect_equal(map(q$jobs, 'state'), rep('queued', 3)) + + q$wait() + expect_equal(map(q$jobs, 'state'), c('running', 'running', 'queued')) + expect_equal(map(q$workers, 'state'), rep('busy', 2)) + + expect_silent(q$shutdown()) +}) + + +test_that('max_cpus', { + + q <- expect_silent(Queue$new(workers = 3L, max_cpus = 2L)) + + q$run({ Sys.sleep(1) }, scan = FALSE) + q$run({ Sys.sleep(1) }, scan = FALSE) + q$run({ Sys.sleep(1) }, scan = FALSE) + + expect_equal(map(q$jobs, 'state'), rep('queued', 3)) + + q$wait() + q$run({ Sys.sleep(1) }, scan = FALSE) + + expect_equal(map(q$jobs, 'state'), c(rep('running', 2), rep('queued', 2))) + expect_equal(map(q$workers, 'state'), c('busy', 'busy', 'idle')) + + expect_silent(q$shutdown()) +}) + + +test_that('interrupt', { + + q <- expect_silent(Queue$new(workers = 1L, scan = FALSE)) + + job <- q$run({ Sys.sleep(1) }) + expect_silent(job$stop()) + expect_s3_class(job$result, class = c('interrupt', 'error', 'condition')) + + job <- q$run({ Sys.sleep(1) }, timeout = 0.1) + expect_s3_class(job$result, class = c('interrupt', 'error', 'condition')) + + job1 <- q$run({ Sys.sleep(1); 'A' }, stop_id = function (job) 123) + job2 <- q$run({ 'B' }, stop_id = ~{ 123 }) + + expect_s3_class(job1$result, class = c('interrupt', 'error', 'condition')) + expect_equal(job2$result, 'B') + + job1 <- q$run({ Sys.sleep(0.1); 'A' }, copy_id = ~{ 456 }) + job2 <- q$run({ 'B' }, copy_id = 456) + expect_equal(job1$result, 'A') + expect_equal(job2$result, 'A') + + expect_silent(q$shutdown()) +}) + diff --git a/tests/testthat/test-validate.r b/tests/testthat/test-validate.r new file mode 100644 index 0000000..70022d1 --- /dev/null +++ b/tests/testthat/test-validate.r @@ -0,0 +1,19 @@ + +test_that('validate', { + expect_error(validate_function('not a function')) + expect_error(validate_expression('not an expression', 'or call')) + expect_error(validate_list(list('not named'), named = TRUE)) + expect_error(validate_list(list(is = 'named'), named = FALSE)) + expect_error(validate_list(list(not = 'numeric'), of_type = 'numeric')) + expect_error(validate_timeout(list('dup' = "name", 'dup' = "name"))) + expect_error(validate_timeout(list('queued' = 1:5))) + expect_error(validate_timeout(list('queued' = 'not a number'))) + expect_error(validate_timeout(list('queued' = numeric(0)))) + expect_error(validate_timeout(list('queued' = NA_integer_))) + expect_error(validate_environment('not an environment')) + expect_error(validate_positive_integer('not an integer')) + expect_error(validate_logical('not a logical')) + expect_error(validate_character_vector(list('not a character vector'))) + expect_error(validate_string(list('not a string'))) +}) + diff --git a/tests/testthat/test-worker.r b/tests/testthat/test-worker.r new file mode 100644 index 0000000..d957f82 --- /dev/null +++ b/tests/testthat/test-worker.r @@ -0,0 +1,26 @@ + +test_that('worker', { + + w <- expect_silent(Worker$new()) + + expect_silent(w$on('starting', ~{ NULL })) + expect_silent(w$on('*', ~{ NULL })) + expect_silent(w$run(Job$new({ 1 }))) + + expect_error(w$start()) + expect_error(w$run('not a Job')) + expect_no_error(w$print()) + expect_true(is.list(w$hooks)) + expect_true(is.list(w$backlog)) + expect_true(is.null(w$job)) + expect_true(inherits(w$r_session, 'r_session')) + expect_true(startsWith(w$uid, 'W')) + + expect_silent(w$wait()) + expect_silent(w$run(Job$new({ 1 }))) + expect_silent(w$run(Job$new({ 1 }))) + + expect_silent(w$stop()) + expect_silent(w$run(Job$new({ 1 }))) +}) +