Skip to content

Commit

Permalink
Merge pull request #17 from LoLab-VU/dream_pool_all_python
Browse files Browse the repository at this point in the history
Run PyDREAM with python 3.8
  • Loading branch information
ortega2247 authored Feb 18, 2020
2 parents dd9cd08 + 278af49 commit e3cb561
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 69 deletions.
8 changes: 3 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
language: python
env:
- PYVER=2.7
- PYVER=3.8
- PYVER=3.6
- PYVER=3.7
before_install:
Expand All @@ -10,10 +10,8 @@ before_install:
- bash miniconda.sh -b -p $HOME/miniconda
- export PATH="/home/travis/miniconda/bin:$PATH"
- conda update --yes conda
- conda install --yes -c conda-forge -c alubbock python="$PYVER" numpy scipy nose
- conda install --yes -c conda-forge -c alubbock python="$PYVER" numpy scipy nose cython
pysb
- pip install python-dateutil
- pip install -r requirements.txt
install:
- pip install .
script:
Expand All @@ -27,4 +25,4 @@ deploy:
on:
branch: master
tags: true
condition: $PYVER = 2.7
condition: $PYVER = 3.6
109 changes: 85 additions & 24 deletions pydream/Dream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from . import Dream_shared_vars
from datetime import datetime
import traceback
import multiprocess as mp
import multiprocess.pool as mp_pool
import multiprocessing as mp
from multiprocessing import pool
import time

