Skip to content

Commit

Permalink
LiveGraph driver
Browse files Browse the repository at this point in the history
  • Loading branch information
Dean De Leo committed Oct 13, 2020
1 parent caeebca commit 50c78d5
Show file tree
Hide file tree
Showing 14 changed files with 2,058 additions and 38 deletions.
4 changes: 4 additions & 0 deletions Makefile.in
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ endif
ifneq (,$(findstring -DHAVE_TESEO,${ALL_CPPFLAGS}))
sources += $(addprefix library/teseo/, teseo_driver.cpp teseo_lcc.cpp teseo_real_vtx.cpp)
endif
ifneq (,$(findstring -DHAVE_LIVEGRAPH,${ALL_CPPFLAGS}))
sources += $(addprefix library/livegraph/, livegraph_driver.cpp)
endif


#############################################################################
# Helper variables
Expand Down
35 changes: 35 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,41 @@ if ( test x"${with_graphone}" != x"" && test x"${with_graphone}" != "no" ); then
AS_VAR_APPEND([CPPFLAGS], [" -DHAVE_GRAPHONE"])
fi


#############################################################################
# Support for the GraphOne framework
# Usage:
# --with-livegraph=/path/to/livegraph/lib
#
AC_ARG_WITH([livegraph], [AS_HELP_STRING([--with-livegraph@<:@=ARG@:>@], [
Link to the LiveGraph library. The argument needs to be the path to the dynamic library.
])], [])

if ( test x"${with_livegraph}" != x"" && test x"${with_livegraph}" != "no" ); then
dnl check whether we can compile & link a program that makes usage of omp.h
if ( test "${have_openmp}" == "no" ); then
AC_MSG_FAILURE([LiveGraph: missing prerequisite header omp.h (OpenMP runtime library)])
fi

AC_MSG_NOTICE([LiveGraph: checking whether Intel TBB is present in the machine...])
MY_INTEL_TBB()

if( test x"${with_livegraph}" == x"yes" ); then
AC_MSG_ERROR(["the full path to the LiveGraph library needs to be set, e.g. --with-livegraph=/path/to/livegraph/lib"]);
fi

path_livegraph_library=$(realpath "${with_livegraph}")
AC_CHECK_FILE([${path_livegraph_library}],
[ LIBS="-L${path_livegraph_library} -Wl,-rpath=${path_livegraph_library} ${LIBS}" ],
[ AC_MSG_ERROR(["the path does not exist: ${path_livegraph_library}"]) ]
)

AC_SEARCH_LIBS([_ZN2lg5Graph17begin_transactionEv], [livegraph], [], [ AC_MSG_ERROR([cannot link the library livegraph]) ])

AC_MSG_NOTICE([LiveGraph support enabled])
AS_VAR_APPEND([CPPFLAGS], [" -DHAVE_LIVEGRAPH"])
fi

#############################################################################
# Remove extra blanks from our variables
EXTRA_CPPFLAGS=$(echo ${EXTRA_CPPFLAGS} | xargs)
Expand Down
128 changes: 111 additions & 17 deletions library/baseline/csr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ void CSR::set_timeout(uint64_t seconds) {
m_timeout = seconds;
}

uint64_t* CSR::out_v() const { return m_out_v; }
uint64_t* CSR::out_e() const { return m_out_e; }
double* CSR::out_w() const { return m_out_w; }
uint64_t* CSR::in_v() const { return m_in_v; }
uint64_t* CSR::in_e() const { return m_in_e; }
double* CSR::in_w() const { return m_in_w; }

/*****************************************************************************
* *
* Load *
Expand All @@ -174,15 +181,21 @@ void CSR::set_timeout(uint64_t seconds) {
void CSR::load(const std::string& path){
if(m_out_v != nullptr) ERROR("Already initialised & loaded");

::gfe::graph::WeightedEdgeStream stream { path };
load(stream);
}

void CSR::load(gfe::graph::WeightedEdgeStream& stream){
if(m_out_v != nullptr) ERROR("Already initialised & loaded");

if(m_is_directed){
load_directed(path);
load_directed(stream);
} else {
load_undirected(path);
load_undirected(stream);
}
}

void CSR::load_directed(const std::string& path){
::gfe::graph::WeightedEdgeStream stream { path };
void CSR::load_directed(gfe::graph::WeightedEdgeStream& stream){
m_num_edges = stream.num_edges();

{ // init the mapping of vertices
Expand Down Expand Up @@ -263,9 +276,9 @@ void CSR::load_directed(const std::string& path){
}
}

void CSR::load_undirected(const std::string& path){
void CSR::load_undirected(gfe::graph::WeightedEdgeStream& stream){
// We rely to load_directed to build a directed graph first
load_directed(path);
load_directed(stream);

// init the final vectors
unique_ptr<uint64_t[]> ptr_out_v_undirected { new uint64_t[m_num_vertices]() };
Expand Down Expand Up @@ -426,22 +439,35 @@ them in parent array as negative numbers. Thus the encoding of parent is:
November 2012.
*/

