Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
Committed-by: xiaolei.zl from Dev container
  • Loading branch information
zhanglei1949 committed Oct 29, 2024
2 parents e10f73d + 49b2643 commit f502047
Show file tree
Hide file tree
Showing 12 changed files with 272 additions and 251 deletions.
75 changes: 63 additions & 12 deletions .github/workflows/interactive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,52 @@ concurrency:
cancel-in-progress: true

jobs:
build-interactive:
runs-on: ubuntu-20.04
if: ${{ github.repository == 'alibaba/GraphScope' }}
container:
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.24.2-amd64
steps:
- uses: actions/checkout@v4
- name: Install latest libgrape-lite
if: false
run: |
git clone --single-branch https://github.com/alibaba/libgrape-lite.git /tmp/libgrape-lite
cd /tmp/libgrape-lite
mkdir -p build && cd build
cmake ..
make -j$(nproc)
make install
- name: Setup tmate session
if: false
uses: mxschmitt/action-tmate@v3

- name: Build Interactive
env:
GIE_HOME: ${{ github.workspace }}/interactive_engine/
HOME: /home/graphscope/
run: |
. /home/graphscope/.graphscope_env
cd ${GITHUB_WORKSPACE}/
git submodule update --init
cd ${GITHUB_WORKSPACE}/flex
mkdir build && cd build
cmake .. -DCMAKE_INSTALL_PREFIX=/opt/graphscope && sudo make -j$(nproc)
# package the build artifacts
cd .. && tar -zcf build.tar.gz build
- name: Upload Artifacts
uses: actions/upload-artifact@v4
with:
name: interactive_build-${{ github.sha }}
path: |
${{ github.workspace }}/flex/build.tar.gz
test-hqps-engine:
runs-on: ubuntu-20.04
needs: build-interactive
if: ${{ github.repository == 'alibaba/GraphScope' }}
container:
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.24.2-amd64
Expand Down Expand Up @@ -58,23 +102,28 @@ jobs:
cmake ..
make -j$(nproc)
make install
- name: Download Artifacts
uses: actions/download-artifact@v4
with:
name: interactive_build-${{ github.sha }}

- name: Setup tmate session
if: false
uses: mxschmitt/action-tmate@v3

- name: Extract build artifacts
run: |
cd ${GITHUB_WORKSPACE}
tar zxf build.tar.gz -C flex && rm build.tar.gz
- name: Build
env:
GIE_HOME: ${{ github.workspace }}/interactive_engine/
HOME: /home/graphscope/
run: |
. /home/graphscope/.graphscope_env
cd ${GITHUB_WORKSPACE}/
git submodule update --init
cd ${GITHUB_WORKSPACE}/flex
mkdir build && cd build
cmake .. && sudo make -j$(nproc)
sudo make install
# cargo
. /home/graphscope/.cargo/env
Expand Down Expand Up @@ -401,6 +450,7 @@ jobs:
test-flex:
runs-on: ubuntu-20.04
needs: build-interactive
if: ${{ github.repository == 'alibaba/GraphScope' }}
container:
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.24.2-amd64
Expand All @@ -417,14 +467,15 @@ jobs:
make -j$(nproc)
make install
- name: Build
env:
HOME: /home/graphscope/
- name: Download Artifacts
uses: actions/download-artifact@v4
with:
name: interactive_build-${{ github.sha }}

- name: Extract build artifacts
run: |
cd ${GITHUB_WORKSPACE}/flex
git submodule update --init
mkdir build && cd build
cmake .. && sudo make -j$(nproc)
cd ${GITHUB_WORKSPACE}
tar zxf build.tar.gz -C flex && rm build.tar.gz
- name: Test GRIN on mutable csr
run: |
Expand Down
54 changes: 30 additions & 24 deletions flex/engines/graph_db/database/wal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,6 @@ KafkaWalsParser::KafkaWalsParser(cppkafka::Configuration config,
LOG(INFO) << "No message are polled, the topic has been all consumed.";
break;
}
// message_vector_.emplace_back(std::move(msgs));
for (auto& msg : msgs) {
if (msg) {
if (msg.get_error()) {
Expand All @@ -443,33 +442,37 @@ KafkaWalsParser::KafkaWalsParser(cppkafka::Configuration config,
}
} else {
message_vector_.emplace_back(msg.get_payload());
const char* payload = message_vector_.back().data();
const WalHeader* header = reinterpret_cast<const WalHeader*>(payload);
uint32_t cur_ts = header->timestamp;
if (cur_ts == 0) {
LOG(WARNING) << "Invalid timestamp 0, skip";
continue;
}
int length = header->length;
if (header->type) {
UpdateWalUnit unit;
unit.timestamp = cur_ts;
unit.ptr = const_cast<char*>(payload + sizeof(WalHeader));
unit.size = length;
update_wal_list_.push_back(unit);
} else {
if (insert_wal_list_[cur_ts].ptr) {
LOG(WARNING) << "Duplicated timestamp " << cur_ts << ", skip";
}
insert_wal_list_[cur_ts].ptr =
const_cast<char*>(payload + sizeof(WalHeader));
insert_wal_list_[cur_ts].size = length;
}
last_ts_ = std::max(cur_ts, last_ts_);
}
}
}
}

