From fea3b6cf010d1419ecdb17308552c225d20c3965 Mon Sep 17 00:00:00 2001 From: ortega2247 Date: Thu, 13 Feb 2020 10:47:15 -0600 Subject: [PATCH 1/8] run nosetests for 40 mins --- .travis.yml | 6 ++-- pydream/Dream.py | 87 ++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 76 insertions(+), 17 deletions(-) diff --git a/.travis.yml b/.travis.yml index 6a732ce..1b67db5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: python env: -- PYVER=2.7 +- PYVER=3.8 - PYVER=3.6 - PYVER=3.7 before_install: @@ -10,7 +10,7 @@ before_install: - bash miniconda.sh -b -p $HOME/miniconda - export PATH="/home/travis/miniconda/bin:$PATH" - conda update --yes conda -- conda install --yes -c conda-forge -c alubbock python="$PYVER" numpy scipy nose +- conda install --yes -c conda-forge -c alubbock python="$PYVER" numpy scipy nose cython pysb - pip install python-dateutil - pip install -r requirements.txt @@ -27,4 +27,4 @@ deploy: on: branch: master tags: true - condition: $PYVER = 2.7 + condition: $PYVER = 3.6 diff --git a/pydream/Dream.py b/pydream/Dream.py index 7e2d4e2..59851e4 100644 --- a/pydream/Dream.py +++ b/pydream/Dream.py @@ -990,22 +990,81 @@ def metrop_select(mr, q, q0): # Reject proposed value return q0 - -class NoDaemonProcess(mp.Process): - def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): - mp.Process.__init__(self, group, target, name, args, kwargs) - # make 'daemon' attribute always return False - def _get_daemon(self): +# Pythons 2.7, 3.4-3.7.0, and 3.7.1 have three different implementations of +# pool.Pool().Process(), and the type of the result varies based on the default +# multiprocessing context, so we need to dynamically patch the daemon property +class NonDaemonMixin(object): + @property + def daemon(self): return False - def _set_daemon(self, value): + + @daemon.setter + def daemon(self, val): + pass + + +try: + from multiprocessing import context + + + # Exists on all platforms + class NonDaemonSpawnProcess(NonDaemonMixin, context.SpawnProcess): + pass + + + class NonDaemonSpawnContext(context.SpawnContext): + Process = NonDaemonSpawnProcess + + + _nondaemon_context_mapper = { + 'spawn': NonDaemonSpawnContext() + } + + # POSIX only + try: + class NonDaemonForkProcess(NonDaemonMixin, context.ForkProcess): + pass + + + class NonDaemonForkContext(context.ForkContext): + Process = NonDaemonForkProcess + + + _nondaemon_context_mapper['fork'] = NonDaemonForkContext() + except AttributeError: + pass + # POSIX only + try: + class NonDaemonForkServerProcess(NonDaemonMixin, context.ForkServerProcess): + pass + + + class NonDaemonForkServerContext(context.ForkServerContext): + Process = NonDaemonForkServerProcess + + + _nondaemon_context_mapper['forkserver'] = NonDaemonForkServerContext() + except AttributeError: + pass + + + class DreamPool(mp.pool.Pool): + def __init__(self, processes=None, initializer=None, initargs=(), + maxtasksperchild=None, context=None): + if context is None: + context = mp.get_context() + context = _nondaemon_context_mapper[context._name] + super(DreamPool, self).__init__(processes=processes, + initializer=initializer, + initargs=initargs, + maxtasksperchild=maxtasksperchild, + context=context) + +except ImportError: + class NonDaemonProcess(NonDaemonMixin, mp.Process): pass - daemon = property(_get_daemon, _set_daemon) -#A subclass of multiprocessing.pool.Pool that allows processes to launch child processes (this is necessary for Dream to use multi-try) -#Taken from http://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic -class DreamPool(mp_pool.Pool): - def __init__(self, processes=None, initializer=None, initargs=None, maxtasksperchild=None): - mp_pool.Pool.__init__(self, processes, initializer, initargs, maxtasksperchild) - Process = NoDaemonProcess + class DreamPool(mp.pool.Pool): + Process = NonDaemonProcess \ No newline at end of file From dbaa05aeba99f8fbe7894f595822648d240366fc Mon Sep 17 00:00:00 2001 From: ortega2247 Date: Thu, 13 Feb 2020 10:57:42 -0600 Subject: [PATCH 2/8] use python multiprocessing package --- pydream/Dream.py | 4 ++-- pydream/core.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pydream/Dream.py b/pydream/Dream.py index 59851e4..ec327a3 100644 --- a/pydream/Dream.py +++ b/pydream/Dream.py @@ -5,8 +5,8 @@ from . import Dream_shared_vars from datetime import datetime import traceback -import multiprocess as mp -import multiprocess.pool as mp_pool +import multiprocessing as mp +import multiprocessing.pool as mp_pool import time class Dream(): diff --git a/pydream/core.py b/pydream/core.py index 87fa663..6a960e3 100644 --- a/pydream/core.py +++ b/pydream/core.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- import numpy as np -import multiprocess as mp +import multiprocessing as mp from . import Dream_shared_vars from .Dream import Dream, DreamPool from .model import Model From 21b5d6f586e095f5e12a52178b807ae31242818d Mon Sep 17 00:00:00 2001 From: ortega2247 Date: Thu, 13 Feb 2020 16:56:33 -0600 Subject: [PATCH 3/8] Remove multiprocess.Support python 3.7-3.8. Provide options to change multiprocessing context --- .travis.yml | 2 -- pydream/Dream.py | 30 ++++++++++++++++------------ pydream/core.py | 52 ++++++++++++++++++++++++++++++------------------ setup.py | 2 +- 4 files changed, 51 insertions(+), 35 deletions(-) diff --git a/.travis.yml b/.travis.yml index 1b67db5..56b7375 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,8 +12,6 @@ before_install: - conda update --yes conda - conda install --yes -c conda-forge -c alubbock python="$PYVER" numpy scipy nose cython pysb -- pip install python-dateutil -- pip install -r requirements.txt install: - pip install . script: diff --git a/pydream/Dream.py b/pydream/Dream.py index ec327a3..a06358f 100644 --- a/pydream/Dream.py +++ b/pydream/Dream.py @@ -6,7 +6,6 @@ from datetime import datetime import traceback import multiprocessing as mp -import multiprocessing.pool as mp_pool import time class Dream(): @@ -55,11 +54,18 @@ class Dream(): A model name to be used as a prefix when saving history and crossover value files. hardboundaries : bool Whether to relect point back into bounds of hard prior (i.e., if using a uniform prior, reflect points outside of boundaries back in, so you don't waste time looking at points with logpdf = -inf). + mp_context : a multiprocessing context or None. It will be used to launch the workers """ - - def __init__(self, model, variables=None, nseedchains=None, nCR=3, adapt_crossover=True, adapt_gamma=False, crossover_burnin=None, DEpairs=1, lamb=.05, zeta=1e-12, history_thin=10, snooker=.10, p_gamma_unity=.20, gamma_levels=1, start_random=True, save_history=True, history_file=False, crossover_file=False, gamma_file=False, multitry=False, parallel=False, verbose=False, model_name=False, hardboundaries=True, **kwargs): - #Set model and variable attributes (if no variables passed, set to all parameters) + def __init__(self, model, variables=None, nseedchains=None, nCR=3, adapt_crossover=True, adapt_gamma=False, + crossover_burnin=None, DEpairs=1, lamb=.05, zeta=1e-12, history_thin=10, snooker=.10, + p_gamma_unity=.20, gamma_levels=1, start_random=True, save_history=True, history_file=False, + crossover_file=False, gamma_file=False, multitry=False, parallel=False, verbose=False, + model_name=False, hardboundaries=True, mp_context=None, **kwargs): + + # Set Dream multiprocessing context + self.mp_context = mp_context + # Set model and variable attributes (if no variables passed, set to all parameters) self.model = model self.model_name = model_name if variables is None: @@ -846,11 +852,9 @@ def mt_evaluate_logps(self, parallel, multitry, proposed_pts, pfunc, ref=False): #If using multi-try and running in parallel farm out proposed points to process pool. if parallel: - p = mp.Pool(multitry) - args = list(zip([self]*multitry, np.squeeze(proposed_pts))) - logps = p.map(call_logp, args) - p.close() - p.join() + args = list(zip([self] * multitry, np.squeeze(proposed_pts))) + with mp.pool.Pool(multitry, context=self.mp_context) as p: + logps = p.map(call_logp, args) log_priors = [val[0] for val in logps] log_likes = [val[1] for val in logps] @@ -1056,10 +1060,10 @@ def __init__(self, processes=None, initializer=None, initargs=(), context = mp.get_context() context = _nondaemon_context_mapper[context._name] super(DreamPool, self).__init__(processes=processes, - initializer=initializer, - initargs=initargs, - maxtasksperchild=maxtasksperchild, - context=context) + initializer=initializer, + initargs=initargs, + maxtasksperchild=maxtasksperchild, + context=context) except ImportError: class NonDaemonProcess(NonDaemonMixin, mp.Process): diff --git a/pydream/core.py b/pydream/core.py index 6a960e3..1bd7402 100644 --- a/pydream/core.py +++ b/pydream/core.py @@ -7,7 +7,8 @@ from .model import Model import traceback -def run_dream(parameters, likelihood, nchains=5, niterations=50000, start=None, restart=False, verbose=True, nverbose=10, tempering=False, **kwargs): + +def run_dream(parameters, likelihood, nchains=5, niterations=50000, start=None, restart=False, verbose=True, nverbose=10, tempering=False, mp_context=None, **kwargs): """Run DREAM given a set of parameters with priors and a likelihood function. Parameters @@ -28,6 +29,7 @@ def run_dream(parameters, likelihood, nchains=5, niterations=50000, start=None, Whether to print verbose output (including acceptance or rejection of moves and the current acceptance rate). Default: True tempering: Boolean, optional Whether to use parallel tempering for the DREAM chains. Warning: this feature is untested. Use at your own risk! Default: False + mp_context: multiprocessing context or None. It will be used to launch the workers kwargs: Other arguments that will be passed to the Dream class on initialization. For more information, see Dream class. @@ -51,11 +53,15 @@ def run_dream(parameters, likelihood, nchains=5, niterations=50000, start=None, model = Model(likelihood=likelihood, sampled_parameters=parameters) if restart: - step_instance = Dream(model=model, variables=parameters, history_file=kwargs['model_name']+'_DREAM_chain_history.npy', crossover_file=kwargs['model_name']+'_DREAM_chain_adapted_crossoverprob.npy', gamma_file=kwargs['model_name']+'_DREAM_chain_adapted_gammalevelprob.npy', verbose=verbose, **kwargs) + step_instance = Dream(model=model, variables=parameters, + history_file=kwargs['model_name'] + '_DREAM_chain_history.npy', + crossover_file=kwargs['model_name'] + '_DREAM_chain_adapted_crossoverprob.npy', + gamma_file=kwargs['model_name'] + '_DREAM_chain_adapted_gammalevelprob.npy', + verbose=verbose, mp_context=mp_context, **kwargs) else: - step_instance = Dream(model=model, variables=parameters, verbose=verbose, **kwargs) + step_instance = Dream(model=model, variables=parameters, verbose=verbose, mp_context=mp_context, **kwargs) - pool = _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=start) + pool = _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=start, mp_context=mp_context) if tempering: @@ -236,7 +242,7 @@ def _sample_dream_pt_chain(args): return q1, logprior1, loglike1, dream_instance -def _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=None): +def _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=None, mp_context=None): min_njobs = (2*len(step_instance.DEpairs))+1 if nchains < min_njobs: @@ -262,23 +268,28 @@ def _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=None): raise Exception('The size of the seeded starting history is insufficient. Increase nseedchains>=%s.' %str(min_nseedchains)) current_position_dim = nchains*step_instance.total_var_dimension - history_arr = mp.Array('d', [0]*int(arr_dim)) + # Get context to define arrays + if mp_context is None: + ctx = mp.get_context(mp_context) + else: + ctx = mp_context + history_arr = ctx.Array('d', [0] * int(arr_dim)) if step_instance.history_file != False: history_arr[0:len_old_history] = old_history.flatten() nCR = step_instance.nCR ngamma = step_instance.ngamma crossover_setting = step_instance.CR_probabilities - crossover_probabilities = mp.Array('d', crossover_setting) - ncrossover_updates = mp.Array('d', [0]*nCR) - delta_m = mp.Array('d', [0]*nCR) + crossover_probabilities = ctx.Array('d', crossover_setting) + ncrossover_updates = ctx.Array('d', [0] * nCR) + delta_m = ctx.Array('d', [0] * nCR) gamma_level_setting = step_instance.gamma_probabilities - gamma_probabilities = mp.Array('d', gamma_level_setting) - ngamma_updates = mp.Array('d', [0]*ngamma) - delta_m_gamma = mp.Array('d', [0]*ngamma) - current_position_arr = mp.Array('d', [0]*current_position_dim) - shared_nchains = mp.Value('i', nchains) - n = mp.Value('i', 0) - tf = mp.Value('c', b'F') + gamma_probabilities = ctx.Array('d', gamma_level_setting) + ngamma_updates = ctx.Array('d', [0] * ngamma) + delta_m_gamma = ctx.Array('d', [0] * ngamma) + current_position_arr = ctx.Array('d', [0] * current_position_dim) + shared_nchains = ctx.Value('i', nchains) + n = ctx.Value('i', 0) + tf = ctx.Value('c', b'F') if step_instance.crossover_burnin == None: step_instance.crossover_burnin = int(np.floor(niterations/10)) @@ -288,9 +299,12 @@ def _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=None): print('Warning: start position provided but random_start set to True. Overrode random_start value and starting walk at provided start position.') step_instance.start_random = False - p = DreamPool(nchains, initializer=_mp_dream_init, initargs=(history_arr, current_position_arr, shared_nchains, crossover_probabilities, ncrossover_updates, delta_m, gamma_probabilities, ngamma_updates, delta_m_gamma, n, tf, )) - #p = mp.pool.ThreadPool(nchains, initializer=_mp_dream_init, initargs=(history_arr, current_position_arr, shared_nchains, crossover_probabilities, ncrossover_updates, delta_m, gamma_probabilities, ngamma_updates, delta_m_gamma, n, tf, )) - #p = mp.Pool(nchains, initializer=_mp_dream_init, initargs=(history_arr, current_position_arr, shared_nchains, crossover_probabilities, ncrossover_updates, delta_m, gamma_probabilities, ngamma_updates, delta_m_gamma, n, tf, )) + p = DreamPool(nchains, context=ctx, initializer=_mp_dream_init, + initargs=(history_arr, current_position_arr, shared_nchains, + crossover_probabilities, ncrossover_updates, delta_m, + gamma_probabilities, ngamma_updates, delta_m_gamma, n, tf,)) + # p = mp.pool.ThreadPool(nchains, initializer=_mp_dream_init, initargs=(history_arr, current_position_arr, shared_nchains, crossover_probabilities, ncrossover_updates, delta_m, gamma_probabilities, ngamma_updates, delta_m_gamma, n, tf, )) + # p = mp.Pool(nchains, initializer=_mp_dream_init, initargs=(history_arr, current_position_arr, shared_nchains, crossover_probabilities, ncrossover_updates, delta_m, gamma_probabilities, ngamma_updates, delta_m_gamma, n, tf, )) return p diff --git a/setup.py b/setup.py index 010a868..3afd465 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ def readme(): author='Erin Shockley', author_email='erin.shockley@vanderbilt.edu', packages=['pydream'], - install_requires=['numpy', 'scipy', 'multiprocess'], + install_requires=['numpy', 'scipy'], zip_safe=False, test_suite='nose.collector', tests_require=['nose'], From 635f28491fd6df042952a37a6623a189d6546615 Mon Sep 17 00:00:00 2001 From: ortega2247 Date: Thu, 13 Feb 2020 17:01:33 -0600 Subject: [PATCH 4/8] remove requirements. Update multiprocess to multiprocessing --- pydream/tests/test_dream.py | 2 +- requirements.txt | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) delete mode 100644 requirements.txt diff --git a/pydream/tests/test_dream.py b/pydream/tests/test_dream.py index 8d1ea7c..49532f7 100644 --- a/pydream/tests/test_dream.py +++ b/pydream/tests/test_dream.py @@ -8,7 +8,7 @@ import unittest from os import remove -import multiprocess as mp +import multiprocessing as mp import numpy as np import pydream.Dream_shared_vars from pydream.Dream import Dream diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 1168839..0000000 --- a/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -dill==0.3.1.1 -multiprocess==0.70.9 -wheel==0.24.0 From bb39edea5034099661ed53a083a571f81c3ca0b7 Mon Sep 17 00:00:00 2001 From: ortega2247 Date: Thu, 13 Feb 2020 17:07:58 -0600 Subject: [PATCH 5/8] from multiprocessing import pool --- pydream/Dream.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pydream/Dream.py b/pydream/Dream.py index a06358f..434d18d 100644 --- a/pydream/Dream.py +++ b/pydream/Dream.py @@ -6,6 +6,7 @@ from datetime import datetime import traceback import multiprocessing as mp +from multiprocessing import pool import time class Dream(): @@ -853,7 +854,7 @@ def mt_evaluate_logps(self, parallel, multitry, proposed_pts, pfunc, ref=False): #If using multi-try and running in parallel farm out proposed points to process pool. if parallel: args = list(zip([self] * multitry, np.squeeze(proposed_pts))) - with mp.pool.Pool(multitry, context=self.mp_context) as p: + with pool.Pool(multitry, context=self.mp_context) as p: logps = p.map(call_logp, args) log_priors = [val[0] for val in logps] log_likes = [val[1] for val in logps] @@ -1053,7 +1054,7 @@ class NonDaemonForkServerContext(context.ForkServerContext): pass - class DreamPool(mp.pool.Pool): + class DreamPool(pool.Pool): def __init__(self, processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None): if context is None: @@ -1070,5 +1071,5 @@ class NonDaemonProcess(NonDaemonMixin, mp.Process): pass - class DreamPool(mp.pool.Pool): + class DreamPool(pool.Pool): Process = NonDaemonProcess \ No newline at end of file From 22d31fafe221052ae87330857ff32646fc5f6feb Mon Sep 17 00:00:00 2001 From: ortega2247 Date: Thu, 13 Feb 2020 17:10:27 -0600 Subject: [PATCH 6/8] remove repeated random sampling --- pydream/parameters.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pydream/parameters.py b/pydream/parameters.py index 2c7a35e..c75b934 100644 --- a/pydream/parameters.py +++ b/pydream/parameters.py @@ -32,8 +32,6 @@ def random(self, reseed=False): else: random_seed = None - random = self.dist.rvs(random_state=random_seed) - return self.dist.rvs(random_state=random_seed) def prior(self, q0): From 91fcae695dd28db0d41b294f4c2810c386c6e6ad Mon Sep 17 00:00:00 2001 From: ortega2247 Date: Mon, 17 Feb 2020 17:17:31 -0600 Subject: [PATCH 7/8] Close and join main multiprocessing pool --- pydream/core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pydream/core.py b/pydream/core.py index 1bd7402..e563311 100644 --- a/pydream/core.py +++ b/pydream/core.py @@ -78,7 +78,8 @@ def run_dream(parameters, likelihood, nchains=5, niterations=50000, start=None, returned_vals = pool.map(_sample_dream, args) sampled_params = [val[0] for val in returned_vals] log_ps = [val[1] for val in returned_vals] - + pool.close() + pool.join() return sampled_params, log_ps def _sample_dream(args): From 278af49025628e86480a6682a4d829afd6424261 Mon Sep 17 00:00:00 2001 From: ortega2247 Date: Mon, 17 Feb 2020 21:29:42 -0600 Subject: [PATCH 8/8] applied requested changes --- pydream/Dream.py | 93 +++++++++++++++++++++++------------------------- pydream/core.py | 34 ++++++++++-------- 2 files changed, 64 insertions(+), 63 deletions(-) diff --git a/pydream/Dream.py b/pydream/Dream.py index 434d18d..81e34ec 100644 --- a/pydream/Dream.py +++ b/pydream/Dream.py @@ -55,7 +55,9 @@ class Dream(): A model name to be used as a prefix when saving history and crossover value files. hardboundaries : bool Whether to relect point back into bounds of hard prior (i.e., if using a uniform prior, reflect points outside of boundaries back in, so you don't waste time looking at points with logpdf = -inf). - mp_context : a multiprocessing context or None. It will be used to launch the workers + mp_context : multiprocessing context or None. + Method used to to start the processes. If it's None, the default context, which depends in Python version and OS, is used. + For more information please check: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods """ def __init__(self, model, variables=None, nseedchains=None, nCR=3, adapt_crossover=True, adapt_gamma=False, @@ -995,10 +997,14 @@ def metrop_select(mr, q, q0): # Reject proposed value return q0 +# The following part uses source code from the file legacymultiproc.py from https://github.com/nipy/nipype +# copyright NIPY developers, licensed under the Apache 2.0 license. -# Pythons 2.7, 3.4-3.7.0, and 3.7.1 have three different implementations of +# Pythons 3.4-3.7.0, and 3.7.1 have three different implementations of # pool.Pool().Process(), and the type of the result varies based on the default # multiprocessing context, so we need to dynamically patch the daemon property + + class NonDaemonMixin(object): @property def daemon(self): @@ -1009,67 +1015,58 @@ def daemon(self, val): pass -try: - from multiprocessing import context +from multiprocessing import context - # Exists on all platforms - class NonDaemonSpawnProcess(NonDaemonMixin, context.SpawnProcess): - pass +# Exists on all platforms +class NonDaemonSpawnProcess(NonDaemonMixin, context.SpawnProcess): + pass - class NonDaemonSpawnContext(context.SpawnContext): - Process = NonDaemonSpawnProcess +class NonDaemonSpawnContext(context.SpawnContext): + Process = NonDaemonSpawnProcess - _nondaemon_context_mapper = { - 'spawn': NonDaemonSpawnContext() - } +_nondaemon_context_mapper = { + 'spawn': NonDaemonSpawnContext() +} - # POSIX only - try: - class NonDaemonForkProcess(NonDaemonMixin, context.ForkProcess): - pass +# POSIX only +try: + class NonDaemonForkProcess(NonDaemonMixin, context.ForkProcess): + pass - class NonDaemonForkContext(context.ForkContext): - Process = NonDaemonForkProcess + class NonDaemonForkContext(context.ForkContext): + Process = NonDaemonForkProcess - _nondaemon_context_mapper['fork'] = NonDaemonForkContext() - except AttributeError: + _nondaemon_context_mapper['fork'] = NonDaemonForkContext() +except AttributeError: + pass +# POSIX only +try: + class NonDaemonForkServerProcess(NonDaemonMixin, context.ForkServerProcess): pass - # POSIX only - try: - class NonDaemonForkServerProcess(NonDaemonMixin, context.ForkServerProcess): - pass - - class NonDaemonForkServerContext(context.ForkServerContext): - Process = NonDaemonForkServerProcess + class NonDaemonForkServerContext(context.ForkServerContext): + Process = NonDaemonForkServerProcess - _nondaemon_context_mapper['forkserver'] = NonDaemonForkServerContext() - except AttributeError: - pass - - class DreamPool(pool.Pool): - def __init__(self, processes=None, initializer=None, initargs=(), - maxtasksperchild=None, context=None): - if context is None: - context = mp.get_context() - context = _nondaemon_context_mapper[context._name] - super(DreamPool, self).__init__(processes=processes, - initializer=initializer, - initargs=initargs, - maxtasksperchild=maxtasksperchild, - context=context) - -except ImportError: - class NonDaemonProcess(NonDaemonMixin, mp.Process): - pass + _nondaemon_context_mapper['forkserver'] = NonDaemonForkServerContext() +except AttributeError: + pass - class DreamPool(pool.Pool): - Process = NonDaemonProcess \ No newline at end of file +class DreamPool(pool.Pool): + def __init__(self, processes=None, initializer=None, initargs=(), + maxtasksperchild=None, context=None): + if context is None: + context = mp.get_context() + context = _nondaemon_context_mapper[context._name] + super(DreamPool, self).__init__(processes=processes, + initializer=initializer, + initargs=initargs, + maxtasksperchild=maxtasksperchild, + context=context) diff --git a/pydream/core.py b/pydream/core.py index e563311..baf8a37 100644 --- a/pydream/core.py +++ b/pydream/core.py @@ -29,7 +29,9 @@ def run_dream(parameters, likelihood, nchains=5, niterations=50000, start=None, Whether to print verbose output (including acceptance or rejection of moves and the current acceptance rate). Default: True tempering: Boolean, optional Whether to use parallel tempering for the DREAM chains. Warning: this feature is untested. Use at your own risk! Default: False - mp_context: multiprocessing context or None. It will be used to launch the workers + mp_context: multiprocessing context or None. + Method used to to start the processes. If it's None, the default context, which depends in Python version and OS, is used. + For more information please check: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods kwargs: Other arguments that will be passed to the Dream class on initialization. For more information, see Dream class. @@ -62,26 +64,28 @@ def run_dream(parameters, likelihood, nchains=5, niterations=50000, start=None, step_instance = Dream(model=model, variables=parameters, verbose=verbose, mp_context=mp_context, **kwargs) pool = _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=start, mp_context=mp_context) + try: + if tempering: - if tempering: - - sampled_params, log_ps = _sample_dream_pt(nchains, niterations, step_instance, start, pool, verbose=verbose) - - else: - - if type(start) is list: - args = zip([step_instance]*nchains, [niterations]*nchains, start, [verbose]*nchains, [nverbose]*nchains) + sampled_params, log_ps = _sample_dream_pt(nchains, niterations, step_instance, start, pool, verbose=verbose) else: - args = list(zip([step_instance]*nchains, [niterations]*nchains, [start]*nchains, [verbose]*nchains, [nverbose]*nchains)) - returned_vals = pool.map(_sample_dream, args) - sampled_params = [val[0] for val in returned_vals] - log_ps = [val[1] for val in returned_vals] - pool.close() - pool.join() + if type(start) is list: + args = zip([step_instance]*nchains, [niterations]*nchains, start, [verbose]*nchains, [nverbose]*nchains) + + else: + args = list(zip([step_instance]*nchains, [niterations]*nchains, [start]*nchains, [verbose]*nchains, [nverbose]*nchains)) + + returned_vals = pool.map(_sample_dream, args) + sampled_params = [val[0] for val in returned_vals] + log_ps = [val[1] for val in returned_vals] + finally: + pool.close() + pool.join() return sampled_params, log_ps + def _sample_dream(args): try: