Skip to content

Commit

Permalink
[GraphBolt] Remove old version to in MiniBatch. (#7309)
Browse files Browse the repository at this point in the history
Co-authored-by: Ubuntu <[email protected]>
  • Loading branch information
yxy235 and Ubuntu authored Apr 16, 2024
1 parent bc3da37 commit 41a3848
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 203 deletions.
43 changes: 4 additions & 39 deletions python/dgl/graphbolt/minibatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,49 +562,14 @@ def _to(x, device):
def apply_to(x, device):
return recursive_apply(x, lambda x: _to(x, device))

if self.seed_nodes is not None and self.compacted_node_pairs is None:
# Node related tasks.
transfer_attrs = [
"labels",
"sampled_subgraphs",
"node_features",
"edge_features",
]
if self.labels is None:
# Layerwise inference
transfer_attrs.append("seed_nodes")
elif self.seed_nodes is None and self.compacted_node_pairs is not None:
# Link/edge related tasks.
transfer_attrs = [
"labels",
"compacted_node_pairs",
"compacted_negative_srcs",
"compacted_negative_dsts",
"sampled_subgraphs",
"node_features",
"edge_features",
]
elif self.seeds is not None:
# Node/link/edge related tasks.
transfer_attrs = [
"labels",
"sampled_subgraphs",
"node_features",
"edge_features",
"compacted_seeds",
"indexes",
"seeds",
]
else:
# Otherwise copy all the attributes to the device.
transfer_attrs = get_attributes(self)
transfer_attrs = get_attributes(self)

for attr in transfer_attrs:
# Only copy member variables.
try:
# For read-only attributes such as blocks and
# node_pairs_with_labels, setattr will throw an AttributeError.
# We catch these exceptions and skip those attributes.
# For read-only attributes such as blocks , setattr will throw
# an AttributeError. We catch these exceptions and skip those
# attributes.
setattr(self, attr, apply_to(getattr(self, attr), device))
except AttributeError:
continue
Expand Down
175 changes: 11 additions & 164 deletions tests/python/pytorch/graphbolt/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,132 +29,6 @@ def test_CopyTo():
assert data.seeds.device.type == "cuda"


@pytest.mark.parametrize(
"task",
[
"node_classification",
"node_inference",
"link_prediction",
"edge_classification",
"extra_attrs",
],
)
@unittest.skipIf(F._default_context_str == "cpu", "CopyTo needs GPU to test")
def test_CopyToWithMiniBatches_original(task):
N = 16
B = 2
if task == "node_classification" or task == "extra_attrs":
itemset = gb.ItemSet(
(torch.arange(N), torch.arange(N)), names=("seed_nodes", "labels")
)
elif task == "node_inference":
itemset = gb.ItemSet(torch.arange(N), names="seed_nodes")
elif task == "link_prediction":
itemset = gb.ItemSet(
(
torch.arange(2 * N).reshape(-1, 2),
torch.arange(3 * N).reshape(-1, 3),
),
names=("node_pairs", "negative_dsts"),
)
elif task == "edge_classification":
itemset = gb.ItemSet(
(torch.arange(2 * N).reshape(-1, 2), torch.arange(N)),
names=("node_pairs", "labels"),
)
graph = gb_test_utils.rand_csc_graph(100, 0.15, bidirection_edge=True)

features = {}
keys = [("node", None, "a"), ("node", None, "b")]
features[keys[0]] = gb.TorchBasedFeature(torch.randn(200, 4))
features[keys[1]] = gb.TorchBasedFeature(torch.randn(200, 4))
feature_store = gb.BasicFeatureStore(features)

datapipe = gb.ItemSampler(itemset, batch_size=B)
datapipe = gb.NeighborSampler(
datapipe,
graph,
fanouts=[torch.LongTensor([2]) for _ in range(2)],
)
if task != "node_inference":
datapipe = gb.FeatureFetcher(
datapipe,
feature_store,
["a"],
)

if task == "node_classification":
copied_attrs = [
"node_features",
"edge_features",
"sampled_subgraphs",
"labels",
"blocks",
"seeds",
]
elif task == "node_inference":
copied_attrs = [
"seeds",
"sampled_subgraphs",
"blocks",
"labels",
]
elif task == "link_prediction" or task == "edge_classification":
copied_attrs = [
"labels",
"compacted_seeds",
"sampled_subgraphs",
"indexes",
"node_features",
"edge_features",
"blocks",
"seeds",
]
elif task == "extra_attrs":
copied_attrs = [
"node_features",
"edge_features",
"sampled_subgraphs",
"labels",
"blocks",
"seed_nodes",
"seeds",
]

def test_data_device(datapipe):
for data in datapipe:
for attr in dir(data):
var = getattr(data, attr)
if isinstance(var, Mapping):
var = var[next(iter(var))]
elif isinstance(var, Iterable):
var = next(iter(var))
if (
not callable(var)
and not attr.startswith("__")
and hasattr(var, "device")
and var is not None
):
if task == "other":
assert var.device.type == "cuda"
else:
if attr in copied_attrs:
assert var.device.type == "cuda"
else:
assert var.device.type == "cpu"

if task == "extra_attrs":
extra_attrs = ["seed_nodes"]
else:
extra_attrs = None

# Invoke CopyTo via class constructor.
test_data_device(gb.CopyTo(datapipe, "cuda", extra_attrs))

# Invoke CopyTo via functional form.
test_data_device(datapipe.copy_to("cuda", extra_attrs))


@pytest.mark.parametrize(
"task",
[
Expand Down Expand Up @@ -209,47 +83,20 @@ def test_CopyToWithMiniBatches(task):
["a"],
)

if task == "node_classification":
copied_attrs = [
"node_features",
"edge_features",
"sampled_subgraphs",
"labels",
"blocks",
"seeds",
]
elif task == "node_inference":
copied_attrs = [
"seeds",
"sampled_subgraphs",
"blocks",
"labels",
]
elif task == "link_prediction" or task == "edge_classification":
copied_attrs = [
"labels",
"compacted_seeds",
"sampled_subgraphs",
"indexes",
"node_features",
"edge_features",
"blocks",
"seeds",
]
elif task == "extra_attrs":
copied_attrs = [
"node_features",
"edge_features",
"sampled_subgraphs",
"labels",
"blocks",
"seed_nodes",
"seeds",
]
copied_attrs = [
"labels",
"compacted_seeds",
"sampled_subgraphs",
"indexes",
"node_features",
"edge_features",
"blocks",
"seeds",
"input_nodes",
]

def test_data_device(datapipe):
for data in datapipe:
print(data)
for attr in dir(data):
var = getattr(data, attr)
if isinstance(var, Mapping):
Expand Down

0 comments on commit 41a3848

Please sign in to comment.