class Dream():
Expand Down Expand Up @@ -55,11 +55,20 @@ class Dream():
A model name to be used as a prefix when saving history and crossover value files.
hardboundaries : bool
Whether to relect point back into bounds of hard prior (i.e., if using a uniform prior, reflect points outside of boundaries back in, so you don't waste time looking at points with logpdf = -inf).
mp_context : multiprocessing context or None.
Method used to to start the processes. If it's None, the default context, which depends in Python version and OS, is used.
For more information please check: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
"""

def __init__(self, model, variables=None, nseedchains=None, nCR=3, adapt_crossover=True, adapt_gamma=False, crossover_burnin=None, DEpairs=1, lamb=.05, zeta=1e-12, history_thin=10, snooker=.10, p_gamma_unity=.20, gamma_levels=1, start_random=True, save_history=True, history_file=False, crossover_file=False, gamma_file=False, multitry=False, parallel=False, verbose=False, model_name=False, hardboundaries=True, **kwargs):

#Set model and variable attributes (if no variables passed, set to all parameters)
def __init__(self, model, variables=None, nseedchains=None, nCR=3, adapt_crossover=True, adapt_gamma=False,
crossover_burnin=None, DEpairs=1, lamb=.05, zeta=1e-12, history_thin=10, snooker=.10,
p_gamma_unity=.20, gamma_levels=1, start_random=True, save_history=True, history_file=False,
crossover_file=False, gamma_file=False, multitry=False, parallel=False, verbose=False,
model_name=False, hardboundaries=True, mp_context=None, **kwargs):

# Set Dream multiprocessing context
self.mp_context = mp_context
# Set model and variable attributes (if no variables passed, set to all parameters)
self.model = model
self.model_name = model_name
if variables is None:
Expand Down Expand Up @@ -846,11 +855,9 @@ def mt_evaluate_logps(self, parallel, multitry, proposed_pts, pfunc, ref=False):

#If using multi-try and running in parallel farm out proposed points to process pool.
if parallel:
p = mp.Pool(multitry)
args = list(zip([self]*multitry, np.squeeze(proposed_pts)))
logps = p.map(call_logp, args)
p.close()
p.join()
args = list(zip([self] * multitry, np.squeeze(proposed_pts)))
with pool.Pool(multitry, context=self.mp_context) as p:
logps = p.map(call_logp, args)
log_priors = [val[0] for val in logps]
log_likes = [val[1] for val in logps]

Expand Down Expand Up @@ -990,22 +997,76 @@ def metrop_select(mr, q, q0):
# Reject proposed value
return q0


class NoDaemonProcess(mp.Process):
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
mp.Process.__init__(self, group, target, name, args, kwargs)
# The following part uses source code from the file legacymultiproc.py from https://github.com/nipy/nipype
# copyright NIPY developers, licensed under the Apache 2.0 license.

# Pythons 3.4-3.7.0, and 3.7.1 have three different implementations of
# pool.Pool().Process(), and the type of the result varies based on the default
# multiprocessing context, so we need to dynamically patch the daemon property

# make 'daemon' attribute always return False
def _get_daemon(self):

class NonDaemonMixin(object):
@property
def daemon(self):
return False
def _set_daemon(self, value):

@daemon.setter
def daemon(self, val):
pass
daemon = property(_get_daemon, _set_daemon)

#A subclass of multiprocessing.pool.Pool that allows processes to launch child processes (this is necessary for Dream to use multi-try)
#Taken from http://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic
class DreamPool(mp_pool.Pool):
def __init__(self, processes=None, initializer=None, initargs=None, maxtasksperchild=None):
mp_pool.Pool.__init__(self, processes, initializer, initargs, maxtasksperchild)
Process = NoDaemonProcess

from multiprocessing import context


# Exists on all platforms
class NonDaemonSpawnProcess(NonDaemonMixin, context.SpawnProcess):
pass


class NonDaemonSpawnContext(context.SpawnContext):
Process = NonDaemonSpawnProcess


_nondaemon_context_mapper = {
'spawn': NonDaemonSpawnContext()
}

# POSIX only
try:
class NonDaemonForkProcess(NonDaemonMixin, context.ForkProcess):
pass


class NonDaemonForkContext(context.ForkContext):
Process = NonDaemonForkProcess


_nondaemon_context_mapper['fork'] = NonDaemonForkContext()
except AttributeError:
pass
# POSIX only
try:
class NonDaemonForkServerProcess(NonDaemonMixin, context.ForkServerProcess):
pass


class NonDaemonForkServerContext(context.ForkServerContext):
Process = NonDaemonForkServerProcess


_nondaemon_context_mapper['forkserver'] = NonDaemonForkServerContext()
except AttributeError:
pass


class DreamPool(pool.Pool):
def __init__(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None, context=None):
if context is None:
context = mp.get_context()
context = _nondaemon_context_mapper[context._name]
super(DreamPool, self).__init__(processes=processes,
initializer=initializer,
initargs=initargs,
maxtasksperchild=maxtasksperchild,
context=context)
85 changes: 52 additions & 33 deletions pydream/core.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# -*- coding: utf-8 -*-

import numpy as np
import multiprocess as mp
import multiprocessing as mp
from . import Dream_shared_vars
from .Dream import Dream, DreamPool
from .model import Model
import traceback

def run_dream(parameters, likelihood, nchains=5, niterations=50000, start=None, restart=False, verbose=True, nverbose=10, tempering=False, **kwargs):

def run_dream(parameters, likelihood, nchains=5, niterations=50000, start=None, restart=False, verbose=True, nverbose=10, tempering=False, mp_context=None, **kwargs):
"""Run DREAM given a set of parameters with priors and a likelihood function.
Parameters
Expand All @@ -28,6 +29,9 @@ def run_dream(parameters, likelihood, nchains=5, niterations=50000, start=None,
Whether to print verbose output (including acceptance or rejection of moves and the current acceptance rate). Default: True
tempering: Boolean, optional
Whether to use parallel tempering for the DREAM chains. Warning: this feature is untested. Use at your own risk! Default: False
mp_context: multiprocessing context or None.
Method used to to start the processes. If it's None, the default context, which depends in Python version and OS, is used.
For more information please check: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
kwargs:
Other arguments that will be passed to the Dream class on initialization. For more information, see Dream class.
Expand All @@ -51,30 +55,37 @@ def run_dream(parameters, likelihood, nchains=5, niterations=50000, start=None,
model = Model(likelihood=likelihood, sampled_parameters=parameters)

if restart:
step_instance = Dream(model=model, variables=parameters, history_file=kwargs['model_name']+'_DREAM_chain_history.npy', crossover_file=kwargs['model_name']+'_DREAM_chain_adapted_crossoverprob.npy', gamma_file=kwargs['model_name']+'_DREAM_chain_adapted_gammalevelprob.npy', verbose=verbose, **kwargs)
step_instance = Dream(model=model, variables=parameters,
history_file=kwargs['model_name'] + '_DREAM_chain_history.npy',
crossover_file=kwargs['model_name'] + '_DREAM_chain_adapted_crossoverprob.npy',
gamma_file=kwargs['model_name'] + '_DREAM_chain_adapted_gammalevelprob.npy',
verbose=verbose, mp_context=mp_context, **kwargs)
else:
step_instance = Dream(model=model, variables=parameters, verbose=verbose, **kwargs)
step_instance = Dream(model=model, variables=parameters, verbose=verbose, mp_context=mp_context, **kwargs)

pool = _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=start)
pool = _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=start, mp_context=mp_context)
try:
if tempering:

if tempering:

sampled_params, log_ps = _sample_dream_pt(nchains, niterations, step_instance, start, pool, verbose=verbose)

else:

if type(start) is list:
args = zip([step_instance]*nchains, [niterations]*nchains, start, [verbose]*nchains, [nverbose]*nchains)
sampled_params, log_ps = _sample_dream_pt(nchains, niterations, step_instance, start, pool, verbose=verbose)

else:
args = list(zip([step_instance]*nchains, [niterations]*nchains, [start]*nchains, [verbose]*nchains, [nverbose]*nchains))

returned_vals = pool.map(_sample_dream, args)
sampled_params = [val[0] for val in returned_vals]
log_ps = [val[1] for val in returned_vals]

if type(start) is list:
args = zip([step_instance]*nchains, [niterations]*nchains, start, [verbose]*nchains, [nverbose]*nchains)

else:
args = list(zip([step_instance]*nchains, [niterations]*nchains, [start]*nchains, [verbose]*nchains, [nverbose]*nchains))

returned_vals = pool.map(_sample_dream, args)
sampled_params = [val[0] for val in returned_vals]
log_ps = [val[1] for val in returned_vals]
finally:
pool.close()
pool.join()
return sampled_params, log_ps


def _sample_dream(args):

try:
Expand Down Expand Up @@ -236,7 +247,7 @@ def _sample_dream_pt_chain(args):

return q1, logprior1, loglike1, dream_instance

def _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=None):
def _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=None, mp_context=None):

