Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run PyDREAM with python 3.8 #17

Merged
merged 8 commits into from
Feb 18, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
language: python
env:
- PYVER=2.7
- PYVER=3.8
- PYVER=3.6
- PYVER=3.7
before_install:
Expand All @@ -10,10 +10,8 @@ before_install:
- bash miniconda.sh -b -p $HOME/miniconda
- export PATH="/home/travis/miniconda/bin:$PATH"
- conda update --yes conda
- conda install --yes -c conda-forge -c alubbock python="$PYVER" numpy scipy nose
- conda install --yes -c conda-forge -c alubbock python="$PYVER" numpy scipy nose cython
pysb
- pip install python-dateutil
- pip install -r requirements.txt
install:
- pip install .
script:
Expand All @@ -27,4 +25,4 @@ deploy:
on:
branch: master
tags: true
condition: $PYVER = 2.7
condition: $PYVER = 3.6
112 changes: 88 additions & 24 deletions pydream/Dream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from . import Dream_shared_vars
from datetime import datetime
import traceback
import multiprocess as mp
import multiprocess.pool as mp_pool
import multiprocessing as mp
from multiprocessing import pool
import time

class Dream():
Expand Down Expand Up @@ -55,11 +55,18 @@ class Dream():
A model name to be used as a prefix when saving history and crossover value files.
hardboundaries : bool
Whether to relect point back into bounds of hard prior (i.e., if using a uniform prior, reflect points outside of boundaries back in, so you don't waste time looking at points with logpdf = -inf).
mp_context : a multiprocessing context or None. It will be used to launch the workers
ortega2247 marked this conversation as resolved.
Show resolved Hide resolved
"""

def __init__(self, model, variables=None, nseedchains=None, nCR=3, adapt_crossover=True, adapt_gamma=False, crossover_burnin=None, DEpairs=1, lamb=.05, zeta=1e-12, history_thin=10, snooker=.10, p_gamma_unity=.20, gamma_levels=1, start_random=True, save_history=True, history_file=False, crossover_file=False, gamma_file=False, multitry=False, parallel=False, verbose=False, model_name=False, hardboundaries=True, **kwargs):

#Set model and variable attributes (if no variables passed, set to all parameters)
def __init__(self, model, variables=None, nseedchains=None, nCR=3, adapt_crossover=True, adapt_gamma=False,
crossover_burnin=None, DEpairs=1, lamb=.05, zeta=1e-12, history_thin=10, snooker=.10,
p_gamma_unity=.20, gamma_levels=1, start_random=True, save_history=True, history_file=False,
crossover_file=False, gamma_file=False, multitry=False, parallel=False, verbose=False,
model_name=False, hardboundaries=True, mp_context=None, **kwargs):

# Set Dream multiprocessing context
self.mp_context = mp_context
# Set model and variable attributes (if no variables passed, set to all parameters)
self.model = model
self.model_name = model_name
if variables is None:
Expand Down Expand Up @@ -846,11 +853,9 @@ def mt_evaluate_logps(self, parallel, multitry, proposed_pts, pfunc, ref=False):

#If using multi-try and running in parallel farm out proposed points to process pool.
if parallel:
p = mp.Pool(multitry)
args = list(zip([self]*multitry, np.squeeze(proposed_pts)))
logps = p.map(call_logp, args)
p.close()
p.join()
args = list(zip([self] * multitry, np.squeeze(proposed_pts)))
with pool.Pool(multitry, context=self.mp_context) as p:
logps = p.map(call_logp, args)
log_priors = [val[0] for val in logps]
log_likes = [val[1] for val in logps]

Expand Down Expand Up @@ -990,22 +995,81 @@ def metrop_select(mr, q, q0):
# Reject proposed value
return q0


class NoDaemonProcess(mp.Process):
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
mp.Process.__init__(self, group, target, name, args, kwargs)

ortega2247 marked this conversation as resolved.
Show resolved Hide resolved
# make 'daemon' attribute always return False
def _get_daemon(self):
# Pythons 2.7, 3.4-3.7.0, and 3.7.1 have three different implementations of
ortega2247 marked this conversation as resolved.
Show resolved Hide resolved
# pool.Pool().Process(), and the type of the result varies based on the default
# multiprocessing context, so we need to dynamically patch the daemon property
class NonDaemonMixin(object):
@property
def daemon(self):
return False
def _set_daemon(self, value):

@daemon.setter
def daemon(self, val):
pass


try:
from multiprocessing import context


# Exists on all platforms
class NonDaemonSpawnProcess(NonDaemonMixin, context.SpawnProcess):
pass


class NonDaemonSpawnContext(context.SpawnContext):
Process = NonDaemonSpawnProcess


_nondaemon_context_mapper = {
'spawn': NonDaemonSpawnContext()
}

# POSIX only
try:
class NonDaemonForkProcess(NonDaemonMixin, context.ForkProcess):
pass


class NonDaemonForkContext(context.ForkContext):
Process = NonDaemonForkProcess


_nondaemon_context_mapper['fork'] = NonDaemonForkContext()
except AttributeError:
pass
# POSIX only
try:
class NonDaemonForkServerProcess(NonDaemonMixin, context.ForkServerProcess):
pass


class NonDaemonForkServerContext(context.ForkServerContext):
Process = NonDaemonForkServerProcess


_nondaemon_context_mapper['forkserver'] = NonDaemonForkServerContext()
except AttributeError:
pass


class DreamPool(pool.Pool):
def __init__(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None, context=None):
if context is None:
context = mp.get_context()
context = _nondaemon_context_mapper[context._name]
super(DreamPool, self).__init__(processes=processes,
initializer=initializer,
initargs=initargs,
maxtasksperchild=maxtasksperchild,
context=context)

except ImportError:
class NonDaemonProcess(NonDaemonMixin, mp.Process):
pass
daemon = property(_get_daemon, _set_daemon)

#A subclass of multiprocessing.pool.Pool that allows processes to launch child processes (this is necessary for Dream to use multi-try)
#Taken from http://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic
class DreamPool(mp_pool.Pool):
def __init__(self, processes=None, initializer=None, initargs=None, maxtasksperchild=None):
mp_pool.Pool.__init__(self, processes, initializer, initargs, maxtasksperchild)
Process = NoDaemonProcess

class DreamPool(pool.Pool):
Process = NonDaemonProcess
ortega2247 marked this conversation as resolved.
Show resolved Hide resolved
57 changes: 36 additions & 21 deletions pydream/core.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# -*- coding: utf-8 -*-

import numpy as np
import multiprocess as mp
import multiprocessing as mp
from . import Dream_shared_vars
from .Dream import Dream, DreamPool
from .model import Model
import traceback

def run_dream(parameters, likelihood, nchains=5, niterations=50000, start=None, restart=False, verbose=True, nverbose=10, tempering=False, **kwargs):

def run_dream(parameters, likelihood, nchains=5, niterations=50000, start=None, restart=False, verbose=True, nverbose=10, tempering=False, mp_context=None, **kwargs):
"""Run DREAM given a set of parameters with priors and a likelihood function.

Parameters
Expand All @@ -28,6 +29,7 @@ def run_dream(parameters, likelihood, nchains=5, niterations=50000, start=None,
Whether to print verbose output (including acceptance or rejection of moves and the current acceptance rate). Default: True
tempering: Boolean, optional
Whether to use parallel tempering for the DREAM chains. Warning: this feature is untested. Use at your own risk! Default: False
mp_context: multiprocessing context or None. It will be used to launch the workers
ortega2247 marked this conversation as resolved.
Show resolved Hide resolved
kwargs:
Other arguments that will be passed to the Dream class on initialization. For more information, see Dream class.

Expand All @@ -51,11 +53,15 @@ def run_dream(parameters, likelihood, nchains=5, niterations=50000, start=None,
model = Model(likelihood=likelihood, sampled_parameters=parameters)

if restart:
step_instance = Dream(model=model, variables=parameters, history_file=kwargs['model_name']+'_DREAM_chain_history.npy', crossover_file=kwargs['model_name']+'_DREAM_chain_adapted_crossoverprob.npy', gamma_file=kwargs['model_name']+'_DREAM_chain_adapted_gammalevelprob.npy', verbose=verbose, **kwargs)
step_instance = Dream(model=model, variables=parameters,
history_file=kwargs['model_name'] + '_DREAM_chain_history.npy',
crossover_file=kwargs['model_name'] + '_DREAM_chain_adapted_crossoverprob.npy',
gamma_file=kwargs['model_name'] + '_DREAM_chain_adapted_gammalevelprob.npy',
verbose=verbose, mp_context=mp_context, **kwargs)
else:
step_instance = Dream(model=model, variables=parameters, verbose=verbose, **kwargs)
step_instance = Dream(model=model, variables=parameters, verbose=verbose, mp_context=mp_context, **kwargs)

pool = _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=start)
pool = _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=start, mp_context=mp_context)

ortega2247 marked this conversation as resolved.
Show resolved Hide resolved
if tempering:

Expand All @@ -72,7 +78,8 @@ def run_dream(parameters, likelihood, nchains=5, niterations=50000, start=None,
returned_vals = pool.map(_sample_dream, args)
sampled_params = [val[0] for val in returned_vals]
log_ps = [val[1] for val in returned_vals]
ortega2247 marked this conversation as resolved.
Show resolved Hide resolved

pool.close()
pool.join()
return sampled_params, log_ps

def _sample_dream(args):
Expand Down Expand Up @@ -236,7 +243,7 @@ def _sample_dream_pt_chain(args):

return q1, logprior1, loglike1, dream_instance

def _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=None):
def _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=None, mp_context=None):

