Skip to content

Commit

Permalink
Add QGraph dependencies to dimension clustering.
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleGower committed Oct 4, 2024
1 parent 6ec2c40 commit 56e2d00
Show file tree
Hide file tree
Showing 6 changed files with 564 additions and 119 deletions.
1 change: 1 addition & 0 deletions doc/changes/DM-46513.misc.rst
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
Fixed test_clustered_quantum_graph.testClusters.
Moved validation of ClusteredQuantumGraph from tests to class definition.
Added following QuantumGraph dependencies to dimension clustering to enable clustering when dimension values aren't equal (e.g., group vs visit).
37 changes: 37 additions & 0 deletions doc/lsst.ctrl.bps/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,43 @@ Relevant Config Entries:
# requestCpus: N # Overrides for jobs in this cluster
# requestMemory: NNNN # MB, Overrides for jobs in this cluster
.. _bps-dimension_dependency:

User-defined Dimension Clustering with QuantumGraph Dependencies
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

There are instances where the dimensions aren't the same for quanta
that we want to put in the same cluster. In many cases, we can use
``equalDimensions`` to solve this problem. However that only works if
the values are equal, but just different dimension names (e.g., visit
and exposure). In the case of group and visit, the values aren't the
same. The QuantumGraph has dependencies between those quanta that
can be used instead of comparing dimension values.

Using the dependencies is an option per cluster definition. To enable it,
define ``find_dependency_method``. A subgraph of the pipeline graph is
made (i.e., a directed graph of the pipeline task labels specified for
the cluster). A value of ``sink`` says to use the dimensions of the sink
nodes in the subgraph and then find ancestors in the ``QuantumGraph`` to
complete the cluster. A value of ``source`` says to use the dimensions
of the source nodes in the subgraph and then find descendants in the
``QuantumGraph`` to complete the cluster. Generally, it doesn't matter
which direction is used, but the direction determines which dimension
values appear in the cluster names and thus job names.

.. code-block:: YAML
clusterAlgorithm: lsst.ctrl.bps.quantum_clustering_funcs.dimension_clustering
cluster:
# Repeat cluster subsection for however many clusters there are
# with or without find_dependency_method
clusterLabel1:
dimensions: visit, detector
pipetasks: getRegionTimeFromVisit, loadDiaCatalogs, diaPipe
find_dependency_method: sink
# requestCpus: N # Overrides for jobs in this cluster
# requestMemory: NNNN # MB, Overrides for jobs in this cluster
.. _bps-softlink:

