Skip to content

Commit

Permalink
Export the GEF.Block.Nimble code, and use the future_map function ins…
Browse files Browse the repository at this point in the history
…tead of the foreach function for the parallelization.
  • Loading branch information
Dongchen Zhang committed Jul 13, 2023
1 parent 4f0959f commit feb47ab
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 17 deletions.
2 changes: 1 addition & 1 deletion modules/assim.sequential/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export(Create_Site_PFT_CSV)
export(EnKF)
export(EnKF.MultiSite)
export(GEF)
export(GEF.Block.Nimble)
export(GEF.MultiSite)
export(GEF.MultiSite.Nimble)
export(Local.support)
Expand Down Expand Up @@ -59,7 +60,6 @@ import(furrr)
import(lubridate)
import(nimble)
importFrom(dplyr,"%>%")
importFrom(foreach,"%dopar%")
importFrom(lubridate,"%m+%")
importFrom(magrittr,"%>%")
importFrom(rlang,.data)
21 changes: 5 additions & 16 deletions modules/assim.sequential/R/Analysis_sda_block.R
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
##' @description This function provides the block-based MCMC sampling approach.
##'
##' @return It returns the `build.block.xy` object and the analysis results.
##' @importFrom foreach %dopar%
##' @importFrom dplyr %>%
analysis_sda_block <- function (settings, block.list.all, X, obs.mean, obs.cov, t, nt, MCMC.args) {
#convert from vector values to block lists.
Expand Down Expand Up @@ -51,26 +50,16 @@ analysis_sda_block <- function (settings, block.list.all, X, obs.mean, obs.cov,
})

#parallel for loop over each block.
#initialize parallel settings.
cores <- parallel::detectCores()
cl <- parallel::makeCluster(cores)
doSNOW::registerDoSNOW(cl)

#progress bar
pb <- utils::txtProgressBar(min=1, max=length(block.list.all[[t]]), style=3)
progress <- function(n) utils::setTxtProgressBar(pb, n)
opts <- list(progress=progress)
PEcAn.logger::logger.info(paste0("Running MCMC ", "for ", length(block.list.all[[t]]), " blocks"))

if ("try-error" %in% class(try(block.list.all[[t]] <- foreach::foreach(block = block.list.all[[t]], .packages="Kendall", .options.snow=opts) %dopar% {
library(nimble)
return(MCMC_block_function(block))
}))) {
if ("try-error" %in% class(try(block.list.all[[t]] <- furrr::future_map(block.list.all[[t]], MCMC_block_function, .progress = T)))) {
PEcAn.logger::logger.error("Something wrong within the MCMC_block_function function.")
}
PEcAn.logger::logger.info("Completed!")

#convert from block lists to vector values.
V <- block.2.vector(block.list.all[[t]], X, H)
if ("try-error" %in% class(try(V <- block.2.vector(block.list.all[[t]], X, H)))) {
PEcAn.logger::logger.error("Something wrong within the block.2.vector function.")
}

#return values
return(list(block.list.all = block.list.all,
Expand Down
1 change: 1 addition & 0 deletions modules/assim.sequential/R/Nimble_codes.R
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ GEF.MultiSite.Nimble <- nimbleCode({
#GEF.Block.Nimble--This does the block-based SDA -------------------------------------
#' block-based TWEnF
#' @format TBD
#' @export
GEF.Block.Nimble <- nimbleCode({
#I think the blocked nimble has to be implemented and used instead of a long vector sampling.
#1) due to the convergence of X.mod.
Expand Down

0 comments on commit feb47ab

Please sign in to comment.