Skip to content

Commit

Permalink
CLEANUP: Got rid of makeExpression(); now part of getExpression() for…
Browse files Browse the repository at this point in the history
… Future
  • Loading branch information
HenrikBengtsson committed Dec 31, 2024
1 parent 46933a9 commit a9e358a
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 145 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Package: future
Version: 1.34.0-9098
Version: 1.34.0-9099
Title: Unified Parallel and Distributed Processing in R for Everyone
Imports:
digest,
Expand Down
154 changes: 103 additions & 51 deletions R/Future-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -656,60 +656,112 @@ resolved.Future <- function(x, run = TRUE, ...) {
getExpression <- function(future, ...) UseMethod("getExpression")

#' @export
getExpression.Future <- function(future, expr = future$expr, local = future$local, stdout = future$stdout, conditionClasses = future$conditions, seed = future$seed, split = future$split, mc.cores = NULL, ...) {
debug <- getOption("future.debug", FALSE)
## mdebug("getExpression() ...")

if (is.null(split)) split <- FALSE
stop_if_not(is.logical(split), length(split) == 1L, !is.na(split))

version <- future$version
if (is.null(version)) {
warning(FutureWarning("Future version was not set. Using default %s",
sQuote(version)))
}

## Globals needed by the future
globals <- globals(future)

## Packages needed by the future
pkgs <- packages(future)
if (length(pkgs) > 0) {
if (debug) mdebugf("Packages needed by the future expression (n = %d): %s", length(pkgs), paste(sQuote(pkgs), collapse = ", "))
} else {
if (debug) mdebug("Packages needed by the future expression (n = 0): <none>")
}
getExpression.Future <- local({

## Future strategies
strategies <- plan("list")
stop_if_not(length(strategies) >= 1L)
tmpl_expr_evaluate2 <- bquote_compile({
## Evaluate future
future:::evalFuture(expr = quote(.(expr)), local = .(local), stdout = .(stdout), conditionClasses = .(conditionClasses), split = .(split), seed = .(seed), immediateConditions = .(immediateConditions), immediateConditionClasses = .(immediateConditionClasses), globals = .(globals), packages = .(pkgs), strategiesR = .(strategiesR), forwardOptions = .(forwardOptions), threads = .(threads), cleanup = .(cleanup))
})

## Pass down the default or the remain set of future strategies?
strategiesR <- strategies[-1]
## mdebugf("Number of remaining strategies: %d", length(strategiesR))

## Use default future strategy + identify packages needed by the futures
if (length(strategiesR) == 0L) {
if (debug) mdebug("Packages needed by future strategies (n = 0): <none>")
strategiesR <- "default"
} else {
## Identify package namespaces needed for strategies
pkgsS <- lapply(strategiesR, FUN = environment)
pkgsS <- lapply(pkgsS, FUN = environmentName)
pkgsS <- unique(unlist(pkgsS, use.names = FALSE))
## CLEANUP: Only keep those that are loaded in the current session
pkgsS <- intersect(pkgsS, loadedNamespaces())
if (debug) mdebugf("Packages needed by future strategies (n = %d): %s", length(pkgsS), paste(sQuote(pkgsS), collapse = ", "))
pkgs <- unique(c(pkgs, pkgsS))
}

expr <- makeExpression(expr = expr, local = local, stdout = stdout, conditionClasses = conditionClasses, split = split, globals = globals, ..., seed = seed, packages = pkgs, mc.cores = mc.cores, version = version)
if (getOption("future.debug", FALSE)) mprint(expr)

## mdebug("getExpression() ... DONE")
function(future, expr = future$expr, local = future$local, stdout = future$stdout, conditionClasses = future$conditions, split = future$split, seed = future$seed, immediateConditions = FALSE, mc.cores = NULL, threads = NA_integer_, cleanup = TRUE, ...) {
debug <- getOption("future.debug", FALSE)
## mdebug("getExpression() ...")
if (is.null(split)) split <- FALSE
stop_if_not(is.logical(split), length(split) == 1L, !is.na(split))

version <- future$version
if (is.null(version)) {
warning(FutureWarning("Future version was not set. Using default %s",
sQuote(version)))
}

## Globals needed by the future
globals <- globals(future)

## Packages needed by the future
pkgs <- packages(future)
if (length(pkgs) > 0) {
if (debug) mdebugf("Packages needed by the future expression (n = %d): %s", length(pkgs), paste(sQuote(pkgs), collapse = ", "))
} else {
if (debug) mdebug("Packages needed by the future expression (n = 0): <none>")
}

expr
} ## getExpression()
## Future strategies
strategies <- plan("list")
stop_if_not(length(strategies) >= 1L)

## Pass down the default or the remain set of future strategies?
strategiesR <- strategies[-1]
## mdebugf("Number of remaining strategies: %d", length(strategiesR))

## Use default future strategy + identify packages needed by the futures
if (length(strategiesR) == 0L) {
if (debug) mdebug("Packages needed by future strategies (n = 0): <none>")
strategiesR <- getOption("future.plan", "sequential")
} else {
## Identify package namespaces needed for strategies
pkgsS <- lapply(strategiesR, FUN = environment)
pkgsS <- lapply(pkgsS, FUN = environmentName)
pkgsS <- unique(unlist(pkgsS, use.names = FALSE))
## CLEANUP: Only keep those that are loaded in the current session
pkgsS <- intersect(pkgsS, loadedNamespaces())
if (debug) mdebugf("Packages needed by future strategies (n = %d): %s", length(pkgsS), paste(sQuote(pkgsS), collapse = ", "))
pkgs <- unique(c(pkgs, pkgsS))
}

conditionClassesExclude <- attr(conditionClasses, "exclude", exact = TRUE)
muffleInclude <- attr(conditionClasses, "muffleInclude", exact = TRUE)
if (is.null(muffleInclude)) muffleInclude <- "^muffle"

if (immediateConditions && !is.null(conditionClasses)) {
immediateConditionClasses <- getOption("future.relay.immediate", "immediateCondition")
conditionClasses <- unique(c(conditionClasses, immediateConditionClasses))
attr(conditionClasses, "exclude") <- conditionClassesExclude
attr(conditionClasses, "muffleInclude") <- muffleInclude
} else {
immediateConditionClasses <- character(0L)
}

forwardOptions <- list(
## Assert globals when future is created (or at run time)?
future.globals.onMissing = getOption("future.globals.onMissing"),

## Pass down other future.* options
future.globals.maxSize = getOption("future.globals.maxSize"),
future.globals.method = getOption("future.globals.method"),
future.globals.onReference = getOption("future.globals.onReference"),
future.globals.resolve = getOption("future.globals.resolve"),
future.resolve.recursive = getOption("future.resolve.recursive"),
future.rng.onMisuse = getOption("future.rng.onMisuse"),
future.rng.onMisuse.keepFuture = getOption("future.rng.onMisuse.keepFuture"),
future.stdout.windows.reencode = getOption("future.stdout.windows.reencode"),

future.fork.multithreading.enable = getOption("future.fork.multithreading.enable"),

future.globalenv.onMisuse = getOption("future.globalenv.onMisuse"),

future.makeExpression.skip = getOption("future.makeExpression.skip"),
future.makeExpression.skip.local = getOption("future.makeExpression.skip.local"),

## Other options relevant to making futures behave consistently
## across backends
width = getOption("width")
)

if (!is.null(mc.cores)) {
forwardOptions$mc.cores <- mc.cores
}

expr <- bquote_apply(tmpl_expr_evaluate2)

if (getOption("future.debug", FALSE)) mprint(expr)

## mdebug("getExpression() ... DONE")

expr
}
}) ## getExpression()


globals <- function(future, ...) UseMethod("globals")
Expand Down
93 changes: 0 additions & 93 deletions R/expressions.R
Original file line number Diff line number Diff line change
@@ -1,96 +1,3 @@
makeExpression <- local({
tmpl_expr_evaluate2 <- future:::bquote_compile({
## Evaluate future
future:::evalFuture(expr = quote(.(expr)), local = .(local), stdout = .(stdout), conditionClasses = .(conditionClasses), split = .(split), immediateConditions = .(immediateConditions), immediateConditionClasses = .(immediateConditionClasses), globals = .(globals), packages = .(packages), seed = .(seed), strategiesR = .(strategiesR), forwardOptions = .(forwardOptions), threads = .(threads), cleanup = .(cleanup))
})


function(expr, local = TRUE, immediateConditions = FALSE, stdout = TRUE, conditionClasses = NULL, split = FALSE, globals = NULL, version = "1.8", packages = NULL, seed = NULL, mc.cores = NULL, threads = NA_integer_, cleanup = TRUE) {
if (version != "1.8") {
stop(FutureError("Internal error: Non-supported future expression version: ", version))
}

conditionClassesExclude <- attr(conditionClasses, "exclude", exact = TRUE)
muffleInclude <- attr(conditionClasses, "muffleInclude", exact = TRUE)
if (is.null(muffleInclude)) muffleInclude <- "^muffle"

if (immediateConditions && !is.null(conditionClasses)) {
immediateConditionClasses <- getOption("future.relay.immediate", "immediateCondition")
conditionClasses <- unique(c(conditionClasses, immediateConditionClasses))
attr(conditionClasses, "exclude") <- conditionClassesExclude
attr(conditionClasses, "muffleInclude") <- muffleInclude
} else {
immediateConditionClasses <- character(0L)
}

strategies <- plan("list")
strategiesR <- strategies[-1]
if (length(strategiesR) == 0L) {
strategiesR <- getOption("future.plan", sequential)
} else {
## Identify package namespaces needed for strategies
pkgsS <- lapply(strategiesR, FUN = environment)
pkgsS <- lapply(pkgsS, FUN = environmentName)
pkgsS <- unique(unlist(pkgsS, use.names = FALSE))
## CLEANUP: Only keep those that are loaded in the current session
pkgsS <- intersect(pkgsS, loadedNamespaces())
packages <- unique(c(packages, pkgsS))
}

if (is.function(strategiesR)) {
if (!inherits(strategiesR, "future")) {
stop(FutureError(sprintf("Argument 'strategiesR' is a function, but does not inherit 'future': %s", paste(sQuote(class(strategiesR)), collapse = ", "))))
}
} else if (is.list(strategiesR)) {
for (kk in seq_along(strategiesR)) {
strategy <- strategiesR[[kk]]
if (!inherits(strategy, "future")) {
stop(FutureError(sprintf("Element #%d of list 'strategiesR' is a function, but does not inherit 'future': %s", kk, paste(sQuote(class(strategy)), collapse = ", "))))
}
}
} else if (is.character(strategiesR)) {
} else {
stop(FutureError(sprintf("Unknown value of argument 'strategiesR': %s", paste(sQuote(class(strategiesR)), collapse = ", "))))
}

forwardOptions <- list(
## Assert globals when future is created (or at run time)?
future.globals.onMissing = getOption("future.globals.onMissing"),

## Pass down other future.* options
future.globals.maxSize = getOption("future.globals.maxSize"),
future.globals.method = getOption("future.globals.method"),
future.globals.onReference = getOption("future.globals.onReference"),
future.globals.resolve = getOption("future.globals.resolve"),
future.resolve.recursive = getOption("future.resolve.recursive"),
future.rng.onMisuse = getOption("future.rng.onMisuse"),
future.rng.onMisuse.keepFuture = getOption("future.rng.onMisuse.keepFuture"),
future.stdout.windows.reencode = getOption("future.stdout.windows.reencode"),

future.fork.multithreading.enable = getOption("future.fork.multithreading.enable"),

future.globalenv.onMisuse = getOption("future.globalenv.onMisuse"),

future.makeExpression.skip = getOption("future.makeExpression.skip"),
future.makeExpression.skip.local = getOption("future.makeExpression.skip.local"),

## Other options relevant to making futures behave consistently
## across backends
width = getOption("width")
)

if (!is.null(mc.cores)) {
forwardOptions$mc.cores <- mc.cores
}

expr <- bquote_apply(tmpl_expr_evaluate2)

expr
}
}) ## makeExpression()



FutureEvalError <- function(...) {
ex <- FutureError(...)
class(ex) <- c("FutureEvalError", class(ex))
Expand Down

0 comments on commit a9e358a

Please sign in to comment.