for (auto& wal : message_vector_) {
const char* payload = wal.data();
const WalHeader* header = reinterpret_cast<const WalHeader*>(payload);
uint32_t cur_ts = header->timestamp;
if (cur_ts == 0) {
LOG(WARNING) << "Invalid timestamp 0, skip";
continue;
}
int length = header->length;
if (header->type) {
UpdateWalUnit unit;
unit.timestamp = cur_ts;
unit.ptr = const_cast<char*>(payload + sizeof(WalHeader));
unit.size = length;
update_wal_list_.push_back(unit);
} else {
if (insert_wal_list_[cur_ts].ptr) {
LOG(WARNING) << "Duplicated timestamp " << cur_ts << ", skip";
}
insert_wal_list_[cur_ts].ptr =
const_cast<char*>(payload + sizeof(WalHeader));
insert_wal_list_[cur_ts].size = length;
}
last_ts_ = std::max(cur_ts, last_ts_);
}

LOG(INFO) << "last_ts: " << last_ts_;
if (!update_wal_list_.empty()) {
std::sort(update_wal_list_.begin(), update_wal_list_.end(),
Expand All @@ -487,6 +490,9 @@ KafkaWalsParser::~KafkaWalsParser() {

uint32_t KafkaWalsParser::last_ts() const { return last_ts_; }
const WalContentUnit& KafkaWalsParser::get_insert_wal(uint32_t ts) const {
if (insert_wal_list_[ts].ptr == NULL) {
LOG(WARNING) << "No WAL for timestamp " << ts;
}
return insert_wal_list_[ts];
}
const std::vector<UpdateWalUnit>& KafkaWalsParser::update_wals() const {
Expand Down
5 changes: 5 additions & 0 deletions flex/engines/graph_db/grin/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../../../..)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../../../storages/rt_mutable_graph)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../../../utils/property)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include)
if (EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/../../../build/utils)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../../../build/utils)
else ()
message(FATAL_ERROR "Please build flex first")
endif ()

file(GLOB_RECURSE FILES_NEED_FORMAT "src/*.cc")

Expand Down
4 changes: 2 additions & 2 deletions flex/engines/http_server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ if (Hiactor_FOUND)

hiactor_codegen (server_actor_autogen server_actor_autogen_files
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/
INCLUDE_PATHS ${Hiactor_INCLUDE_DIR},${CMAKE_CURRENT_SOURCE_DIR}/../../../)
INCLUDE_PATHS ${Hiactor_INCLUDE_DIR},${CMAKE_CURRENT_SOURCE_DIR}/../../../,${CMAKE_CURRENT_BINARY_DIR}/../../utils/)


# get all .cc files in current directory, except for generated/
Expand Down Expand Up @@ -32,7 +32,7 @@ if (Hiactor_FOUND)
endif ()
set_target_properties(Hiactor::seastar PROPERTIES INTERFACE_COMPILE_OPTIONS "${seastar_options}")

target_include_directories(flex_server PUBLIC ${CMAKE_CURRENT_BINARY_DIR}/../hqps/)
target_include_directories(flex_server PUBLIC ${CMAKE_CURRENT_BINARY_DIR}/../../utils/)
target_link_libraries(flex_server flex_plan_proto)
if (OPENTELEMETRY_CPP_FOUND)
target_link_libraries(flex_server otel)
Expand Down
90 changes: 12 additions & 78 deletions flex/engines/http_server/actor/executor.act.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,10 @@ seastar::future<query_result> executor::run_graph_db_query(
}