//#define DEBUG_BFS
#if defined(DEBUG_BFS)
#define COUT_DEBUG_BFS(msg) COUT_DEBUG(msg)
#else
#define COUT_DEBUG_BFS(msg)
#endif


int64_t CSR::do_bfs_BUStep(int64_t* distances, int64_t distance, gapbs::Bitmap &front, gapbs::Bitmap &next) const {
int64_t awake_count = 0;
next.reset();
uint64_t* __restrict in_e = m_in_e;

#pragma omp parallel for schedule(dynamic, 1024) reduction(+ : awake_count)
for (uint64_t u = 0; u < m_num_vertices; u++) {
COUT_DEBUG_BFS("explore " << u << " [external vertex = " << m_log2ext[u] << "], distance: " << distances[u]);

if (distances[u] < 0){ // the node has not been visited yet
auto in_interval = get_in_interval(u);
uint64_t degree = in_interval.second - in_interval.first;
if(degree == 0) continue;

for(uint64_t i = in_interval.first; i < in_interval.second; i++){
uint64_t dst = in_e[i];
COUT_DEBUG_BFS("\tincoming edge: " << dst << " [external vertex = " << m_log2ext[dst] << "]");

if(front.get_bit(dst)) {
COUT_DEBUG_BFS("\t-> distance updated to " << distance << " via vertex #" << dst << " [external vertex = " << m_log2ext[dst] << "]");
distances[u] = distance; // on each BUStep, all nodes will have the same distance
awake_count++;
next.set_bit(u);
Expand All @@ -465,15 +491,18 @@ int64_t CSR::do_bfs_TDStep(int64_t* distances, int64_t distance, gapbs::SlidingQ
#pragma omp for schedule(dynamic, 64)
for (auto q_iter = queue.begin(); q_iter < queue.end(); q_iter++) {
int64_t u = *q_iter;
COUT_DEBUG_BFS("explore: " << u << " [external vertex = " << m_log2ext[u] << "]");
auto out_interval = get_out_interval(u);
uint64_t degree = out_interval.second - out_interval.first;
if(degree == 0) continue;

for(uint64_t i = out_interval.first; i < out_interval.second; i++){
uint64_t dst = out_e[i];
COUT_DEBUG_BFS("\toutgoing edge: " << dst << " [external vertex = " << m_log2ext[dst] << "]");

int64_t curr_val = distances[dst];
if (curr_val < 0 && gapbs::compare_and_swap(distances[dst], curr_val, distance)) {
COUT_DEBUG_BFS("\t-> distance updated to " << distance << " via vertex #" << dst << " [external vertex = " << m_log2ext[dst] << "]");
lqueue.push_back(dst);
scout_count += -curr_val;
}
Expand Down Expand Up @@ -565,6 +594,7 @@ void CSR::bfs(uint64_t external_source_id, const char* dump2file) {
utility::TimeoutService timeout { m_timeout };
Timer timer; timer.start();
uint64_t root = m_ext2log.at(external_source_id);
COUT_DEBUG_BFS("root: " << root << " [external vertex: " << external_source_id << "]");

// Run the BFS algorithm
unique_ptr<int64_t[]> ptr_result = do_bfs(root, timeout);
Expand Down Expand Up @@ -1010,8 +1040,18 @@ void CSR::cdlp(uint64_t max_iterations, const char* dump2file) {
#define COUT_DEBUG_LCC(msg)
#endif

// loosely based on the impl~ made for GraphOne
unique_ptr<double[]> CSR::do_lcc(utility::TimeoutService& timer) const {
if(m_is_directed){
return do_lcc_directed(timer);
} else {
return do_lcc_undirected(timer);
}
}

// loosely based on the impl~ made for GraphOne
unique_ptr<double[]> CSR::do_lcc_directed(utility::TimeoutService& timer) const {
assert(m_is_directed && "Implementation for directed graphs");

unique_ptr<double[]> ptr_lcc { new double[m_num_vertices] };
double* lcc = ptr_lcc.get();
uint64_t* __restrict out_e = m_out_e;
Expand All @@ -1030,7 +1070,7 @@ unique_ptr<double[]> CSR::do_lcc(utility::TimeoutService& timer) const {

// Cfr. Spec v.0.9.0 pp. 15: "If the number of neighbors of a vertex is less than two, its coefficient is defined as zero"
uint64_t v_degree_out = get_out_degree(v);
uint64_t v_degree_in = m_is_directed ? get_in_degree(v) : 0;
uint64_t v_degree_in = get_in_degree(v);
uint64_t v_degree_ub = v_degree_in + v_degree_out; // upper bound for directed graphs, exact degree for those undirected
if(v_degree_ub < 2) continue;

Expand All @@ -1048,20 +1088,18 @@ unique_ptr<double[]> CSR::do_lcc(utility::TimeoutService& timer) const {
}

// Incoming edges (only directed graphs)
if(m_is_directed){
auto in_interval = get_in_interval(v);
for(uint64_t i = in_interval.first; i < in_interval.second; i++){
uint64_t u = in_e[i];
auto result = neighbours.insert(u);
if(result.second){ // the element was actually inserted
edges.push_back(u);
}
auto in_interval = get_in_interval(v);
for(uint64_t i = in_interval.first; i < in_interval.second; i++){
uint64_t u = in_e[i];
auto result = neighbours.insert(u);
if(result.second){ // the element was actually inserted
edges.push_back(u);
}
}
const uint64_t v_degree = edges.size();

// Now we know is the actual degree of v, perform the proper check for directed graphs
if(m_is_directed && v_degree < 2) continue;
if(v_degree < 2) continue;

// again, visit all neighbours of v
// for directed graphs, edges1 contains the intersection of both the incoming and the outgoing edges
Expand Down Expand Up @@ -1094,6 +1132,62 @@ unique_ptr<double[]> CSR::do_lcc(utility::TimeoutService& timer) const {
return ptr_lcc;
}

unique_ptr<double[]> CSR::do_lcc_undirected(utility::TimeoutService& timer) const {
assert(!m_is_directed && "Implementation for undirected graphs");

unique_ptr<double[]> ptr_lcc { new double[m_num_vertices] };
double* lcc = ptr_lcc.get();
uint64_t* __restrict out_e = m_out_e;

#pragma omp parallel for schedule(dynamic, 64)
for(uint64_t v = 0; v < m_num_vertices; v++){
COUT_DEBUG_LCC("> Node " << v);
if(timer.is_timeout()) continue; // exhausted the budget of available time
lcc[v] = 0.0;
uint64_t num_triangles = 0; // number of triangles found so far for the node v

// Cfr. Spec v.0.9.0 pp. 15: "If the number of neighbors of a vertex is less than two, its coefficient is defined as zero"
uint64_t v_degree_out = get_out_degree(v);
if(v_degree_out < 2) continue;

// Build the list of neighbours of v
unordered_set<uint64_t> neighbours;

// Outgoing edges
auto out_interval = get_out_interval(v);
for(uint64_t i = out_interval.first; i < out_interval.second; i++){
uint64_t u = out_e[i];
neighbours.insert(u);
}

// again, visit all neighbours of v
for(uint64_t i = out_interval.first; i < out_interval.second; i++){
uint64_t u = out_e[i];
COUT_DEBUG_LCC("[" << (i - out_interval.first) << "/" << v_degree_out << "] neighbour: " << u);
assert(neighbours.count(u) == 1 && "The set `neighbours' should contain all neighbours of v");

// For the Graphalytics spec v 0.9.0, only consider the outgoing edges for the neighbours u
auto u_out_interval = get_out_interval(u);

for(uint64_t j = u_out_interval.first; j < u_out_interval.second; j++){
uint64_t w = out_e[j];
COUT_DEBUG_LCC("---> [" << j << "/" << /* degree */ (u_out_interval.second - u_out_interval.first) << "] neighbour: " << w);
// check whether it's also a neighbour of v
if(neighbours.count(w) == 1){
COUT_DEBUG_LCC("Triangle found " << v << " - " << u << " - " << w);
num_triangles++;
}
}
}

// register the final score
uint64_t max_num_edges = v_degree_out * (v_degree_out -1);
lcc[v] = static_cast<double>(num_triangles) / max_num_edges;
COUT_DEBUG_LCC("Score computed: " << (num_triangles) << "/" << max_num_edges << " = " << lcc[v]);
}
return ptr_lcc;
}

void CSR::lcc(const char* dump2file) {
utility::TimeoutService timeout { m_timeout };
Timer timer; timer.start();
Expand Down
19 changes: 16 additions & 3 deletions library/baseline/csr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
#include "common/error.hpp"
#include "library/interface.hpp"


// Forward declarations
namespace gapbs { class Bitmap; }
namespace gapbs { template <typename T> class SlidingQueue; }
namespace gapbs { template <typename T> class pvector; }
namespace gfe::graph { class WeightedEdgeStream; }
namespace gfe::utility { class TimeoutService; }
void _bm_run_csr(); // bm experiment

Expand Down Expand Up @@ -72,8 +72,8 @@ class CSR : public virtual LoaderInterface, public virtual RandomVertexInterface

private:
// Load an undirected graph
void load_undirected(const std::string& path);
void load_directed(const std::string& path);
void load_undirected(gfe::graph::WeightedEdgeStream& stream);
void load_directed(gfe::graph::WeightedEdgeStream& stream);

// BFS implementation
std::unique_ptr<int64_t[]> do_bfs(uint64_t root, utility::TimeoutService& timer, int alpha = 15, int beta = 18) const;
Expand All @@ -94,6 +94,8 @@ class CSR : public virtual LoaderInterface, public virtual RandomVertexInterface

// LCC implementation
std::unique_ptr<double[]> do_lcc(utility::TimeoutService& timer) const;
std::unique_ptr<double[]> do_lcc_directed(utility::TimeoutService& timer) const;
std::unique_ptr<double[]> do_lcc_undirected(utility::TimeoutService& timer) const;

// SSSP implementation
gapbs::pvector<double> do_sssp(uint64_t source, double delta, utility::TimeoutService& timer) const;
Expand Down Expand Up @@ -139,6 +141,7 @@ class CSR : public virtual LoaderInterface, public virtual RandomVertexInterface
* Load the whole graph representation from the given path
*/
void load(const std::string& path);
void load(gfe::graph::WeightedEdgeStream& stream); // it modifies the stream

/**
* Set the timeout for the Graphalytics kernels
Expand All @@ -150,6 +153,16 @@ class CSR : public virtual LoaderInterface, public virtual RandomVertexInterface
*/
uint64_t get_random_vertex_id() const;

/**
* Retrieve the internal pointers to the CSR arrays. For Debug & Testing only
*/
uint64_t* out_v() const; // outgoing edges, vertex array of size |V|
uint64_t* out_e() const; // outgoing edges, edge array of size |E|
double* out_w() const; // outgoing edges, weight array of size |E|
uint64_t* in_v() const; // incoming edges (only directed graphs), vertex array
uint64_t* in_e() const; // incoming edges (only directed graphs), edge array
double* in_w() const; // incoming edges (only directed graphs), weight array

/**
* Perform a BFS from source_vertex_id to all the other vertices in the graph.
* @param source_vertex_id the vertex where to start the search
Expand Down
17 changes: 17 additions & 0 deletions library/interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
#if defined(HAVE_GRAPHONE)
#include "graphone/graphone.hpp"
#endif
#if defined(HAVE_LIVEGRAPH)
#include "livegraph/livegraph_driver.hpp"
#endif
#if defined(HAVE_TESEO)
#include "teseo/teseo_driver.hpp"
#include "teseo/teseo_lcc.hpp"
Expand Down Expand Up @@ -172,6 +175,15 @@ std::unique_ptr<Interface> generate_graphone_ref_ignore_build(bool directed_grap
}
#endif

#if defined(HAVE_LIVEGRAPH)
std::unique_ptr<Interface> generate_livegraph_ro(bool directed_graph){ // read only transactions for Graphalytics
return unique_ptr<Interface>( new LiveGraphDriver(directed_graph, /*read_only ? */ true));
}
std::unique_ptr<Interface> generate_livegraph_rw(bool directed_graph){ // read-write transactions for Graphalytics
return unique_ptr<Interface>( new LiveGraphDriver(directed_graph, /*read_only ? */ false));
}
#endif

#if defined(HAVE_TESEO)
std::unique_ptr<Interface> generate_teseo(bool directed_graph){
return unique_ptr<Interface>{ new TeseoDriver(directed_graph) };
Expand Down Expand Up @@ -236,6 +248,11 @@ vector<ImplementationManifest> implementations() {
result.emplace_back("g1_v4-ref-ignore-build", "GraphOne, reference GAP BS for the Graphalytics algorithms", &generate_graphone_ref_ignore_build);
#endif

#if defined(HAVE_LIVEGRAPH)
result.emplace_back("livegraph_ro", "LiveGraph, use read-only transactions for the Graphalytics kernels", &generate_livegraph_ro);
result.emplace_back("livegraph_rw", "LiveGraph, use read-write transactions for the Graphalytics kernels", &generate_livegraph_rw);
#endif

#if defined(HAVE_TESEO)
// v1 05/04/2020: initial version for evaluation
// v2 28/04/2020: big rewrite: dense file, delayed rebalances, new leaf layout, new rebalancer logic. All experiments should be repeated, that is, ignore v1.
Expand Down
Loading

0 comments on commit 50c78d5

Please sign in to comment.