min_njobs = (2*len(step_instance.DEpairs))+1
if nchains < min_njobs:
Expand All @@ -262,23 +273,28 @@ def _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=None):
raise Exception('The size of the seeded starting history is insufficient. Increase nseedchains>=%s.' %str(min_nseedchains))

current_position_dim = nchains*step_instance.total_var_dimension
history_arr = mp.Array('d', [0]*int(arr_dim))
# Get context to define arrays
if mp_context is None:
ctx = mp.get_context(mp_context)
else:
ctx = mp_context
history_arr = ctx.Array('d', [0] * int(arr_dim))
if step_instance.history_file != False:
history_arr[0:len_old_history] = old_history.flatten()
nCR = step_instance.nCR
ngamma = step_instance.ngamma
crossover_setting = step_instance.CR_probabilities
crossover_probabilities = mp.Array('d', crossover_setting)
ncrossover_updates = mp.Array('d', [0]*nCR)
delta_m = mp.Array('d', [0]*nCR)
crossover_probabilities = ctx.Array('d', crossover_setting)
ncrossover_updates = ctx.Array('d', [0] * nCR)
delta_m = ctx.Array('d', [0] * nCR)
gamma_level_setting = step_instance.gamma_probabilities
gamma_probabilities = mp.Array('d', gamma_level_setting)
ngamma_updates = mp.Array('d', [0]*ngamma)
delta_m_gamma = mp.Array('d', [0]*ngamma)
current_position_arr = mp.Array('d', [0]*current_position_dim)
shared_nchains = mp.Value('i', nchains)
n = mp.Value('i', 0)
tf = mp.Value('c', b'F')
gamma_probabilities = ctx.Array('d', gamma_level_setting)
ngamma_updates = ctx.Array('d', [0] * ngamma)
delta_m_gamma = ctx.Array('d', [0] * ngamma)
current_position_arr = ctx.Array('d', [0] * current_position_dim)
shared_nchains = ctx.Value('i', nchains)
n = ctx.Value('i', 0)
tf = ctx.Value('c', b'F')