seastar::future<admin_query_result> executor::create_vertex(
graph_management_param&& param) {
std::string&& graph_id = std::move(param.content.first);
if (metadata_store_) {
auto running_graph_res = metadata_store_->GetRunningGraph();

if (!running_graph_res.ok() || running_graph_res.value() != graph_id)
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(
gs::Status(gs::StatusCode::NOT_FOUND,
"The queried graph is not running: " + graph_id)));
}
query_param&& param) {
rapidjson::Document input_json;
// Parse the input json
if (input_json.Parse(param.content.second.c_str()).HasParseError()) {
if (input_json.Parse(param.content.c_str()).HasParseError()) {
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(gs::Status(
gs::StatusCode::INVALID_SCHEMA,
Expand All @@ -89,20 +79,9 @@ seastar::future<admin_query_result> executor::create_vertex(
gs::Result<seastar::sstring>(result.status()));
}

seastar::future<admin_query_result> executor::create_edge(
graph_management_param&& param) {
std::string&& graph_id = std::move(param.content.first);
if (metadata_store_) {
auto running_graph_res = metadata_store_->GetRunningGraph();

if (!running_graph_res.ok() || running_graph_res.value() != graph_id)
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(
gs::Status(gs::StatusCode::NOT_FOUND,
"The queried graph is not running: " + graph_id)));
}
seastar::future<admin_query_result> executor::create_edge(query_param&& param) {
rapidjson::Document input_json;
if (input_json.Parse(param.content.second.c_str()).HasParseError()) {
if (input_json.Parse(param.content.c_str()).HasParseError()) {
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(gs::Status(
gs::StatusCode::INVALID_SCHEMA,
Expand All @@ -120,21 +99,10 @@ seastar::future<admin_query_result> executor::create_edge(
}

seastar::future<admin_query_result> executor::update_vertex(
graph_management_param&& param) {
std::string&& graph_id = std::move(param.content.first);
if (metadata_store_) {
auto running_graph_res = metadata_store_->GetRunningGraph();

if (!running_graph_res.ok() || running_graph_res.value() != graph_id) {
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(
gs::Status(gs::StatusCode::NOT_FOUND,
"The queried graph is not running: " + graph_id)));
}
}
query_param&& param) {
rapidjson::Document input_json;
// Parse the input json
if (input_json.Parse(param.content.second.c_str()).HasParseError()) {
if (input_json.Parse(param.content.c_str()).HasParseError()) {
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(gs::Status(
gs::StatusCode::INVALID_SCHEMA,
Expand All @@ -151,22 +119,10 @@ seastar::future<admin_query_result> executor::update_vertex(
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(result.status()));
}
seastar::future<admin_query_result> executor::update_edge(
graph_management_param&& param) {
std::string&& graph_id = std::move(param.content.first);
if (metadata_store_) {
auto running_graph_res = metadata_store_->GetRunningGraph();

if (!running_graph_res.ok() || running_graph_res.value() != graph_id) {
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(
gs::Status(gs::StatusCode::NOT_FOUND,
"The queried graph is not running: " + graph_id)));
}
}
seastar::future<admin_query_result> executor::update_edge(query_param&& param) {
rapidjson::Document input_json;
// Parse the input json
if (input_json.Parse(param.content.second.c_str()).HasParseError()) {
if (input_json.Parse(param.content.c_str()).HasParseError()) {
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(gs::Status(
gs::StatusCode::INVALID_SCHEMA,
Expand All @@ -186,19 +142,8 @@ seastar::future<admin_query_result> executor::update_edge(

seastar::future<admin_query_result> executor::get_vertex(
graph_management_query_param&& param) {
std::string&& graph_id = std::move(param.content.first);
if (metadata_store_) {
auto running_graph_res = metadata_store_->GetRunningGraph();

if (!running_graph_res.ok() || running_graph_res.value() != graph_id)
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(
gs::Status(gs::StatusCode::NOT_FOUND,
"The queried graph is not running: " + graph_id)));
}

std::unordered_map<std::string, std::string> params;
for (auto& [key, value] : param.content.second) {
for (auto& [key, value] : param.content) {
params[std::string(key)] = std::string(value);
}
auto result = gs::GraphDBOperations::GetVertex(
Expand All @@ -215,18 +160,8 @@ seastar::future<admin_query_result> executor::get_vertex(

seastar::future<admin_query_result> executor::get_edge(
graph_management_query_param&& param) {
std::string&& graph_id = std::move(param.content.first);
if (metadata_store_) {
auto running_graph_res = metadata_store_->GetRunningGraph();

if (!running_graph_res.ok() || running_graph_res.value() != graph_id)
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(
gs::Status(gs::StatusCode::NOT_FOUND,
"The queried graph is not running: " + graph_id)));
}
std::unordered_map<std::string, std::string> params;
for (auto& [key, value] : param.content.second) {
for (auto& [key, value] : param.content) {
params[std::string(key)] = std::string(value);
}
auto result = gs::GraphDBOperations::GetEdge(
Expand All @@ -242,14 +177,13 @@ seastar::future<admin_query_result> executor::get_edge(
}

seastar::future<admin_query_result> executor::delete_vertex(
graph_management_param&& param) {
query_param&& param) {
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(gs::Status(
gs::StatusCode::UNIMPLEMENTED, "delete_vertex is not implemented")));
}

seastar::future<admin_query_result> executor::delete_edge(
graph_management_param&& param) {
seastar::future<admin_query_result> executor::delete_edge(query_param&& param) {
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(gs::Status(
gs::StatusCode::UNIMPLEMENTED, "delete_edge is not implemented")));
Expand Down
Loading

0 comments on commit f502047

Please sign in to comment.