diff --git a/python/dgl/graphbolt/minibatch.py b/python/dgl/graphbolt/minibatch.py index acaa95b50163..71ca7d07c34a 100644 --- a/python/dgl/graphbolt/minibatch.py +++ b/python/dgl/graphbolt/minibatch.py @@ -562,49 +562,14 @@ def _to(x, device): def apply_to(x, device): return recursive_apply(x, lambda x: _to(x, device)) - if self.seed_nodes is not None and self.compacted_node_pairs is None: - # Node related tasks. - transfer_attrs = [ - "labels", - "sampled_subgraphs", - "node_features", - "edge_features", - ] - if self.labels is None: - # Layerwise inference - transfer_attrs.append("seed_nodes") - elif self.seed_nodes is None and self.compacted_node_pairs is not None: - # Link/edge related tasks. - transfer_attrs = [ - "labels", - "compacted_node_pairs", - "compacted_negative_srcs", - "compacted_negative_dsts", - "sampled_subgraphs", - "node_features", - "edge_features", - ] - elif self.seeds is not None: - # Node/link/edge related tasks. - transfer_attrs = [ - "labels", - "sampled_subgraphs", - "node_features", - "edge_features", - "compacted_seeds", - "indexes", - "seeds", - ] - else: - # Otherwise copy all the attributes to the device. - transfer_attrs = get_attributes(self) + transfer_attrs = get_attributes(self) for attr in transfer_attrs: # Only copy member variables. try: - # For read-only attributes such as blocks and - # node_pairs_with_labels, setattr will throw an AttributeError. - # We catch these exceptions and skip those attributes. + # For read-only attributes such as blocks , setattr will throw + # an AttributeError. We catch these exceptions and skip those + # attributes. setattr(self, attr, apply_to(getattr(self, attr), device)) except AttributeError: continue diff --git a/tests/python/pytorch/graphbolt/test_base.py b/tests/python/pytorch/graphbolt/test_base.py index 6f3010b4fe40..7cd50b8eb8b7 100644 --- a/tests/python/pytorch/graphbolt/test_base.py +++ b/tests/python/pytorch/graphbolt/test_base.py @@ -29,132 +29,6 @@ def test_CopyTo(): assert data.seeds.device.type == "cuda" -@pytest.mark.parametrize( - "task", - [ - "node_classification", - "node_inference", - "link_prediction", - "edge_classification", - "extra_attrs", - ], -) -@unittest.skipIf(F._default_context_str == "cpu", "CopyTo needs GPU to test") -def test_CopyToWithMiniBatches_original(task): - N = 16 - B = 2 - if task == "node_classification" or task == "extra_attrs": - itemset = gb.ItemSet( - (torch.arange(N), torch.arange(N)), names=("seed_nodes", "labels") - ) - elif task == "node_inference": - itemset = gb.ItemSet(torch.arange(N), names="seed_nodes") - elif task == "link_prediction": - itemset = gb.ItemSet( - ( - torch.arange(2 * N).reshape(-1, 2), - torch.arange(3 * N).reshape(-1, 3), - ), - names=("node_pairs", "negative_dsts"), - ) - elif task == "edge_classification": - itemset = gb.ItemSet( - (torch.arange(2 * N).reshape(-1, 2), torch.arange(N)), - names=("node_pairs", "labels"), - ) - graph = gb_test_utils.rand_csc_graph(100, 0.15, bidirection_edge=True) - - features = {} - keys = [("node", None, "a"), ("node", None, "b")] - features[keys[0]] = gb.TorchBasedFeature(torch.randn(200, 4)) - features[keys[1]] = gb.TorchBasedFeature(torch.randn(200, 4)) - feature_store = gb.BasicFeatureStore(features) - - datapipe = gb.ItemSampler(itemset, batch_size=B) - datapipe = gb.NeighborSampler( - datapipe, - graph, - fanouts=[torch.LongTensor([2]) for _ in range(2)], - ) - if task != "node_inference": - datapipe = gb.FeatureFetcher( - datapipe, - feature_store, - ["a"], - ) - - if task == "node_classification": - copied_attrs = [ - "node_features", - "edge_features", - "sampled_subgraphs", - "labels", - "blocks", - "seeds", - ] - elif task == "node_inference": - copied_attrs = [ - "seeds", - "sampled_subgraphs", - "blocks", - "labels", - ] - elif task == "link_prediction" or task == "edge_classification": - copied_attrs = [ - "labels", - "compacted_seeds", - "sampled_subgraphs", - "indexes", - "node_features", - "edge_features", - "blocks", - "seeds", - ] - elif task == "extra_attrs": - copied_attrs = [ - "node_features", - "edge_features", - "sampled_subgraphs", - "labels", - "blocks", - "seed_nodes", - "seeds", - ] - - def test_data_device(datapipe): - for data in datapipe: - for attr in dir(data): - var = getattr(data, attr) - if isinstance(var, Mapping): - var = var[next(iter(var))] - elif isinstance(var, Iterable): - var = next(iter(var)) - if ( - not callable(var) - and not attr.startswith("__") - and hasattr(var, "device") - and var is not None - ): - if task == "other": - assert var.device.type == "cuda" - else: - if attr in copied_attrs: - assert var.device.type == "cuda" - else: - assert var.device.type == "cpu" - - if task == "extra_attrs": - extra_attrs = ["seed_nodes"] - else: - extra_attrs = None - - # Invoke CopyTo via class constructor. - test_data_device(gb.CopyTo(datapipe, "cuda", extra_attrs)) - - # Invoke CopyTo via functional form. - test_data_device(datapipe.copy_to("cuda", extra_attrs)) - - @pytest.mark.parametrize( "task", [ @@ -209,47 +83,20 @@ def test_CopyToWithMiniBatches(task): ["a"], ) - if task == "node_classification": - copied_attrs = [ - "node_features", - "edge_features", - "sampled_subgraphs", - "labels", - "blocks", - "seeds", - ] - elif task == "node_inference": - copied_attrs = [ - "seeds", - "sampled_subgraphs", - "blocks", - "labels", - ] - elif task == "link_prediction" or task == "edge_classification": - copied_attrs = [ - "labels", - "compacted_seeds", - "sampled_subgraphs", - "indexes", - "node_features", - "edge_features", - "blocks", - "seeds", - ] - elif task == "extra_attrs": - copied_attrs = [ - "node_features", - "edge_features", - "sampled_subgraphs", - "labels", - "blocks", - "seed_nodes", - "seeds", - ] + copied_attrs = [ + "labels", + "compacted_seeds", + "sampled_subgraphs", + "indexes", + "node_features", + "edge_features", + "blocks", + "seeds", + "input_nodes", + ] def test_data_device(datapipe): for data in datapipe: - print(data) for attr in dir(data): var = getattr(data, attr) if isinstance(var, Mapping):