From 1e3d27bacb7b7e92da7c5c98174dd65871608d8f Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Wed, 15 Jul 2020 14:12:41 +0200 Subject: [PATCH 01/34] refactor parallel.py --- pmda/parallel.py | 73 +++++++++++++++++++++--------------------------- 1 file changed, 32 insertions(+), 41 deletions(-) diff --git a/pmda/parallel.py b/pmda/parallel.py index 636c7d5b..e180b870 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -14,12 +14,10 @@ classes. """ -from __future__ import absolute_import, division from contextlib import contextmanager import warnings import time -from six.moves import range import MDAnalysis as mda from dask.delayed import delayed import dask @@ -35,7 +33,7 @@ class Timing(object): store various timeing results of obtained during a parallel analysis run """ - def __init__(self, io, compute, total, universe, prepare, + def __init__(self, io, compute, total, prepare, prepare_dask, conclude, wait=None, io_block=None, compute_block=None): self._io = io @@ -44,8 +42,8 @@ def __init__(self, io, compute, total, universe, prepare, self._compute_block = compute_block self._total = total self._cumulate = np.sum(io) + np.sum(compute) - self._universe = universe self._prepare = prepare + self._prepare_dask = prepare_dask self._conclude = conclude self._wait = wait @@ -83,16 +81,16 @@ def cumulate_time(self): """ return self._cumulate - @property - def universe(self): - """time to create a universe for each block""" - return self._universe - @property def prepare(self): """time to prepare""" return self._prepare + @property + def prepare_dask(self): + """time for blocks to start working""" + return self._prepare_dask + @property def conclude(self): """time to conclude""" @@ -189,7 +187,7 @@ def _reduce(res, result_single_frame): """ - def __init__(self, universe, atomgroups): + def __init__(self, universe): """Parameters ---------- Universe : :class:`~MDAnalysis.core.groups.Universe` @@ -207,10 +205,8 @@ def __init__(self, universe, atomgroups): :meth:`pmda.parallel.ParallelAnalysisBase._single_frame`. """ + self._universe = universe self._trajectory = universe.trajectory - self._top = universe.filename - self._traj = universe.trajectory.filename - self._indices = [ag.indices for ag in atomgroups] @contextmanager def readonly_attributes(self): @@ -232,8 +228,8 @@ def __setattr__(self, key, val): # guards to stop people assigning to self when they shouldn't # if locked, the only attribute you can modify is _attr_lock # if self._attr_lock isn't set, default to unlocked - if key == '_attr_lock' or not getattr(self, '_attr_lock', False): - super(ParallelAnalysisBase, self).__setattr__(key, val) + if key == '_ts' or key == '_attr_lock' or not getattr(self, '_attr_lock', False): + super().__setattr__(key, val) else: # raise HalError("I'm sorry Dave, I'm afraid I can't do that") raise AttributeError("Can't set attribute at this time") @@ -253,7 +249,7 @@ def _prepare(self): """additional preparation to run""" pass # pylint: disable=unnecessary-pass - def _single_frame(self, ts, atomgroups): + def _single_frame(self): """Perform computation on a single trajectory frame. Must return computed values as a list. You can only **read** @@ -374,18 +370,17 @@ def run(self, blocks = [] _blocks = [] with self.readonly_attributes(): - for bslice in slices: - task = delayed( - self._dask_helper, pure=False)( - bslice, - self._indices, - self._top, - self._traj, ) - blocks.append(task) - # save the frame numbers for each block - _blocks.append(range(bslice.start, - bslice.stop, bslice.step)) - blocks = delayed(blocks) + with timeit() as prepare_dask: + for bslice in slices: + task = delayed( + self._dask_helper, pure=False)( + bslice) + blocks.append(task) + # save the frame numbers for each block + _blocks.append(range(bslice.start, + bslice.stop, bslice.step)) + blocks = delayed(blocks) + time_prepare_dask = prepare_dask.elapsed # record the time when scheduler starts working wait_start = time.time() @@ -404,23 +399,19 @@ def run(self, self.timing = Timing( np.hstack([el[1] for el in res]), np.hstack([el[2] for el in res]), total.elapsed, - np.array([el[3] for el in res]), time_prepare, + time_prepare, + time_prepare_dask, conclude.elapsed, # waiting time = wait_end - wait_start - np.array([el[4]-wait_start for el in res]), - np.array([el[5] for el in res]), - np.array([el[6] for el in res])) + np.array([el[3]-wait_start for el in res]), + np.array([el[4] for el in res]), + np.array([el[5] for el in res])) return self - def _dask_helper(self, bslice, indices, top, traj): + def _dask_helper(self, bslice): """helper function to actually setup dask graph""" # wait_end needs to be first line for accurate timing wait_end = time.time() - # record time to generate universe and atom groups - with timeit() as b_universe: - u = mda.Universe(top, traj) - agroups = [u.atoms[idx] for idx in indices] - res = [] times_io = [] times_compute = [] @@ -431,16 +422,16 @@ def _dask_helper(self, bslice, indices, top, traj): with timeit() as b_io: # explicit instead of 'for ts in u.trajectory[bslice]' # so that we can get accurate timing. - ts = u.trajectory[i] + self._ts = self._trajectory[i] # record compute time per frame with timeit() as b_compute: - res = self._reduce(res, self._single_frame(ts, agroups)) + res = self._reduce(res, self._single_frame()) times_io.append(b_io.elapsed) times_compute.append(b_compute.elapsed) # calculate io and compute time per block return np.asarray(res), np.asarray(times_io), np.asarray( - times_compute), b_universe.elapsed, wait_end, np.sum( + times_compute), wait_end, np.sum( times_io), np.sum(times_compute) @staticmethod From 4435f29d5e96476fa87ebf18c489cfaee9e72937 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Wed, 15 Jul 2020 14:23:08 +0200 Subject: [PATCH 02/34] refactor custom --- pmda/custom.py | 22 ++++------------------ 1 file changed, 4 insertions(+), 18 deletions(-) diff --git a/pmda/custom.py b/pmda/custom.py index 23056def..9f451471 100644 --- a/pmda/custom.py +++ b/pmda/custom.py @@ -16,8 +16,6 @@ same universe and return a value. """ -from __future__ import absolute_import - from MDAnalysis.core.groups import AtomGroup from MDAnalysis.core.universe import Universe from MDAnalysis.coordinates.base import ProtoReader @@ -77,27 +75,15 @@ def __init__(self, function, universe, *args, **kwargs): """ self.function = function - - # collect all atomgroups with the same trajectory object as universe - trajectory = universe.trajectory - arg_ags = [] - self.other_args = [] - for arg in args: - if isinstance(arg, - AtomGroup) and arg.universe.trajectory == trajectory: - arg_ags.append(arg) - else: - self.other_args.append(arg) - - super(AnalysisFromFunction, self).__init__(universe, arg_ags) + super().__init__(universe) + self.args = args self.kwargs = kwargs def _prepare(self): self.results = [] - def _single_frame(self, ts, atomgroups): - args = atomgroups + self.other_args - return self.function(*args, **self.kwargs) + def _single_frame(self): + return self.function(*self.args, **self.kwargs) def _conclude(self): self.results = np.concatenate(self._results) From 198167326b06f072e39d95558d95cdfeebfd2cb1 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Wed, 15 Jul 2020 14:32:47 +0200 Subject: [PATCH 03/34] pep8 --- pmda/parallel.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pmda/parallel.py b/pmda/parallel.py index e180b870..b4caca14 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -228,7 +228,11 @@ def __setattr__(self, key, val): # guards to stop people assigning to self when they shouldn't # if locked, the only attribute you can modify is _attr_lock # if self._attr_lock isn't set, default to unlocked - if key == '_ts' or key == '_attr_lock' or not getattr(self, '_attr_lock', False): + + # keys that can be changed + if key == '_ts' or \ + key == '_attr_lock' or \ + not getattr(self, '_attr_lock', False): super().__setattr__(key, val) else: # raise HalError("I'm sorry Dave, I'm afraid I can't do that") @@ -372,9 +376,7 @@ def run(self, with self.readonly_attributes(): with timeit() as prepare_dask: for bslice in slices: - task = delayed( - self._dask_helper, pure=False)( - bslice) + task = delayed(self._dask_helper, pure=False)(bslice) blocks.append(task) # save the frame numbers for each block _blocks.append(range(bslice.start, @@ -403,7 +405,7 @@ def run(self, time_prepare_dask, conclude.elapsed, # waiting time = wait_end - wait_start - np.array([el[3]-wait_start for el in res]), + np.array([el[3] - wait_start for el in res]), np.array([el[4] for el in res]), np.array([el[5] for el in res])) return self From 0a708579abf396956fd6ad2b7a2f53f72e8670a1 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Wed, 15 Jul 2020 14:36:35 +0200 Subject: [PATCH 04/34] refactor rmsd --- pmda/rms/rmsd.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/pmda/rms/rmsd.py b/pmda/rms/rmsd.py index c6358f58..340ab1a5 100644 --- a/pmda/rms/rmsd.py +++ b/pmda/rms/rmsd.py @@ -24,8 +24,6 @@ :inherited-members: """ -from __future__ import absolute_import - from MDAnalysis.analysis import rms import numpy as np @@ -127,7 +125,8 @@ class RMSD(ParallelAnalysisBase): """ def __init__(self, mobile, ref, superposition=True): universe = mobile.universe - super(RMSD, self).__init__(universe, (mobile, )) + super().__init__(universe) + self._mobile = mobile self._ref_pos = ref.positions.copy() self.superposition = superposition @@ -137,7 +136,7 @@ def _prepare(self): def _conclude(self): self.rmsd = np.vstack(self._results) - def _single_frame(self, ts, atomgroups): - return (ts.frame, ts.time, - rms.rmsd(atomgroups[0].positions, self._ref_pos, + def _single_frame(self): + return (self._ts.frame, self._ts.time, + rms.rmsd(self._mobile.positions, self._ref_pos, superposition=self.superposition)) From cafe65f42a408bbaf43e739afb7d24cab5de5abb Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Wed, 15 Jul 2020 14:39:05 +0200 Subject: [PATCH 05/34] refactor rmsf --- pmda/rms/rmsf.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/pmda/rms/rmsf.py b/pmda/rms/rmsf.py index ad563b55..60ea819b 100644 --- a/pmda/rms/rmsf.py +++ b/pmda/rms/rmsf.py @@ -24,9 +24,6 @@ MDAnalysis.analysis.rms.RMSF """ - -from __future__ import absolute_import, division - import numpy as np from pmda.parallel import ParallelAnalysisBase @@ -172,14 +169,12 @@ class RMSF(ParallelAnalysisBase): def __init__(self, atomgroup): u = atomgroup.universe - super(RMSF, self).__init__(u, (atomgroup, )) + super().__init__(u) self._atomgroup = atomgroup - self._top = u.filename - self._traj = u.trajectory.filename - def _single_frame(self, ts, atomgroups): + def _single_frame(self): # mean and sum of squares calculations done in _reduce() - return atomgroups[0] + return self._atomgroup def _conclude(self): """ From 185d19a790c97e4c86f05ebaf29090c998b4d06f Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Wed, 15 Jul 2020 14:47:21 +0200 Subject: [PATCH 06/34] refactor contacts --- pmda/contacts.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pmda/contacts.py b/pmda/contacts.py index 419afbef..122a2642 100644 --- a/pmda/contacts.py +++ b/pmda/contacts.py @@ -237,7 +237,8 @@ def __init__(self, respective functions for reasonable values. """ universe = mobiles[0].universe - super(Contacts, self).__init__(universe, mobiles) + super().__init__(universe) + self._mobiles = mobiles if method == 'hard_cut': self.fraction_contacts = hard_cut_q @@ -267,13 +268,13 @@ def __init__(self, def _prepare(self): self.timeseries = None - def _single_frame(self, ts, atomgroups): - grA, grB = atomgroups + def _single_frame(self): + grA, grB = self._mobiles # compute distance array for a frame d = distance_array(grA.positions, grB.positions) y = np.empty(len(self.r0) + 1) - y[0] = ts.frame + y[0] = self._ts.frame for i, (initial_contacts, r0) in enumerate(zip(self.initial_contacts, self.r0)): # select only the contacts that were formed in the reference state From ef86b9dbcd3d077c6c22fb754cbb4756b922e3cb Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Wed, 15 Jul 2020 16:14:10 +0200 Subject: [PATCH 07/34] refactor density --- pmda/density.py | 31 ++++++++++++------------------- pmda/parallel.py | 4 ++++ 2 files changed, 16 insertions(+), 19 deletions(-) diff --git a/pmda/density.py b/pmda/density.py index 461ae1f7..e11dd3e4 100644 --- a/pmda/density.py +++ b/pmda/density.py @@ -22,9 +22,6 @@ MDAnalysis.analysis.density.density_from_Universe """ - -from __future__ import absolute_import - import numpy as np from MDAnalysis.lib.util import fixedwidth_bins @@ -241,7 +238,7 @@ def __init__(self, atomgroup, delta=1.0, atomselection=None, parameters=None, gridcenter=None, xdim=None, ydim=None, zdim=None): u = atomgroup.universe - super(DensityAnalysis, self).__init__(u, (atomgroup, )) + super().__init__(u) self._atomgroup = atomgroup self._delta = delta self._atomselection = atomselection @@ -259,10 +256,15 @@ def __init__(self, atomgroup, delta=1.0, atomselection=None, elif not updating and atomselection is not None: raise ValueError("""With updating=False, the atomselection='{}' is not used and should be None""".format(atomselection)) + elif updating and atomselection is not None: + self._select_atomgroup = atomgroup.select_atoms(atomselection, + updating=True) + else: + self._select_atomgroup = atomgroup + def _prepare(self): - coord = self.current_coordinates(self._atomgroup, self._atomselection, - self._updating) + coord = self._select_atomgroup.positions if self._gridcenter is not None: # Generate a copy of smin/smax from coords to later check if the # defined box might be too small for the selection @@ -294,10 +296,10 @@ def _prepare(self): self._arange = arange self._bins = bins - def _single_frame(self, ts, atomgroups): - coord = self.current_coordinates(atomgroups[0], self._atomselection, - self._updating) - result = np.histogramdd(coord, bins=self._bins, range=self._arange, + def _single_frame(self): + result = np.histogramdd(self._select_atomgroup.positions, + bins=self._bins, + range=self._arange, normed=False) return result[0] @@ -330,12 +332,3 @@ def _reduce(res, result_single_frame): else: res += result_single_frame return res - - @staticmethod - def current_coordinates(atomgroup, atomselection, updating): - """Retrieves the current coordinates of all atoms in the chosen atom - selection. - Note: currently required to allow for updating selections""" - ag = atomgroup if not updating else atomgroup.select_atoms( - atomselection) - return ag.positions diff --git a/pmda/parallel.py b/pmda/parallel.py index b4caca14..ca24cd0f 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -408,6 +408,10 @@ def run(self, np.array([el[3] - wait_start for el in res]), np.array([el[4] for el in res]), np.array([el[5] for el in res])) + + # this is crucial if the analysis does not iterate over + # the whole trajectory. + self._trajectory.rewind() return self def _dask_helper(self, bslice): From c0b0bd6f93881f6b54c9e685374dbb5e74016b5c Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Wed, 15 Jul 2020 16:33:32 +0200 Subject: [PATCH 08/34] refactor rdf --- pmda/rdf.py | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/pmda/rdf.py b/pmda/rdf.py index a8aabde6..9c5cae14 100644 --- a/pmda/rdf.py +++ b/pmda/rdf.py @@ -25,9 +25,6 @@ :inherited-members: """ - -from __future__ import absolute_import, division - import numpy as np from MDAnalysis.lib import distances @@ -93,8 +90,9 @@ class InterRDF(ParallelAnalysisBase): def __init__(self, g1, g2, nbins=75, range=(0.0, 15.0), exclusion_block=None): u = g1.universe - super(InterRDF, self).__init__(u, (g1, g2)) + super().__init__(u) + self._atomgroups = (g1, g2) # collect all atomgroups with the same trajectory object as universe self.nA = len(g1) self.nB = len(g2) @@ -117,13 +115,12 @@ def _prepare(self): # Set the max range to filter the search radius self._maxrange = self.rdf_settings['range'][1] - def _single_frame(self, ts, atomgroups): - g1, g2 = atomgroups - u = g1.universe + def _single_frame(self): + g1, g2 = self._atomgroups pairs, dist = distances.capped_distance(g1.positions, g2.positions, self._maxrange, - box=u.dimensions) + box=self._universe.dimensions) # If provided exclusions, create a mask of _result which # lets us take these out. if self._exclusion_block is not None: @@ -132,7 +129,7 @@ def _single_frame(self, ts, atomgroups): mask = np.where(idxA != idxB)[0] dist = dist[mask] count = np.histogram(dist, **self.rdf_settings)[0] - volume = u.trajectory.ts.volume + volume = self._universe.trajectory.ts.volume return np.array([count, np.array(volume, dtype=np.float64)]) @@ -261,7 +258,9 @@ def __init__(self, u, ags, for pair in ags: atomgroups.append(pair[0]) atomgroups.append(pair[1]) - super(InterRDF_s, self).__init__(u, atomgroups) + super().__init__(u) + + self._atomgroups = atomgroups # List of pairs of AtomGroups self._density = density @@ -289,22 +288,22 @@ def _prepare(self): self.volume = 0.0 self._maxrange = self.rdf_settings['range'][1] - def _single_frame(self, ts, atomgroups): - ags = [[atomgroups[2*i], atomgroups[2*i+1]] for i in range(self.n)] + def _single_frame(self): + ags = [[self._atomgroups[2*i], self._atomgroups[2*i+1]] + for i in range(self.n)] count = [np.zeros((ag1.n_atoms, ag2.n_atoms, self.len), dtype=np.float64) for ag1, ag2 in ags] for i, (ag1, ag2) in enumerate(ags): - u = ag1.universe pairs, dist = distances.capped_distance(ag1.positions, ag2.positions, self._maxrange, - box=u.dimensions) + box=self._universe.dimensions) for j, (idx1, idx2) in enumerate(pairs): count[i][idx1, idx2, :] = np.histogram(dist[j], **self.rdf_settings)[0] - volume = u.trajectory.ts.volume + volume = self._universe.trajectory.ts.volume return np.array([np.array(count), np.array(volume, dtype=np.float64)]) From bfa629cafed01a6361f10b0710dcd6520af780b9 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Wed, 15 Jul 2020 16:49:25 +0200 Subject: [PATCH 09/34] refactor HBonds --- pmda/hbond_analysis.py | 34 +++++++++--------------- pmda/parallel.py | 2 +- pmda/test/test_hydrogenbonds_analysis.py | 4 +-- 3 files changed, 15 insertions(+), 25 deletions(-) diff --git a/pmda/hbond_analysis.py b/pmda/hbond_analysis.py index 82125141..936ed61b 100644 --- a/pmda/hbond_analysis.py +++ b/pmda/hbond_analysis.py @@ -24,8 +24,6 @@ :members: """ -from __future__ import absolute_import, division - import numpy as np from MDAnalysis.lib.distances import capped_distance, calc_angles @@ -172,7 +170,8 @@ def __init__(self, universe, donors_sel=None, hydrogens_sel=None, """ ag = universe.atoms - super(HydrogenBondAnalysis, self).__init__(universe, (ag, )) + super().__init__(universe) + self._atomgroup = ag self.donors_sel = donors_sel self.hydrogens_sel = hydrogens_sel self.acceptors_sel = acceptors_sel @@ -218,7 +217,7 @@ def guess_hydrogens(self, selection='all', max_mass=1.1, min_charge=0.3): :attr:`hydrogens_sel`. """ - u = self._universe() + u = self._universe ag = u.select_atoms(selection) hydrogens_ag = ag[ np.logical_and( @@ -282,7 +281,7 @@ def guess_donors(self, selection='all', max_charge=-0.5): hydrogens_sel = self.guess_hydrogens() else: hydrogens_sel = self.hydrogens_sel - u = self._universe() + u = self._universe hydrogens_ag = u.select_atoms(hydrogens_sel) ag = hydrogens_ag.residues.atoms.select_atoms( @@ -335,7 +334,7 @@ def guess_acceptors(self, selection='all', max_charge=-0.5): attribute :attr:`acceptors_sel`. """ - u = self._universe() + u = self._universe ag = u.select_atoms(selection) acceptors_ag = ag[ag.charges < max_charge] acceptors_list = np.unique( @@ -393,7 +392,7 @@ def _get_dh_pairs(self, u): return donors, hydrogens def _prepare(self): - u = mda.Universe(self._top, self._traj) + u = self._universe self.hbonds = [] self.frames = np.arange(self.start, self.stop, self.step) self.timesteps = (self.frames*u.trajectory.dt) + u.trajectory[0].time @@ -410,10 +409,9 @@ def _prepare(self): self._donors_ids = donors.ids self._hydrogens_ids = hydrogens.ids - def _single_frame(self, ts, atomgroups): - u = atomgroups[0].universe - - box = ts.dimensions + def _single_frame(self): + u = self._universe + box = self._ts.dimensions # Update donor-hydrogen pairs if necessary if self.update_selections: @@ -459,7 +457,7 @@ def _single_frame(self, ts, atomgroups): # Store data on hydrogen bonds found at this frame hbonds = [[], [], [], [], [], []] - hbonds[0].extend(np.full_like(hbond_donors, ts.frame)) + hbonds[0].extend(np.full_like(hbond_donors, self._ts.frame)) hbonds[1].extend(hbond_donors.ids) hbonds[2].extend(hbond_hydrogens.ids) hbonds[3].extend(hbond_acceptors.ids) @@ -508,7 +506,7 @@ def count_by_type(self): resname and atom type of the donor and acceptor atoms in a hydrogen bond. """ - u = self._universe() + u = self._universe d = u.atoms[self.hbonds[:, 1].astype(np.int)] a = u.atoms[self.hbonds[:, 3].astype(np.int)] @@ -541,7 +539,7 @@ def count_by_ids(self): hydrogen atom id and acceptor atom id in a hydrogen bond. """ - u = self._universe() + u = self._universe d = u.atoms[self.hbonds[:, 1].astype(np.int)] h = u.atoms[self.hbonds[:, 2].astype(np.int)] a = u.atoms[self.hbonds[:, 3].astype(np.int)] @@ -558,14 +556,6 @@ def count_by_ids(self): return unique_hbonds - def _universe(self): - # A Universe containing position information is needed for guessing - # donors and acceptors. - u = mda.Universe(self._top) - if not hasattr(u.atoms, 'positions'): - u.load_new(self._positions) - return u - @staticmethod def _reduce(res, result_single_frame): """ Use numpy array append to combine results""" diff --git a/pmda/parallel.py b/pmda/parallel.py index ca24cd0f..45b617e3 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -236,7 +236,7 @@ def __setattr__(self, key, val): super().__setattr__(key, val) else: # raise HalError("I'm sorry Dave, I'm afraid I can't do that") - raise AttributeError("Can't set attribute at this time") + raise AttributeError("Can't set '{}' at this time".format(key)) def _conclude(self): """Finalise the results you've gathered. diff --git a/pmda/test/test_hydrogenbonds_analysis.py b/pmda/test/test_hydrogenbonds_analysis.py index 2917b523..e7851175 100644 --- a/pmda/test/test_hydrogenbonds_analysis.py +++ b/pmda/test/test_hydrogenbonds_analysis.py @@ -104,7 +104,7 @@ def test_count_by_ids(self, h): def test_universe(self, h, universe): ref = universe.atoms.positions h.run(n_jobs=4, n_blocks=4) - u = h._universe() + u = h._universe assert_array_almost_equal(u.atoms.positions, ref) @@ -269,5 +269,5 @@ def test_universe(self): universe = MDAnalysis.Universe(GRO) ref = universe.atoms.positions h = HydrogenBondAnalysis(universe, **self.kwargs) - u = h._universe() + u = h._universe assert_array_almost_equal(u.atoms.positions, ref) From 495033fd8fa0e7533c243c4e723b203452fc7a35 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Wed, 15 Jul 2020 20:04:41 +0200 Subject: [PATCH 10/34] leaflet broken --- pmda/leaflet.py | 59 ++++++++++++++++++++++----------------- pmda/parallel.py | 2 +- pmda/test/test_leaflet.py | 19 ++++++------- 3 files changed, 42 insertions(+), 38 deletions(-) diff --git a/pmda/leaflet.py b/pmda/leaflet.py index b0e0bbed..e889b0d1 100644 --- a/pmda/leaflet.py +++ b/pmda/leaflet.py @@ -19,8 +19,6 @@ :inherited-members: """ -from __future__ import absolute_import, division - import numpy as np import dask.bag as db import networkx as nx @@ -70,11 +68,11 @@ class LeafletFinder(ParallelAnalysisBase): """ - def __init__(self, universe, atomgroups): - self._atomgroup = atomgroups - self._results = list() + def __init__(self, universe, atomgroup): + super().__init__(universe) - super(LeafletFinder, self).__init__(universe, (atomgroups,)) + self._atomgroup = atomgroup + self._results = list() def _find_connected_components(self, data, cutoff=15.0): """Perform the Connected Components discovery for the atoms in data. @@ -99,6 +97,7 @@ def _find_connected_components(self, data, cutoff=15.0): """ # pylint: disable=unsubscriptable-object + # raise TypeError(data[0]) window, index = data[0] num = window[0].shape[0] i_index = index[0] @@ -157,7 +156,7 @@ def _find_connected_components(self, data, cutoff=15.0): return comp # pylint: disable=arguments-differ - def _single_frame(self, ts, atomgroups, scheduler_kwargs, n_jobs, + def _single_frame(self, scheduler_kwargs, n_jobs, cutoff=15.0): """Perform computation on a single trajectory frame. @@ -187,27 +186,37 @@ def _single_frame(self, ts, atomgroups, scheduler_kwargs, n_jobs, """ # Get positions of the atoms in the atomgroup and find their number. - atoms = ts.positions[atomgroups.indices] + atoms = self._atomgroup.positions matrix_size = atoms.shape[0] arranged_coord = list() part_size = int(matrix_size / n_jobs) # Partition the data based on a 2-dimensional partitioning - for i in range(1, matrix_size + 1, part_size): - for j in range(i, matrix_size + 1, part_size): - arranged_coord.append(([atoms[i - 1:i - 1 + part_size], - atoms[j - 1:j - 1 + part_size]], - [i, j])) + i = np.linspace(0, matrix_size, n_jobs+1).astype(int) + print(i) + if len(i) == 2: + arranged_coord.append(([atoms, + atoms], + [1,1])) + else: + for index_i in range(len(i)-1): + j = i[1:] + for index_j in range(len(j)-1): + arranged_coord.append(([atoms[i[index_i]:i[index_i+1]], + atoms[j[index_j]:j[index_j+1]]], + [i[index_i]+1, j[index_j]+1])) # Distribute the data over the available cores, apply the map function # and execute. - parAtoms = db.from_sequence(arranged_coord, - npartitions=len(arranged_coord)) - parAtomsMap = parAtoms.map_partitions(self._find_connected_components, - cutoff=cutoff) + with timeit() as prepare_dask: + parAtoms = db.from_sequence(arranged_coord, + npartitions=len(arranged_coord)) + parAtomsMap = parAtoms.map_partitions(self._find_connected_components, + cutoff=cutoff) + self.prepare_dask_total += prepare_dask.elapsed Components = parAtomsMap.compute(**scheduler_kwargs) - # Gather the results and start the reduction. TODO: think if it can go # to the private _reduce method of the based class. result = list(Components) + raise TypeError(Components) # Create the overall connected components of the graph while len(result) != 0: @@ -277,11 +286,11 @@ def run(self, if scheduler == 'multiprocessing': scheduler_kwargs['num_workers'] = n_jobs - with timeit() as b_universe: - universe = mda.Universe(self._top, self._traj) - start, stop, step = self._trajectory.check_slice_indices( start, stop, step) + + self.prepare_dask_total = 0 + with timeit() as total: with timeit() as prepare: self._prepare() @@ -291,13 +300,11 @@ def run(self, times_io = [] for frame in range(start, stop, step): with timeit() as b_io: - ts = universe.trajectory[frame] + self._ts = self._universe.trajectory[frame] times_io.append(b_io.elapsed) with timeit() as b_compute: components = self. \ - _single_frame(ts=ts, - atomgroups=self._atomgroup, - scheduler_kwargs=scheduler_kwargs, + _single_frame(scheduler_kwargs=scheduler_kwargs, n_jobs=n_jobs, cutoff=cutoff) timings.append(b_compute.elapsed) @@ -308,7 +315,7 @@ def run(self, self._conclude() self.timing = Timing(times_io, np.hstack(timings), total.elapsed, - b_universe.elapsed, prepare.elapsed, + prepare.elapsed, self.prepare_dask_total, conclude.elapsed) return self diff --git a/pmda/parallel.py b/pmda/parallel.py index 45b617e3..51205743 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -230,7 +230,7 @@ def __setattr__(self, key, val): # if self._attr_lock isn't set, default to unlocked # keys that can be changed - if key == '_ts' or \ + if key in ['_ts', 'prepare_dask_total'] or \ key == '_attr_lock' or \ not getattr(self, '_attr_lock', False): super().__setattr__(key, val) diff --git a/pmda/test/test_leaflet.py b/pmda/test/test_leaflet.py index cd7153bd..39d5ae90 100644 --- a/pmda/test/test_leaflet.py +++ b/pmda/test/test_leaflet.py @@ -27,30 +27,27 @@ def correct_values(self): np.array([36634]), np.array([36507, 36761, 38285, 39174]), np.array([36634]), - np.array([36507, 36761, 37650, 38285, 39174, 39936]), + np.array([36507, 36761, 37650, 38285, 39174, 39936])] np.array([36634]), - np.array([36507, 36761, 37650, 38285, 39174, 39428, 39936]), - np.array([36634]), - np.array([36507, 36761]), - np.array([36634])] + # np.array([36507, 36761, 37650, 38285, 39174, 39428, 39936]), + # np.array([36634]), + # np.array([36507, 36761]), + # np.array([36634])] @pytest.fixture() def correct_values_single_frame(self): return [np.arange(1, 2150, 12), np.arange(2521, 4670, 12)] # XFAIL for 2 jobs needs to be fixed! - @pytest.mark.parametrize('n_jobs', [ - pytest.param(-1, marks=pytest.mark.xfail), - 1, - pytest.param(2, marks=pytest.mark.xfail), - ]) + @pytest.mark.parametrize('n_jobs', (-1, 1, 2)) def test_leaflet(self, universe, correct_values, n_jobs): lipid_heads = universe.select_atoms("name P and resname POPG") universe.trajectory.rewind() leaflets = leaflet.LeafletFinder(universe, lipid_heads) - leaflets.run(n_jobs=n_jobs) + leaflets.run(n_jobs=n_jobs, stop=3) results = [atoms.indices for atomgroup in leaflets.results for atoms in atomgroup] + print(results) [assert_almost_equal(x, y, err_msg="error: leaflets should match " + "test values") for x, y in zip(results, correct_values)] From 87002237faafa18163d4879ea038e4aed2e12304 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Thu, 16 Jul 2020 13:00:56 +0200 Subject: [PATCH 11/34] build mdanalysis on serialize_io --- .travis.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index f308baf6..5d29ff74 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,14 +19,14 @@ env: # mdanalysis develop from source (see below), which needs # minimal CONDA_MDANALYSIS_DEPENDENCIES #- CONDA_DEPENDENCIES="mdanalysis mdanalysistests dask joblib pytest-pep8 mock codecov cython hypothesis sphinx" - #- CONDA_MDANALYSIS_DEPENDENCIES="cython mmtf-python six biopython networkx scipy griddataformats gsd hypothesis" + - CONDA_MDANALYSIS_DEPENDENCIES="cython mmtf-python six biopython networkx scipy griddataformats gsd hypothesis" - CONDA_MDANALYSIS_DEPENDENCIES="mdanalysis mdanalysistests" - CONDA_DEPENDENCIES="${CONDA_MDANALYSIS_DEPENDENCIES} dask distributed joblib pytest-pep8 mock codecov" - CONDA_CHANNELS='conda-forge' - CONDA_CHANNEL_PRIORITY=True # install development version of MDAnalysis (needed until the test # files for analysis.rdf are available in release 0.19.0) - #- PIP_DEPENDENCIES="git+https://github.com/MDAnalysis/mdanalysis#egg=mdanalysis&subdirectory=package git+https://github.com/MDAnalysis/mdanalysis#egg=mdanalysistests&subdirectory=testsuite" + - PIP_DEPENDENCIES="git+https://github.com/yuxuanzhuang/mdanalysis.git@serialize_io&subdirectory=package git+https://github.com/yuxuanzhuang/mdanalysis.git@serialize_io&#egg=mdanalysistests&subdirectory=testsuite" - NUMPY_VERSION=stable - BUILD_CMD='python setup.py develop' - CODECOV=false From c8e99734354904c52dbbff60b5d8a01055c37cfe Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Thu, 16 Jul 2020 13:32:53 +0200 Subject: [PATCH 12/34] build mdanalysis on serialize_io fix --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 5d29ff74..d9497c4f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -26,7 +26,7 @@ env: - CONDA_CHANNEL_PRIORITY=True # install development version of MDAnalysis (needed until the test # files for analysis.rdf are available in release 0.19.0) - - PIP_DEPENDENCIES="git+https://github.com/yuxuanzhuang/mdanalysis.git@serialize_io&subdirectory=package git+https://github.com/yuxuanzhuang/mdanalysis.git@serialize_io&#egg=mdanalysistests&subdirectory=testsuite" + - PIP_DEPENDENCIES="git+https://github.com/yuxuanzhuang/mdanalysis#egg=mdanalysis&subdirectory=package@serialize_io git+https://github.com/yuxuanzhuang/mdanalysis#egg=mdanalysistests&subdirectory=testsuite@serialize_io" - NUMPY_VERSION=stable - BUILD_CMD='python setup.py develop' - CODECOV=false From 356cfb9929d50a9f5134ed966268a8d5f4d15fe7 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Thu, 16 Jul 2020 13:43:10 +0200 Subject: [PATCH 13/34] push leaflet fix back --- pmda/test/test_leaflet.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/pmda/test/test_leaflet.py b/pmda/test/test_leaflet.py index 39d5ae90..cd7153bd 100644 --- a/pmda/test/test_leaflet.py +++ b/pmda/test/test_leaflet.py @@ -27,27 +27,30 @@ def correct_values(self): np.array([36634]), np.array([36507, 36761, 38285, 39174]), np.array([36634]), - np.array([36507, 36761, 37650, 38285, 39174, 39936])] + np.array([36507, 36761, 37650, 38285, 39174, 39936]), np.array([36634]), - # np.array([36507, 36761, 37650, 38285, 39174, 39428, 39936]), - # np.array([36634]), - # np.array([36507, 36761]), - # np.array([36634])] + np.array([36507, 36761, 37650, 38285, 39174, 39428, 39936]), + np.array([36634]), + np.array([36507, 36761]), + np.array([36634])] @pytest.fixture() def correct_values_single_frame(self): return [np.arange(1, 2150, 12), np.arange(2521, 4670, 12)] # XFAIL for 2 jobs needs to be fixed! - @pytest.mark.parametrize('n_jobs', (-1, 1, 2)) + @pytest.mark.parametrize('n_jobs', [ + pytest.param(-1, marks=pytest.mark.xfail), + 1, + pytest.param(2, marks=pytest.mark.xfail), + ]) def test_leaflet(self, universe, correct_values, n_jobs): lipid_heads = universe.select_atoms("name P and resname POPG") universe.trajectory.rewind() leaflets = leaflet.LeafletFinder(universe, lipid_heads) - leaflets.run(n_jobs=n_jobs, stop=3) + leaflets.run(n_jobs=n_jobs) results = [atoms.indices for atomgroup in leaflets.results for atoms in atomgroup] - print(results) [assert_almost_equal(x, y, err_msg="error: leaflets should match " + "test values") for x, y in zip(results, correct_values)] From 58fea5d172257c5697678724ed4c4f31b2fd463a Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Thu, 16 Jul 2020 13:49:04 +0200 Subject: [PATCH 14/34] travis fix --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index d9497c4f..b738e645 100644 --- a/.travis.yml +++ b/.travis.yml @@ -20,7 +20,7 @@ env: # minimal CONDA_MDANALYSIS_DEPENDENCIES #- CONDA_DEPENDENCIES="mdanalysis mdanalysistests dask joblib pytest-pep8 mock codecov cython hypothesis sphinx" - CONDA_MDANALYSIS_DEPENDENCIES="cython mmtf-python six biopython networkx scipy griddataformats gsd hypothesis" - - CONDA_MDANALYSIS_DEPENDENCIES="mdanalysis mdanalysistests" + #- CONDA_MDANALYSIS_DEPENDENCIES="mdanalysis mdanalysistests" - CONDA_DEPENDENCIES="${CONDA_MDANALYSIS_DEPENDENCIES} dask distributed joblib pytest-pep8 mock codecov" - CONDA_CHANNELS='conda-forge' - CONDA_CHANNEL_PRIORITY=True From dfd35882fcbdc6ace626fd6b58c8f0a2b37a4c16 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Thu, 16 Jul 2020 13:56:43 +0200 Subject: [PATCH 15/34] travis fix --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index b738e645..7b8ee952 100644 --- a/.travis.yml +++ b/.travis.yml @@ -26,7 +26,7 @@ env: - CONDA_CHANNEL_PRIORITY=True # install development version of MDAnalysis (needed until the test # files for analysis.rdf are available in release 0.19.0) - - PIP_DEPENDENCIES="git+https://github.com/yuxuanzhuang/mdanalysis#egg=mdanalysis&subdirectory=package@serialize_io git+https://github.com/yuxuanzhuang/mdanalysis#egg=mdanalysistests&subdirectory=testsuite@serialize_io" + - PIP_DEPENDENCIES="git+https://github.com/yuxuanzhuang/mdanalysis.git@serialize_io#egg=mdanalysis&subdirectory=package git+https://github.com/yuxuanzhuang/mdanalysis.git@serialize_io#egg=mdanalysistests&subdirectory=testsuite" - NUMPY_VERSION=stable - BUILD_CMD='python setup.py develop' - CODECOV=false From 09218822839953c0770cee3ce67762e5e531d4e3 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Thu, 16 Jul 2020 14:46:01 +0200 Subject: [PATCH 16/34] test parallel --- pmda/test/test_parallel.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/pmda/test/test_parallel.py b/pmda/test/test_parallel.py index a0bed833..0acb29f1 100644 --- a/pmda/test/test_parallel.py +++ b/pmda/test/test_parallel.py @@ -24,23 +24,23 @@ def test_timeing(): io = np.arange(5) compute = np.arange(5) + 1 total = 5 - universe = np.arange(2) prepare = 3 + prepare_dask = 4 conclude = 6 wait = 12 io_block = np.sum(io) compute_block = np.sum(compute) timing = parallel.Timing(io, compute, total, - universe, prepare, conclude, wait, + prepare, prepare_dask, conclude, wait, io_block, compute_block,) np.testing.assert_equal(timing.io, io) np.testing.assert_equal(timing.compute, compute) np.testing.assert_equal(timing.total, total) - np.testing.assert_equal(timing.universe, universe) np.testing.assert_equal(timing.cumulate_time, np.sum(io) + np.sum(compute)) np.testing.assert_equal(timing.prepare, prepare) + np.testing.assert_equal(timing.prepare_dask, prepare_dask) np.testing.assert_equal(timing.conclude, conclude) np.testing.assert_equal(timing.wait, wait) np.testing.assert_equal(timing.io_block, io_block) @@ -50,7 +50,8 @@ def test_timeing(): class NoneAnalysis(parallel.ParallelAnalysisBase): def __init__(self, atomgroup): universe = atomgroup.universe - super(NoneAnalysis, self).__init__(universe, (atomgroup, )) + super().__init__(universe) + self._atomgroup = atomgroup def _prepare(self): pass @@ -58,8 +59,8 @@ def _prepare(self): def _conclude(self): self.res = np.hstack(self._results) - def _single_frame(self, ts, atomgroups): - return ts.frame + def _single_frame(self): + return self._ts.frame @pytest.fixture @@ -72,7 +73,7 @@ def analysis(): @pytest.mark.parametrize('n_jobs', (1, 2)) def test_all_frames(analysis, n_jobs): analysis.run(n_jobs=n_jobs) - u = mda.Universe(analysis._top, analysis._traj) + u = analysis._universe assert len(analysis.res) == u.trajectory.n_frames @@ -84,7 +85,7 @@ def test_sub_frames(analysis, n_jobs): @pytest.mark.parametrize('n_jobs', (1, 2, 3)) def test_no_frames(analysis, n_jobs): - u = mda.Universe(analysis._top, analysis._traj) + u = analysis._universe n_frames = u.trajectory.n_frames with pytest.warns(UserWarning): analysis.run(start=n_frames, stop=n_frames+1, n_jobs=n_jobs) @@ -103,7 +104,7 @@ def test_scheduler(analysis, scheduler): def test_nframes_less_nblocks_warning(analysis): - u = mda.Universe(analysis._top, analysis._traj) + u = analysis._universe n_frames = u.trajectory.n_frames with pytest.warns(UserWarning): analysis.run(stop=2, n_blocks=4, n_jobs=2) @@ -125,7 +126,7 @@ def test_guess_nblocks(analysis): @pytest.mark.parametrize('n_blocks', np.arange(1, 11)) def test_blocks(analysis, n_blocks): analysis.run(n_blocks=n_blocks) - u = mda.Universe(analysis._top, analysis._traj) + u = analysis._universe n_frames = u.trajectory.n_frames start, stop, step = u.trajectory.check_slice_indices( None, None, None) @@ -138,7 +139,7 @@ def test_blocks(analysis, n_blocks): def test_attrlock(): u = mda.Universe(PSF, DCD) - pab = parallel.ParallelAnalysisBase(u, (u.atoms,)) + pab = parallel.ParallelAnalysisBase(u) # Should initially be allowed to set attributes pab.thing1 = 24 From 49288e88875c9b7fbaf81ba9982a688aa566f924 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Thu, 16 Jul 2020 17:03:47 +0200 Subject: [PATCH 17/34] timing test --- pmda/parallel.py | 2 +- pmda/test/test_parallel.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pmda/parallel.py b/pmda/parallel.py index 51205743..5ac67346 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -390,7 +390,7 @@ def run(self, # hack to handle n_frames == 0 in this framework if len(res) == 0: # everything else wants list of block tuples - res = [([], [], [], 0, wait_start, 0, 0)] + res = [([], [], [], wait_start, 0, 0)] # record conclude time with timeit() as conclude: self._results = np.asarray([el[0] for el in res]) diff --git a/pmda/test/test_parallel.py b/pmda/test/test_parallel.py index 0acb29f1..c2b0eeaf 100644 --- a/pmda/test/test_parallel.py +++ b/pmda/test/test_parallel.py @@ -96,7 +96,6 @@ def test_no_frames(analysis, n_jobs): np.testing.assert_equal(analysis.timing.io_block, [0]) np.testing.assert_equal(analysis.timing.compute_block, [0]) np.testing.assert_equal(analysis.timing.wait, [0]) - assert analysis.timing.universe == 0 def test_scheduler(analysis, scheduler): From 9a0e0c580f558493a073d88cec955e497fcd5d70 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Thu, 16 Jul 2020 17:36:59 +0200 Subject: [PATCH 18/34] leaflet fix --- pmda/leaflet.py | 34 ++++++++++++++-------------------- pmda/rdf.py | 4 ++-- 2 files changed, 16 insertions(+), 22 deletions(-) diff --git a/pmda/leaflet.py b/pmda/leaflet.py index e889b0d1..64c6a38e 100644 --- a/pmda/leaflet.py +++ b/pmda/leaflet.py @@ -19,6 +19,8 @@ :inherited-members: """ +from __future__ import absolute_import, division + import numpy as np import dask.bag as db import networkx as nx @@ -68,12 +70,12 @@ class LeafletFinder(ParallelAnalysisBase): """ - def __init__(self, universe, atomgroup): + def __init__(self, universe, atomgroups): super().__init__(universe) - - self._atomgroup = atomgroup + self._atomgroup = atomgroups self._results = list() + def _find_connected_components(self, data, cutoff=15.0): """Perform the Connected Components discovery for the atoms in data. @@ -97,7 +99,6 @@ def _find_connected_components(self, data, cutoff=15.0): """ # pylint: disable=unsubscriptable-object - # raise TypeError(data[0]) window, index = data[0] num = window[0].shape[0] i_index = index[0] @@ -191,32 +192,25 @@ def _single_frame(self, scheduler_kwargs, n_jobs, arranged_coord = list() part_size = int(matrix_size / n_jobs) # Partition the data based on a 2-dimensional partitioning - i = np.linspace(0, matrix_size, n_jobs+1).astype(int) - print(i) - if len(i) == 2: - arranged_coord.append(([atoms, - atoms], - [1,1])) - else: - for index_i in range(len(i)-1): - j = i[1:] - for index_j in range(len(j)-1): - arranged_coord.append(([atoms[i[index_i]:i[index_i+1]], - atoms[j[index_j]:j[index_j+1]]], - [i[index_i]+1, j[index_j]+1])) + for i in range(1, matrix_size + 1, part_size): + for j in range(i, matrix_size + 1, part_size): + arranged_coord.append(([atoms[i - 1:i - 1 + part_size], + atoms[j - 1:j - 1 + part_size]], + [i, j])) # Distribute the data over the available cores, apply the map function # and execute. with timeit() as prepare_dask: parAtoms = db.from_sequence(arranged_coord, npartitions=len(arranged_coord)) - parAtomsMap = parAtoms.map_partitions(self._find_connected_components, - cutoff=cutoff) + parAtomsMap = parAtoms.map_partitions( + self._find_connected_components, + cutoff=cutoff) self.prepare_dask_total += prepare_dask.elapsed Components = parAtomsMap.compute(**scheduler_kwargs) + # Gather the results and start the reduction. TODO: think if it can go # to the private _reduce method of the based class. result = list(Components) - raise TypeError(Components) # Create the overall connected components of the graph while len(result) != 0: diff --git a/pmda/rdf.py b/pmda/rdf.py index 9c5cae14..ad5fb89a 100644 --- a/pmda/rdf.py +++ b/pmda/rdf.py @@ -289,8 +289,8 @@ def _prepare(self): self._maxrange = self.rdf_settings['range'][1] def _single_frame(self): - ags = [[self._atomgroups[2*i], self._atomgroups[2*i+1]] - for i in range(self.n)] + ags = [[self._atomgroups[2 * i], self._atomgroups[2 * i + 1]] + for i in range(self.n)] count = [np.zeros((ag1.n_atoms, ag2.n_atoms, self.len), dtype=np.float64) for ag1, ag2 in ags] for i, (ag1, ag2) in enumerate(ags): From 187463bbf2dff87efcc8eff745d8e5f4817358dd Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Fri, 17 Jul 2020 10:43:47 +0200 Subject: [PATCH 19/34] pep8 --- pmda/density.py | 1 - pmda/leaflet.py | 1 - pmda/rdf.py | 17 ++++++++++------- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/pmda/density.py b/pmda/density.py index e11dd3e4..65050f5c 100644 --- a/pmda/density.py +++ b/pmda/density.py @@ -262,7 +262,6 @@ def __init__(self, atomgroup, delta=1.0, atomselection=None, else: self._select_atomgroup = atomgroup - def _prepare(self): coord = self._select_atomgroup.positions if self._gridcenter is not None: diff --git a/pmda/leaflet.py b/pmda/leaflet.py index 64c6a38e..a0e0b10e 100644 --- a/pmda/leaflet.py +++ b/pmda/leaflet.py @@ -75,7 +75,6 @@ def __init__(self, universe, atomgroups): self._atomgroup = atomgroups self._results = list() - def _find_connected_components(self, data, cutoff=15.0): """Perform the Connected Components discovery for the atoms in data. diff --git a/pmda/rdf.py b/pmda/rdf.py index ad5fb89a..2ff887a7 100644 --- a/pmda/rdf.py +++ b/pmda/rdf.py @@ -124,8 +124,8 @@ def _single_frame(self): # If provided exclusions, create a mask of _result which # lets us take these out. if self._exclusion_block is not None: - idxA = pairs[:, 0]//self._exclusion_block[0] - idxB = pairs[:, 1]//self._exclusion_block[1] + idxA = pairs[:, 0] // self._exclusion_block[0] + idxB = pairs[:, 1] // self._exclusion_block[1] mask = np.where(idxA != idxB)[0] dist = dist[mask] count = np.histogram(dist, **self.rdf_settings)[0] @@ -147,7 +147,7 @@ def _conclude(self, ): # Volume in each radial shell vol = np.power(self.edges[1:], 3) - np.power(self.edges[:-1], 3) - vol *= 4/3.0 * np.pi + vol *= 4 / 3.0 * np.pi # Average number density box_vol = self.volume / self.n_frames @@ -289,15 +289,18 @@ def _prepare(self): self._maxrange = self.rdf_settings['range'][1] def _single_frame(self): - ags = [[self._atomgroups[2 * i], self._atomgroups[2 * i + 1]] - for i in range(self.n)] + ags = [ + [self._atomgroups[2 * i], + self._atomgroups[2 * i + 1]] + for i in range(self.n)] count = [np.zeros((ag1.n_atoms, ag2.n_atoms, self.len), dtype=np.float64) for ag1, ag2 in ags] for i, (ag1, ag2) in enumerate(ags): pairs, dist = distances.capped_distance(ag1.positions, ag2.positions, self._maxrange, - box=self._universe.dimensions) + box=self._universe + .dimensions) for j, (idx1, idx2) in enumerate(pairs): count[i][idx1, idx2, :] = np.histogram(dist[j], @@ -312,7 +315,7 @@ def _conclude(self): self.volume = np.sum(self._results[:, 1]) # Volume in each radial shell vol = np.power(self.edges[1:], 3) - np.power(self.edges[:-1], 3) - vol *= 4/3.0 * np.pi + vol *= 4 / 3.0 * np.pi # Empty lists to restore indices, RDF rdf = [] From 8a420401e68317e41335adfda794b9d6a6fdf01f Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Fri, 17 Jul 2020 12:58:18 +0200 Subject: [PATCH 20/34] make sure universe before atomgroup --- pmda/contacts.py | 2 -- pmda/custom.py | 26 ++++++++++---------------- pmda/parallel.py | 30 ++++++++++++++---------------- 3 files changed, 24 insertions(+), 34 deletions(-) diff --git a/pmda/contacts.py b/pmda/contacts.py index 122a2642..1f01ceef 100644 --- a/pmda/contacts.py +++ b/pmda/contacts.py @@ -182,8 +182,6 @@ def is_any_closer(r, r0, dist=2.5): :inherited-members: """ -from __future__ import absolute_import, division - import MDAnalysis as mda from MDAnalysis.analysis.contacts import (contact_matrix, hard_cut_q, radius_cut_q, soft_cut_q) diff --git a/pmda/custom.py b/pmda/custom.py index 9f451471..0f1da8b0 100644 --- a/pmda/custom.py +++ b/pmda/custom.py @@ -16,7 +16,6 @@ same universe and return a value. """ -from MDAnalysis.core.groups import AtomGroup from MDAnalysis.core.universe import Universe from MDAnalysis.coordinates.base import ProtoReader import numpy as np @@ -42,7 +41,7 @@ class AnalysisFromFunction(ParallelAnalysisBase): >>> return mda.analysis.align.rotation_matrix(mobile, ref)[0] >>> # now run an analysis using the standalone function >>> rot = AnalysisFromFunction(rotation_matrix, - trajectory, mobile, ref).run() + universe, mobile, ref).run() >>> print(rot.results) Raises @@ -62,18 +61,14 @@ def __init__(self, function, universe, *args, **kwargs): to be 'mobile' Atomgroups if they belong to the same universe. All other Atomgroups are assumed to be reference. Here 'mobile' means they will be iterated over. - Universe : :class:`~MDAnalysis.core.groups.Universe` + universe : :class:`~MDAnalysis.core.groups.Universe` a :class:`MDAnalysis.core.groups.Universe` (the - `atomgroups` must belong to this Universe) + `atomgroups` in other args must belong to this Universe) *args : list arguments for ``function`` **kwargs : dict - keyword arguments for ``function``. keyword arguments with name - 'universe' or 'atomgroups' will be ignored! Mobile atomgroups to - analyze can not be passed as keyword arguments currently. - + keyword arguments for ``function``. """ - self.function = function super().__init__(universe) self.args = args @@ -118,7 +113,7 @@ def analysis_class(function): >>> def RotationMatrix(mobile, ref): >>> return mda.analysis.align.rotation_matrix(mobile, ref)[0] - >>> rot = RotationMatrix(u.trajectory, mobile, ref, step=2).run() + >>> rot = RotationMatrix(u, mobile, ref, step=2).run() >>> print(rot.results) See Also @@ -130,13 +125,12 @@ def analysis_class(function): class WrapperClass(AnalysisFromFunction): """Custom Analysis Function""" - def __init__(self, trajectory=None, *args, **kwargs): - if not (isinstance(trajectory, ProtoReader) or isinstance( - trajectory, Universe)): - print(type(trajectory)) + def __init__(self, universe=None, *args, **kwargs): + if not isinstance(universe, Universe): + print(type(universe)) raise ValueError( - "First argument needs to be an MDAnalysis reader object.") - super(WrapperClass, self).__init__(function, trajectory, *args, + "First argument needs to be an MDAnalysis Universe.") + super().__init__(function, universe, *args, **kwargs) return WrapperClass diff --git a/pmda/parallel.py b/pmda/parallel.py index 5ac67346..6de027af 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -88,7 +88,7 @@ def prepare(self): @property def prepare_dask(self): - """time for blocks to start working""" + """time to submit jobs to dask""" return self._prepare_dask @property @@ -134,7 +134,7 @@ class ParallelAnalysisBase(object): class NewAnalysis(ParallelAnalysisBase): def __init__(self, atomgroup, parameter): self._ag = atomgroup - super(NewAnalysis, self).__init__(atomgroup.universe, + super().__init__(atomgroup.universe, self._ag) def _single_frame(self, ts, agroups): @@ -190,13 +190,10 @@ def _reduce(res, result_single_frame): def __init__(self, universe): """Parameters ---------- - Universe : :class:`~MDAnalysis.core.groups.Universe` + universe : :class:`~MDAnalysis.core.groups.Universe` a :class:`MDAnalysis.core.groups.Universe` (the `atomgroups` must belong to this Universe) - atomgroups : tuple of :class:`~MDAnalysis.core.groups.AtomGroup` - atomgroups that are iterated in parallel - Attributes ---------- _results : list @@ -258,19 +255,10 @@ def _single_frame(self): Must return computed values as a list. You can only **read** from member variables stored in ``self``. Changing them during - a run will result in undefined behavior. `ts` and any of the + a run will result in undefined behavior. `self._ts` and any of the atomgroups can be changed (but changes will be overwritten when the next time step is read). - Parameters - ---------- - ts : :class:`~MDAnalysis.coordinates.base.Timestep` - The current coordinate frame (time step) in the - trajectory. - atomgroups : tuple - Tuple of :class:`~MDAnalysis.core.groups.AtomGroup` - instances that are updated to the current frame. - Returns ------- values : anything @@ -445,3 +433,13 @@ def _reduce(res, result_single_frame): """ 'append' action for a time series""" res.append(result_single_frame) return res + + def __getstate__(self): + # To make sure Universe is pickled before Atomgroup, + base_dict = self.__dict__ + universe_dict = {} + for key, item in base_dict.items(): + if(isinstance(item, mda.Universe)): + universe_dict[key] = item + universe_dict.update(base_dict) + return universe_dict From 053225bed989bc9399b28d0bf08f1197185fbdbc Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Sun, 9 Aug 2020 18:22:30 +0200 Subject: [PATCH 21/34] change travis back --- .travis.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 7b8ee952..f308baf6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,14 +19,14 @@ env: # mdanalysis develop from source (see below), which needs # minimal CONDA_MDANALYSIS_DEPENDENCIES #- CONDA_DEPENDENCIES="mdanalysis mdanalysistests dask joblib pytest-pep8 mock codecov cython hypothesis sphinx" - - CONDA_MDANALYSIS_DEPENDENCIES="cython mmtf-python six biopython networkx scipy griddataformats gsd hypothesis" - #- CONDA_MDANALYSIS_DEPENDENCIES="mdanalysis mdanalysistests" + #- CONDA_MDANALYSIS_DEPENDENCIES="cython mmtf-python six biopython networkx scipy griddataformats gsd hypothesis" + - CONDA_MDANALYSIS_DEPENDENCIES="mdanalysis mdanalysistests" - CONDA_DEPENDENCIES="${CONDA_MDANALYSIS_DEPENDENCIES} dask distributed joblib pytest-pep8 mock codecov" - CONDA_CHANNELS='conda-forge' - CONDA_CHANNEL_PRIORITY=True # install development version of MDAnalysis (needed until the test # files for analysis.rdf are available in release 0.19.0) - - PIP_DEPENDENCIES="git+https://github.com/yuxuanzhuang/mdanalysis.git@serialize_io#egg=mdanalysis&subdirectory=package git+https://github.com/yuxuanzhuang/mdanalysis.git@serialize_io#egg=mdanalysistests&subdirectory=testsuite" + #- PIP_DEPENDENCIES="git+https://github.com/MDAnalysis/mdanalysis#egg=mdanalysis&subdirectory=package git+https://github.com/MDAnalysis/mdanalysis#egg=mdanalysistests&subdirectory=testsuite" - NUMPY_VERSION=stable - BUILD_CMD='python setup.py develop' - CODECOV=false From 83becd717cf94a56ab6cf7b34e8079bd065111f3 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Thu, 13 Aug 2020 18:47:28 +0200 Subject: [PATCH 22/34] travis to develop --- .travis.yml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/.travis.yml b/.travis.yml index f308baf6..3b903a01 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,15 +18,15 @@ env: - SETUP_CMD="pmda --pep8 --cov pmda" # mdanalysis develop from source (see below), which needs # minimal CONDA_MDANALYSIS_DEPENDENCIES - #- CONDA_DEPENDENCIES="mdanalysis mdanalysistests dask joblib pytest-pep8 mock codecov cython hypothesis sphinx" - #- CONDA_MDANALYSIS_DEPENDENCIES="cython mmtf-python six biopython networkx scipy griddataformats gsd hypothesis" - - CONDA_MDANALYSIS_DEPENDENCIES="mdanalysis mdanalysistests" - - CONDA_DEPENDENCIES="${CONDA_MDANALYSIS_DEPENDENCIES} dask distributed joblib pytest-pep8 mock codecov" + - CONDA_DEPENDENCIES="mdanalysis mdanalysistests dask joblib pytest-pep8 mock codecov cython hypothesis sphinx" + - CONDA_MDANALYSIS_DEPENDENCIES="cython mmtf-python six biopython networkx scipy griddataformats gsd hypothesis" + # - CONDA_MDANALYSIS_DEPENDENCIES="mdanalysis mdanalysistests" + # - CONDA_DEPENDENCIES="${CONDA_MDANALYSIS_DEPENDENCIES} dask distributed joblib pytest-pep8 mock codecov" - CONDA_CHANNELS='conda-forge' - CONDA_CHANNEL_PRIORITY=True # install development version of MDAnalysis (needed until the test # files for analysis.rdf are available in release 0.19.0) - #- PIP_DEPENDENCIES="git+https://github.com/MDAnalysis/mdanalysis#egg=mdanalysis&subdirectory=package git+https://github.com/MDAnalysis/mdanalysis#egg=mdanalysistests&subdirectory=testsuite" + - PIP_DEPENDENCIES="git+https://github.com/MDAnalysis/mdanalysis#egg=mdanalysis&subdirectory=package git+https://github.com/MDAnalysis/mdanalysis#egg=mdanalysistests&subdirectory=testsuite" - NUMPY_VERSION=stable - BUILD_CMD='python setup.py develop' - CODECOV=false From 666d6905b3650e0ed1ae80ade7a923ce1e14bab5 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Mon, 17 Aug 2020 15:42:02 +0200 Subject: [PATCH 23/34] turn to dask collection --- pmda/parallel.py | 229 +++++++++++++++++++++++++---------------------- 1 file changed, 124 insertions(+), 105 deletions(-) diff --git a/pmda/parallel.py b/pmda/parallel.py index 6de027af..fc774769 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -21,7 +21,11 @@ import MDAnalysis as mda from dask.delayed import delayed import dask +from dask.base import DaskMethodsMixin import dask.distributed +from dask.utils import funcname +from dask.base import tokenize +from dask.highlevelgraph import HighLevelGraph from joblib import cpu_count import numpy as np @@ -102,7 +106,7 @@ def wait(self): return self._wait -class ParallelAnalysisBase(object): +class ParallelAnalysisBase(DaskMethodsMixin): """Base class for defining parallel multi frame analysis The class it is designed as a template for creating multiframe analyses. @@ -204,6 +208,13 @@ def __init__(self, universe): """ self._universe = universe self._trajectory = universe.trajectory + # _dsk keeps the dask graph + # (which is a dict of tuples of functions) + self._dsk = {} + # _keys keeps the desired results + # in a nested list that represent the outputs of the graph + self._keys = [] + self._job_prepared = False @contextmanager def readonly_attributes(self): @@ -272,6 +283,52 @@ def _single_frame(self): """ raise NotImplementedError + def prepare_jobs(self, + start=None, + stop=None, + step=None, + n_blocks=None): + start, stop, step = self._universe.trajectory.check_slice_indices(start, + stop, step) + n_frames = len(range(start, stop, step)) + + self.start, self.stop, self.step = start, stop, step + + self.n_frames = n_frames + + # record prepare time + with timeit() as prepare: + self._prepare() + self.time_prepare = prepare.elapsed + + if n_blocks is None: + n_blocks = cpu_count() + if n_frames == 0: + warnings.warn("run() analyses no frames: check start/stop/step") + if n_frames < n_blocks: + warnings.warn("run() uses more blocks than frames: " + "decrease n_blocks") + self.n_blocks = n_blocks + + slices = make_balanced_slices(n_frames, n_blocks, + start=start, stop=stop, step=step) + + self._blocks = [] + with timeit() as prepare_dask: + for bslice in slices: + self._frame_index = bslice + self._keys.append(self._append_job_to_dsk(self._map_chunk, + bslice)) + + # save the frame numbers for each block + self._blocks.append(range(bslice.start, + bslice.stop, + bslice.step)) + + self.time_prepare_dask = prepare_dask.elapsed + self._job_prepared = True + return self + def run(self, start=None, stop=None, @@ -295,115 +352,22 @@ def run(self, n_blocks : int, optional number of blocks to divide trajectory into. If ``None`` set equal to n_jobs or number of available workers in scheduler. - """ - # are we using a distributed scheduler or should we use - # multiprocessing? - scheduler = dask.config.get('scheduler', None) - if scheduler is None: - # maybe we can grab a global worker - try: - scheduler = dask.distributed.worker.get_client() - except ValueError: - pass - - if n_jobs == -1: - n_jobs = cpu_count() - - # we could not find a global scheduler to use and we ask for a single - # job. Therefore we run this on the single threaded scheduler for - # debugging. - if scheduler is None and n_jobs == 1: - scheduler = 'synchronous' - - # fall back to multiprocessing, we tried everything - if scheduler is None: - scheduler = 'processes' - - if n_blocks is None: - if scheduler == 'processes': - n_blocks = n_jobs - elif isinstance(scheduler, dask.distributed.Client): - n_blocks = len(scheduler.ncores()) - else: - n_blocks = 1 - warnings.warn( - "Couldn't guess ideal number of blocks from scheduler. " - "Setting n_blocks=1. " - "Please provide `n_blocks` in call to method.") - - scheduler_kwargs = {'scheduler': scheduler} - if scheduler == 'processes': - scheduler_kwargs['num_workers'] = n_jobs - - start, stop, step = self._trajectory.check_slice_indices(start, - stop, step) - n_frames = len(range(start, stop, step)) - - self.start, self.stop, self.step = start, stop, step - - self.n_frames = n_frames - - if n_frames == 0: - warnings.warn("run() analyses no frames: check start/stop/step") - if n_frames < n_blocks: - warnings.warn("run() uses more blocks than frames: " - "decrease n_blocks") - - slices = make_balanced_slices(n_frames, n_blocks, - start=start, stop=stop, step=step) - - # record total time with timeit() as total: - # record prepare time - with timeit() as prepare: - self._prepare() - time_prepare = prepare.elapsed - blocks = [] - _blocks = [] - with self.readonly_attributes(): - with timeit() as prepare_dask: - for bslice in slices: - task = delayed(self._dask_helper, pure=False)(bslice) - blocks.append(task) - # save the frame numbers for each block - _blocks.append(range(bslice.start, - bslice.stop, bslice.step)) - blocks = delayed(blocks) - time_prepare_dask = prepare_dask.elapsed - - # record the time when scheduler starts working - wait_start = time.time() - res = blocks.compute(**scheduler_kwargs) - # hack to handle n_frames == 0 in this framework - if len(res) == 0: - # everything else wants list of block tuples - res = [([], [], [], wait_start, 0, 0)] - # record conclude time - with timeit() as conclude: - self._results = np.asarray([el[0] for el in res]) - # save the frame numbers for all blocks - self._blocks = _blocks - self._conclude() - # put all time information into the timing object - self.timing = Timing( - np.hstack([el[1] for el in res]), - np.hstack([el[2] for el in res]), total.elapsed, - time_prepare, - time_prepare_dask, - conclude.elapsed, - # waiting time = wait_end - wait_start - np.array([el[3] - wait_start for el in res]), - np.array([el[4] for el in res]), - np.array([el[5] for el in res])) + if not self._job_prepared: + self.prepare_jobs(start, stop, step, n_blocks) + + self.wait_start = time.time() + _ = self.compute() + # empty the dask jobs + self._dsk = {} + self._keys = [] + self.timing._total = total.elapsed - # this is crucial if the analysis does not iterate over - # the whole trajectory. - self._trajectory.rewind() return self - def _dask_helper(self, bslice): - """helper function to actually setup dask graph""" + def _map_chunk(self, bslice): + """function to setup chunk dask graph""" # wait_end needs to be first line for accurate timing wait_end = time.time() res = [] @@ -412,6 +376,7 @@ def _dask_helper(self, bslice): # NOTE: bslice.stop cannot be None! Always make sure # that it comes from _trajectory.check_slice_indices()! for i in range(bslice.start, bslice.stop, bslice.step): + self._frame_index = i # record io time per frame with timeit() as b_io: # explicit instead of 'for ts in u.trajectory[bslice]' @@ -443,3 +408,57 @@ def __getstate__(self): universe_dict[key] = item universe_dict.update(base_dict) return universe_dict + + def __dask_graph__(self): + return self._dsk + + def __dask_keys__(self): + return self._keys + + __dask_scheduler__ = staticmethod(dask.multiprocessing.get) + + def __dask_postcompute__(self): + return self._post_reduce, () + + def _post_reduce(self, res): + self._results = list(res) + # hack to handle n_frames == 0 in this framework + if len(res) == 0: + # everything else wants list of block tuples + res = [([], [], [], self.wait_start, 0, 0)] + # record conclude time + with timeit() as conclude: + self._results = np.asarray([el[0] for el in res]) + # save the frame numbers for all blocks + self._conclude() + # put all time information into the timing object + self.timing = Timing( + np.hstack([el[1] for el in res]), + np.hstack([el[2] for el in res]), 0, + self.time_prepare, + self.time_prepare_dask, + conclude.elapsed, + # waiting time = wait_end - wait_start + np.array([el[3] - self.wait_start for el in res]), + np.array([el[4] for el in res]), + np.array([el[5] for el in res])) + + # this is crucial if the analysis does not iterate over + # the whole trajectory. + self._trajectory.rewind() + self._dsk = {} + self._keys = [] + + def __dask_postpersist__(self): + return self._post_reduce, (self._keys) + + def __dask_tokenize__(self): + return tuple(self._keys) + + def _append_job_to_dsk(self, func, *args, **kwargs): + name = "%s-%s" % (funcname(func), tokenize(func, + args, + kwargs, + self._frame_index)) + self._dsk[name] = (func, *args) + return name From 1e0013e6c6d4b8d5c663a43d18ac67e54aa071d3 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Mon, 17 Aug 2020 16:27:10 +0200 Subject: [PATCH 24/34] n_frame modify --- pmda/parallel.py | 43 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 34 insertions(+), 9 deletions(-) diff --git a/pmda/parallel.py b/pmda/parallel.py index fc774769..3bb9911e 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -19,13 +19,11 @@ import time import MDAnalysis as mda -from dask.delayed import delayed import dask from dask.base import DaskMethodsMixin import dask.distributed from dask.utils import funcname from dask.base import tokenize -from dask.highlevelgraph import HighLevelGraph from joblib import cpu_count import numpy as np @@ -288,6 +286,20 @@ def prepare_jobs(self, stop=None, step=None, n_blocks=None): + """Prepare the jobs + + Parameters + ---------- + start : int, optional + start frame of analysis + stop : int, optional + stop frame of analysis + step : int, optional + number of frames to skip between each analysed frame + n_blocks : int, optional + number of blocks to divide trajectory into. If ``None`` set equal + to n_jobs or number of available workers in scheduler. + """ start, stop, step = self._universe.trajectory.check_slice_indices(start, stop, step) n_frames = len(range(start, stop, step)) @@ -304,10 +316,11 @@ def prepare_jobs(self, if n_blocks is None: n_blocks = cpu_count() if n_frames == 0: - warnings.warn("run() analyses no frames: check start/stop/step") + warnings.warn("analyses no frames: check start/stop/step") if n_frames < n_blocks: - warnings.warn("run() uses more blocks than frames: " - "decrease n_blocks") + warnings.warn("uses more blocks than frames: " + "will decrease n_blocks") + n_blocks = n_frames self.n_blocks = n_blocks slices = make_balanced_slices(n_frames, n_blocks, @@ -317,6 +330,7 @@ def prepare_jobs(self, with timeit() as prepare_dask: for bslice in slices: self._frame_index = bslice + self._keys.append(self._append_job_to_dsk(self._map_chunk, bslice)) @@ -327,6 +341,7 @@ def prepare_jobs(self, self.time_prepare_dask = prepare_dask.elapsed self._job_prepared = True + self.wait_start = time.time() return self def run(self, @@ -357,8 +372,10 @@ def run(self, if not self._job_prepared: self.prepare_jobs(start, stop, step, n_blocks) - self.wait_start = time.time() - _ = self.compute() + scheduler_kwargs = {'num_workers': n_jobs} + + _ = self.compute(**scheduler_kwargs) + # empty the dask jobs self._dsk = {} self._keys = [] @@ -409,19 +426,21 @@ def __getstate__(self): universe_dict.update(base_dict) return universe_dict + # dask-related functions + def __dask_graph__(self): return self._dsk def __dask_keys__(self): return self._keys + # it uses multiprocessing scheduler in default __dask_scheduler__ = staticmethod(dask.multiprocessing.get) def __dask_postcompute__(self): return self._post_reduce, () def _post_reduce(self, res): - self._results = list(res) # hack to handle n_frames == 0 in this framework if len(res) == 0: # everything else wants list of block tuples @@ -448,17 +467,23 @@ def _post_reduce(self, res): self._trajectory.rewind() self._dsk = {} self._keys = [] + self._job_prepared = False def __dask_postpersist__(self): - return self._post_reduce, (self._keys) + # we don't need persist implementation. + raise NotImplementedError def __dask_tokenize__(self): return tuple(self._keys) def _append_job_to_dsk(self, func, *args, **kwargs): + # the name has to be identical for each jobs + # args, current_frame_index, func will all be used for tokenization. name = "%s-%s" % (funcname(func), tokenize(func, args, kwargs, self._frame_index)) self._dsk[name] = (func, *args) + # return name to be added to self._keys + # which means this function itself does not modify the _keys return name From f9c89e6ab054e80faa979d7c6073ff0e6d87ca55 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Wed, 19 Aug 2020 12:01:47 +0200 Subject: [PATCH 25/34] remove getstate --- pmda/parallel.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/pmda/parallel.py b/pmda/parallel.py index 6de027af..ee9907d3 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -433,13 +433,3 @@ def _reduce(res, result_single_frame): """ 'append' action for a time series""" res.append(result_single_frame) return res - - def __getstate__(self): - # To make sure Universe is pickled before Atomgroup, - base_dict = self.__dict__ - universe_dict = {} - for key, item in base_dict.items(): - if(isinstance(item, mda.Universe)): - universe_dict[key] = item - universe_dict.update(base_dict) - return universe_dict From 61bce8f2b8057f5a300268757dae9a96faea965c Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Wed, 19 Aug 2020 12:21:51 +0200 Subject: [PATCH 26/34] pep8 --- pmda/leaflet.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pmda/leaflet.py b/pmda/leaflet.py index a0e0b10e..eac91da7 100644 --- a/pmda/leaflet.py +++ b/pmda/leaflet.py @@ -202,8 +202,8 @@ def _single_frame(self, scheduler_kwargs, n_jobs, parAtoms = db.from_sequence(arranged_coord, npartitions=len(arranged_coord)) parAtomsMap = parAtoms.map_partitions( - self._find_connected_components, - cutoff=cutoff) + self._find_connected_components, + cutoff=cutoff) self.prepare_dask_total += prepare_dask.elapsed Components = parAtomsMap.compute(**scheduler_kwargs) @@ -297,9 +297,10 @@ def run(self, times_io.append(b_io.elapsed) with timeit() as b_compute: components = self. \ - _single_frame(scheduler_kwargs=scheduler_kwargs, - n_jobs=n_jobs, - cutoff=cutoff) + _single_frame( + scheduler_kwargs=scheduler_kwargs, + n_jobs=n_jobs, + cutoff=cutoff) timings.append(b_compute.elapsed) leaflet1 = self._atomgroup[components[0]] leaflet2 = self._atomgroup[components[1]] From cb99fc8d909c4115a869178fc005c76fcf0a5de0 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Wed, 19 Aug 2020 12:28:22 +0200 Subject: [PATCH 27/34] update setup.py --- setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 0d465d87..8658c394 100644 --- a/setup.py +++ b/setup.py @@ -48,7 +48,7 @@ }, packages=find_packages(), install_requires=[ - 'MDAnalysis>=1.0.0', + 'MDAnalysis>=2.0.0', 'dask>=0.18', 'distributed', 'six', @@ -58,5 +58,5 @@ ], tests_require=[ 'pytest', - 'MDAnalysisTests>=1.0.0', # keep + 'MDAnalysisTests>=2.0.0', # keep ], ) From d95add1ac83a0aa65ec29bade495f544a10de686 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Wed, 19 Aug 2020 12:58:57 +0200 Subject: [PATCH 28/34] pep8 warning --- setup.cfg | 3 +++ 1 file changed, 3 insertions(+) diff --git a/setup.cfg b/setup.cfg index 63d555a6..f16d01d8 100644 --- a/setup.cfg +++ b/setup.cfg @@ -12,6 +12,8 @@ ignore = N806, N803, N802, I100 pep8ignore = setup.py ALL docs/conf.py ALL +filterwarnings = + ignore::pytest.PytestDeprecationWarning [versioneer] VCS = git @@ -20,3 +22,4 @@ versionfile_source = pmda/_version.py versionfile_build = pmda/_version.py tag_prefix = parentdir_prefix = pmda + From a5052829f4e3139703ca946f0bd8f17c8a0d9b89 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Wed, 19 Aug 2020 13:09:27 +0200 Subject: [PATCH 29/34] travis --- .travis.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.travis.yml b/.travis.yml index 3b903a01..3051275e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,10 +18,10 @@ env: - SETUP_CMD="pmda --pep8 --cov pmda" # mdanalysis develop from source (see below), which needs # minimal CONDA_MDANALYSIS_DEPENDENCIES - - CONDA_DEPENDENCIES="mdanalysis mdanalysistests dask joblib pytest-pep8 mock codecov cython hypothesis sphinx" - - CONDA_MDANALYSIS_DEPENDENCIES="cython mmtf-python six biopython networkx scipy griddataformats gsd hypothesis" - # - CONDA_MDANALYSIS_DEPENDENCIES="mdanalysis mdanalysistests" - # - CONDA_DEPENDENCIES="${CONDA_MDANALYSIS_DEPENDENCIES} dask distributed joblib pytest-pep8 mock codecov" + # - CONDA_DEPENDENCIES="mdanalysis mdanalysistests dask joblib pytest-pep8 mock codecov cython hypothesis sphinx" + # - CONDA_MDANALYSIS_DEPENDENCIES="cython mmtf-python six biopython networkx scipy griddataformats gsd hypothesis" + - CONDA_MDANALYSIS_DEPENDENCIES="mmtf-python biopython networkx cython matplotlib scipy griddataformats hypothesis gsd" + - CONDA_DEPENDENCIES="${CONDA_MDANALYSIS_DEPENDENCIES} dask distributed joblib pytest-pep8 mock codecov" - CONDA_CHANNELS='conda-forge' - CONDA_CHANNEL_PRIORITY=True # install development version of MDAnalysis (needed until the test From 608a803d9146b2ec0e0e4127bd4f160a411442ad Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Wed, 19 Aug 2020 13:23:52 +0200 Subject: [PATCH 30/34] setup reverse --- setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 8658c394..0d465d87 100644 --- a/setup.py +++ b/setup.py @@ -48,7 +48,7 @@ }, packages=find_packages(), install_requires=[ - 'MDAnalysis>=2.0.0', + 'MDAnalysis>=1.0.0', 'dask>=0.18', 'distributed', 'six', @@ -58,5 +58,5 @@ ], tests_require=[ 'pytest', - 'MDAnalysisTests>=2.0.0', # keep + 'MDAnalysisTests>=1.0.0', # keep ], ) From 18988c596c278e26e82593a1230306cca7cbd892 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Wed, 19 Aug 2020 14:34:27 +0200 Subject: [PATCH 31/34] pep --- pmda/custom.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pmda/custom.py b/pmda/custom.py index 0f1da8b0..cab045a6 100644 --- a/pmda/custom.py +++ b/pmda/custom.py @@ -130,7 +130,9 @@ def __init__(self, universe=None, *args, **kwargs): print(type(universe)) raise ValueError( "First argument needs to be an MDAnalysis Universe.") - super().__init__(function, universe, *args, - **kwargs) + super().__init__(function, + universe, + *args, + **kwargs) return WrapperClass From 61a34c72422912d211b6138bc67dc0fcaf4dcc4b Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Wed, 19 Aug 2020 15:40:40 +0200 Subject: [PATCH 32/34] setup py3 --- CHANGELOG | 8 ++++++++ setup.py | 16 ++++++++++++---- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 89d9b03d..ef488ff4 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -33,6 +33,14 @@ Fixes Changes * requires MDAnalysis >= 1.0.0 (#122) * dropped official support for Python 3.5 (2.7 and >= 3.6 are supported) + * requires MDAnalysis >= 2.0.0 (#132) + * dropped official support for Python<=3.5 + * with support of serialization of Universe, in ParallelAnalysisBase: (#132) + - we pickle/unpickle Universes, instead of creating new ones. + - __init__ only takes Universe as arguments + - _single_frame takes no extra arguments. + - timing for generating universe is ditched. + - timing for creating dask graph is added. 10/14/2019 VOD555, nawtrey diff --git a/setup.py b/setup.py index 0d465d87..a81550d2 100644 --- a/setup.py +++ b/setup.py @@ -8,8 +8,16 @@ # Released under the GNU Public Licence, v2 or any higher version from setuptools import setup, find_packages +import sys import versioneer +# Make sure I have the right Python version. +if sys.version_info[:2] < (3, 6): + print('PMDA requires Python 3.6 or better. Python {0:d}.{1:d} detected'.format(* + sys.version_info[:2])) + print('Please upgrade your version of Python.') + sys.exit(-1) + with open('README.rst', 'r') as f: long_description = f.read() @@ -33,8 +41,6 @@ 'Programming Language :: Python', 'Topic :: Scientific/Engineering', 'Topic :: Software Development :: Libraries :: Python Modules', - 'Programming Language :: Python :: 2', - 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.7', @@ -48,7 +54,8 @@ }, packages=find_packages(), install_requires=[ - 'MDAnalysis>=1.0.0', + # 'MDAnalysis>=2.0.0', + 'mdanalysis @ git+https://github.com/MDAnalysis/mdanalysis#egg=mdanalysis&subdirectory=package' 'dask>=0.18', 'distributed', 'six', @@ -58,5 +65,6 @@ ], tests_require=[ 'pytest', - 'MDAnalysisTests>=1.0.0', # keep + # 'MDAnalysisTests>=2.0.0', # keep + 'mdanalysistests @ git+https://github.com/MDAnalysis/mdanalysis#egg=mdanalysistests&subdirectory=testsuite' ], ) From 7bf68f5c9027568dcb8801079b6a6c34c5eacfdc Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Wed, 19 Aug 2020 16:18:58 +0200 Subject: [PATCH 33/34] rewind doc --- CHANGELOG | 6 +++--- pmda/parallel.py | 6 ++++-- setup.py | 2 +- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index ef488ff4..d3479f72 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -34,11 +34,11 @@ Changes * requires MDAnalysis >= 1.0.0 (#122) * dropped official support for Python 3.5 (2.7 and >= 3.6 are supported) * requires MDAnalysis >= 2.0.0 (#132) - * dropped official support for Python<=3.5 + * dropped official support for Python<=3.5 (#132) * with support of serialization of Universe, in ParallelAnalysisBase: (#132) - we pickle/unpickle Universes, instead of creating new ones. - - __init__ only takes Universe as arguments - - _single_frame takes no extra arguments. + - __init__ only takes Universe as then argument. + - _single_frame takes no argument. - timing for generating universe is ditched. - timing for creating dask graph is added. diff --git a/pmda/parallel.py b/pmda/parallel.py index ee9907d3..3ea6dcc5 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -397,8 +397,10 @@ def run(self, np.array([el[4] for el in res]), np.array([el[5] for el in res])) - # this is crucial if the analysis does not iterate over - # the whole trajectory. + # To make sure the trajectory is reset to initial state, + # if we are not running the analysis through the whole trajectory. + # With this, we get the same result (state of the trajectory) from + # ParallelAnalysisBase and MDAnalysis.AnalaysisBase. self._trajectory.rewind() return self diff --git a/setup.py b/setup.py index a81550d2..54cd9810 100644 --- a/setup.py +++ b/setup.py @@ -55,7 +55,7 @@ packages=find_packages(), install_requires=[ # 'MDAnalysis>=2.0.0', - 'mdanalysis @ git+https://github.com/MDAnalysis/mdanalysis#egg=mdanalysis&subdirectory=package' + 'mdanalysis @ git+https://github.com/MDAnalysis/mdanalysis#egg=mdanalysis&subdirectory=package', 'dask>=0.18', 'distributed', 'six', From 186bfbf8aa80cc1bc31e7153e79bd10bff7bff60 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhuang Date: Thu, 20 Aug 2020 12:39:40 +0200 Subject: [PATCH 34/34] change tokenize to uuid --- pmda/parallel.py | 71 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 56 insertions(+), 15 deletions(-) diff --git a/pmda/parallel.py b/pmda/parallel.py index 39b71821..c0ba6643 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -15,16 +15,17 @@ """ from contextlib import contextmanager +from joblib import cpu_count +import time import warnings +import uuid -import time import MDAnalysis as mda import dask from dask.base import DaskMethodsMixin import dask.distributed from dask.utils import funcname from dask.base import tokenize -from joblib import cpu_count import numpy as np from .util import timeit, make_balanced_slices @@ -282,10 +283,11 @@ def _single_frame(self): raise NotImplementedError def prepare(self, - start=None, - stop=None, - step=None, - n_blocks=None): + start=None, + stop=None, + step=None, + n_jobs=1, + n_blocks=None): """Prepare the jobs Parameters @@ -313,8 +315,38 @@ def prepare(self, self._prepare() self.time_prepare = prepare_time.elapsed + # get global scheduler + scheduler = dask.config.get('scheduler', None) + if scheduler == 'processes': + self.__dask_scheduler__ = dask.multiprocessing.get + elif scheduler == 'synchronous': + self.__dask_scheduler__ = dask.get + elif isinstance(scheduler, dask.distributed.Client): + self.__dask_scheduler__ = dask.distributed.Client.get + elif scheduler is not None: + raise ValueError(f"PMDA doesn't support other" + f"schedulers: {scheduler}") + + if n_jobs == -1: + n_jobs = cpu_count() + + if scheduler is None and n_jobs == 1: + # synchronous + self.__dask_scheduler__ = dask.get + + # setting n_blocks if n_blocks is None: - n_blocks = cpu_count() + if self.__dask_scheduler__ == dask.multiprocessing.get: + n_blocks = n_jobs + elif self.__dask_scheduler__ == dask.distributed.Client.get: + n_blocks = len(dask.distributed.worker.get_client().ncores()) + else: + n_blocks = 1 + warnings.warn( + "Couldn't guess ideal number of blocks from scheduler. " + "Setting n_blocks=1. " + "Please provide `n_blocks` in call to method.") + if n_frames == 0: warnings.warn("analyses no frames: check start/stop/step") if n_frames < n_blocks: @@ -368,7 +400,7 @@ def run(self, """ with timeit() as total: if not self._job_prepared: - self.prepare(start, stop, step, n_blocks) + self.prepare(start, stop, step, n_jobs, n_blocks) if n_jobs == -1: n_jobs = cpu_count() @@ -467,13 +499,22 @@ def __dask_postpersist__(self): def __dask_tokenize__(self): return tuple(self._keys) - def _append_job_to_dsk(self, func, *args, **kwargs): - # the name has to be identical for each jobs - # args, current_frame_index, func will all be used for tokenization. - name = "%s-%s" % (funcname(func), tokenize(func, - args, - kwargs, - self._frame_index)) + def _append_job_to_dsk(self, func, *args, pure=False, **kwargs): + # If pure is True, a consistent hash function is tried on the input. + # If False (default), then a unique identifier is always used. + + # When True, args, current_frame_index, func will all be + # used for tokenization. This is much slower and requires the class + # to be pickled during this process. + # (any reason to use tokenize instead of uuid? + + if not pure: + name = "%s-%s" % (funcname(func), str(uuid.uuid4())) + else: + name = "%s-%s" % (funcname(func), tokenize(func, + args, + kwargs, + self._frame_index)) self._dsk[name] = (func, *args) # return name to be added to self._keys # which means this function itself does not modify the _keys