Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed support for v6d GraphScope #116

Merged
merged 24 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ detect_cuda_*
.idea/
cmake-build-release/
cmake-build-debug/
/graphlearn_torch/python/*.so
2 changes: 1 addition & 1 deletion examples/igbh/dist_train_rgnn.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def evaluate(model, dataloader, current_device, use_fp16, with_gpu,
key=mllog_constants.EVAL_STOP,
metadata={mllog_constants.EPOCH_NUM: epoch_num},
)
return acc.item(), global_acc
return acc, global_acc

def run_training_proc(local_proc_rank, num_nodes, node_rank, num_training_procs,
split_training_sampling, hidden_channels, num_classes, num_layers,
Expand Down
2 changes: 1 addition & 1 deletion examples/igbh/train_rgnn_multi_gpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def evaluate(model, dataloader, current_device, rank, world_size, epoch_num):
key=mllog_constants.EVAL_STOP,
metadata={mllog_constants.EPOCH_NUM: epoch_num},
)
return acc.item(), global_acc
return acc, global_acc

def run_training_proc(rank, world_size,
hidden_channels, num_classes, num_layers, model_type, num_heads, fan_out,
Expand Down
49 changes: 36 additions & 13 deletions graphlearn_torch/python/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import logging
from multiprocessing.reduction import ForkingPickler
from typing import Dict, List, Optional, Union, Literal, Tuple
from enum import Enum
from collections.abc import Sequence

import torch

Expand Down Expand Up @@ -165,40 +165,48 @@ def load_vineyard(
# TODO(hongyi): GPU support
is_homo = len(edges) == 1 and edges[0][0] == edges[0][2]
from .vineyard_utils import \
vineyard_to_csr, load_vertex_feature_from_vineyard, load_edge_feature_from_vineyard
vineyard_to_csr, load_vertex_feature_from_vineyard, \
load_edge_feature_from_vineyard, VineyardGid2Lid

_edge_index = {}
_edge_ids = {}
_edge_weights = {}
layout = {}
for etype in edges:
src_ntype = etype[0] if self.edge_dir == "out" else etype[2]
indptr, indices, edge_id = vineyard_to_csr(vineyard_socket, vineyard_id, src_ntype, etype[1], self.edge_dir, True)
_edge_index[etype] = (indptr, indices) if self.edge_dir == "out" else (indices, indptr)
indptr, indices, edge_id = vineyard_to_csr(vineyard_socket, \
vineyard_id, src_ntype, etype[1], self.edge_dir, True)
_edge_index[etype] = (indptr, indices) if self.edge_dir == \
"out" else (indices, indptr)
_edge_ids[etype] = edge_id
layout[etype] = "CSR" if self.edge_dir == "out" else "CSC"
if edge_weights:
etype_edge_weights_label_name = edge_weights.get(etype)
if etype_edge_weights_label_name:
_edge_weights[etype] = torch.squeeze(
load_edge_feature_from_vineyard(vineyard_socket, vineyard_id, [etype_edge_weights_label_name], etype[1]))
load_edge_feature_from_vineyard(vineyard_socket, vineyard_id, \
[etype_edge_weights_label_name], etype[1]))
if is_homo:
ntype = edges[0]
_edge_index = _edge_index[ntype]
_edge_ids = _edge_ids[ntype]
_edge_weights = _edge_weights.get(ntype)
layout = "CSR" if self.edge_dir == "out" else "CSC"
self.init_graph(edge_index=_edge_index, edge_ids=_edge_ids, layout=layout, graph_mode='CPU', edge_weights=_edge_weights)
self.init_graph(edge_index=_edge_index, edge_ids=_edge_ids, \
layout=layout, graph_mode='CPU', edge_weights=_edge_weights)

# load node features
if node_features:
node_feature_data = {}
id2idx = {}
for ntype, property_names in node_features.items():
node_feature_data[ntype] = \
load_vertex_feature_from_vineyard(vineyard_socket, vineyard_id, property_names, ntype)
id2idx[ntype] = VineyardGid2Lid(vineyard_socket, vineyard_id, ntype)
if is_homo:
node_feature_data = node_feature_data[edges[0][0]]
self.init_node_features(node_feature_data=node_feature_data, with_gpu=False)
id2idx = VineyardGid2Lid(vineyard_socket, vineyard_id, edges[0][0])
self.init_node_features(node_feature_data=node_feature_data, id2idx=id2idx, with_gpu=False)

# load edge features
if edge_features:
Expand All @@ -215,18 +223,21 @@ def load_vineyard(
# load node labels
if node_labels:
node_label_data = {}
id2idx = {}
for ntype, label_property_name in node_labels.items():
node_label_data[ntype] = \
load_vertex_feature_from_vineyard(vineyard_socket, vineyard_id, [label_property_name], ntype)

id2idx[ntype] = VineyardGid2Lid(vineyard_socket, vineyard_id, ntype)
if is_homo:
node_label_data = node_label_data[edges[0][0]]
self.init_node_labels(node_label_data=node_label_data)
id2idx = VineyardGid2Lid(vineyard_socket, vineyard_id, edges[0][0])
self.init_node_labels(node_label_data=node_label_data, id2idx=id2idx)

def init_node_features(
self,
node_feature_data: Union[TensorDataType, Dict[NodeType, TensorDataType]] = None,
id2idx: Union[TensorDataType, Dict[NodeType, TensorDataType]] = None,
id2idx: Union[TensorDataType, Dict[NodeType, TensorDataType],
Sequence, Dict[NodeType, Sequence]] = None,
sort_func = None,
split_ratio: Union[float, Dict[NodeType, float]] = 0.0,
device_group_list: Optional[List[DeviceGroup]] = None,
Expand Down Expand Up @@ -331,17 +342,29 @@ def init_edge_features(

def init_node_labels(
self,
node_label_data: Union[TensorDataType, Dict[NodeType, TensorDataType]] = None
node_label_data: Union[TensorDataType, Dict[NodeType, TensorDataType]] = None,
id2idx: Union[TensorDataType, Dict[NodeType, TensorDataType], \
Sequence, Dict[NodeType, Sequence]] = None
):
r""" Initialize the node label storage.

Args:
node_label_data (torch.Tensor or numpy.ndarray): A tensor of the raw
node label data, should be a dict for heterogenous graph nodes.
(default: ``None``)
id2idx (torch.Tensor or numpy.ndarray): A tensor that maps global node id
to local index, and should be None for GLT(none-v6d) graph. (default: ``None``)
"""
if node_label_data is not None:
self.node_labels = squeeze(convert_to_tensor(node_label_data))
# For v6d graph, label data are partitioned into different fragments, and are
# handled in the same approach as distributed feature.
if id2idx is not None:
node_label_data = convert_to_tensor(node_label_data, dtype=torch.int64)
id2idx = convert_to_tensor(id2idx)
self.node_labels = _build_features(node_label_data, id2idx, 0.0, \
None, None, False, None)
else:
self.node_labels = squeeze(convert_to_tensor(node_label_data))

def init_node_split(
self,
Expand Down Expand Up @@ -413,7 +436,7 @@ def get_edge_feature(self, etype: Optional[EdgeType] = None):
return None

def get_node_label(self, ntype: Optional[NodeType] = None):
if isinstance(self.node_labels, torch.Tensor):
if isinstance(self.node_labels, Feature) or isinstance(self.node_labels, torch.Tensor):
return self.node_labels
if isinstance(self.node_labels, dict):
assert ntype is not None
Expand Down
7 changes: 4 additions & 3 deletions graphlearn_torch/python/data/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@

import threading
from multiprocessing.reduction import ForkingPickler
from typing import List, Optional
from typing import List, Optional, Union
from collections.abc import Sequence

import torch

Expand Down Expand Up @@ -100,7 +101,7 @@ class Feature(object):
"""
def __init__(self,
feature_tensor: TensorDataType,
id2index: Optional[torch.Tensor] = None,
id2index: Optional[Union[torch.Tensor, Sequence]] = None,
split_ratio: float = 0.0,
device_group_list: Optional[List[DeviceGroup]] = None,
device: Optional[int] = None,
Expand Down Expand Up @@ -210,7 +211,7 @@ def share_ipc(self):
if self._ipc_handle is not None:
return self._ipc_handle

if self.id2index is not None:
if self.id2index is not None and isinstance(self.id2index, torch.Tensor):
self.id2index = self.id2index.cpu()
self.id2index.share_memory_()

Expand Down
98 changes: 97 additions & 1 deletion graphlearn_torch/python/data/vineyard_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,19 @@
# ==============================================================================

try:
from .. import py_graphlearn_torch_vineyard as pywrap
import torch
from typing import Dict
from collections.abc import Sequence

from .. import py_graphlearn_torch_vineyard as pywrap


except ImportError:
pass

from ..partition import PartitionBook


def vineyard_to_csr(sock, fid, v_label_name, e_label_name, edge_dir, haseid=0):
'''
Wrap to_csr function to read graph from vineyard
Expand All @@ -42,3 +51,90 @@ def load_edge_feature_from_vineyard(sock, fid, ecols, e_label_name):
return edge_feature(torch.Tensor)
'''
return pywrap.load_edge_feature_from_vineyard(sock, fid, e_label_name, ecols)


def get_fid_from_gid(gid):
'''
Wrap get_fid_from_gid function to get fid from gid
'''
return pywrap.get_fid_from_gid(gid)


def get_frag_vertex_offset(sock, fid, v_label_name):
'''
Wrap GetFragVertexOffset function to get vertex offset of a fragment.
'''
return pywrap.get_frag_vertex_offset(sock, fid, v_label_name)


def get_frag_vertex_num(sock, fid, v_label_name):
'''
Wrap GetFragVertexNum function to get vertex number of a fragment.
'''
return pywrap.get_frag_vertex_num(sock, fid, v_label_name)


class VineyardPartitionBook(PartitionBook):
def __init__(self, sock, obj_id, v_label_name, fid2pid: Dict=None):
self._sock = sock
self._obj_id = obj_id
self._v_label_name = v_label_name
self._frag = None
self._offset = get_frag_vertex_offset(sock, obj_id, v_label_name)
# TODO: optimise this query process if too slow
self._fid2pid = fid2pid

def __getitem__(self, gids) -> torch.Tensor:
fids = self.gid2fid(gids)
if self._fid2pid is not None:
pids = torch.tensor([self._fid2pid[fid] for fid in fids])
return pids.to(torch.int32)
return fids.to(torch.int32)

@property
def device(self):
return torch.device('cpu')

@property
def offset(self):
return self._offset

def gid2fid(self, gids):
'''
Parse gid to get fid
'''
if self._frag is None:
self._frag = pywrap.VineyardFragHandle(self._sock, self._obj_id)

fids = self._frag.get_fid_from_gid(gids.tolist())

return fids


class VineyardGid2Lid(Sequence):
def __init__(self, sock, fid, v_label_name):
self._offset = get_frag_vertex_offset(sock, fid, v_label_name)
self._vnum = get_frag_vertex_num(sock, fid, v_label_name)

def __getitem__(self, gids):
return gids - self._offset

def __len__(self):
return self._vnum

def v6d_id_select(srcs, p_mask, node_pb: PartitionBook):
'''
Select the inner vertices in `srcs` that belong to a specific partition,
and return their local offsets in the partition.
'''
gids = torch.masked_select(srcs, p_mask)
offsets = gids - node_pb.offset
return offsets

def v6d_id_filter(node_pb: VineyardPartitionBook, partition_idx):
'''
Select the inner vertices that belong to a specific partition
'''
frag = pywrap.VineyardFragHandle(node_pb._sock, node_pb._obj_id)
inner_vertices = frag.get_inner_vertices(node_pb._v_label_name)
return inner_vertices
Loading
Loading