if step_instance.crossover_burnin == None:
step_instance.crossover_burnin = int(np.floor(niterations/10))
Expand All @@ -288,9 +304,12 @@ def _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=None):
print('Warning: start position provided but random_start set to True. Overrode random_start value and starting walk at provided start position.')
step_instance.start_random = False

p = DreamPool(nchains, initializer=_mp_dream_init, initargs=(history_arr, current_position_arr, shared_nchains, crossover_probabilities, ncrossover_updates, delta_m, gamma_probabilities, ngamma_updates, delta_m_gamma, n, tf, ))
#p = mp.pool.ThreadPool(nchains, initializer=_mp_dream_init, initargs=(history_arr, current_position_arr, shared_nchains, crossover_probabilities, ncrossover_updates, delta_m, gamma_probabilities, ngamma_updates, delta_m_gamma, n, tf, ))
#p = mp.Pool(nchains, initializer=_mp_dream_init, initargs=(history_arr, current_position_arr, shared_nchains, crossover_probabilities, ncrossover_updates, delta_m, gamma_probabilities, ngamma_updates, delta_m_gamma, n, tf, ))
p = DreamPool(nchains, context=ctx, initializer=_mp_dream_init,
initargs=(history_arr, current_position_arr, shared_nchains,
crossover_probabilities, ncrossover_updates, delta_m,
gamma_probabilities, ngamma_updates, delta_m_gamma, n, tf,))
# p = mp.pool.ThreadPool(nchains, initializer=_mp_dream_init, initargs=(history_arr, current_position_arr, shared_nchains, crossover_probabilities, ncrossover_updates, delta_m, gamma_probabilities, ngamma_updates, delta_m_gamma, n, tf, ))
# p = mp.Pool(nchains, initializer=_mp_dream_init, initargs=(history_arr, current_position_arr, shared_nchains, crossover_probabilities, ncrossover_updates, delta_m, gamma_probabilities, ngamma_updates, delta_m_gamma, n, tf, ))

return p

Expand Down
2 changes: 0 additions & 2 deletions pydream/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ def random(self, reseed=False):
else:
random_seed = None

random = self.dist.rvs(random_state=random_seed)

return self.dist.rvs(random_state=random_seed)

def prior(self, q0):
Expand Down
2 changes: 1 addition & 1 deletion pydream/tests/test_dream.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import unittest
from os import remove

import multiprocess as mp
import multiprocessing as mp
import numpy as np
import pydream.Dream_shared_vars
from pydream.Dream import Dream
Expand Down
3 changes: 0 additions & 3 deletions requirements.txt

This file was deleted.

2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def readme():
author='Erin Shockley',
author_email='[email protected]',
packages=['pydream'],
install_requires=['numpy', 'scipy', 'multiprocess'],
install_requires=['numpy', 'scipy'],
zip_safe=False,
test_suite='nose.collector',
tests_require=['nose'],
Expand Down

0 comments on commit e3cb561

Please sign in to comment.