min_njobs = (2*len(step_instance.DEpairs))+1
if nchains < min_njobs:
Expand All @@ -262,23 +269,28 @@ def _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=None):
raise Exception('The size of the seeded starting history is insufficient. Increase nseedchains>=%s.' %str(min_nseedchains))

current_position_dim = nchains*step_instance.total_var_dimension
history_arr = mp.Array('d', [0]*int(arr_dim))
# Get context to define arrays
if mp_context is None:
ctx = mp.get_context(mp_context)
else:
ctx = mp_context
history_arr = ctx.Array('d', [0] * int(arr_dim))
if step_instance.history_file != False:
history_arr[0:len_old_history] = old_history.flatten()
nCR = step_instance.nCR
ngamma = step_instance.ngamma
crossover_setting = step_instance.CR_probabilities
crossover_probabilities = mp.Array('d', crossover_setting)
ncrossover_updates = mp.Array('d', [0]*nCR)
delta_m = mp.Array('d', [0]*nCR)
crossover_probabilities = ctx.Array('d', crossover_setting)
ncrossover_updates = ctx.Array('d', [0] * nCR)
delta_m = ctx.Array('d', [0] * nCR)
gamma_level_setting = step_instance.gamma_probabilities
gamma_probabilities = mp.Array('d', gamma_level_setting)
ngamma_updates = mp.Array('d', [0]*ngamma)
delta_m_gamma = mp.Array('d', [0]*ngamma)
current_position_arr = mp.Array('d', [0]*current_position_dim)
shared_nchains = mp.Value('i', nchains)
n = mp.Value('i', 0)
tf = mp.Value('c', b'F')
gamma_probabilities = ctx.Array('d', gamma_level_setting)
ngamma_updates = ctx.Array('d', [0] * ngamma)
delta_m_gamma = ctx.Array('d', [0] * ngamma)
current_position_arr = ctx.Array('d', [0] * current_position_dim)
shared_nchains = ctx.Value('i', nchains)
n = ctx.Value('i', 0)
tf = ctx.Value('c', b'F')

