From b01f26fe118247813b3c8d859b6cc90e2ea1600e Mon Sep 17 00:00:00 2001 From: Kun Chen Date: Mon, 7 Nov 2022 14:21:10 -0500 Subject: [PATCH 01/24] bump MPI --- Project.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index 2190bc8..7f5c7d5 100644 --- a/Project.toml +++ b/Project.toml @@ -17,7 +17,7 @@ Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [compat] Graphs = "1" -MPI = "0.16, 0.19" +MPI = "0.16, 0.19, 0.20" StaticArrays = "1" julia = "1.6" Measurements = "2" From fa975bb41410c3d91699c63abdf3c36b4ba595c9 Mon Sep 17 00:00:00 2001 From: Kun Chen Date: Mon, 7 Nov 2022 14:33:26 -0500 Subject: [PATCH 02/24] adapt MPI.jl from DFTK.jl --- src/utility/MPI.jl | 17 +++++++++++++++++ src/utility/utility.jl | 2 ++ 2 files changed, 19 insertions(+) create mode 100644 src/utility/MPI.jl diff --git a/src/utility/MPI.jl b/src/utility/MPI.jl new file mode 100644 index 0000000..07506cc --- /dev/null +++ b/src/utility/MPI.jl @@ -0,0 +1,17 @@ +# Convenience functions for working with MPI, adapted from DFTK.jl +import MPI + +""" +Number of processors used in MPI. Can be called without ensuring initialization. +""" +mpi_nprocs(comm=MPI.COMM_WORLD) = (MPI.Init(); MPI.Comm_size(comm)) +mpi_master(comm=MPI.COMM_WORLD) = (MPI.Init(); MPI.Comm_rank(comm) == 0) + +mpi_sum(arr, comm::MPI.Comm) = MPI.Allreduce(arr, +, comm) +mpi_sum!(arr, comm::MPI.Comm) = MPI.Allreduce!(arr, +, comm) +mpi_min(arr, comm::MPI.Comm) = MPI.Allreduce(arr, min, comm) +mpi_min!(arr, comm::MPI.Comm) = MPI.Allreduce!(arr, min, comm) +mpi_max(arr, comm::MPI.Comm) = MPI.Allreduce(arr, max, comm) +mpi_max!(arr, comm::MPI.Comm) = MPI.Allreduce!(arr, max, comm) +mpi_mean(arr, comm::MPI.Comm) = mpi_sum(arr, comm) ./ mpi_nprocs(comm) +mpi_mean!(arr, comm::MPI.Comm) = (mpi_sum!(arr, comm); arr ./= mpi_nprocs(comm)) \ No newline at end of file diff --git a/src/utility/utility.jl b/src/utility/utility.jl index 6a9ffeb..cace79a 100644 --- a/src/utility/utility.jl +++ b/src/utility/utility.jl @@ -11,6 +11,8 @@ export StopWatch, check include("color.jl") export black, red, green, yellow, blue, magenta, cyan, white +include("MPI.jl") + export progressBar, locate, smooth, rescale """ progressBar(step, total) From 62fe3f4408622d82e9c285e829fed0866fc42397 Mon Sep 17 00:00:00 2001 From: Kun Chen Date: Mon, 7 Nov 2022 14:37:12 -0500 Subject: [PATCH 03/24] remove Measurements dependence --- Project.toml | 4 +--- src/MCIntegration.jl | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/Project.toml b/Project.toml index 7f5c7d5..3884e24 100644 --- a/Project.toml +++ b/Project.toml @@ -8,7 +8,6 @@ Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" Graphs = "86223c79-3864-5bf0-83f7-82e725a168b6" LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" MPI = "da04e1cc-30fd-572f-bb4f-1f8673147195" -Measurements = "eff96d63-e80a-5855-80a2-b1b0885c5ab7" Printf = "de0858da-6303-5e67-8744-51eddeeeb8d7" ProgressMeter = "92933f4c-e287-5a05-a399-4b506db050ca" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" @@ -18,10 +17,9 @@ Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [compat] Graphs = "1" MPI = "0.16, 0.19, 0.20" +ProgressMeter = "1" StaticArrays = "1" julia = "1.6" -Measurements = "2" -ProgressMeter = "1" [extras] Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" diff --git a/src/MCIntegration.jl b/src/MCIntegration.jl index c523945..239b2fe 100644 --- a/src/MCIntegration.jl +++ b/src/MCIntegration.jl @@ -5,7 +5,7 @@ using Random using Graphs using Test using ProgressMeter -using Measurements +# using Measurements const RNG = Random.GLOBAL_RNG const TINY = eps(Float64(0)) * 1e50 # 4.940656458412466e-274 From 5df81577f5563c47654f7d45b1cd6b0143b0fa5d Mon Sep 17 00:00:00 2001 From: Kun Chen Date: Mon, 7 Nov 2022 18:55:28 -0500 Subject: [PATCH 04/24] thread new --- src/main.jl | 175 +++++++++++++++++++++++++--------------- src/utility/MPI.jl | 17 ---- src/utility/parallel.jl | 118 +++++++++++++++++++++++++++ src/utility/utility.jl | 22 +---- 4 files changed, 231 insertions(+), 101 deletions(-) delete mode 100644 src/utility/MPI.jl create mode 100644 src/utility/parallel.jl diff --git a/src/main.jl b/src/main.jl index 4f121b3..e75d01d 100644 --- a/src/main.jl +++ b/src/main.jl @@ -13,6 +13,7 @@ ignore::Int=adapt ? 1 : 0, measure::Union{Nothing,Function}=nothing, measurefreq::Int=1, + inplace::Bool=false, kwargs... ) @@ -43,6 +44,7 @@ - `measure`: measurement function, See [`Vegas.montecarlo`](@ref), [`VegasMC.montecarlo`](@ref) and [`MCMC.montecarlo`](@ref) for more details. - `measurefreq`: how often perform the measurement for ever `measurefreq` MC steps. If a measurement is expansive, you may want to make the measurement less frequent. - `inplace`: whether to use the inplace version of the integrand. Default is `false`, which is more convenient for integrand with a few return values but may cause type instability. Only useful for the :vegas and :vegasmc solver. +- `parallel`: :auto will automatically choose the best parallelization mode. :mpi will use MPI.jl to run the MC in parallel. :thread will use Threads.@threads to run the MC in parallel. Default is :auto. - `kwargs`: Keyword arguments. The supported keywords include, * `measure` and `measurefreq`: measurement function and how frequent it is called. * If `config` is `nothing`, you may need to provide arguments for the `Configuration` constructor, check [`Configuration`](@ref) docs for more details. @@ -71,6 +73,7 @@ function integrate(integrand::Function; measure::Union{Nothing,Function}=nothing, measurefreq::Int=1, inplace::Bool=false, # whether to use the inplace version of the integrand + parallel::Symbol=:auto, # :auto, :mpi, or :thread, or :serial kwargs... ) if isnothing(config) @@ -89,87 +92,72 @@ function integrate(integrand::Function; ########### initialized MPI ####################################### (MPI.Initialized() == false) && MPI.Init() comm = MPI.COMM_WORLD - Nworker = MPI.Comm_size(comm) # number of MPI workers - rank = MPI.Comm_rank(comm) # rank of current MPI worker - root = 0 # rank of the root worker - # MPI.Barrier(comm) - ######### construct configurations for each block ################ - if block > Nworker - block = (block ÷ Nworker) * Nworker # make Nblock % size ==0, error estimation assumes this relation - else - block = Nworker # each worker should handle at least one block - end + ############# figure out the best parallelization mode ############ + parallel = MCUtility.choose_parallel(parallel) + + Nworker = MCUtility.nproc() # actual number of workers + Nthread = MCUtility.nthreads() # numebr of threads, each thread require its own workspace + + ############# figure out evaluations in each block ################ + block = _standardize_block(block, Nworker) @assert block % Nworker == 0 nevalperblock = neval ÷ block # number of evaluations per block - # nevalperblock = neval # number of evaluations per block - - results = [] - #In the MPI mode, progress will only need to track the progress of the root worker. + ########## initialize the progress bar ############################ + #In the MPI/thread mode, progress will only need to track the progress of the root worker. Ntotal = niter * block ÷ Nworker progress = Progress(Ntotal; dt=(print >= 0 ? (0.5 + print) : 0.5), enabled=(print >= -1), showspeed=true, desc="Total iterations * blocks $(Ntotal): ", output=printio) + # initialize temp variables + configs = [deepcopy(config) for i in 1:Nthread] # configurations for each worker + obsSum = [[zero(o) for o in config.observable] for _ in 1:Nthread] # sum of observables for each worker + obsSquaredSum = [[zero(o) for o in config.observable] for _ in 1:Nthread] # sum of squared observables for each worker + summedConfig = [deepcopy(config) for i in 1:Nthread] # summed configuration for each thread + startTime = time() - for iter in 1:niter + results=[] - obsSum = [zero(o) for o in config.observable] - obsSquaredSum = [zero(o) for o in config.observable] - # summed configuration of all blocks, but changes in each iteration - summedConfig = deepcopy(config) - - for i = 1:block - # MPI thread rank will run the block with the indexes: rank, rank+Nworker, rank+2Nworker, ... - (i % Nworker != rank) && continue - - clearStatistics!(config) # reset statistics - - if solver == :vegasmc - config = VegasMC.montecarlo(config, integrand, nevalperblock, print, save, timer, debug; - measure=measure, measurefreq=measurefreq, inplace=inplace) - elseif solver == :vegas - config = Vegas.montecarlo(config, integrand, nevalperblock, print, save, timer, debug; - measure=measure, measurefreq=measurefreq, inplace=inplace) - elseif solver == :mcmc - config = MCMC.montecarlo(config, integrand, nevalperblock, print, save, timer, debug; - measure=measure, measurefreq=measurefreq) - else - error("Solver $solver is not supported!") - end + for iter in 1:niter - addConfig!(summedConfig, config) # collect statistics from the config of each block to summedConfig + for i in 1:Nthread + fill!(obsSum[i], zero(eltype(obsSum[1]))) + fill!(obsSquaredSum[i], zero(eltype(obsSquaredSum[1]))) + clearStatistics!(summedConfig[i]) + end - if (config.normalization > 0.0) == false #in case config.normalization is not a number - error("normalization of block $i is $(config.normalization), which is not positively defined!") + if parallel == :thread + Threads.@threads for i = 1:block + _block(configs, obsSum, obsSquaredSum, summedConfig, + integrand, nevalperblock, print, saver, timer, debug, + measure, measurefreq, inplace, parallel) end - - for o in 1:config.N - if obsSum[o] isa AbstractArray - m = config.observable[o] ./ config.normalization - obsSum[o] += m - obsSquaredSum[o] += (eltype(m) <: Complex) ? (@. (real(m))^2 + (imag(m))^2 * 1im) : m .^ 2 - else - m = config.observable[o] / config.normalization - obsSum[o] += m - obsSquaredSum[o] += (eltype(m) <: Complex) ? (real(m))^2 + (imag(m))^2 * 1im : m^2 - end + else + for i = 1:block + _block(configs, obsSum, obsSquaredSum, summedConfig, + integrand, nevalperblock, print, saver, timer, debug, + measure, measurefreq, inplace, parallel) end + end - if MPI.Comm_rank(comm) == root - (print >= -1) && next!(progress) - end + for i in 2:Nthread + obsSum[1] += obsSum[i] + obsSquaredSum[1] += obsSquaredSum[i] + addConfig(summedConfig[1], summedConfig[i]) end + #################### collect statistics #################################### - obsSum = [MCUtility.MPIreduce(osum) for osum in obsSum] - obsSquaredSum = [MCUtility.MPIreduce(osumsq) for osumsq in obsSquaredSum] + obsSum[1] = [MCUtility.MPIreduce(osum) for osum in obsSum[1]] + obsSquaredSum[1] = [MCUtility.MPIreduce(osumsq) for osumsq in obsSquaredSum[1]] # collect all statistics to summedConfig of the root worker - MPIreduceConfig!(summedConfig, root, comm) + MPIreduceConfig!(summedConfig[1], root, comm) - if MPI.Comm_rank(comm) == root + + if MCUtility.mpi_master() # only the master process will output results, no matter parallel = :mpi or :thread or :serial ##################### Extract Statistics ################################ # println("mean: ", mean, ", std: ", std) - mean, std = _mean_std(obsSum, obsSquaredSum, block) - push!(results, (mean, std, summedConfig)) + mean, std = _mean_std(obsSum[1], obsSquaredSum[1], block) + push!(results, (mean, std, summedConfig[1])) ################### self-learning ########################################## (solver == :mcmc || solver == :vegasmc) && doReweight!(summedConfig, gamma, reweight_goal) @@ -180,15 +168,17 @@ function integrate(integrand::Function; # broadcast the reweight and var.histogram of the summedConfig of the root worker to two targets: # 1. config of the root worker # 2. config of the other workers - config.reweight = MPI.bcast(summedConfig.reweight, root, comm) # broadcast reweight factors to all workers + config.reweight = MPI.bcast(summedConfig[1].reweight, root, comm) # broadcast reweight factors to all workers for (vi, var) in enumerate(config.var) - _bcast_histogram!(var, summedConfig.var[vi], config, adapt) + _bcast_histogram!(var, summedConfig[1].var[vi], config, adapt) end + #TODO: replace this with a more efficient way + configs = [deepcopy(config) for i in 1:Nthread] # configurations for each worker ################################################################################ end ########################## output results ############################## - if MPI.Comm_rank(comm) == root + if MCUtility.mpi_master() # only the master process will output results, no matter parallel = :mpi or :thread or :serial result = Result(results, ignore) if print >= 0 report(result) @@ -198,6 +188,63 @@ function integrate(integrand::Function; end end +function _standardize_block(nblock, Nworker) + ######### construct configurations for each block ################ + if nblock > Nworker + nblock = (nblock ÷ Nworker) * Nworker # make Nblock % size ==0, error estimation assumes this relation + else + nblock = Nworker # each worker should handle at least one block + end + return nblock +end + +function _block(configs, obsSum, obsSquaredSum, summedConfig, + integrand, nevalperblock, print, saver, timer, debug, + measure, measurefreq, inplace, parallel) + + # rank core will run the block with the indexes: rank, rank+Nworker, rank+2Nworker, ... + rank = MCUtility.rank(parallel) + (i % Nworker != rank ) && continue + + config_n = configs[rank] # configuration for the worker with rank `rank` + clearStatistics!(config_n) # reset statistics + + if solver == :vegasmc + config_n = VegasMC.montecarlo(config_n, integrand, nevalperblock, print, save, timer, debug; + measure=measure, measurefreq=measurefreq, inplace=inplace) + elseif solver == :vegas + config_n = Vegas.montecarlo(config_n, integrand, nevalperblock, print, save, timer, debug; + measure=measure, measurefreq=measurefreq, inplace=inplace) + elseif solver == :mcmc + config_n = MCMC.montecarlo(config_n, integrand, nevalperblock, print, save, timer, debug; + measure=measure, measurefreq=measurefreq) + else + error("Solver $solver is not supported!") + end + + addConfig!(summedConfig[rank], config_n) # collect statistics from the config of each block to summedConfig + + if (config_n.normalization > 0.0) == false #in case config.normalization is not a number + error("normalization of block $i is $(config_n.normalization), which is not positively defined!") + end + + for o in 1:config.N + if obsSum[rank][o] isa AbstractArray + m = config_n.observable[o] ./ config_n.normalization + obsSum[rank][o] += m + obsSquaredSum[rank][o] += (eltype(m) <: Complex) ? (@. (real(m))^2 + (imag(m))^2 * 1im) : m .^ 2 + else + m = config_n.observable[o] / config_n.normalization + obsSum[rank][o] += m + obsSquaredSum[rank][o] += (eltype(m) <: Complex) ? (real(m))^2 + (imag(m))^2 * 1im : m^2 + end + end + + if MCUtility.is_root_worker(parallel) + (print >= -1) && next!(progress) + end +end + function _bcast_histogram!(target::V, source::V, config, adapt) where {V} comm = MPI.COMM_WORLD root = 0 # rank of the root worker diff --git a/src/utility/MPI.jl b/src/utility/MPI.jl deleted file mode 100644 index 07506cc..0000000 --- a/src/utility/MPI.jl +++ /dev/null @@ -1,17 +0,0 @@ -# Convenience functions for working with MPI, adapted from DFTK.jl -import MPI - -""" -Number of processors used in MPI. Can be called without ensuring initialization. -""" -mpi_nprocs(comm=MPI.COMM_WORLD) = (MPI.Init(); MPI.Comm_size(comm)) -mpi_master(comm=MPI.COMM_WORLD) = (MPI.Init(); MPI.Comm_rank(comm) == 0) - -mpi_sum(arr, comm::MPI.Comm) = MPI.Allreduce(arr, +, comm) -mpi_sum!(arr, comm::MPI.Comm) = MPI.Allreduce!(arr, +, comm) -mpi_min(arr, comm::MPI.Comm) = MPI.Allreduce(arr, min, comm) -mpi_min!(arr, comm::MPI.Comm) = MPI.Allreduce!(arr, min, comm) -mpi_max(arr, comm::MPI.Comm) = MPI.Allreduce(arr, max, comm) -mpi_max!(arr, comm::MPI.Comm) = MPI.Allreduce!(arr, max, comm) -mpi_mean(arr, comm::MPI.Comm) = mpi_sum(arr, comm) ./ mpi_nprocs(comm) -mpi_mean!(arr, comm::MPI.Comm) = (mpi_sum!(arr, comm); arr ./= mpi_nprocs(comm)) \ No newline at end of file diff --git a/src/utility/parallel.jl b/src/utility/parallel.jl new file mode 100644 index 0000000..9b4b190 --- /dev/null +++ b/src/utility/parallel.jl @@ -0,0 +1,118 @@ +# Convenience functions for working with MPI, adapted from DFTK.jl +""" +Number of processors used in MPI. Can be called without ensuring initialization. +""" +mpi_nprocs(comm=MPI.COMM_WORLD) = (MPI.Init(); MPI.Comm_size(comm)) +mpi_master(comm=MPI.COMM_WORLD) = (MPI.Init(); MPI.Comm_rank(comm) == 0) +mpi_rank(comm=MPI.COMM_WORLD) = (MPI.Init(); MPI.Comm_rank(comm)) + +mpi_sum(arr, comm::MPI.Comm) = MPI.Allreduce(arr, +, comm) +mpi_sum!(arr, comm::MPI.Comm) = MPI.Allreduce!(arr, +, comm) +mpi_min(arr, comm::MPI.Comm) = MPI.Allreduce(arr, min, comm) +mpi_min!(arr, comm::MPI.Comm) = MPI.Allreduce!(arr, min, comm) +mpi_max(arr, comm::MPI.Comm) = MPI.Allreduce(arr, max, comm) +mpi_max!(arr, comm::MPI.Comm) = MPI.Allreduce!(arr, max, comm) +mpi_mean(arr, comm::MPI.Comm) = mpi_sum(arr, comm) ./ mpi_nprocs(comm) +mpi_mean!(arr, comm::MPI.Comm) = (mpi_sum!(arr, comm); arr ./= mpi_nprocs(comm)) + +function MPIreduce(data) + comm = MPI.COMM_WORLD + Nworker = MPI.Comm_size(comm) # number of MPI workers + rank = MPI.Comm_rank(comm) # rank of current MPI worker + root = 0 # rank of the root worker + + if Nworker == 1 #no parallelization + return data + end + if typeof(data) <: AbstractArray + MPI.Reduce!(data, MPI.SUM, root, comm) # root node gets the sum of observables from all blocks + return data + else + result = [data,] # MPI.Reduce works for array only + MPI.Reduce!(result, MPI.SUM, root, comm) # root node gets the sum of observables from all blocks + return result[1] + end +end + +function choose_parallel(parallel::Symbol) + if parallel == :auto + if mpi_nprocs() > 1 + return :mpi + else + if Threads.nthreads() > 1 + return :thread # use threads only if MPI is not available + else + return :serial # use threads only if MPI is not available + end + end + else + return parallel + end +end + +function check_parallel(parallel::Symbol) + if parallel == :mpi + # @assert mpi_nprocs() > 1 "MPI is not available" + # julia may start with -t N, but we don't want to use threads + # should work for MPI worker >=1 + return + elseif parallel == :thread + @assert mpi_nprocs() == 1 "MPI and threads cannot be used together" + elseif parallel == :serial + @assert mpi_nprocs() == 1 "MPI should not be used for serial calculations" + # julia may start with -t N, but we don't want to use threads + else + error("Unknown parallelization mode: $(parallel)") + end +end + +# actual number of threads used +function nthreads(parallel::Symbol) + if parallel == :thread + return Threads.nthreads() + else + return 1 + end +end + +function is_root_worker(parallel::Symbol) # only one thread of each MPI worker is the root + return mpi_master() && (Threads.threadid() == 1) +end + +# function root(parallel::Symbol) # only one thread of each MPI worker is the root +# if parallel == :mpi +# return mpi_master() +# elseif parallel == :thread +# return 1 +# elseif parallel == :serial +# return 1 +# else +# error("Unknown parallelization mode: $(parallel)") +# end +# end + +# rank of the current worker for both MPI and threads +function rank(parallel::Symbol) + if parallel == :mpi + return mpi_rank() + elseif parallel == :thread + return Threads.threadid() + elseif parallel == :serial + return 1 + else + error("Unknown parallelization mode: $(parallel)") + end +end + +# number of total workers for both MPI and threads +function nproc(parallel::Symbol) + if parallel == :mpi + return mpi_nprocs() + elseif parallel == :thread + return Threads.nthreads() + elseif parallel == :serial + return 1 + else + error("Unknown parallelization mode: $(parallel)") + end +end \ No newline at end of file diff --git a/src/utility/utility.jl b/src/utility/utility.jl index cace79a..4d17dae 100644 --- a/src/utility/utility.jl +++ b/src/utility/utility.jl @@ -4,6 +4,7 @@ Utility data structures and functions module MCUtility using Test using ..MPI +using ..Threads include("stopwatch.jl") export StopWatch, check @@ -11,7 +12,7 @@ export StopWatch, check include("color.jl") export black, red, green, yellow, blue, magenta, cyan, white -include("MPI.jl") +include("parallel.jl") export progressBar, locate, smooth, rescale """ @@ -36,25 +37,6 @@ function progressBar(step, total) return str end -function MPIreduce(data) - comm = MPI.COMM_WORLD - Nworker = MPI.Comm_size(comm) # number of MPI workers - rank = MPI.Comm_rank(comm) # rank of current MPI worker - root = 0 # rank of the root worker - - if Nworker == 1 #no parallelization - return data - end - if typeof(data) <: AbstractArray - MPI.Reduce!(data, MPI.SUM, root, comm) # root node gets the sum of observables from all blocks - return data - else - result = [data,] # MPI.Reduce works for array only - MPI.Reduce!(result, MPI.SUM, root, comm) # root node gets the sum of observables from all blocks - return result[1] - end -end - function test_type_stability(f, args) try @inferred f(args...) From b9626beeff5acc2526b9d0f438d3a6ccf070225a Mon Sep 17 00:00:00 2001 From: Kun Chen Date: Mon, 7 Nov 2022 22:31:00 -0500 Subject: [PATCH 05/24] vegas failed the test --- src/configuration.jl | 3 +- src/main.jl | 47 +++++++++++++--------- src/mcmc/montecarlo.jl | 5 ++- src/utility/parallel.jl | 86 +++++++++++++++++++++-------------------- test/montecarlo.jl | 2 +- 5 files changed, 78 insertions(+), 65 deletions(-) diff --git a/src/configuration.jl b/src/configuration.jl index afc7d5f..55ddde2 100644 --- a/src/configuration.jl +++ b/src/configuration.jl @@ -230,7 +230,8 @@ function addConfig!(c::Configuration, ic::Configuration) end end -function MPIreduceConfig!(c::Configuration, root, comm) +function MPIreduceConfig!(c::Configuration, root=0, comm=MPI.COMM_WORLD) + function histogram_reduce!(var::Variable) if var isa Dist.CompositeVar for v in var.vars diff --git a/src/main.jl b/src/main.jl index e75d01d..d8f8aaa 100644 --- a/src/main.jl +++ b/src/main.jl @@ -73,7 +73,7 @@ function integrate(integrand::Function; measure::Union{Nothing,Function}=nothing, measurefreq::Int=1, inplace::Bool=false, # whether to use the inplace version of the integrand - parallel::Symbol=:auto, # :auto, :mpi, or :thread, or :serial + parallel::Symbol=:nothread, # :auto, :mpi, or :thread, or :serial kwargs... ) if isnothing(config) @@ -96,8 +96,8 @@ function integrate(integrand::Function; ############# figure out the best parallelization mode ############ parallel = MCUtility.choose_parallel(parallel) - Nworker = MCUtility.nproc() # actual number of workers - Nthread = MCUtility.nthreads() # numebr of threads, each thread require its own workspace + Nworker = MCUtility.nworker(parallel) # actual number of workers + Nthread = MCUtility.nthreads(parallel) # numebr of threads, threads share the same memory ############# figure out evaluations in each block ################ block = _standardize_block(block, Nworker) @@ -128,18 +128,20 @@ function integrate(integrand::Function; if parallel == :thread Threads.@threads for i = 1:block - _block(configs, obsSum, obsSquaredSum, summedConfig, - integrand, nevalperblock, print, saver, timer, debug, + _block!(i ,configs, obsSum, obsSquaredSum, summedConfig, solver, progress, + integrand, nevalperblock, print, save, timer, debug, measure, measurefreq, inplace, parallel) end else for i = 1:block - _block(configs, obsSum, obsSquaredSum, summedConfig, - integrand, nevalperblock, print, saver, timer, debug, + _block!(i, configs, obsSum, obsSquaredSum, summedConfig, solver, progress, + integrand, nevalperblock, print, save, timer, debug, measure, measurefreq, inplace, parallel) end end + # println(configs[1].normalization) + for i in 2:Nthread obsSum[1] += obsSum[i] obsSquaredSum[1] += obsSquaredSum[i] @@ -150,7 +152,7 @@ function integrate(integrand::Function; obsSum[1] = [MCUtility.MPIreduce(osum) for osum in obsSum[1]] obsSquaredSum[1] = [MCUtility.MPIreduce(osumsq) for osumsq in obsSquaredSum[1]] # collect all statistics to summedConfig of the root worker - MPIreduceConfig!(summedConfig[1], root, comm) + MPIreduceConfig!(summedConfig[1]) if MCUtility.mpi_master() # only the master process will output results, no matter parallel = :mpi or :thread or :serial @@ -160,7 +162,7 @@ function integrate(integrand::Function; push!(results, (mean, std, summedConfig[1])) ################### self-learning ########################################## - (solver == :mcmc || solver == :vegasmc) && doReweight!(summedConfig, gamma, reweight_goal) + (solver == :mcmc || solver == :vegasmc) && doReweight!(summedConfig[1], gamma, reweight_goal) end ######################## syncronize between works ############################## @@ -168,7 +170,8 @@ function integrate(integrand::Function; # broadcast the reweight and var.histogram of the summedConfig of the root worker to two targets: # 1. config of the root worker # 2. config of the other workers - config.reweight = MPI.bcast(summedConfig[1].reweight, root, comm) # broadcast reweight factors to all workers + # config.reweight = MPI.bcast(summedConfig[1].reweight, root, comm) # broadcast reweight factors to all workers + config.reweight = MCUtility.MPIbcast(summedConfig[1].reweight) for (vi, var) in enumerate(config.var) _bcast_histogram!(var, summedConfig[1].var[vi], config, adapt) end @@ -198,37 +201,43 @@ function _standardize_block(nblock, Nworker) return nblock end -function _block(configs, obsSum, obsSquaredSum, summedConfig, - integrand, nevalperblock, print, saver, timer, debug, +function _block!(iblock, configs, obsSum, obsSquaredSum, summedConfig, solver, progress, + integrand, nevalperblock, print, save, timer, debug, measure, measurefreq, inplace, parallel) # rank core will run the block with the indexes: rank, rank+Nworker, rank+2Nworker, ... rank = MCUtility.rank(parallel) - (i % Nworker != rank ) && continue + Nworker = MCUtility.nworker(parallel) + # println(iblock, " ", rank, " ", Nworker) + + (iblock % Nworker != rank-1 ) && return config_n = configs[rank] # configuration for the worker with rank `rank` clearStatistics!(config_n) # reset statistics if solver == :vegasmc - config_n = VegasMC.montecarlo(config_n, integrand, nevalperblock, print, save, timer, debug; + VegasMC.montecarlo(config_n, integrand, nevalperblock, print, save, timer, debug; measure=measure, measurefreq=measurefreq, inplace=inplace) elseif solver == :vegas - config_n = Vegas.montecarlo(config_n, integrand, nevalperblock, print, save, timer, debug; + Vegas.montecarlo(config_n, integrand, nevalperblock, print, save, timer, debug; measure=measure, measurefreq=measurefreq, inplace=inplace) elseif solver == :mcmc - config_n = MCMC.montecarlo(config_n, integrand, nevalperblock, print, save, timer, debug; + MCMC.montecarlo(config_n, integrand, nevalperblock, print, save, timer, debug; measure=measure, measurefreq=measurefreq) else error("Solver $solver is not supported!") end - addConfig!(summedConfig[rank], config_n) # collect statistics from the config of each block to summedConfig + # println(config_n.normalization) + if (config_n.normalization > 0.0) == false #in case config.normalization is not a number error("normalization of block $i is $(config_n.normalization), which is not positively defined!") end - for o in 1:config.N + addConfig!(summedConfig[rank], config_n) # collect statistics from the config of each block to summedConfig + + for o in 1:config_n.N if obsSum[rank][o] isa AbstractArray m = config_n.observable[o] ./ config_n.normalization obsSum[rank][o] += m @@ -240,7 +249,7 @@ function _block(configs, obsSum, obsSquaredSum, summedConfig, end end - if MCUtility.is_root_worker(parallel) + if MCUtility.is_root(parallel) (print >= -1) && next!(progress) end end diff --git a/src/mcmc/montecarlo.jl b/src/mcmc/montecarlo.jl index 55fabde..2601d94 100644 --- a/src/mcmc/montecarlo.jl +++ b/src/mcmc/montecarlo.jl @@ -121,9 +121,9 @@ function montecarlo(config::Configuration{N,V,P,O,T}, integrand::Function, neval end end if (state.curr != config.norm) && state.probability ≈ 0.0 - error("Cannot find the variables that makes the $(state.curr) integrand nonzero!") + error("Cannot find the variables that makes the #$(state.curr) integrand nonzero!") elseif (state.curr != config.norm) && state.probability < TINY - @warn("Cannot find the variables that makes the $(state.curr) integrand >1e-10!") + @warn("Cannot find the variables that makes the #$(state.curr) integrand >1e-10!") end # updates = [changeIntegrand,] # TODO: sample changeVariable more often @@ -208,6 +208,7 @@ function initialize!(config::Configuration{N,V,P,O,T}, integrand, state) where { if curr != config.norm # config.weights[curr] = integrand_wrap(curr, config, integrand) state.weight = (length(config.var) == 1) ? integrand(curr, config.var[1], config) : integrand(curr, config.var, config) + # println(config.var[1][1], ", ", config.var[1][2], ", ", integrand(curr, config.var[1], config)) state.probability = abs(state.weight) * config.reweight[curr] else state.weight = zero(T) diff --git a/src/utility/parallel.jl b/src/utility/parallel.jl index 9b4b190..532bf5e 100644 --- a/src/utility/parallel.jl +++ b/src/utility/parallel.jl @@ -4,6 +4,7 @@ Number of processors used in MPI. Can be called without ensuring initialization. """ mpi_nprocs(comm=MPI.COMM_WORLD) = (MPI.Init(); MPI.Comm_size(comm)) mpi_master(comm=MPI.COMM_WORLD) = (MPI.Init(); MPI.Comm_rank(comm) == 0) +mpi_root(comm=MPI.COMM_WORLD) = 0 mpi_rank(comm=MPI.COMM_WORLD) = (MPI.Init(); MPI.Comm_rank(comm)) mpi_sum(arr, comm::MPI.Comm) = MPI.Allreduce(arr, +, comm) @@ -34,37 +35,51 @@ function MPIreduce(data) end end +function MPIbcast(data) + comm = MPI.COMM_WORLD + Nworker = MPI.Comm_size(comm) # number of MPI workers + rank = MPI.Comm_rank(comm) # rank of current MPI worker + root = 0 # rank of the root worker + + if Nworker == 1 #no parallelization + return data + end + if typeof(data) <: AbstractArray + return MPI.bcast(data, MPI.SUM, root, comm) # root node gets the sum of observables from all blocks + else + # MPI.Reduce works for array only + result = MPI.bcast([data, ], MPI.SUM, root, comm) # root node gets the sum of observables from all blocks + return result[1] + end +end + function choose_parallel(parallel::Symbol) if parallel == :auto - if mpi_nprocs() > 1 - return :mpi + if Threads.nthreads() > 1 + return :thread # use threads only if MPI is not available else - if Threads.nthreads() > 1 - return :thread # use threads only if MPI is not available - else - return :serial # use threads only if MPI is not available - end + return :nothread # use threads only if MPI is not available end else return parallel end end -function check_parallel(parallel::Symbol) - if parallel == :mpi - # @assert mpi_nprocs() > 1 "MPI is not available" - # julia may start with -t N, but we don't want to use threads - # should work for MPI worker >=1 - return - elseif parallel == :thread - @assert mpi_nprocs() == 1 "MPI and threads cannot be used together" - elseif parallel == :serial - @assert mpi_nprocs() == 1 "MPI should not be used for serial calculations" - # julia may start with -t N, but we don't want to use threads - else - error("Unknown parallelization mode: $(parallel)") - end -end +# function check_parallel(parallel::Symbol) +# if parallel == :mpi +# # @assert mpi_nprocs() > 1 "MPI is not available" +# # julia may start with -t N, but we don't want to use threads +# # should work for MPI worker >=1 +# return +# elseif parallel == :thread +# @assert mpi_nprocs() == 1 "MPI and threads cannot be used together" +# elseif parallel == :serial +# @assert mpi_nprocs() == 1 "MPI should not be used for serial calculations" +# # julia may start with -t N, but we don't want to use threads +# else +# error("Unknown parallelization mode: $(parallel)") +# end +# end # actual number of threads used function nthreads(parallel::Symbol) @@ -75,7 +90,8 @@ function nthreads(parallel::Symbol) end end -function is_root_worker(parallel::Symbol) # only one thread of each MPI worker is the root +# only one thread of each MPI worker is the root +function is_root(parallel::Symbol) return mpi_master() && (Threads.threadid() == 1) end @@ -93,26 +109,12 @@ end # rank of the current worker for both MPI and threads function rank(parallel::Symbol) - if parallel == :mpi - return mpi_rank() - elseif parallel == :thread - return Threads.threadid() - elseif parallel == :serial - return 1 - else - error("Unknown parallelization mode: $(parallel)") - end + #if thread is off, then nthreads must be one. Only mpi_rank contributes + # mpi_rank() always start with 0 + return Threads.threadid()*(nthreads(parallel)-1)+mpi_rank()+1 end # number of total workers for both MPI and threads -function nproc(parallel::Symbol) - if parallel == :mpi - return mpi_nprocs() - elseif parallel == :thread - return Threads.nthreads() - elseif parallel == :serial - return 1 - else - error("Unknown parallelization mode: $(parallel)") - end +function nworker(parallel::Symbol) + return mpi_nprocs()*nthreads(parallel) end \ No newline at end of file diff --git a/test/montecarlo.jl b/test/montecarlo.jl index b0729b2..50d256a 100644 --- a/test/montecarlo.jl +++ b/test/montecarlo.jl @@ -3,7 +3,7 @@ Demostrate do syntax for Monte Carlo simulation. """ function Sphere1(neval, alg) X = Continuous(0.0, 1.0) - f(x, c) = (X[1]^2 + X[2]^2 < 1.0) ? 1.0 : 0.0 + f(x, c) = (x[1]^2 + x[2]^2 < 1.0) ? 1.0 : 0.0 f(idx, x, c)::Float64 = f(x, c) return integrate(f; var=(X,), dof=[[2,],], neval=neval, print=-1, solver=alg) end From 9e7880f2f885077fe04ba2407868252c5a993f1d Mon Sep 17 00:00:00 2001 From: Kun Chen Date: Mon, 7 Nov 2022 23:13:12 -0500 Subject: [PATCH 06/24] add tests for mpi --- src/utility/parallel.jl | 26 +++++++++----- test/mpi.jl | 12 +++++++ test/mpi_test.jl | 78 +++++++++++++++++++++++++++++++++++++++++ test/thread.jl | 12 +++++++ 4 files changed, 120 insertions(+), 8 deletions(-) create mode 100644 test/mpi.jl create mode 100644 test/mpi_test.jl diff --git a/src/utility/parallel.jl b/src/utility/parallel.jl index 532bf5e..efab281 100644 --- a/src/utility/parallel.jl +++ b/src/utility/parallel.jl @@ -16,7 +16,7 @@ mpi_max!(arr, comm::MPI.Comm) = MPI.Allreduce!(arr, max, comm) mpi_mean(arr, comm::MPI.Comm) = mpi_sum(arr, comm) ./ mpi_nprocs(comm) mpi_mean!(arr, comm::MPI.Comm) = (mpi_sum!(arr, comm); arr ./= mpi_nprocs(comm)) -function MPIreduce(data) +function MPIreduce(data, op = MPI.SUM) comm = MPI.COMM_WORLD Nworker = MPI.Comm_size(comm) # number of MPI workers rank = MPI.Comm_rank(comm) # rank of current MPI worker @@ -26,12 +26,22 @@ function MPIreduce(data) return data end if typeof(data) <: AbstractArray - MPI.Reduce!(data, MPI.SUM, root, comm) # root node gets the sum of observables from all blocks - return data + return MPI.Reduce(data, MPI.SUM, root, comm) # root node gets the sum of observables from all blocks else - result = [data,] # MPI.Reduce works for array only - MPI.Reduce!(result, MPI.SUM, root, comm) # root node gets the sum of observables from all blocks - return result[1] + # result = [data,] # MPI.Reduce works for array only + return MPI.Reduce(data, MPI.SUM, root, comm) # root node gets the sum of observables from all blocks + # return output + end +end + +function MPIreduce!(data::AbstractArray, op = MPI.SUM) + comm = MPI.COMM_WORLD + Nworker = MPI.Comm_size(comm) # number of MPI workers + rank = MPI.Comm_rank(comm) # rank of current MPI worker + root = 0 # rank of the root worker + + if Nworker > 1 + MPI.Reduce!(data, MPI.SUM, root, comm) # root node gets the sum of observables from all blocks end end @@ -45,10 +55,10 @@ function MPIbcast(data) return data end if typeof(data) <: AbstractArray - return MPI.bcast(data, MPI.SUM, root, comm) # root node gets the sum of observables from all blocks + return MPI.bcast(data, root, comm) # root node gets the sum of observables from all blocks else # MPI.Reduce works for array only - result = MPI.bcast([data, ], MPI.SUM, root, comm) # root node gets the sum of observables from all blocks + result = MPI.bcast([data, ], root, comm) # root node gets the sum of observables from all blocks return result[1] end end diff --git a/test/mpi.jl b/test/mpi.jl new file mode 100644 index 0000000..b5510cb --- /dev/null +++ b/test/mpi.jl @@ -0,0 +1,12 @@ +using MPI +using Test + +@testset "MPI" begin + n = 2 # number of processes + mpiexec() do exe # MPI wrapper + run(`$exe -n $n $(Base.julia_cmd()) mpi_test.jl`) + # alternatively: + # p = run(ignorestatus(`...`)) + # @test success(p) + end +end \ No newline at end of file diff --git a/test/mpi_test.jl b/test/mpi_test.jl new file mode 100644 index 0000000..81c1e3f --- /dev/null +++ b/test/mpi_test.jl @@ -0,0 +1,78 @@ +using Test +using MPI +using MCIntegration +const MCUtility = MCIntegration.MCUtility + +@testset "MPI reduce" begin + (MPI.Initialized() == false) && MPI.Init() + comm = MPI.COMM_WORLD + Nworker = MPI.Comm_size(comm) # number of MPI workers + rank = MPI.Comm_rank(comm) # rank of current MPI worker + root = 0 # rank of the root worker + + a = [1, 2, 3] + a = MCUtility.MPIreduce(a) + if rank == root + @test a == [Nworker, 2Nworker, 3Nworker] + end + + b = 1 + b = MCUtility.MPIreduce(b) + if rank == root + @test b == Nworker + end +end + +@testset "MPI bcast" begin + (MPI.Initialized() == false) && MPI.Init() + comm = MPI.COMM_WORLD + Nworker = MPI.Comm_size(comm) # number of MPI workers + rank = MPI.Comm_rank(comm) # rank of current MPI worker + root = 0 # rank of the root worker + + a = [1, 2, 3] .* rank + a = MCUtility.MPIbcast(a) + if rank != root + @test a == [0, 0, 0] + end + + b = 1 .* rank + b = MCUtility.MPIbcast(b) + if rank != root + @test b == 0 + end +end + +@testset "MPI more" begin + (MPI.Initialized() == false) && MPI.Init() + comm = MPI.COMM_WORLD + Nworker = MPI.Comm_size(comm) # number of MPI workers + rank = MPI.Comm_rank(comm) # rank of current MPI worker + root = 0 # rank of the root worker + + X = Continuous(0.0, 1.0) + Y = Continuous(0.0, 1.0) + Z = Continuous(0.0, 1.0) + X.histogram[1] = 1.0 + Y.histogram[1] = 1.0 + Z.histogram[1] = 1.0 + cvar = CompositeVar(Y, Z) + config = Configuration(var = (X, cvar), dof=[[1, 1], ]) + config.normalization = 1.0 + config.visited[1] = 1.0 + config.propose[1, 1, 1] = 1.0 + config.accept[1, 1, 1] = 1.0 + + MCIntegration.MPIreduceConfig!(config) + if rank == root + @test config.normalization == Nworker + @test config.visited[1] == Nworker + @test config.propose[1, 1, 1] == Nworker + @test config.accept[1, 1, 1] == Nworker + @test config.var[1].histogram[1] == Nworker + cvar = config.var[2] + @test cvar[1].histogram[1] == Nworker + @test cvar[2].histogram[1] == Nworker + end + +end \ No newline at end of file diff --git a/test/thread.jl b/test/thread.jl index 81202e4..82af0f3 100644 --- a/test/thread.jl +++ b/test/thread.jl @@ -14,4 +14,16 @@ result = integrate((idx, X, c) -> (X[1])^i; var=(Continuous(0.0, 1.0),), dof=[[1,],], print=-1, solver=:mcmc) check(result, 1 / (1 + i)) end + +end + +@testset "inner thread test" begin + if Threads.nthreads() == 1 + @warn "There is only one thread currently. You may want run julia with -t auto to enable multithreading." + end + X = Continuous(0.0, 1.0) + i = 2 + result = integrate((x, c) -> (x[1])^i; var=(Continuous(0.0, 1.0),), dof=[[1,],], print=-1, solver=:vegas, parallel = :thread) + println(i, " -> ", result.mean, "+-", result.stdev) + check(result, 1 / (1 + i)) end \ No newline at end of file From ebbd632603d7a13742e90ccbb5b7193faccd7eb0 Mon Sep 17 00:00:00 2001 From: Kun Chen Date: Mon, 7 Nov 2022 23:48:32 -0500 Subject: [PATCH 07/24] add tests for mpi --- src/configuration.jl | 26 ++++++------- src/utility/parallel.jl | 19 +++++++-- test/mpi_test.jl | 86 +++++++++++++++++++++++++++++++++++------ 3 files changed, 103 insertions(+), 28 deletions(-) diff --git a/src/configuration.jl b/src/configuration.jl index 55ddde2..988855f 100644 --- a/src/configuration.jl +++ b/src/configuration.jl @@ -238,30 +238,28 @@ function MPIreduceConfig!(c::Configuration, root=0, comm=MPI.COMM_WORLD) histogram_reduce!(v) end else - histogram = MPI.Reduce(var.histogram, MPI.SUM, root, comm) - if MPI.Comm_rank(comm) == root - var.histogram = histogram - end + MCUtility.MPIreduce!(var.histogram) end end ########## variable that could be a number ############## - neval = MPI.Reduce(c.neval, MPI.SUM, root, comm) - normalization = MPI.Reduce(c.normalization, MPI.SUM, root, comm) - observable = [MPI.Reduce(c.observable[o], MPI.SUM, root, comm) for o in eachindex(c.observable)] - if MPI.Comm_rank(comm) == root - c.neval = neval - c.normalization = normalization - c.observable = observable + c.neval = MCUtility.MPIreduce(c.neval) + c.normalization = MCUtility.MPIreduce(c.normalization) + for o in eachindex(c.observable) + if c.observable[o] isa AbstractArray + MCUtility.MPIreduce!(c.observable[o]) # avoid memory allocation + else + c.observable[o] = MCUtility.MPIreduce(c.observable[o]) + end end for v in c.var histogram_reduce!(v) end ########## variable that are vectors ############## - MPI.Reduce!(c.visited, MPI.SUM, root, comm) - MPI.Reduce!(c.propose, MPI.SUM, root, comm) - MPI.Reduce!(c.accept, MPI.SUM, root, comm) + MCUtility.MPIreduce!(c.visited) + MCUtility.MPIreduce!(c.propose) + MCUtility.MPIreduce!(c.accept) end function report(config::Configuration, total_neval=nothing) diff --git a/src/utility/parallel.jl b/src/utility/parallel.jl index efab281..b74af42 100644 --- a/src/utility/parallel.jl +++ b/src/utility/parallel.jl @@ -26,10 +26,12 @@ function MPIreduce(data, op = MPI.SUM) return data end if typeof(data) <: AbstractArray - return MPI.Reduce(data, MPI.SUM, root, comm) # root node gets the sum of observables from all blocks + d = MPI.Reduce(data, MPI.SUM, root, comm) # root node gets the sum of observables from all blocks + return rank == root ? d : data else - # result = [data,] # MPI.Reduce works for array only - return MPI.Reduce(data, MPI.SUM, root, comm) # root node gets the sum of observables from all blocks + # MPI.Reduce works for array only in old verison of MPI.jl + d = MPI.Reduce([data, ], MPI.SUM, root, comm) # root node gets the sum of observables from all blocks + return rank == root ? d[1] : data # return output end end @@ -63,6 +65,17 @@ function MPIbcast(data) end end +function MPIbcast!(data::AbstractArray) + comm = MPI.COMM_WORLD + Nworker = MPI.Comm_size(comm) # number of MPI workers + rank = MPI.Comm_rank(comm) # rank of current MPI worker + root = 0 # rank of the root worker + + if Nworker > 1 #no parallelization + return MPI.Bcast!(data, root, comm) # root node gets the sum of observables from all blocks + end +end + function choose_parallel(parallel::Symbol) if parallel == :auto if Threads.nthreads() > 1 diff --git a/test/mpi_test.jl b/test/mpi_test.jl index 81c1e3f..9eb4501 100644 --- a/test/mpi_test.jl +++ b/test/mpi_test.jl @@ -11,15 +11,28 @@ const MCUtility = MCIntegration.MCUtility root = 0 # rank of the root worker a = [1, 2, 3] - a = MCUtility.MPIreduce(a) + aa = MCUtility.MPIreduce(a) if rank == root - @test a == [Nworker, 2Nworker, 3Nworker] + @test aa == [Nworker, 2Nworker, 3Nworker] + @test a == [1, 2, 3] + else + @test aa == a #non-root returns nothing end b = 1 - b = MCUtility.MPIreduce(b) + bb = MCUtility.MPIreduce(b) + if rank == root + @test b == 1 + @test bb == Nworker + else + @test bb == b #non-root returns nothing + end + + # inplace + a = [1, 2, 3] + MCUtility.MPIreduce!(a) if rank == root - @test b == Nworker + @test a == [Nworker, 2Nworker, 3Nworker] end end @@ -31,19 +44,33 @@ end root = 0 # rank of the root worker a = [1, 2, 3] .* rank - a = MCUtility.MPIbcast(a) + aa = MCUtility.MPIbcast(a) if rank != root - @test a == [0, 0, 0] + @test aa == [0, 0, 0] + @test a == [1, 2, 3] .* rank + else + @test a == aa end - b = 1 .* rank - b = MCUtility.MPIbcast(b) + b = rank + bb = MCUtility.MPIbcast(b) if rank != root - @test b == 0 + @test bb == 0 + @test b == rank + else + @test b == bb end + + # inplace + a = [1, 2, 3] .* rank + MCUtility.MPIbcast!(a) + if rank != root + @test a == [0, 0, 0] + end + end -@testset "MPI more" begin +@testset "MPI reduce config" begin (MPI.Initialized() == false) && MPI.Init() comm = MPI.COMM_WORLD Nworker = MPI.Comm_size(comm) # number of MPI workers @@ -74,5 +101,42 @@ end @test cvar[1].histogram[1] == Nworker @test cvar[2].histogram[1] == Nworker end +end + +# @testset "MPI bcast histogram" begin +# (MPI.Initialized() == false) && MPI.Init() +# comm = MPI.COMM_WORLD +# Nworker = MPI.Comm_size(comm) # number of MPI workers +# rank = MPI.Comm_rank(comm) # rank of current MPI worker +# root = 0 # rank of the root worker + +# X = Continuous(0.0, 1.0) +# Y = Continuous(0.0, 1.0) +# Z = Continuous(0.0, 1.0) +# cvar = CompositeVar(Y, Z) + +# if rank == root +# X.histogram[1] = rank +# Y.histogram[1] = rank +# Z.histogram[1] = rank +# else +# X.histogram[1] = rank +# Y.histogram[1] = rank +# Z.histogram[1] = rank +# end + +# config = Configuration(var = (X, cvar), dof=[[1, 1], ]) + +# MCIntegration._bcast_histogram(config) -end \ No newline at end of file +# if rank == root +# @test config.normalization == Nworker +# @test config.visited[1] == Nworker +# @test config.propose[1, 1, 1] == Nworker +# @test config.accept[1, 1, 1] == Nworker +# @test config.var[1].histogram[1] == Nworker +# cvar = config.var[2] +# @test cvar[1].histogram[1] == Nworker +# @test cvar[2].histogram[1] == Nworker +# end +# end \ No newline at end of file From 686a63f1e77723076f08b382976ae2b6b940631b Mon Sep 17 00:00:00 2001 From: Kun Chen Date: Mon, 7 Nov 2022 23:50:19 -0500 Subject: [PATCH 08/24] add tests for mpi --- test/mpi_test.jl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/mpi_test.jl b/test/mpi_test.jl index 9eb4501..488a2ab 100644 --- a/test/mpi_test.jl +++ b/test/mpi_test.jl @@ -84,7 +84,8 @@ end Y.histogram[1] = 1.0 Z.histogram[1] = 1.0 cvar = CompositeVar(Y, Z) - config = Configuration(var = (X, cvar), dof=[[1, 1], ]) + obs = [1.0,] + config = Configuration(var = (X, cvar), dof=[[1, 1], ], obs=obs) config.normalization = 1.0 config.visited[1] = 1.0 config.propose[1, 1, 1] = 1.0 @@ -92,6 +93,7 @@ end MCIntegration.MPIreduceConfig!(config) if rank == root + @test config.observable[1] == Nworker @test config.normalization == Nworker @test config.visited[1] == Nworker @test config.propose[1, 1, 1] == Nworker From 0fb7fd106e3dc8762e3433587520fd6a103135ea Mon Sep 17 00:00:00 2001 From: Kun Chen Date: Tue, 8 Nov 2022 11:18:58 -0500 Subject: [PATCH 09/24] add MPIbcastConfig --- src/configuration.jl | 31 +++++++++++- src/utility/parallel.jl | 22 +++++++++ test/mpi_test.jl | 106 ++++++++++++++++++++-------------------- 3 files changed, 105 insertions(+), 54 deletions(-) diff --git a/src/configuration.jl b/src/configuration.jl index 988855f..6a8d133 100644 --- a/src/configuration.jl +++ b/src/configuration.jl @@ -231,6 +231,11 @@ function addConfig!(c::Configuration, ic::Configuration) end function MPIreduceConfig!(c::Configuration, root=0, comm=MPI.COMM_WORLD) + # Need to reduce from workers: + # neval + # var.histogram + # visited, propose, accept + # normalization, observable function histogram_reduce!(var::Variable) if var isa Dist.CompositeVar @@ -243,8 +248,8 @@ function MPIreduceConfig!(c::Configuration, root=0, comm=MPI.COMM_WORLD) end ########## variable that could be a number ############## - c.neval = MCUtility.MPIreduce(c.neval) - c.normalization = MCUtility.MPIreduce(c.normalization) + c.neval = MCUtility.MPIreduce(c.neval) # reduce the amount of the commuication + c.normalization = MCUtility.MPIreduce(c.normalization) # reduce the amount of the commuication for o in eachindex(c.observable) if c.observable[o] isa AbstractArray MCUtility.MPIreduce!(c.observable[o]) # avoid memory allocation @@ -262,6 +267,28 @@ function MPIreduceConfig!(c::Configuration, root=0, comm=MPI.COMM_WORLD) MCUtility.MPIreduce!(c.accept) end +function MPIbcastConfig!(c::Configuration, root=0, comm=MPI.COMM_WORLD) + # need to broadcast from root to workers: + # reweight + # var.histogram + function histogram_bcast!(var::Variable) + if var isa Dist.CompositeVar + for v in var.vars + histogram_bcast!(v) + end + else + MCUtility.MPIbcast!(var.histogram) + end + end + + ########## variable that could be a number ############## + MCUtility.MPIbcast(c.reweight) + + for v in c.var + histogram_bcast!(v) + end +end + function report(config::Configuration, total_neval=nothing) neval, visited, reweight, propose, accept = config.neval, config.visited, config.reweight, config.propose, config.accept var, neighbor = config.var, config.neighbor diff --git a/src/utility/parallel.jl b/src/utility/parallel.jl index b74af42..676ce3b 100644 --- a/src/utility/parallel.jl +++ b/src/utility/parallel.jl @@ -16,6 +16,12 @@ mpi_max!(arr, comm::MPI.Comm) = MPI.Allreduce!(arr, max, comm) mpi_mean(arr, comm::MPI.Comm) = mpi_sum(arr, comm) ./ mpi_nprocs(comm) mpi_mean!(arr, comm::MPI.Comm) = (mpi_sum!(arr, comm); arr ./= mpi_nprocs(comm)) +""" + function MPIreduce(data, op = MPI.SUM) + +Reduce data from MPI workers to root with the operation `op`. `data` can be an array or a scalar. +The root node returns the reduced data with the operation `op`, and other nodes return their own data. +""" function MPIreduce(data, op = MPI.SUM) comm = MPI.COMM_WORLD Nworker = MPI.Comm_size(comm) # number of MPI workers @@ -36,6 +42,11 @@ function MPIreduce(data, op = MPI.SUM) end end +""" + function MPIreduce!(data::AbstractArray, op = MPI.SUM) + +Reduce data from MPI workers to root with the operation `op`. `data` should be an array. +""" function MPIreduce!(data::AbstractArray, op = MPI.SUM) comm = MPI.COMM_WORLD Nworker = MPI.Comm_size(comm) # number of MPI workers @@ -47,6 +58,12 @@ function MPIreduce!(data::AbstractArray, op = MPI.SUM) end end +""" + function MPIbcast(data) + +Broadcast data from MPI root to other nodes. `data` can be an array or a scalar. +The root node its own data, and other nodes return the broadcasted data from the root. +""" function MPIbcast(data) comm = MPI.COMM_WORLD Nworker = MPI.Comm_size(comm) # number of MPI workers @@ -65,6 +82,11 @@ function MPIbcast(data) end end +""" + function MPIbcast!(data::AbstractArray) + +Broadcast data from MPI root to other nodes. `data` is an array. +""" function MPIbcast!(data::AbstractArray) comm = MPI.COMM_WORLD Nworker = MPI.Comm_size(comm) # number of MPI workers diff --git a/test/mpi_test.jl b/test/mpi_test.jl index 488a2ab..da2bbbf 100644 --- a/test/mpi_test.jl +++ b/test/mpi_test.jl @@ -80,65 +80,67 @@ end X = Continuous(0.0, 1.0) Y = Continuous(0.0, 1.0) Z = Continuous(0.0, 1.0) - X.histogram[1] = 1.0 - Y.histogram[1] = 1.0 - Z.histogram[1] = 1.0 + X.histogram[1] = 1.1 + Y.histogram[1] = 1.2 + Z.histogram[1] = 1.3 cvar = CompositeVar(Y, Z) obs = [1.0,] config = Configuration(var = (X, cvar), dof=[[1, 1], ], obs=obs) - config.normalization = 1.0 - config.visited[1] = 1.0 - config.propose[1, 1, 1] = 1.0 - config.accept[1, 1, 1] = 1.0 + config.neval = 101 + config.normalization = 1.1 + config.visited[1] = 1.2 + config.propose[1, 1, 1] = 1.3 + config.accept[1, 1, 1] = 1.4 MCIntegration.MPIreduceConfig!(config) if rank == root @test config.observable[1] == Nworker - @test config.normalization == Nworker - @test config.visited[1] == Nworker - @test config.propose[1, 1, 1] == Nworker - @test config.accept[1, 1, 1] == Nworker - @test config.var[1].histogram[1] == Nworker - cvar = config.var[2] - @test cvar[1].histogram[1] == Nworker - @test cvar[2].histogram[1] == Nworker + @test config.neval == Nworker*101 + @test config.normalization ≈ Nworker*1.1 + @test config.visited[1] ≈ Nworker*1.2 + @test config.propose[1, 1, 1] ≈ Nworker*1.3 + @test config.accept[1, 1, 1] ≈ Nworker*1.4 + + @test config.var[1].histogram[1] ≈ Nworker*1.1 # X + cvar = config.var[2] #compositevar + @test cvar[1].histogram[1] ≈ Nworker *1.2 #Y + @test cvar[2].histogram[1] ≈ Nworker*1.3 #Z end end -# @testset "MPI bcast histogram" begin -# (MPI.Initialized() == false) && MPI.Init() -# comm = MPI.COMM_WORLD -# Nworker = MPI.Comm_size(comm) # number of MPI workers -# rank = MPI.Comm_rank(comm) # rank of current MPI worker -# root = 0 # rank of the root worker - -# X = Continuous(0.0, 1.0) -# Y = Continuous(0.0, 1.0) -# Z = Continuous(0.0, 1.0) -# cvar = CompositeVar(Y, Z) - -# if rank == root -# X.histogram[1] = rank -# Y.histogram[1] = rank -# Z.histogram[1] = rank -# else -# X.histogram[1] = rank -# Y.histogram[1] = rank -# Z.histogram[1] = rank -# end - -# config = Configuration(var = (X, cvar), dof=[[1, 1], ]) - -# MCIntegration._bcast_histogram(config) - -# if rank == root -# @test config.normalization == Nworker -# @test config.visited[1] == Nworker -# @test config.propose[1, 1, 1] == Nworker -# @test config.accept[1, 1, 1] == Nworker -# @test config.var[1].histogram[1] == Nworker -# cvar = config.var[2] -# @test cvar[1].histogram[1] == Nworker -# @test cvar[2].histogram[1] == Nworker -# end -# end \ No newline at end of file +@testset "MPI bcast histogram" begin + (MPI.Initialized() == false) && MPI.Init() + comm = MPI.COMM_WORLD + Nworker = MPI.Comm_size(comm) # number of MPI workers + rank = MPI.Comm_rank(comm) # rank of current MPI worker + root = 0 # rank of the root worker + + X = Continuous(0.0, 1.0) + Y = Continuous(0.0, 1.0) + Z = Continuous(0.0, 1.0) + cvar = CompositeVar(Y, Z) + + if rank == root + X.histogram[1] = 1.1 + Y.histogram[1] = 1.2 + Z.histogram[1] = 1.3 + else + X.histogram[1] = rank + Y.histogram[1] = rank + Z.histogram[1] = rank + end + + config = Configuration(var = (X, cvar), dof=[[1, 1], ]) + config.reweight = [1.1, 1.2] + + MCIntegration.MPIbcastConfig!(config) + + if rank != root + @test config.reweight ≈ [1.1, 1.2] + + @test config.var[1].histogram[1] ≈ 1.1 # X + cvar = config.var[2] #compositevar + @test cvar[1].histogram[1] ≈ 1.2 #Y + @test cvar[2].histogram[1] ≈ 1.3 #Z + end +end \ No newline at end of file From 64760acd97e75dc32c03e510c40504311a3c99c0 Mon Sep 17 00:00:00 2001 From: Kun Chen Date: Tue, 8 Nov 2022 12:25:42 -0500 Subject: [PATCH 10/24] main loop still has a bug --- src/configuration.jl | 12 ++++++++ src/distribution/variable.jl | 53 +++++++++++++++++++++++++++++++++++ src/main.jl | 54 +++++++++++++++++++++--------------- 3 files changed, 97 insertions(+), 22 deletions(-) diff --git a/src/configuration.jl b/src/configuration.jl index 6a8d133..0927516 100644 --- a/src/configuration.jl +++ b/src/configuration.jl @@ -289,6 +289,18 @@ function MPIbcastConfig!(c::Configuration, root=0, comm=MPI.COMM_WORLD) end end +function bcastConfig!(dest::Configuration, src::Configuration) + # need to broadcast from root to workers: + # reweight + # var.histogram + ########## variable that could be a number ############## + dest.reweight .= src.reweight + + for i in 1:length(dest.var) + copy!(dest.var[i], src.var[i]) + end +end + function report(config::Configuration, total_neval=nothing) neval, visited, reweight, propose, accept = config.neval, config.visited, config.reweight, config.propose, config.accept var, neighbor = config.var, config.neighbor diff --git a/src/distribution/variable.jl b/src/distribution/variable.jl index 76d53c5..46b3dc0 100644 --- a/src/distribution/variable.jl +++ b/src/distribution/variable.jl @@ -33,6 +33,17 @@ function Base.setindex!(Var::FermiK{D}, v, i::Int) where {D} end Base.lastindex(Var::FermiK{D}) where {D} = size(Var.data)[2] # return index, not the value +function Base.copy!(dest::FermiK{D}, src::FermiK{D}) where {D} + dest.data .= src.data + dest.kF = src.kF + dest.δk = src.δk + dest.maxK = src.maxK + dest.offset = src.offset + dest.prob .= src.prob + dest.histogram .= src.histogram + return dest +end + mutable struct RadialFermiK <: Variable data::Vector{Float64} kF::Float64 @@ -147,6 +158,21 @@ function accumulate!(T::Continuous, idx::Int, weight=1.0) end end +function Base.copy!(dest::Continuous, src::Continuous) + dest.data .= src.data + dest.gidx .= src.gidx + dest.prob .= src.prob + dest.lower = src.lower + dest.range = src.range + dest.offset = src.offset + dest.grid = src.grid + dest.inc .= src.inc + dest.histogram .= src.histogram + dest.alpha = src.alpha + dest.adapt = src.adapt + return dest +end + """ Vegas adaptive map @@ -287,6 +313,21 @@ function train!(T::Discrete) clearStatistics!(T) end +function Base.copy!(T::Discrete, T2::Discrete) + T.data .= T2.data + T.lower = T2.lower + T.upper = T2.upper + T.prob .= T2.prob + T.size = T2.size + T.offset = T2.offset + T.histogram .= T2.histogram + T.accumulation .= T2.accumulation + T.distribution .= T2.distribution + T.alpha = T2.alpha + T.adapt = T2.adapt + return T +end + mutable struct CompositeVar{V} <: Variable vars::V prob::Vector{Float64} @@ -338,6 +379,18 @@ Base.lastindex(Var::CompositeVar) = length(Var.vars) # return index, not the val Base.iterate(cvar::CompositeVar) = Base.iterate(cvar.vars) Base.iterate(cvar::CompositeVar, state) = Base.iterate(cvar.vars, state) +function Base.copy!(T::CompositeVar, T2::CompositeVar) + for i in 1:length(T.vars) + copy!(T.vars[i], T2.vars[i]) + end + T.prob .= T2.prob + T.offset = T2.offset + T.adapt = T2.adapt + T.size = T2.size + T._prob_cache = T2._prob_cache + return T +end + function accumulate!(vars::CompositeVar, idx, weight) for v in vars.vars accumulate!(v, idx, weight) diff --git a/src/main.jl b/src/main.jl index d8f8aaa..97b34b3 100644 --- a/src/main.jl +++ b/src/main.jl @@ -171,12 +171,22 @@ function integrate(integrand::Function; # 1. config of the root worker # 2. config of the other workers # config.reweight = MPI.bcast(summedConfig[1].reweight, root, comm) # broadcast reweight factors to all workers - config.reweight = MCUtility.MPIbcast(summedConfig[1].reweight) - for (vi, var) in enumerate(config.var) - _bcast_histogram!(var, summedConfig[1].var[vi], config, adapt) + # config.reweight = MCUtility.MPIbcast(summedConfig[1].reweight) + # for (vi, var) in enumerate(config.var) + # _bcast_histogram!(var, summedConfig[1].var[vi], config, adapt) + # end + MPIbcastConfig!(summedConfig[1]) + + for config in configs + bcastConfig!(config, summedConfig[1]) + if adapt + for v in config.var + Dist.train!(v) + Dist.initialize!(v, config) + # println(v.grid[2]) + end + end end - #TODO: replace this with a more efficient way - configs = [deepcopy(config) for i in 1:Nthread] # configurations for each worker ################################################################################ end @@ -202,8 +212,8 @@ function _standardize_block(nblock, Nworker) end function _block!(iblock, configs, obsSum, obsSquaredSum, summedConfig, solver, progress, - integrand, nevalperblock, print, save, timer, debug, - measure, measurefreq, inplace, parallel) + integrand::Function, nevalperblock, print, save, timer, debug::Bool, + measure::Union{Nothing, Function}, measurefreq, inplace, parallel) # rank core will run the block with the indexes: rank, rank+Nworker, rank+2Nworker, ... rank = MCUtility.rank(parallel) @@ -254,21 +264,21 @@ function _block!(iblock, configs, obsSum, obsSquaredSum, summedConfig, solver, p end end -function _bcast_histogram!(target::V, source::V, config, adapt) where {V} - comm = MPI.COMM_WORLD - root = 0 # rank of the root worker - if target isa Dist.CompositeVar - for (vi, v) in enumerate(target.vars) - _bcast_histogram!(v, source.vars[vi], config, adapt) - end - else - target.histogram = MPI.bcast(source.histogram, root, comm) - if adapt - Dist.train!(target) - Dist.initialize!(target, config) - end - end -end +# function _bcast_histogram!(target::V, source::V, config, adapt) where {V} +# comm = MPI.COMM_WORLD +# root = 0 # rank of the root worker +# if target isa Dist.CompositeVar +# for (vi, v) in enumerate(target.vars) +# _bcast_histogram!(v, source.vars[vi], config, adapt) +# end +# else +# target.histogram = MPI.bcast(source.histogram, root, comm) +# if adapt +# Dist.train!(target) +# Dist.initialize!(target, config) +# end +# end +# end function _mean_std(obsSum, obsSquaredSum, block) function _sqrt(x) From 124a7822a8ea50372e204234560c5bfc1aad3aec Mon Sep 17 00:00:00 2001 From: Kun Chen Date: Tue, 8 Nov 2022 12:43:23 -0500 Subject: [PATCH 11/24] main loop still has a bug --- src/main.jl | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/main.jl b/src/main.jl index 97b34b3..489516b 100644 --- a/src/main.jl +++ b/src/main.jl @@ -156,11 +156,6 @@ function integrate(integrand::Function; if MCUtility.mpi_master() # only the master process will output results, no matter parallel = :mpi or :thread or :serial - ##################### Extract Statistics ################################ - # println("mean: ", mean, ", std: ", std) - mean, std = _mean_std(obsSum[1], obsSquaredSum[1], block) - push!(results, (mean, std, summedConfig[1])) - ################### self-learning ########################################## (solver == :mcmc || solver == :vegasmc) && doReweight!(summedConfig[1], gamma, reweight_goal) end @@ -183,10 +178,16 @@ function integrate(integrand::Function; for v in config.var Dist.train!(v) Dist.initialize!(v, config) - # println(v.grid[2]) + println(v.grid[2]) end end end + + if MCUtility.mpi_master() # only the master process will output results, no matter parallel = :mpi or :thread or :serial + ##################### Extract Statistics ################################ + mean, std = _mean_std(obsSum[1], obsSquaredSum[1], block) + push!(results, (mean, std, configs[1])) # configs has tried grid + end ################################################################################ end From 34ab47e6063205a99b25e27c49ad2dc73cb2c95f Mon Sep 17 00:00:00 2001 From: Kun Chen Date: Tue, 8 Nov 2022 12:55:38 -0500 Subject: [PATCH 12/24] MPI works, but thread still has a bug --- src/configuration.jl | 12 +++++++++++- src/main.jl | 7 +++---- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/src/configuration.jl b/src/configuration.jl index 0927516..b6c3afc 100644 --- a/src/configuration.jl +++ b/src/configuration.jl @@ -296,8 +296,18 @@ function bcastConfig!(dest::Configuration, src::Configuration) ########## variable that could be a number ############## dest.reweight .= src.reweight + function histogram_bcast!(dest::Variable, src::Variable) + if dest isa Dist.CompositeVar + for i in 1:length(dest.vars) + histogram_bcast!(dest.vars[i], src.vars[i]) + end + else + dest.histogram .= src.histogram + end + end + for i in 1:length(dest.var) - copy!(dest.var[i], src.var[i]) + histogram_bcast!(dest.var[i], src.var[i]) end end diff --git a/src/main.jl b/src/main.jl index 489516b..e7e5d3b 100644 --- a/src/main.jl +++ b/src/main.jl @@ -121,8 +121,8 @@ function integrate(integrand::Function; for iter in 1:niter for i in 1:Nthread - fill!(obsSum[i], zero(eltype(obsSum[1]))) - fill!(obsSquaredSum[i], zero(eltype(obsSquaredSum[1]))) + fill!(obsSum[i], zero(obsSum[1][1])) + fill!(obsSquaredSum[i], zero(obsSquaredSum[1][1])) clearStatistics!(summedConfig[i]) end @@ -145,7 +145,7 @@ function integrate(integrand::Function; for i in 2:Nthread obsSum[1] += obsSum[i] obsSquaredSum[1] += obsSquaredSum[i] - addConfig(summedConfig[1], summedConfig[i]) + addConfig!(summedConfig[1], summedConfig[i]) end #################### collect statistics #################################### @@ -178,7 +178,6 @@ function integrate(integrand::Function; for v in config.var Dist.train!(v) Dist.initialize!(v, config) - println(v.grid[2]) end end end From 1c9ea324faec614a24ed5574282d72e6f5037db5 Mon Sep 17 00:00:00 2001 From: Kun Chen Date: Tue, 8 Nov 2022 15:14:14 -0500 Subject: [PATCH 13/24] now thread works --- src/configuration.jl | 5 +++ src/main.jl | 72 ++++++++++++++++++++++------------------- src/statistics.jl | 36 ++++++++++++++------- src/utility/parallel.jl | 69 +++++++++++---------------------------- test/thread.jl | 6 ++-- 5 files changed, 90 insertions(+), 98 deletions(-) diff --git a/src/configuration.jl b/src/configuration.jl index b6c3afc..7dc6c5d 100644 --- a/src/configuration.jl +++ b/src/configuration.jl @@ -167,6 +167,11 @@ function Configuration(; ) end +function reset_seed!(cfg::Configuration, seed::Int) + cfg.seed = seed + cfg.rng = MersenneTwister(seed) +end + function _neighbor(neighbor, Nd) if isnothing(neighbor) # By default, only the order-1 and order+1 diagrams are considered to be the neighbors diff --git a/src/main.jl b/src/main.jl index e7e5d3b..a15afaf 100644 --- a/src/main.jl +++ b/src/main.jl @@ -93,27 +93,28 @@ function integrate(integrand::Function; (MPI.Initialized() == false) && MPI.Init() comm = MPI.COMM_WORLD - ############# figure out the best parallelization mode ############ - parallel = MCUtility.choose_parallel(parallel) - - Nworker = MCUtility.nworker(parallel) # actual number of workers - Nthread = MCUtility.nthreads(parallel) # numebr of threads, threads share the same memory + # numebr of threads within MCIntegration, threads share the same memory + # >1 only in the :thread parallel mode + Nthread = MCUtility.nthreads(parallel) ############# figure out evaluations in each block ################ - block = _standardize_block(block, Nworker) - @assert block % Nworker == 0 - nevalperblock = neval ÷ block # number of evaluations per block + nevalperblock, block = _standardize_block(neval, block) + @assert block % MCUtility.mpi_nprocs() == 0 ########## initialize the progress bar ############################ #In the MPI/thread mode, progress will only need to track the progress of the root worker. - Ntotal = niter * block ÷ Nworker + Ntotal = niter * block ÷ MCUtility.nworker(parallel) progress = Progress(Ntotal; dt=(print >= 0 ? (0.5 + print) : 0.5), enabled=(print >= -1), showspeed=true, desc="Total iterations * blocks $(Ntotal): ", output=printio) # initialize temp variables - configs = [deepcopy(config) for i in 1:Nthread] # configurations for each worker + configs = [deepcopy(config) for _ in 1:Nthread] # configurations for each thread + summedConfig = [deepcopy(config) for _ in 1:Nthread] # summed configuration for each thread from different blocks obsSum = [[zero(o) for o in config.observable] for _ in 1:Nthread] # sum of observables for each worker obsSquaredSum = [[zero(o) for o in config.observable] for _ in 1:Nthread] # sum of squared observables for each worker - summedConfig = [deepcopy(config) for i in 1:Nthread] # summed configuration for each thread + + for _config in configs + reset_seed!(_config, rand(config.rng, 1:1000000)) + end startTime = time() results=[] @@ -126,17 +127,19 @@ function integrate(integrand::Function; clearStatistics!(summedConfig[i]) end - if parallel == :thread - Threads.@threads for i = 1:block - _block!(i ,configs, obsSum, obsSquaredSum, summedConfig, solver, progress, - integrand, nevalperblock, print, save, timer, debug, - measure, measurefreq, inplace, parallel) - end - else - for i = 1:block - _block!(i, configs, obsSum, obsSquaredSum, summedConfig, solver, progress, - integrand, nevalperblock, print, save, timer, debug, - measure, measurefreq, inplace, parallel) + for _ in MCUtility.mpi_nprocs() + if parallel == :thread + Threads.@threads for _ in 1:block/MCUtility.mpi_nprocs() + _block!(configs, obsSum, obsSquaredSum, summedConfig, solver, progress, + integrand, nevalperblock, print, save, timer, debug, + measure, measurefreq, inplace, parallel) + end + else + for _ in 1:block/MCUtility.mpi_nprocs() + _block!(configs, obsSum, obsSquaredSum, summedConfig, solver, progress, + integrand, nevalperblock, print, save, timer, debug, + measure, measurefreq, inplace, parallel) + end end end @@ -201,28 +204,31 @@ function integrate(integrand::Function; end end -function _standardize_block(nblock, Nworker) +function _standardize_block(neval, nblock) ######### construct configurations for each block ################ + @assert neval > nblock "neval=$neval should be larger than nblock = $nblock" + # standardize nblock according to MPI workers + Nworker = MCUtility.mpi_nprocs() # number of MPI workers if nblock > Nworker - nblock = (nblock ÷ Nworker) * Nworker # make Nblock % size ==0, error estimation assumes this relation + # make Nblock % nworker ==0, error estimation assumes this relation + nblock = (nblock ÷ Nworker) * Nworker else nblock = Nworker # each worker should handle at least one block end - return nblock + + nevalperblock = neval ÷ nblock # make sure each block has the same nevalperblock, error estimation assumes this relation + return nevalperblock, nblock end -function _block!(iblock, configs, obsSum, obsSquaredSum, summedConfig, solver, progress, +function _block!(configs, obsSum, obsSquaredSum, summedConfig, + solver, progress, integrand::Function, nevalperblock, print, save, timer, debug::Bool, measure::Union{Nothing, Function}, measurefreq, inplace, parallel) - # rank core will run the block with the indexes: rank, rank+Nworker, rank+2Nworker, ... - rank = MCUtility.rank(parallel) - Nworker = MCUtility.nworker(parallel) - # println(iblock, " ", rank, " ", Nworker) - - (iblock % Nworker != rank-1 ) && return + rank = MCUtility.threadid(parallel) + # println(rank) - config_n = configs[rank] # configuration for the worker with rank `rank` + config_n = configs[rank] # configuration for the worker with thread id `rank` clearStatistics!(config_n) # reset statistics if solver == :vegasmc diff --git a/src/statistics.jl b/src/statistics.jl index 04885c6..5027d45 100644 --- a/src/statistics.jl +++ b/src/statistics.jl @@ -87,7 +87,23 @@ end function Base.show(io::IO, result::Result) # print(io, summary(result.config)) - print(io, report(result; verbose=-1)) + # print(io, report(result; verbose=-1, io = io)) + for i in 1:result.config.N + info = "$i" + m, e, chi2 = first(result.mean[i]), first(result.stdev[i]), first(result.chi2[i]) + if result.dof == 0 + print(io, green("Integral $info = $m ± $e")) + else + print(io, green("Integral $info = $m ± $e (chi2/dof = $(round(chi2, sigdigits=3)))")) + end + if i < result.config.N + print(io, "\n") + end + end +end + +function Base.show(io::IO, ::MIME"text/plain", result::Result) + Base.show(io, result) end """ @@ -102,7 +118,7 @@ It will first print the configuration from the last iteration, then print the we - pick: The pick function is used to select one of the observable to be printed. The return value of pick function must be a Number. - name: name of each picked observable. If name is not given, the index of the pick function will be used. """ -function report(result::Result, ignore=result.ignore; pick::Union{Function,AbstractVector}=obs -> first(obs), name=nothing, verbose=0) +function report(result::Result, ignore=result.ignore; pick::Union{Function,AbstractVector}=obs -> first(obs), name=nothing, verbose=0, io::IO=Base.stdout) if isnothing(name) == false name = collect(name) end @@ -116,29 +132,27 @@ function report(result::Result, ignore=result.ignore; pick::Union{Function,Abstr # bar = "---------------------------------------------------------------------------------------------------------------------------" barbar = "==================================== Integral $info ================================================" bar = "-------------------------------------------------------------------------------------------------------" - println(barbar) - println(yellow(@sprintf("%6s %-32s %-32s %16s", "iter", " integral", " wgt average", "chi2/dof"))) - println(bar) + println(io, barbar) + println(io, yellow(@sprintf("%6s %-32s %-32s %16s", "iter", " integral", " wgt average", "chi2/dof"))) + println(io, bar) for iter in 1:length(result.iterations) m0, e0 = p(result.iterations[iter][1][i]), p(result.iterations[iter][2][i]) m, e, chi2 = average(result.iterations, i; init=ignore_iter + 1, max=iter) m, e, chi2 = p(m), p(e), p(chi2) iterstr = iter <= ignore_iter ? "ignore" : "$iter" sm0, sm = tostring(m0, e0), tostring(m, e) - println(@sprintf("%6s %36s %36s %16.4f", iterstr, sm0, sm, abs(chi2))) + println(io, @sprintf("%6s %36s %36s %16.4f", iterstr, sm0, sm, abs(chi2))) end - println(bar) + println(io, bar) else m, e, chi2 = p(result.mean[i]), p(result.stdev[i]), p(result.chi2[i]) if result.dof == 0 - println(green("Integral $info = $m ± $e")) + println(io, green("Integral $info = $m ± $e")) else - println(green("Integral $info = $m ± $e (chi2/dof = $(round(chi2, sigdigits=3)))")) + println(io, green("Integral $info = $m ± $e (chi2/dof = $(round(chi2, sigdigits=3)))")) end end - # println() end - # end end """ diff --git a/src/utility/parallel.jl b/src/utility/parallel.jl index 676ce3b..06b3959 100644 --- a/src/utility/parallel.jl +++ b/src/utility/parallel.jl @@ -7,15 +7,6 @@ mpi_master(comm=MPI.COMM_WORLD) = (MPI.Init(); MPI.Comm_rank(comm) == 0) mpi_root(comm=MPI.COMM_WORLD) = 0 mpi_rank(comm=MPI.COMM_WORLD) = (MPI.Init(); MPI.Comm_rank(comm)) -mpi_sum(arr, comm::MPI.Comm) = MPI.Allreduce(arr, +, comm) -mpi_sum!(arr, comm::MPI.Comm) = MPI.Allreduce!(arr, +, comm) -mpi_min(arr, comm::MPI.Comm) = MPI.Allreduce(arr, min, comm) -mpi_min!(arr, comm::MPI.Comm) = MPI.Allreduce!(arr, min, comm) -mpi_max(arr, comm::MPI.Comm) = MPI.Allreduce(arr, max, comm) -mpi_max!(arr, comm::MPI.Comm) = MPI.Allreduce!(arr, max, comm) -mpi_mean(arr, comm::MPI.Comm) = mpi_sum(arr, comm) ./ mpi_nprocs(comm) -mpi_mean!(arr, comm::MPI.Comm) = (mpi_sum!(arr, comm); arr ./= mpi_nprocs(comm)) - """ function MPIreduce(data, op = MPI.SUM) @@ -98,34 +89,6 @@ function MPIbcast!(data::AbstractArray) end end -function choose_parallel(parallel::Symbol) - if parallel == :auto - if Threads.nthreads() > 1 - return :thread # use threads only if MPI is not available - else - return :nothread # use threads only if MPI is not available - end - else - return parallel - end -end - -# function check_parallel(parallel::Symbol) -# if parallel == :mpi -# # @assert mpi_nprocs() > 1 "MPI is not available" -# # julia may start with -t N, but we don't want to use threads -# # should work for MPI worker >=1 -# return -# elseif parallel == :thread -# @assert mpi_nprocs() == 1 "MPI and threads cannot be used together" -# elseif parallel == :serial -# @assert mpi_nprocs() == 1 "MPI should not be used for serial calculations" -# # julia may start with -t N, but we don't want to use threads -# else -# error("Unknown parallelization mode: $(parallel)") -# end -# end - # actual number of threads used function nthreads(parallel::Symbol) if parallel == :thread @@ -137,26 +100,30 @@ end # only one thread of each MPI worker is the root function is_root(parallel::Symbol) - return mpi_master() && (Threads.threadid() == 1) + if parallel == :thread + return mpi_master() && (Threads.threadid() == 1) + else + return mpi_master() + end end -# function root(parallel::Symbol) # only one thread of each MPI worker is the root -# if parallel == :mpi -# return mpi_master() -# elseif parallel == :thread -# return 1 -# elseif parallel == :serial -# return 1 -# else -# error("Unknown parallelization mode: $(parallel)") -# end -# end - # rank of the current worker for both MPI and threads function rank(parallel::Symbol) #if thread is off, then nthreads must be one. Only mpi_rank contributes # mpi_rank() always start with 0 - return Threads.threadid()*(nthreads(parallel)-1)+mpi_rank()+1 + if parallel == :thread + return Threads.threadid()+((nthreads(parallel)-1)*mpi_rank()) + else + return mpi_rank()+1 + end +end + +function threadid(parallel::Symbol) + if parallel == :thread + return Threads.threadid() + else + return 1 + end end # number of total workers for both MPI and threads diff --git a/test/thread.jl b/test/thread.jl index 82af0f3..b1d3790 100644 --- a/test/thread.jl +++ b/test/thread.jl @@ -6,12 +6,12 @@ Threads.@threads for i in 1:3 println("test thread: ", i) X = Continuous(0.0, 1.0) - result = integrate((X, c) -> (X[1])^i; var=(Continuous(0.0, 1.0),), dof=[[1,],], print=-1, solver=:vegas) + result = integrate((X, c) -> (X[1])^i; var=(Continuous(0.0, 1.0),), dof=[[1,],], print=-1, solver=:vegas, parallel = :nothread) println(i, " -> ", result.mean, "+-", result.stdev) check(result, 1 / (1 + i)) - result = integrate((X, c) -> (X[1])^i; var=(Continuous(0.0, 1.0),), dof=[[1,],], print=-1, solver=:vegasmc) + result = integrate((X, c) -> (X[1])^i; var=(Continuous(0.0, 1.0),), dof=[[1,],], print=-1, solver=:vegasmc, parallel = :nothread) check(result, 1 / (1 + i)) - result = integrate((idx, X, c) -> (X[1])^i; var=(Continuous(0.0, 1.0),), dof=[[1,],], print=-1, solver=:mcmc) + result = integrate((idx, X, c) -> (X[1])^i; var=(Continuous(0.0, 1.0),), dof=[[1,],], print=-1, solver=:mcmc, parallel = :nothread) check(result, 1 / (1 + i)) end From bc8bd6530019f909c470d4d3e0558bd7b96ebd5a Mon Sep 17 00:00:00 2001 From: Kun Chen Date: Tue, 8 Nov 2022 15:32:38 -0500 Subject: [PATCH 14/24] add MPI test --- .github/workflows/CI.yml | 24 ++++++++++++++++++++++++ src/main.jl | 12 +++--------- test/runtests.jl | 1 + 3 files changed, 28 insertions(+), 9 deletions(-) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 721823a..4b4aeff 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -34,7 +34,31 @@ jobs: ${{ runner.os }}-test- ${{ runner.os }}- - uses: julia-actions/julia-buildpkg@v1 + - name: Install MPI dependencies + shell: bash + run: | + julia -e ' + using Pkg; Pkg.add("MPI"); using MPI; MPI.install_mpiexecjl() + ' + - uses: julia-actions/julia-runtest@v1 + + # - name: Execute MPI-parallel tests + # run: | + # julia --project -e ' + # using Pkg; Pkg.build(); Pkg.precompile() + # Pkg.add("MPI"); using MPI; MPI.install_mpiexecjl() + # Pkg.test(; test_args=["quick"]) + # ' + # $HOME/.julia/bin/mpiexecjl -np 8 julia --check-bounds=yes --depwarn=yes --project --color=yes -e 'using Pkg; Pkg.test(coverage=true)' + # if: ${{ matrix.payload == 'mpi' }} + # continue-on-error: ${{ matrix.version == 'nightly' }} + + - uses: julia-actions/julia-processcoverage@v1 + - uses: codecov/codecov-action@v2 + with: + file: lcov.info + - uses: julia-actions/julia-processcoverage@v1 - uses: codecov/codecov-action@v1 with: diff --git a/src/main.jl b/src/main.jl index a15afaf..dfd43a5 100644 --- a/src/main.jl +++ b/src/main.jl @@ -113,7 +113,7 @@ function integrate(integrand::Function; obsSquaredSum = [[zero(o) for o in config.observable] for _ in 1:Nthread] # sum of squared observables for each worker for _config in configs - reset_seed!(_config, rand(config.rng, 1:1000000)) + reset_seed!(_config, rand(config.rng, 1:1000000)) # reset the seed for each thread end startTime = time() @@ -165,17 +165,11 @@ function integrate(integrand::Function; ######################## syncronize between works ############################## - # broadcast the reweight and var.histogram of the summedConfig of the root worker to two targets: - # 1. config of the root worker - # 2. config of the other workers - # config.reweight = MPI.bcast(summedConfig[1].reweight, root, comm) # broadcast reweight factors to all workers - # config.reweight = MCUtility.MPIbcast(summedConfig[1].reweight) - # for (vi, var) in enumerate(config.var) - # _bcast_histogram!(var, summedConfig[1].var[vi], config, adapt) - # end + # broadcast the reweight and var.histogram of the summedConfig[1] from the root to all workers MPIbcastConfig!(summedConfig[1]) for config in configs + # broadcast the reweight and var.histogram of the summedConfig[1] to config of all threads bcastConfig!(config, summedConfig[1]) if adapt for v in config.var diff --git a/test/runtests.jl b/test/runtests.jl index 5e103aa..112ae00 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -29,6 +29,7 @@ if isempty(ARGS) include("thread.jl") include("bubble.jl") include("bubble_FermiK.jl") + include("mpi.jl") else include(ARGS[1]) end From a13979587234f152ad411606a4af61999e13b99b Mon Sep 17 00:00:00 2001 From: Kun Chen Date: Tue, 8 Nov 2022 15:42:06 -0500 Subject: [PATCH 15/24] add thread test --- .github/workflows/CI.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 4b4aeff..a7acce0 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -42,7 +42,8 @@ jobs: ' - uses: julia-actions/julia-runtest@v1 - + env: + JULIA_NUM_THREADS: 4 # - name: Execute MPI-parallel tests # run: | # julia --project -e ' From bf510a4dabd7848dcbea2ea7c71ab0b2992e499a Mon Sep 17 00:00:00 2001 From: Kun Chen Date: Tue, 8 Nov 2022 15:53:50 -0500 Subject: [PATCH 16/24] fix thread test --- test/thread.jl | 38 +++++++++++++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/test/thread.jl b/test/thread.jl index b1d3790..b038a8c 100644 --- a/test/thread.jl +++ b/test/thread.jl @@ -1,18 +1,38 @@ -@testset "Threads" begin +@testset "outer Threads" begin if Threads.nthreads() == 1 @warn "There is only one thread currently. You may want run julia with -t auto to enable multithreading." end + mean = zeros(3, 3) + error = zeros(3, 3) Threads.@threads for i in 1:3 - println("test thread: ", i) + id = Threads.threadid() + println("test thread: $id for the index $i") X = Continuous(0.0, 1.0) + result = integrate((X, c) -> (X[1])^i; var=(Continuous(0.0, 1.0),), dof=[[1,],], print=-1, solver=:vegas, parallel = :nothread) println(i, " -> ", result.mean, "+-", result.stdev) - check(result, 1 / (1 + i)) + mean[i, 1] = result.mean + error[i, 1] = result.stdev + result = integrate((X, c) -> (X[1])^i; var=(Continuous(0.0, 1.0),), dof=[[1,],], print=-1, solver=:vegasmc, parallel = :nothread) - check(result, 1 / (1 + i)) + println(i, " -> ", result.mean, "+-", result.stdev) + mean[i, 2] = result.mean + error[i, 2] = result.stdev + result = integrate((idx, X, c) -> (X[1])^i; var=(Continuous(0.0, 1.0),), dof=[[1,],], print=-1, solver=:mcmc, parallel = :nothread) - check(result, 1 / (1 + i)) + println(i, " -> ", result.mean, "+-", result.stdev) + mean[i, 3] = result.mean + error[i, 3] = result.stdev + end + + for i in 1:3 + println("test outer threads for vegas") + check(mean[i, 1], error[i, 1], 1 / (1 + i)) + println("test outer threads for vegasmc") + check(mean[i, 2], error[i, 2], 1 / (1 + i)) + println("test outer threads for mcmc") + check(mean[i, 3], error[i, 3], 1 / (1 + i)) end end @@ -26,4 +46,12 @@ end result = integrate((x, c) -> (x[1])^i; var=(Continuous(0.0, 1.0),), dof=[[1,],], print=-1, solver=:vegas, parallel = :thread) println(i, " -> ", result.mean, "+-", result.stdev) check(result, 1 / (1 + i)) + + result = integrate((x, c) -> (x[1])^i; var=(Continuous(0.0, 1.0),), dof=[[1,],], print=-1, solver=:vegasmc, parallel = :thread) + println(i, " -> ", result.mean, "+-", result.stdev) + check(result, 1 / (1 + i)) + + result = integrate((id, x, c) -> (x[1])^i; var=(Continuous(0.0, 1.0),), dof=[[1,],], print=-1, solver=:mcmc, parallel = :thread) + println(i, " -> ", result.mean, "+-", result.stdev) + check(result, 1 / (1 + i)) end \ No newline at end of file From b8428042d04fd5605acfce304d776945fb1f5e50 Mon Sep 17 00:00:00 2001 From: Kun Chen Date: Tue, 8 Nov 2022 16:19:00 -0500 Subject: [PATCH 17/24] add more tests --- src/main.jl | 18 ++--------------- test/runtests.jl | 1 + test/statistics.jl | 49 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 52 insertions(+), 16 deletions(-) create mode 100644 test/statistics.jl diff --git a/src/main.jl b/src/main.jl index dfd43a5..31cf8c6 100644 --- a/src/main.jl +++ b/src/main.jl @@ -264,22 +264,8 @@ function _block!(configs, obsSum, obsSquaredSum, summedConfig, end end -# function _bcast_histogram!(target::V, source::V, config, adapt) where {V} -# comm = MPI.COMM_WORLD -# root = 0 # rank of the root worker -# if target isa Dist.CompositeVar -# for (vi, v) in enumerate(target.vars) -# _bcast_histogram!(v, source.vars[vi], config, adapt) -# end -# else -# target.histogram = MPI.bcast(source.histogram, root, comm) -# if adapt -# Dist.train!(target) -# Dist.initialize!(target, config) -# end -# end -# end - +#obsSum or obsSquaredSum can be scalar or vector of float or complex +#the return value is always a vector of float or complex function _mean_std(obsSum, obsSquaredSum, block) function _sqrt(x) return x < 0.0 ? 0.0 : sqrt(x) diff --git a/test/runtests.jl b/test/runtests.jl index 112ae00..bbecd2e 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -25,6 +25,7 @@ end # Write your tests here. if isempty(ARGS) include("utility.jl") + include("statistics.jl") include("montecarlo.jl") include("thread.jl") include("bubble.jl") diff --git a/test/statistics.jl b/test/statistics.jl new file mode 100644 index 0000000..3e0b3aa --- /dev/null +++ b/test/statistics.jl @@ -0,0 +1,49 @@ +using StatsBase + +@testset "Statistics" begin + + function series(block) + @assert block > 1 + timeseries = rand(block) + _sum = sum(timeseries) + _sqaure_sum = sum(timeseries .^ 2) + return timeseries, _sum, _sqaure_sum, StatsBase.mean(timeseries), std(timeseries)/sqrt(block) + # standard deviation of the mean already contains a factor 1/sqrt(block-1) + end + + block = 10 + series1, _sum1, _sqaure_sum1, mean1, error1 = series(10) + series2, _sum2, _sqaure_sum2, mean2, error2 = series(10) + + #two scalar integrals + obsSum = [_sum1, _sum2] + obsSquaredSum = [_sqaure_sum1, _sqaure_sum2] + + mean, error = MCIntegration._mean_std(obsSum, obsSquaredSum, block) + @test mean[1] ≈ mean1 + @test mean[2] ≈ mean2 + @test error[1] ≈ error1 + @test error[2] ≈ error2 + + #one 2-vector integral + mean, error = MCIntegration._mean_std([obsSum, ], [obsSquaredSum, ], block) + @test mean[1][1] ≈ mean1 + @test mean[1][2] ≈ mean2 + @test error[1][1] ≈ error1 + @test error[1][2] ≈ error2 + + #one complex integral in a vector + _csum = _sum1 + im * _sum2 + _csqaure_sum = _sqaure_sum1 + im * _sqaure_sum2 + mean, error = MCIntegration._mean_std([_csum, ], [_csqaure_sum, ], block) + @test mean[1] ≈ mean1 + mean2*im + @test error[1] ≈ error1 + error2*im + + #on complex integrals + _csum = _sum1 + im * _sum2 + _csqaure_sum = _sqaure_sum1 + im * _sqaure_sum2 + mean, error = MCIntegration._mean_std(_csum, _csqaure_sum, block) + @test mean[1] ≈ mean1 + mean2*im + @test error[1] ≈ error1 + error2*im + +end \ No newline at end of file From 27484000005c63073b62571981eb951ed69551ab Mon Sep 17 00:00:00 2001 From: Kun Chen Date: Tue, 8 Nov 2022 16:33:44 -0500 Subject: [PATCH 18/24] working on readme --- README.md | 11 +++++++++++ src/main.jl | 4 ++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 9d61c50..52a376f 100644 --- a/README.md +++ b/README.md @@ -169,6 +169,7 @@ Moreover, packed variables usually indicate nontrivial correlations between thei # Parallelization +## MPI MCIntegration supports MPI parallelization. To run your code in MPI mode, simply use the command ```bash mpiexec julia -n #NCPU ./your_script.jl @@ -178,3 +179,13 @@ where `#NCPU` is the number of workers. Internally, the MC sampler will send the Note that you need to install the package [MPI.jl](https://github.com/JuliaParallel/MPI.jl) to use the MPI mode. See this [link](https://juliaparallel.github.io/MPI.jl/stable/configuration/) for the instruction on the configuration. The user essentially doesn't need to write additional code to support the parallelization. The only tricky part is the output: only the function `MCIntegratoin.integrate` of the root node returns meaningful estimates, while other workers simply returns `nothing`. + +## Multi-threading + +MCIntegration supports multi-threading with or without MPI. To run your code with multiple threads, start the julia with +```bash +julia -t #NCPU ./your_script.jl +``` +Note that all threads will share the same memory, the user-defined `integrand` and `measure` functions should be implemented in a thread safe way (for example, be very careful about reading any data if another thread might write to it). We recommend the user to read the Julia official [documentation](https://docs.julialang.org/en/v1/manual/multi-threading/). + +With multiple threads, you have an option to parallelize the blocks in the function call `integrate` by providing a key argument `parallel = :thread`. For example, \ No newline at end of file diff --git a/src/main.jl b/src/main.jl index 31cf8c6..b9afe48 100644 --- a/src/main.jl +++ b/src/main.jl @@ -44,7 +44,7 @@ - `measure`: measurement function, See [`Vegas.montecarlo`](@ref), [`VegasMC.montecarlo`](@ref) and [`MCMC.montecarlo`](@ref) for more details. - `measurefreq`: how often perform the measurement for ever `measurefreq` MC steps. If a measurement is expansive, you may want to make the measurement less frequent. - `inplace`: whether to use the inplace version of the integrand. Default is `false`, which is more convenient for integrand with a few return values but may cause type instability. Only useful for the :vegas and :vegasmc solver. -- `parallel`: :auto will automatically choose the best parallelization mode. :mpi will use MPI.jl to run the MC in parallel. :thread will use Threads.@threads to run the MC in parallel. Default is :auto. +- `parallel`: :thread will use Threads.@threads to run different blocks in parallel. Default is :nothread. - `kwargs`: Keyword arguments. The supported keywords include, * `measure` and `measurefreq`: measurement function and how frequent it is called. * If `config` is `nothing`, you may need to provide arguments for the `Configuration` constructor, check [`Configuration`](@ref) docs for more details. @@ -73,7 +73,7 @@ function integrate(integrand::Function; measure::Union{Nothing,Function}=nothing, measurefreq::Int=1, inplace::Bool=false, # whether to use the inplace version of the integrand - parallel::Symbol=:nothread, # :auto, :mpi, or :thread, or :serial + parallel::Symbol=:nothread, # :thread or :nothread kwargs... ) if isnothing(config) From 4a5062259070e80d3b657af4afd653d09469b5f4 Mon Sep 17 00:00:00 2001 From: Kun Chen Date: Tue, 8 Nov 2022 22:58:24 -0500 Subject: [PATCH 19/24] add readme for thread support --- README.md | 29 +++++++++++++++++++++++++---- docs/src/index.md | 36 ++++++++++++++++++++++++++++++++++-- 2 files changed, 59 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 52a376f..644c3a1 100644 --- a/README.md +++ b/README.md @@ -168,9 +168,10 @@ The packed variables will be sampled all together in the Markov-chain based solv Moreover, packed variables usually indicate nontrivial correlations between their distributions. In the future, it will be interesting to learn such correlation so that one can sample the packed variables more efficiently. # Parallelization +MCIntegration supports both MPI and multi-thread parallelization. You can even mix them if necessary. ## MPI -MCIntegration supports MPI parallelization. To run your code in MPI mode, simply use the command +To run your code in MPI mode, simply use the command, ```bash mpiexec julia -n #NCPU ./your_script.jl ``` @@ -182,10 +183,30 @@ The user essentially doesn't need to write additional code to support the parall ## Multi-threading -MCIntegration supports multi-threading with or without MPI. To run your code with multiple threads, start the julia with +MCIntegration supports multi-threading with or without MPI. To run your code with multiple threads, start Julia with ```bash julia -t #NCPU ./your_script.jl ``` -Note that all threads will share the same memory, the user-defined `integrand` and `measure` functions should be implemented in a thread safe way (for example, be very careful about reading any data if another thread might write to it). We recommend the user to read the Julia official [documentation](https://docs.julialang.org/en/v1/manual/multi-threading/). +Note that all threads will share the same memory. The user-defined `integrand` and `measure` functions should be implemented thread-safe (for example, be very careful about reading any data if another thread might write to it). We recommend the user read Julia's official [documentation](https://docs.julialang.org/en/v1/manual/multi-threading/). -With multiple threads, you have an option to parallelize the blocks in the function call `integrate` by providing a key argument `parallel = :thread`. For example, \ No newline at end of file +There are two different ways to parallelize your code with multiple threads. + +1. If you need to evaluate multiple integrals, each thread can call the function `MCIntegration.integrate` to do one integral. In the following example, we use three threads to evaluate three integrals altogether. Note that only three threads will be used even if you initialize Julia with more than three threads. +```julia +julia> Threads.@threads for i = 1:3 + println("Thread $(Threads.threadid()) returns ", integrate((x, c) -> x[1]^i, print=-2)) + end +Thread 2 returns Integral 1 = 0.24995156136254149 ± 6.945088534643841e-5 (chi2/dof = 2.95) +Thread 3 returns Integral 1 = 0.3334287563137184 ± 9.452648803649706e-5 (chi2/dof = 1.35) +Thread 1 returns Integral 1 = 0.5000251243601586 ± 0.00013482206569391864 (chi2/dof = 1.58) +``` + +2. Only the main thread calls the function `MCIntegration.integrate`, then parallelize the internal blocks with multiple threads. To do that, you need to call the function `MCIntegration.integrate` with a key argument `parallel = :thread`. This approach will utilize all Julia threads. For example, +```julia +julia> for i = 1:3 + println("Thread $(Threads.threadid()) return ", integrate((x, c) -> x[1]^i, print=-2, parallel=:thread)) + end +Thread 1 return Integral 1 = 0.5001880440214347 ± 0.00015058935731086765 (chi2/dof = 0.397) +Thread 1 return Integral 1 = 0.33341068551139696 ± 0.00010109649819894601 (chi2/dof = 1.94) +Thread 1 return Integral 1 = 0.24983868976137244 ± 8.546009018501706e-5 (chi2/dof = 1.54) +``` diff --git a/docs/src/index.md b/docs/src/index.md index aa66831..0263446 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -197,8 +197,10 @@ Moreover, packed variables usually indicate nontrivial correlations between thei over the infinite interval ``(-\infty, \infty)`` is zero. # Parallelization +MCIntegration supports both MPI and multi-thread parallelization. You can even mix them if necessary. -MCIntegration supports MPI parallelization. To run your code in MPI mode, simply use the command +## MPI +To run your code in MPI mode, simply use the command, ```bash mpiexec julia -n #NCPU ./your_script.jl ``` @@ -206,4 +208,34 @@ where `#NCPU` is the number of workers. Internally, the MC sampler will send the Note that you need to install the package [MPI.jl](https://github.com/JuliaParallel/MPI.jl) to use the MPI mode. See this [link](https://juliaparallel.github.io/MPI.jl/stable/configuration/) for the instruction on the configuration. -The user essentially doesn't need to write additional code to support the parallelization. The only tricky part is the output: only the function `MCIntegratoin.integrate` of the root node returns meaningful estimates, while other workers simply returns `nothing`. +The user essentially doesn't need to write additional code to support the parallelization. The only tricky part is the output: only the function `MCIntegratoin.integrate` of the root node returns meaningful estimates, while other workers simply returns `nothing`. + +## Multi-threading + +MCIntegration supports multi-threading with or without MPI. To run your code with multiple threads, start Julia with +```bash +julia -t #NCPU ./your_script.jl +``` +Note that all threads will share the same memory. The user-defined `integrand` and `measure` functions should be implemented thread-safe (for example, be very careful about reading any data if another thread might write to it). We recommend the user read Julia's official [documentation](https://docs.julialang.org/en/v1/manual/multi-threading/). + +There are two different ways to parallelize your code with multiple threads. + +1. If you need to evaluate multiple integrals, each thread can call the function `MCIntegration.integrate` to do one integral. In the following example, we use three threads to evaluate three integrals altogether. Note that only three threads will be used even if you initialize Julia with more than three threads. +```julia +julia> Threads.@threads for i = 1:3 + println("Thread $(Threads.threadid()) returns ", integrate((x, c) -> x[1]^i, print=-2)) + end +Thread 2 returns Integral 1 = 0.24995156136254149 ± 6.945088534643841e-5 (chi2/dof = 2.95) +Thread 3 returns Integral 1 = 0.3334287563137184 ± 9.452648803649706e-5 (chi2/dof = 1.35) +Thread 1 returns Integral 1 = 0.5000251243601586 ± 0.00013482206569391864 (chi2/dof = 1.58) +``` + +2. Only the main thread calls the function `MCIntegration.integrate`, then parallelize the internal blocks with multiple threads. To do that, you need to call the function `MCIntegration.integrate` with a key argument `parallel = :thread`. This approach will utilize all Julia threads. For example, +```julia +julia> for i = 1:3 + println("Thread $(Threads.threadid()) return ", integrate((x, c) -> x[1]^i, print=-2, parallel=:thread)) + end +Thread 1 return Integral 1 = 0.5001880440214347 ± 0.00015058935731086765 (chi2/dof = 0.397) +Thread 1 return Integral 1 = 0.33341068551139696 ± 0.00010109649819894601 (chi2/dof = 1.94) +Thread 1 return Integral 1 = 0.24983868976137244 ± 8.546009018501706e-5 (chi2/dof = 1.54) +``` \ No newline at end of file From 4e3c01dfa5def583cfadbd8fc25d239a142e91d8 Mon Sep 17 00:00:00 2001 From: Kun Chen Date: Tue, 8 Nov 2022 23:07:36 -0500 Subject: [PATCH 20/24] fix test dependence --- test/Project.toml | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 test/Project.toml diff --git a/test/Project.toml b/test/Project.toml new file mode 100644 index 0000000..9ab86e9 --- /dev/null +++ b/test/Project.toml @@ -0,0 +1,2 @@ +[deps] +StatsBase = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91" From e33ee86f55e3ffac65ccd298037325df3236a5c5 Mon Sep 17 00:00:00 2001 From: Kun Chen Date: Tue, 8 Nov 2022 23:07:57 -0500 Subject: [PATCH 21/24] update version --- Project.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index 3884e24..7cb3156 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "MCIntegration" uuid = "ea1e2de9-7db7-4b42-91ee-0cd1bf6df167" authors = ["Kun Chen", "Xiansheng Cai", "Pengcheng Hou"] -version = "0.3.1" +version = "0.3.2" [deps] Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" From e2e165066a507cdd4ba79e6ecc9d034c5661e7ea Mon Sep 17 00:00:00 2001 From: Kun Chen Date: Tue, 8 Nov 2022 23:12:58 -0500 Subject: [PATCH 22/24] fix test dependence --- test/Project.toml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/test/Project.toml b/test/Project.toml index 9ab86e9..1971ce4 100644 --- a/test/Project.toml +++ b/test/Project.toml @@ -1,2 +1,9 @@ [deps] +LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" +MCIntegration = "ea1e2de9-7db7-4b42-91ee-0cd1bf6df167" +MPI = "da04e1cc-30fd-572f-bb4f-1f8673147195" +Printf = "de0858da-6303-5e67-8744-51eddeeeb8d7" +Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" +StaticArrays = "90137ffa-7385-5640-81b9-e52037218182" StatsBase = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91" +Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" From f0224954a2899de885dea6222aa4ad0421a918ef Mon Sep 17 00:00:00 2001 From: Kun Chen Date: Tue, 8 Nov 2022 23:14:28 -0500 Subject: [PATCH 23/24] test --- src/distribution/variable.jl | 54 ------------------------------------ 1 file changed, 54 deletions(-) diff --git a/src/distribution/variable.jl b/src/distribution/variable.jl index 46b3dc0..1f34c96 100644 --- a/src/distribution/variable.jl +++ b/src/distribution/variable.jl @@ -33,17 +33,6 @@ function Base.setindex!(Var::FermiK{D}, v, i::Int) where {D} end Base.lastindex(Var::FermiK{D}) where {D} = size(Var.data)[2] # return index, not the value -function Base.copy!(dest::FermiK{D}, src::FermiK{D}) where {D} - dest.data .= src.data - dest.kF = src.kF - dest.δk = src.δk - dest.maxK = src.maxK - dest.offset = src.offset - dest.prob .= src.prob - dest.histogram .= src.histogram - return dest -end - mutable struct RadialFermiK <: Variable data::Vector{Float64} kF::Float64 @@ -158,22 +147,6 @@ function accumulate!(T::Continuous, idx::Int, weight=1.0) end end -function Base.copy!(dest::Continuous, src::Continuous) - dest.data .= src.data - dest.gidx .= src.gidx - dest.prob .= src.prob - dest.lower = src.lower - dest.range = src.range - dest.offset = src.offset - dest.grid = src.grid - dest.inc .= src.inc - dest.histogram .= src.histogram - dest.alpha = src.alpha - dest.adapt = src.adapt - return dest -end - - """ Vegas adaptive map """ @@ -313,21 +286,6 @@ function train!(T::Discrete) clearStatistics!(T) end -function Base.copy!(T::Discrete, T2::Discrete) - T.data .= T2.data - T.lower = T2.lower - T.upper = T2.upper - T.prob .= T2.prob - T.size = T2.size - T.offset = T2.offset - T.histogram .= T2.histogram - T.accumulation .= T2.accumulation - T.distribution .= T2.distribution - T.alpha = T2.alpha - T.adapt = T2.adapt - return T -end - mutable struct CompositeVar{V} <: Variable vars::V prob::Vector{Float64} @@ -379,18 +337,6 @@ Base.lastindex(Var::CompositeVar) = length(Var.vars) # return index, not the val Base.iterate(cvar::CompositeVar) = Base.iterate(cvar.vars) Base.iterate(cvar::CompositeVar, state) = Base.iterate(cvar.vars, state) -function Base.copy!(T::CompositeVar, T2::CompositeVar) - for i in 1:length(T.vars) - copy!(T.vars[i], T2.vars[i]) - end - T.prob .= T2.prob - T.offset = T2.offset - T.adapt = T2.adapt - T.size = T2.size - T._prob_cache = T2._prob_cache - return T -end - function accumulate!(vars::CompositeVar, idx, weight) for v in vars.vars accumulate!(v, idx, weight) From 74708bab80df1cc23bbc5390c42382087db09947 Mon Sep 17 00:00:00 2001 From: Kun Chen Date: Wed, 9 Nov 2022 10:12:42 -0500 Subject: [PATCH 24/24] fix types in readme --- README.md | 2 +- docs/src/index.md | 2 +- src/main.jl | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 644c3a1..79e2574 100644 --- a/README.md +++ b/README.md @@ -173,7 +173,7 @@ MCIntegration supports both MPI and multi-thread parallelization. You can even m ## MPI To run your code in MPI mode, simply use the command, ```bash -mpiexec julia -n #NCPU ./your_script.jl +mpiexec -n #NCPU julia ./your_script.jl ``` where `#NCPU` is the number of workers. Internally, the MC sampler will send the blocks (controlled by the argument `Nblock`, see above example code) to different workers, then collect the estimates in the root node. diff --git a/docs/src/index.md b/docs/src/index.md index 0263446..a78e546 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -202,7 +202,7 @@ MCIntegration supports both MPI and multi-thread parallelization. You can even m ## MPI To run your code in MPI mode, simply use the command, ```bash -mpiexec julia -n #NCPU ./your_script.jl +mpiexec -n #NCPU julia ./your_script.jl ``` where `#NCPU` is the number of workers. Internally, the MC sampler will send the blocks (controlled by the argument `Nblock`, see above example code) to different workers, then collect the estimates in the root node. diff --git a/src/main.jl b/src/main.jl index b9afe48..402198e 100644 --- a/src/main.jl +++ b/src/main.jl @@ -14,6 +14,7 @@ measure::Union{Nothing,Function}=nothing, measurefreq::Int=1, inplace::Bool=false, + parallel::Symbol=:nothread, kwargs... )