Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix merging of runs #482

Merged
merged 7 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Changed
### Fixed
- Fix the issue with leaking file descriptors when using dynesty pool. Can lead to 'too many open files' problem ( #379, reported by @bencebecsy, fixed by @segasai )
- Fix the issue with merge_runs when the dynamic runs with different number of batches are merged ( #481 reported by @rodleiva, fixed by @segasai)
- Fix the issue with leaking file descriptors when using dynesty pool. Can lead to 'too many open files' problem ( #379, reported by @bencebecsy, fixed by @segasai)

[2.1.4 - 2024-06-26]
### Added
Expand All @@ -19,7 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Fixed
- Fix the way the additional arguments are treated when working with dynesty's pool. Previously those only could have been passed through dynesty.pool.Pool() constructor. Now they can still be provided directly to the sampler (not recommended) ( #464 , reported by @eteq, fixed by @segasai )
- change the .ptp() method to np.ptp() function as it is deprecated in numpy 2.0 ( #478 , reported and patched by @joezuntz)
- Fix an error if you use run_nested() several times (i.e. with maxiter option) while using blob=True. ( #475 , reported by @carlosRmelo,)
- Fix an error if you use run_nested() several times (i.e. with maxiter option) while using blob=True. ( #475 , reported by @carlosRmelo)

## [2.1.3] - 2023-10-04
### Added
Expand Down
399 changes: 291 additions & 108 deletions demos/Demo 3 - Errors.ipynb

Large diffs are not rendered by default.

241 changes: 110 additions & 131 deletions py/dynesty/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1865,6 +1865,50 @@ def kld_error(res,
return kld


def _prepare_for_merge(res):
"""
Internal method used to prepare a run for merging with another run.
It takes the results object and it returns the dictionary with basic run
info and the number of live points at each iteration.
"""
# Initialize the first ("base") run.
run_info = dict(id=res.samples_id,
u=res.samples_u,
v=res.samples,
logl=res.logl,
nc=res.ncall,
it=res.samples_it,
blob=res.blob)
nrun = len(run_info['id'])

# Number of live points throughout the run.
if res.isdynamic():
run_nlive = res.samples_n
else:
niter, nlive = res.niter, res.nlive
if nrun == niter:
run_nlive = np.ones(niter, dtype=int) * nlive
elif nrun == (niter + nlive):
# this is the case where the last live points are added
# one by one in the end of the run
run_nlive = np.minimum(np.arange(nrun, 0, -1), nlive)
else:
raise ValueError("Final number of samples differs from number of "
"iterations and number of live points in `res1`.")

# Batch information (if available).
# note we also check for existance of batch_bounds
# because unravel_run makes 'static' runs of 1 livepoint
# but some will have bounds
if res.isdynamic() or 'batch_bounds' in res.keys():
run_info['batch'] = res.samples_batch
run_info['bounds'] = res.batch_bounds
else:
run_info['batch'] = np.zeros(nrun, dtype=int)
run_info['bounds'] = np.array([(-np.inf, np.inf)])
return run_nlive, run_info


def _merge_two(res1, res2, compute_aux=False):
"""
Internal method used to merges two runs with differing (possibly variable)
Expand All @@ -1891,156 +1935,91 @@ def _merge_two(res1, res2, compute_aux=False):
nested sampling run.

"""
base_nlive, base_info = _prepare_for_merge(res1)
new_nlive, new_info = _prepare_for_merge(res2)
base_nsamples = len(base_info['id'])
new_nsamples = len(new_info['id'])
# Initialize our new combined run.
combined_info = dict()
for curk in [
'id', 'u', 'v', 'logl', 'logvol', 'logwt', 'logz', 'logzvar', 'h',
'nc', 'it', 'n', 'batch', 'blob'
]:
combined_info[curk] = []

# These are merged batch bounds
combined_bounds = np.unique(np.concatenate(
(base_info['bounds'], new_info['bounds'])),
axis=0)
# Here we try to find where the new bounds are in the combined bounds
new_bound_map = {}
base_bound_map = {}
for i in range(len(new_info['bounds'])):
new_bound_map[i] = np.where(
np.all(new_info['bounds'][i] == combined_bounds, axis=1))[0][0]
for i in range(len(base_info['bounds'])):
base_bound_map[i] = np.where(
np.all(base_info['bounds'][i] == combined_bounds, axis=1))[0][0]

base_lowedge = np.min(base_info['bounds'][base_info['batch']])
new_lowedge = np.min(new_info['bounds'][new_info['batch']])

# Initialize the first ("base") run.
base_info = dict(id=res1.samples_id,
u=res1.samples_u,
v=res1.samples,
logl=res1.logl,
nc=res1.ncall,
it=res1.samples_it,
blob=res1.blob)
nbase = len(base_info['id'])

# Number of live points throughout the run.
if res1.isdynamic():
base_n = res1.samples_n
else:
niter, nlive = res1.niter, res1.nlive
if nbase == niter:
base_n = np.ones(niter, dtype=int) * nlive
elif nbase == (niter + nlive):
base_n = np.minimum(np.arange(nbase, 0, -1), nlive)
# Iteratively walk through both set of samples to simulate
# a combined run.
combined_nsamples = base_nsamples + new_nsamples
# Start our counters at the beginning of each set of dead points.
base_idx, new_idx = 0, 0
for i in range(combined_nsamples):
# Attempt to step along our samples. If we're out of samples,
# set values to defaults.
if base_idx < base_nsamples:
base_cur_logl = base_info['logl'][base_idx]
base_cur_nlive = base_nlive[base_idx]
else:
raise ValueError("Final number of samples differs from number of "
"iterations and number of live points in `res1`.")

# Batch information (if available).
# note we also check for existance of batch_bounds
# because unravel_run makes 'static' runs of 1 livepoint
# but some will have bounds
if res1.isdynamic() or 'batch_bounds' in res1.keys():
base_info['batch'] = res1.samples_batch
base_info['bounds'] = res1.batch_bounds
else:
base_info['batch'] = np.zeros(nbase, dtype=int)
base_info['bounds'] = np.array([(-np.inf, np.inf)])

# Initialize the second ("new") run.
new_info = dict(id=res2.samples_id,
u=res2.samples_u,
v=res2.samples,
logl=res2.logl,
nc=res2.ncall,
it=res2.samples_it,
blob=res2.blob)
nnew = len(new_info['id'])

# Number of live points throughout the run.
if res2.isdynamic():
new_n = res2.samples_n
else:
niter, nlive = res2.niter, res2.nlive
if nnew == niter:
new_n = np.ones(niter, dtype=int) * nlive
elif nnew == (niter + nlive):
new_n = np.minimum(np.arange(nnew, 0, -1), nlive)
base_cur_logl = np.inf
base_cur_nlive = 0
# TODO this is potentially incorrect
# It is not clear what nlive should be when we
# are past the end of one of the run
if new_idx < new_nsamples:
new_cur_logl = new_info['logl'][new_idx]
new_cur_nlive = new_nlive[new_idx]
else:
raise ValueError("Final number of samples differs from number of "
"iterations and number of live points in `res2`.")
new_cur_logl = np.inf
new_cur_nlive = 0

# Batch information (if available).
# note we also check for existance of batch_bounds
# because unravel_run makes 'static' runs of 1 livepoint
# but some will have bounds
if res2.isdynamic() or 'batch_bounds' in res2.keys():
new_info['batch'] = res2.samples_batch
new_info['bounds'] = res2.batch_bounds
else:
new_info['batch'] = np.zeros(nnew, dtype=int)
new_info['bounds'] = np.array([(-np.inf, np.inf)])

# Initialize our new combind run.
combined_info = dict(id=[],
u=[],
v=[],
logl=[],
logvol=[],
logwt=[],
logz=[],
logzvar=[],
h=[],
nc=[],
it=[],
n=[],
batch=[],
blob=[])

# Check if batch info is the same and modify counters accordingly.
if np.all(base_info['bounds'] == new_info['bounds']):
bounds = base_info['bounds']
boffset = 0
else:
bounds = np.concatenate((base_info['bounds'], new_info['bounds']))
boffset = len(base_info['bounds'])

# Start our counters at the beginning of each set of dead points.
idx_base, idx_new = 0, 0
logl_b, logl_n = base_info['logl'][idx_base], new_info['logl'][idx_new]
nlive_b, nlive_n = base_n[idx_base], new_n[idx_new]

# Iteratively walk through both set of samples to simulate
# a combined run.
ntot = nbase + nnew
llmin_b = np.min(base_info['bounds'][base_info['batch']])
llmin_n = np.min(new_info['bounds'][new_info['batch']])
for i in range(ntot):
if logl_b > llmin_n and logl_n > llmin_b:
if base_cur_logl > new_lowedge and new_cur_logl > base_lowedge:
# If our samples from the both runs are past the each others'
# lower log-likelihood bound, both runs are now "active".
nlive = nlive_b + nlive_n
elif logl_b <= llmin_n:
cur_nlive = base_cur_nlive + new_cur_nlive
elif base_cur_logl <= new_lowedge:
# If instead our collection of dead points from the "base" run
# are below the bound, just use those.
nlive = nlive_b
cur_nlive = base_cur_nlive
else:
# Our collection of dead points from the "new" run
# are below the bound, so just use those.
nlive = nlive_n
cur_nlive = new_cur_nlive

# Increment our position along depending on
# which dead point (saved or new) is worse.

if logl_b <= logl_n:
add_idx = idx_base
if base_cur_logl <= new_cur_logl:
add_idx = base_idx
from_run = base_info
idx_base += 1
combined_info['batch'].append(from_run['batch'][add_idx])
from_map = base_bound_map
base_idx += 1
else:
add_idx = idx_new
add_idx = new_idx
from_run = new_info
idx_new += 1
combined_info['batch'].append(from_run['batch'][add_idx] + boffset)
from_map = new_bound_map
new_idx += 1
combined_info['batch'].append(from_map[from_run['batch'][add_idx]])

for curk in ['id', 'u', 'v', 'logl', 'nc', 'it', 'blob']:
combined_info[curk].append(from_run[curk][add_idx])

combined_info['n'].append(nlive)

# Attempt to step along our samples. If we're out of samples,
# set values to defaults.
try:
logl_b = base_info['logl'][idx_base]
nlive_b = base_n[idx_base]
except IndexError:
logl_b = np.inf
nlive_b = 0
try:
logl_n = new_info['logl'][idx_new]
nlive_n = new_n[idx_new]
except IndexError:
logl_n = np.inf
nlive_n = 0
combined_info['n'].append(cur_nlive)

plateau_mode = False
plateau_counter = 0
Expand Down Expand Up @@ -2072,16 +2051,16 @@ def _merge_two(res1, res2, compute_aux=False):
if plateau_counter == 0:
plateau_mode = False
# Compute sampling efficiency.
eff = 100. * ntot / sum(combined_info['nc'])
eff = 100. * combined_nsamples / sum(combined_info['nc'])

# Save results.
r = dict(niter=ntot,
r = dict(niter=combined_nsamples,
ncall=np.asarray(combined_info['nc']),
eff=eff,
samples=np.asarray(combined_info['v']),
logl=np.asarray(combined_info['logl']),
logvol=np.asarray(combined_info['logvol']),
batch_bounds=np.asarray(bounds),
batch_bounds=np.asarray(combined_bounds),
blob=np.asarray(combined_info['blob']))

for curk in ['id', 'it', 'n', 'u', 'batch']:
Expand Down
28 changes: 23 additions & 5 deletions tests/test_gau.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,31 @@ def test_gaussian():
assert (np.abs(logz - g.logz_truth) < sig * results.logzerr[-1])
res_comb = dyfunc.merge_runs([result_list[0]])
res_comb = dyfunc.merge_runs(result_list)
assert (np.abs(res_comb['logz'][-1] - g.logz_truth) <
sig * results['logzerr'][-1])
assert (np.abs(res_comb['logz'][-1] - g.logz_truth)
< sig * results['logzerr'][-1])
# check summary
res = sampler.results
res.summary()


def test_merge():
rstate = get_rstate()
g = Gaussian()
sampler1 = dynesty.DynamicNestedSampler(g.loglikelihood,
g.prior_transform,
g.ndim,
nlive=nlive,
rstate=rstate)
sampler1.run_nested(print_progress=printing, maxbatch=1)
sampler2 = dynesty.DynamicNestedSampler(g.loglikelihood,
g.prior_transform,
g.ndim,
nlive=nlive,
rstate=rstate)
sampler2.run_nested(print_progress=printing, maxbatch=2)
dyfunc.merge_runs((sampler1.results, sampler2.results))


def test_generator():
# Test that we can use the sampler as a generator
rstate = get_rstate()
Expand Down Expand Up @@ -239,9 +257,9 @@ def test_bounding_sample(bound, sample):
print(sampler.citations)


@pytest.mark.parametrize("bound,sample",
itertools.product(
['single', 'multi', 'balls', 'cubes'], ['unif']))
@pytest.mark.parametrize(
"bound,sample",
itertools.product(['single', 'multi', 'balls', 'cubes'], ['unif']))
def test_bounding_bootstrap(bound, sample):
# check various bounding methods with bootstrap

Expand Down
24 changes: 24 additions & 0 deletions tests/test_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,30 @@ def test_ncall(dynamic):
assert samp.ncall == L.ncall


@pytest.mark.parametrize('bounds', [[-10, -9], [-2, -1], [-1, 0]])
def test_merge(bounds):
# test that if we merge two runs
# where one run has a really narrow batch with a lot of points
ndim = 2
rstate = get_rstate()
sampler1 = dynesty.DynamicNestedSampler(loglike,
prior_transform,
ndim,
nlive=nlive,
rstate=rstate)
sampler1.run_nested(maxbatch=0, nlive_init=50, print_progress=printing)
sampler2 = dynesty.DynamicNestedSampler(loglike,
prior_transform,
ndim,
nlive=nlive,
rstate=rstate)
sampler2.run_nested(maxbatch=0, nlive_init=51, print_progress=printing)
sampler2.add_batch(mode='manual', logl_bounds=bounds, nlive=1000)
xres = dyutil.merge_runs([sampler1.results, sampler2.results])
stds = xres.samples_equal().std(axis=0)
assert np.all(np.abs(stds - 1) < 0.1)


def test_quantile():
rstate = get_rstate()
with pytest.raises(Exception):
Expand Down
Loading