Skip to content

Commit

Permalink
fix GC problems and long graph construction times for large datasets.
Browse files Browse the repository at this point in the history
  • Loading branch information
tmoerman committed Jun 11, 2018
1 parent d4abc60 commit 482ce85
Show file tree
Hide file tree
Showing 9 changed files with 608 additions and 29 deletions.
4 changes: 2 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
:alt: Documentation Status
:target: http://arboreto.readthedocs.io/en/latest/?badge=latest

.. image:: https://img.shields.io/badge/pypi-0.1.4-blue.svg
.. image:: https://img.shields.io/badge/pypi-0.1.5-blue.svg
:alt: PyPI package
:target: https://pypi.python.org/pypi?:action=display&name=arboreto&version=0.1.4
:target: https://pypi.python.org/pypi?:action=display&name=arboreto&version=0.1.5

----

Expand Down
1 change: 1 addition & 0 deletions arboreto/algo.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def diy(expression_data,
seed=seed)

if verbose:
print('{} partitions'.format(graph.npartitions))
print('computing dask graph')

return client \
Expand Down
46 changes: 26 additions & 20 deletions arboreto/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,60 +357,64 @@ def target_gene_indices(gene_names,
raise ValueError("Unable to interpret target_genes.")


_GRN_SCHEMA = make_meta({'TF': str, 'target': str, 'importance': float})
_META_SCHEMA = make_meta({'target': str, 'n_estimators': int})


def create_graph(expression_matrix,
gene_names,
tf_names,
regressor_type,
regressor_kwargs,
client=None,
client,
target_genes='all',
limit=None,
include_meta=False,
early_stop_window_length=EARLY_STOP_WINDOW_LENGTH,
repartition_multiplier=1,
seed=DEMON_SEED):
"""
Main API function. Create a Dask computation graph.
Note: fixing the GC problems was fixed by 2 changes: [1] and [2] !!!
:param expression_matrix: numpy matrix. Rows are observations and columns are genes.
:param gene_names: list of gene names. Each entry corresponds to the expression_matrix column with same index.
:param tf_names: list of transcription factor names. Should have a non-empty intersection with gene_names.
:param regressor_type: regressor type. Case insensitive.
:param regressor_kwargs: dict of key-value pairs that configures the regressor.
:param client: a dask.distributed client instance.
* Recommended to use in a distributed environment!
* Used to scatter-broadcast the tf matrix to the workers instead of simply wrapping in a delayed().
* If None, the tf_matrix will be wrapped in a delayed(), suboptimal in a distributed setting.
:param target_genes: either int, 'all' or a collection that is a subset of gene_names.
:param limit: optional number of top regulatory links to return. Default None.
:param include_meta: Also return the meta DataFrame. Default False.
:param early_stop_window_length: window length of the early stopping monitor.
:param repartition_multiplier: multiplier
:param seed: (optional) random seed for the regressors. Default 666.
:return: if include_meta is False, returns a Dask graph that computes the links DataFrame.
If include_meta is True, returns a tuple: the links DataFrame and the meta DataFrame.
"""

assert expression_matrix.shape[1] == len(gene_names)
assert client, "client is required"

tf_matrix, tf_matrix_gene_names = to_tf_matrix(expression_matrix, gene_names, tf_names)

if client is None:
delayed_or_future_tf_matrix = delayed(tf_matrix, pure=True)
else:
delayed_or_future_tf_matrix = client.scatter(tf_matrix, broadcast=True)

delayed_tf_matrix_gene_names = delayed(tf_matrix_gene_names, pure=True)
future_tf_matrix = client.scatter(tf_matrix, broadcast=True)
# [1] wrap in a list of 1 -> unsure why but Matt. Rocklin does this often...
[future_tf_matrix_gene_names] = client.scatter([tf_matrix_gene_names], broadcast=True)

delayed_link_dfs = [] # collection of delayed link DataFrames
delayed_meta_dfs = [] # collection of delayed meta DataFrame

for target_gene_index in target_gene_indices(gene_names, target_genes):
target_gene_name = gene_names[target_gene_index]
target_gene_expression = expression_matrix[:, target_gene_index]
target_gene_name = delayed(gene_names[target_gene_index], pure=True)
target_gene_expression = delayed(expression_matrix[:, target_gene_index], pure=True)

if include_meta:
delayed_link_df, delayed_meta_df = delayed(infer_partial_network, pure=True, nout=2)(
regressor_type, regressor_kwargs,
delayed_or_future_tf_matrix, delayed_tf_matrix_gene_names,
future_tf_matrix, future_tf_matrix_gene_names,
target_gene_name, target_gene_expression, include_meta, early_stop_window_length, seed)

if delayed_link_df is not None:
Expand All @@ -419,29 +423,31 @@ def create_graph(expression_matrix,
else:
delayed_link_df = delayed(infer_partial_network, pure=True)(
regressor_type, regressor_kwargs,
delayed_or_future_tf_matrix, delayed_tf_matrix_gene_names,
future_tf_matrix, future_tf_matrix_gene_names,
target_gene_name, target_gene_expression, include_meta, early_stop_window_length, seed)

if delayed_link_df is not None:
delayed_link_dfs.append(delayed_link_df)

# gather the DataFrames into one distributed DataFrame
all_links_df = from_delayed(delayed_link_dfs,
meta=make_meta({'TF': str, 'target': str, 'importance': float}))
all_meta_df = from_delayed(delayed_meta_dfs,
meta=make_meta({'target': str, 'n_estimators': int}))
all_links_df = from_delayed(delayed_link_dfs, meta=_GRN_SCHEMA)
all_meta_df = from_delayed(delayed_meta_dfs, meta=_META_SCHEMA)

# optionally limit the number of resulting regulatory links, descending by top importance
if limit:
maybe_limited_links_df = all_links_df.nlargest(limit, columns=['importance'])
else:
maybe_limited_links_df = all_links_df

# optionally return the meta DataFrame as well
# [2] repartition to nr of workers -> important to avoid GC problems!
# see: http://dask.pydata.org/en/latest/dataframe-performance.html#repartition-to-reduce-overhead
n_parts = len(client.ncores()) * repartition_multiplier

if include_meta:
return maybe_limited_links_df, all_meta_df
return maybe_limited_links_df.repartition(npartitions=n_parts), \
all_meta_df.repartition(npartitions=n_parts)
else:
return maybe_limited_links_df
return maybe_limited_links_df.repartition(npartitions=n_parts)


class EarlyStopMonitor:
Expand Down
4 changes: 2 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@
# built documents.
#
# The short X.Y version.
version = '0.1.4'
version = '0.1.5'
# The full version, including alpha/beta/rc tags.
release = '0.1.4'
release = '0.1.5'

# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Expand Down
4 changes: 2 additions & 2 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
:alt: Documentation Status
:target: http://arboreto.readthedocs.io/en/latest/?badge=latest

.. image:: https://img.shields.io/badge/pypi-0.1.4-blue.svg
.. image:: https://img.shields.io/badge/pypi-0.1.5-blue.svg
:alt: PyPI package
:target: https://pypi.python.org/pypi?:action=display&name=arboreto&version=0.1.4
:target: https://pypi.python.org/pypi?:action=display&name=arboreto&version=0.1.5

----

Expand Down
2 changes: 1 addition & 1 deletion docs/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Check out the installation:
$ pip show arboreto
Name: arboreto
Version: 0.1.4
Version: 0.1.5
Summary: Scalable gene regulatory network inference using tree-based ensemble regressors
Home-page: https://github.com/tmoerman/arboreto
Author: Thomas Moerman
Expand Down
2 changes: 1 addition & 1 deletion docs/lcb.rst
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ We obviously need Arboreto (make sure you have the latest version):
$ pip show arboreto
Name: arboreto
Version: 0.1.4
Version: 0.1.5
Summary: Scalable gene regulatory network inference using tree-based ensemble regressors
Home-page: https://github.com/tmoerman/arboreto
Author: Thomas Moerman
Expand Down
Loading

0 comments on commit 482ce85

Please sign in to comment.