if step_instance.crossover_burnin == None:
step_instance.crossover_burnin = int(np.floor(niterations/10))
Expand All @@ -288,9 +300,12 @@ def _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=None):
print('Warning: start position provided but random_start set to True. Overrode random_start value and starting walk at provided start position.')
step_instance.start_random = False

p = DreamPool(nchains, initializer=_mp_dream_init, initargs=(history_arr, current_position_arr, shared_nchains, crossover_probabilities, ncrossover_updates, delta_m, gamma_probabilities, ngamma_updates, delta_m_gamma, n, tf, ))
#p = mp.pool.ThreadPool(nchains, initializer=_mp_dream_init, initargs=(history_arr, current_position_arr, shared_nchains, crossover_probabilities, ncrossover_updates, delta_m, gamma_probabilities, ngamma_updates, delta_m_gamma, n, tf, ))
#p = mp.Pool(nchains, initializer=_mp_dream_init, initargs=(history_arr, current_position_arr, shared_nchains, crossover_probabilities, ncrossover_updates, delta_m, gamma_probabilities, ngamma_updates, delta_m_gamma, n, tf, ))
p = DreamPool(nchains, context=ctx, initializer=_mp_dream_init,
initargs=(history_arr, current_position_arr, shared_nchains,
crossover_probabilities, ncrossover_updates, delta_m,
gamma_probabilities, ngamma_updates, delta_m_gamma, n, tf,))
# p = mp.pool.ThreadPool(nchains, initializer=_mp_dream_init, initargs=(history_arr, current_position_arr, shared_nchains, crossover_probabilities, ncrossover_updates, delta_m, gamma_probabilities, ngamma_updates, delta_m_gamma, n, tf, ))
# p = mp.Pool(nchains, initializer=_mp_dream_init, initargs=(history_arr, current_position_arr, shared_nchains, crossover_probabilities, ncrossover_updates, delta_m, gamma_probabilities, ngamma_updates, delta_m_gamma, n, tf, ))

return p

Expand Down
2 changes: 0 additions & 2 deletions pydream/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ def random(self, reseed=False):
else:
random_seed = None

random = self.dist.rvs(random_state=random_seed)

return self.dist.rvs(random_state=random_seed)

def prior(self, q0):
Expand Down
2 changes: 1 addition & 1 deletion pydream/tests/test_dream.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import unittest
from os import remove

import multiprocess as mp
import multiprocessing as mp
import numpy as np
import pydream.Dream_shared_vars
from pydream.Dream import Dream
Expand Down
3 changes: 0 additions & 3 deletions requirements.txt

This file was deleted.

2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def readme():
author='Erin Shockley',
author_email='[email protected]',
packages=['pydream'],
install_requires=['numpy', 'scipy', 'multiprocess'],
install_requires=['numpy', 'scipy'],
zip_safe=False,
test_suite='nose.collector',
tests_require=['nose'],
Expand Down