WMS-id softlink
Expand Down
273 changes: 211 additions & 62 deletions python/lsst/ctrl/bps/quantum_clustering_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Functions that convert QuantumGraph into ClusteredQuantumGraph.
"""
"""Functions that convert QuantumGraph into ClusteredQuantumGraph."""
import logging
import re
from collections import defaultdict
from typing import Any
from uuid import UUID

from lsst.pipe.base import QuantumGraph
from networkx import DiGraph, is_directed_acyclic_graph, topological_sort
from lsst.pipe.base import QuantumGraph, QuantumNode
from networkx import DiGraph, ancestors, descendants, is_directed_acyclic_graph, topological_sort

from . import BpsConfig, ClusteredQuantumGraph, QuantaCluster

Expand Down Expand Up @@ -119,8 +118,8 @@ def _check_clusters_tasks(
cluster_labels: `list` [`str`]
Dependency ordered list of cluster labels (includes
single quantum clusters).
ordered_tasks : `dict` [`str`, `list` [`str`]]
Mapping of cluster label to ordered list of task labels.
ordered_tasks : `dict` [`str`, `networkx.DiGraph`]
Mapping of cluster label to task subgraph.
Raises
------
Expand Down Expand Up @@ -162,7 +161,7 @@ def _check_clusters_tasks(
if cluster_tasks_in_qgraph:
# Ensure have list of tasks in dependency order.
quantum_subgraph = label_graph.subgraph(cluster_tasks_in_qgraph)
ordered_tasks[cluster_label] = list(topological_sort(quantum_subgraph))
ordered_tasks[cluster_label] = quantum_subgraph

clustered_task_graph.add_node(cluster_label)

Expand All @@ -171,7 +170,8 @@ def _check_clusters_tasks(
if label not in used_labels:
task_to_cluster[label] = label
clustered_task_graph.add_node(label)
ordered_tasks[label] = [label]
# ordered_tasks[label] = [label]
ordered_tasks[label] = label_graph.subgraph([label])

# Create dependencies between clusters.
for edge in task_graph.edges:
Expand Down Expand Up @@ -219,14 +219,24 @@ def dimension_clustering(config: BpsConfig, qgraph: QuantumGraph, name: str) ->
for cluster_label in cluster_labels:
_LOG.debug("cluster = %s", cluster_label)
if cluster_label in cluster_section:
add_dim_clusters(
cluster_section[cluster_label],
cluster_label,
qgraph,
ordered_tasks,
cqgraph,
quantum_to_cluster,
)
if "find_dependency_method" in cluster_section[cluster_label]:
add_dim_clusters_dependency(
cluster_section[cluster_label],
cluster_label,
qgraph,
ordered_tasks,
cqgraph,
quantum_to_cluster,
)
else:
add_dim_clusters(
cluster_section[cluster_label],
cluster_label,
qgraph,
ordered_tasks,
cqgraph,
quantum_to_cluster,
)
else:
add_clusters_per_quantum(config, cluster_label, qgraph, cqgraph, quantum_to_cluster)

Expand Down Expand Up @@ -283,7 +293,7 @@ def add_dim_clusters(
cluster_config: BpsConfig,
cluster_label: str,
qgraph: QuantumGraph,
ordered_tasks: dict[str, list[str]],
ordered_tasks: dict[str, DiGraph],
cqgraph: ClusteredQuantumGraph,
quantum_to_cluster: dict[UUID, str],
) -> None:
Expand All @@ -297,8 +307,8 @@ def add_dim_clusters(
Cluster label for which to add clusters.
qgraph : `lsst.pipe.base.QuantumGraph`
QuantumGraph providing quanta for the clusters.
ordered_tasks : `dict` [`str`, `list` [`str`]]
Mapping of cluster label to ordered list of task labels.
ordered_tasks : `dict` [`str`, `networkx.DiGraph`]
Mapping of cluster label to task label subgraph.
cqgraph : `lsst.ctrl.bps.ClusteredQuantumGraph`
The ClusteredQuantumGraph to which the new 1-quantum
clusters are added (modified in method).
Expand All @@ -321,51 +331,17 @@ def add_dim_clusters(
_LOG.debug("template = %s", template)

new_clusters = []
for task_label in ordered_tasks[cluster_label]:
for task_label in topological_sort(ordered_tasks[cluster_label]):
# Determine cluster for each node
for uuid, quantum in qgraph.get_task_quanta(task_label).items():
# Gather info for cluster name template into a dictionary.
info: dict[str, Any] = {"node_number": uuid}

missing_info = set()
assert quantum.dataId is not None, "Quantum DataId cannot be None" # for mypy
data_id_info = dict(quantum.dataId.mapping)
for dim_name in cluster_dims:
_LOG.debug("dim_name = %s", dim_name)
if dim_name in data_id_info:
info[dim_name] = data_id_info[dim_name]
else:
missing_info.add(dim_name)
if equal_dims:
for pair in [pt.strip() for pt in equal_dims.split(",")]:
dim1, dim2 = pair.strip().split(":")
if dim1 in cluster_dims and dim2 in data_id_info:
info[dim1] = data_id_info[dim2]
missing_info.remove(dim1)
elif dim2 in cluster_dims and dim1 in data_id_info:
info[dim2] = data_id_info[dim1]
missing_info.remove(dim2)

info["label"] = cluster_label
_LOG.debug("info for template = %s", info)

if missing_info:
raise RuntimeError(
f"Quantum {uuid} ({data_id_info}) missing dimensions: {','.join(missing_info)}; "
f"required for cluster {cluster_label}"
)

# Use dictionary plus template format string to create name.
# To avoid # key errors from generic patterns, use defaultdict.
cluster_name = template.format_map(defaultdict(lambda: "", info))
cluster_name = re.sub("_+", "_", cluster_name)

# Some dimensions contain slash which must be replaced.
cluster_name = re.sub("/", "_", cluster_name)
_LOG.debug("cluster_name = %s", cluster_name)
task_def = qgraph.findTaskDefByLabel(task_label)
assert task_def is not None # for mypy
for node in qgraph.getNodesForTask(task_def):
cluster_name, info = get_cluster_name_from_node(
node, cluster_dims, "cluster1", template, equal_dims
)

# Save mapping for use when creating dependencies.
quantum_to_cluster[uuid] = cluster_name
quantum_to_cluster[node.nodeId] = cluster_name

# Add cluster to the ClusteredQuantumGraph.
# Saving NodeId instead of number because QuantumGraph API
Expand All @@ -375,7 +351,7 @@ def add_dim_clusters(
else:
cluster = QuantaCluster(cluster_name, cluster_label, info)
cqgraph.add_cluster(cluster)
cluster.add_quantum(uuid, task_label)
cluster.add_quantum(node.nodeId, task_label)
new_clusters.append(cluster)

for cluster in new_clusters:
Expand Down Expand Up @@ -421,3 +397,176 @@ def add_cluster_dependencies(
qnode.quantum.dataId,
)
raise


def add_dim_clusters_dependency(
cluster_config: BpsConfig,
cluster_label: str,
qgraph: QuantumGraph,
ordered_tasks: dict[str, DiGraph],
cqgraph: ClusteredQuantumGraph,
quantum_to_cluster: dict[UUID, str],
) -> None:
"""Add clusters for a cluster label to a ClusteredQuantumGraph using
QuantumGraph dependencies as well as dimension values to help when
some do not have particular dimension value.
Parameters
----------
cluster_config : `lsst.ctrl.bps.BpsConfig`
BPS configuration for specific cluster label.
cluster_label : `str`
Cluster label for which to add clusters.
qgraph : `lsst.pipe.base.QuantumGraph`
QuantumGraph providing quanta for the clusters.
ordered_tasks : `dict` [`str`, `networkx.DiGraph`]
Mapping of cluster label to task label subgraph.
cqgraph : `lsst.ctrl.bps.ClusteredQuantumGraph`
The ClusteredQuantumGraph to which the new
clusters are added (modified in method).
quantum_to_cluster : `dict` [ `str`, `str` ]
Mapping of quantum node id to which cluster it was added
(modified in method).
"""
cluster_dims = []
if "dimensions" in cluster_config:
cluster_dims = [d.strip() for d in cluster_config["dimensions"].split(",")]
_LOG.debug("cluster_dims = %s", cluster_dims)
equal_dims = cluster_config.get("equalDimensions", None)

found, template = cluster_config.search("clusterTemplate", opt={"replaceVars": False})
if not found:
if cluster_dims:
template = f"{cluster_label}_" + "_".join(f"{{{dim}}}" for dim in cluster_dims)
else:
template = cluster_label

Check warning on line 442 in python/lsst/ctrl/bps/quantum_clustering_funcs.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/quantum_clustering_funcs.py#L442

Added line #L442 was not covered by tests
_LOG.debug("template = %s", template)

method = cluster_config["find_dependency_method"]
match method:
case "source":
dim_labels = [
node for node, in_degree in ordered_tasks[cluster_label].in_degree() if in_degree == 0
]
case "sink":
dim_labels = [
node for node, out_degree in ordered_tasks[cluster_label].out_degree() if out_degree == 0
]
case _:
raise RuntimeError("Invalid find_dependency_of (method)")

new_clusters = []
for task_label in dim_labels:
task_def = qgraph.findTaskDefByLabel(task_label)
assert task_def is not None # for mypy
for node in qgraph.getNodesForTask(task_def):
cluster_name, info = get_cluster_name_from_node(
node, cluster_dims, cluster_label, template, equal_dims
)

# Add cluster to the ClusteredQuantumGraph.
# Saving NodeId instead of number because QuantumGraph API
# requires it for creating per-job QuantumGraphs.
if cluster_name in cqgraph:
cluster = cqgraph.get_cluster(cluster_name)
else:
cluster = QuantaCluster(cluster_name, cluster_label, info)
cqgraph.add_cluster(cluster)
cluster.add_quantum(node.nodeId, task_label)

# Save mapping for use when creating dependencies.
quantum_to_cluster[node.nodeId] = cluster_name

# use dependencies to find other quantum to add
match method:
case "source":
possible_nodes = descendants(qgraph.graph, node)
case "sink":
possible_nodes = ancestors(qgraph.graph, node)
for node2 in possible_nodes:
if node2.taskDef.label in ordered_tasks[cluster_label]:
cluster.add_quantum(node2.nodeId, node2.taskDef.label)
quantum_to_cluster[node2.nodeId] = cluster_name
else:
_LOG.debug(
"label (%s) not in ordered_tasks. Not adding possible quantum %s",
node2.taskDef.label,
node2.nodeId,
)

new_clusters.append(cluster)

for cluster in new_clusters:
add_cluster_dependencies(cqgraph, cluster, quantum_to_cluster)


def get_cluster_name_from_node(
node: QuantumNode,
cluster_dims: list[str],
cluster_label: str,
template: str,
equal_dims: str,
) -> tuple[str, dict[str, Any]]:
"""Get the cluster name in which to add the given node.
Parameters
----------
node : `lsst.pipe.base.QuantumNode`
QuantumNode from which to create the cluster.
cluster_dims : `list` [ `str` ]
List of dimension names to be used when clustering.
cluster_label: `str`
Cluster label.
template : `str`
Template for the cluster name.
equal_dims : `str`
Configuration describing any dimensions to be considered equal.
Returns
-------
cluster_name : `str`
Name of the cluster in which to add the given node.
info : dict [`str`, `str`]
Information needed if creating a new node.
"""
# Gather info for cluster name template into a dictionary.
info: dict[str, Any] = {"node_number": node.nodeId}

missing_info = set()
assert node.quantum.dataId is not None # for mypy
data_id_info = dict(node.quantum.dataId.mapping)
for dim_name in cluster_dims:
_LOG.debug("dim_name = %s", dim_name)
if dim_name in data_id_info:
info[dim_name] = data_id_info[dim_name]
else:
missing_info.add(dim_name)
if equal_dims:
for pair in [pt.strip() for pt in equal_dims.split(",")]:
dim1, dim2 = pair.strip().split(":")
if dim1 in cluster_dims and dim2 in data_id_info:
info[dim1] = data_id_info[dim2]
missing_info.remove(dim1)
elif dim2 in cluster_dims and dim1 in data_id_info:
info[dim2] = data_id_info[dim1]
missing_info.remove(dim2)

info["label"] = cluster_label
_LOG.debug("info for template = %s", info)

if missing_info:
raise RuntimeError(
f"Quantum {node.nodeId} ({data_id_info}) missing dimensions: {','.join(missing_info)}; "
f"required for cluster {cluster_label}"
)

# Use dictionary plus template format string to create name.
# To avoid # key errors from generic patterns, use defaultdict.
cluster_name = template.format_map(defaultdict(lambda: "", info))
cluster_name = re.sub("_+", "_", cluster_name)

# Some dimensions contain slash which must be replaced.
cluster_name = re.sub("/", "_", cluster_name)
_LOG.debug("cluster_name = %s", cluster_name)

return cluster_name, info
Loading

0 comments on commit 56e2d00

Please sign in to comment.