From 91fcabe4333bad255b1b8cbaa9ccee6fd9514677 Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Fri, 25 Oct 2024 17:54:38 +0800 Subject: [PATCH] fixing Committed-by: xiaolei.zl from Dev container --- .../interactive/development/dev_and_test.md | 22 +++ flex/CMakeLists.txt | 18 +- flex/bin/CMakeLists.txt | 7 +- flex/bin/wal_consumer.cc | 44 +++-- flex/engines/graph_db/CMakeLists.txt | 4 +- .../graph_db/database/compact_transaction.cc | 2 +- flex/engines/graph_db/database/graph_db.cc | 28 ++-- flex/engines/graph_db/database/graph_db.h | 2 + .../graph_db/database/graph_db_operations.cc | 2 +- .../graph_db/database/graph_db_session.cc | 19 +-- .../graph_db/database/graph_db_session.h | 2 +- .../graph_db/database/insert_transaction.cc | 2 +- .../single_edge_insert_transaction.cc | 2 +- .../single_vertex_insert_transaction.cc | 2 +- .../graph_db/database/update_transaction.cc | 4 +- flex/engines/graph_db/database/wal.cc | 157 ++++++------------ flex/engines/graph_db/database/wal.h | 103 ++++++------ .../http_server/actor/admin_actor.act.cc | 22 ++- .../engines/http_server/actor/executor.act.cc | 6 +- flex/engines/http_server/graph_db_service.cc | 5 + .../handler/graph_db_http_handler.cc | 9 - .../handler/graph_db_http_handler.h | 1 - flex/interactive/README.md | 28 +--- .../sdk/examples/python/basic_example.py | 84 +++++----- flex/tests/CMakeLists.txt | 6 +- flex/tests/hqps/interactive_config_test.yaml | 2 +- flex/tests/wal_writer/CMakeLists.txt | 2 +- flex/tests/wal_writer/wal_writer_test.cc | 2 +- flex/utils/CMakeLists.txt | 2 +- flex/utils/property/column.h | 3 +- 30 files changed, 268 insertions(+), 324 deletions(-) diff --git a/docs/flex/interactive/development/dev_and_test.md b/docs/flex/interactive/development/dev_and_test.md index 3a99b3035ce9..622c6c1fbffc 100644 --- a/docs/flex/interactive/development/dev_and_test.md +++ b/docs/flex/interactive/development/dev_and_test.md @@ -106,6 +106,28 @@ mvn clean package -DskipTests -Pexperimental - `USE_PTHASH`: Indicates whether to use a perfect hash when building the vertex map. - `OPTIMIZE_FOR_HOST`: Determines if Flex should be optimized for performance on the current machine. Note that enabling this option may result in a binary that does not run on different platforms or CPU architectures. +### Wal Writer + +Interactive use WAL(Write Ahead Logging) to ensure the data integrity. Two different wal writer is provided with respect to different storage interface: `LocalWalWriter` for writing wals to local disk and `KafkaWalWriter` for persisting wals on kafka. + +You could switch the wal writer type in the configuration. See [Configuration](./../configuration.md#service-configuration). + +#### Local Wal Writer + +The default wal writer, you don't need to do anything, just make sure your disk has enough space + +#### Kafka Wal Writer + +You need to deploy a kafka cluster first. For details, please refer to [Kafka Documentation](https://kafka.apache.org/documentation/). Follow the [QuickStart](https://kafka.apache.org/quickstart) to deploy a service. + +#### Performance + +##### Settings + +##### Producing Wals + +##### Consuming Wals + ## Testing Numerous test cases have been created for Interactive, which can be referenced in the GitHub workflow[interactive.yaml](https://github.com/alibaba/GraphScope/blob/main/.github/workflows/interactive.yml). diff --git a/flex/CMakeLists.txt b/flex/CMakeLists.txt index 53f77274ba19..595bb26d1d49 100644 --- a/flex/CMakeLists.txt +++ b/flex/CMakeLists.txt @@ -16,7 +16,7 @@ option(BUILD_ODPS_FRAGMENT_LOADER "Whether to build odps fragment loader" OFF) option(USE_PTHASH "Whether to use pthash" OFF) option(OPTIMIZE_FOR_HOST "Whether to optimize on host" ON) # Whether to build optimized code on host option(USE_STATIC_ARROW "Whether to use static arrow" ON) # Whether to link arrow statically, default is ON -option(ENABLE_KAFKA "Whether to enable kafka" ON) +option(BUILD_KAFKA_WAL_WRITER "Whether to build kafka wal writer" ON) #print options message(STATUS "Build test: ${BUILD_TEST}") @@ -58,11 +58,10 @@ if(USE_PTHASH) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/third_party/murmurhash) endif() -if (ENABLE_KAFKA) +if (BUILD_KAFKA_WAL_WRITER) find_package(CppKafka) - if (CppKafka_FOUND) - message(STATUS "cppkafka found, build without third_party/cppkafka") - else () + if (NOT CppKafka_FOUND) + message(STATUS "cppkafka not found, try to build with third_party/cppkafka") add_subdirectory(third_party/cppkafka) # if cppkafka/CMakeLists.txt not exits, tell user to run git submodule update --init --recursive if (NOT EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka/CMakeLists.txt) @@ -71,11 +70,12 @@ if (ENABLE_KAFKA) include_directories(SYSTEM ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka/include) set (CppKafka_INSTALL_DIR ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka/build) list (APPEND CMAKE_PREFIX_PATH ${CppKafka_INSTALL_DIR}) - # find_package (CppKafka CONFIG REQUIRED) - # alias target - add_library(CppKafka::cppkafka ALIAS cppkafka) + set(CppKafka_LIBRARIES cppkafka) + else() + include_directories(SYSTEM ${CppKafka_INCLUDE_DIRS}) + set(CppKafka_LIBRARIES CppKafka::cppkafka) endif () - add_definitions(-DENABLE_KAFKA) + add_definitions(-DBUILD_KAFKA_WAL_WRITER) endif() set(DEFAULT_BUILD_TYPE "Release") diff --git a/flex/bin/CMakeLists.txt b/flex/bin/CMakeLists.txt index 88d7ae683863..1a9c62d79b7b 100644 --- a/flex/bin/CMakeLists.txt +++ b/flex/bin/CMakeLists.txt @@ -46,5 +46,8 @@ add_executable(stored_procedure_runner stored_procedure_runner.cc) target_link_libraries(stored_procedure_runner flex_rt_mutable_graph flex_utils flex_graph_db ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES} ${Boost_LIBRARIES}) install_without_export_flex_target(stored_procedure_runner) -add_executable(wal_consumer wal_consumer.cc) -target_link_libraries(wal_consumer PUBLIC CppKafka::cppkafka ${Boost_LIBRARIES} ${GLOG_LIBRARIES} flex_graph_db) \ No newline at end of file +if (BUILD_KAFKA_WAL_WRITER) + add_executable(wal_consumer wal_consumer.cc) + target_link_libraries(wal_consumer PUBLIC ${CppKafka_LIBRARIES} ${Boost_LIBRARIES} ${GLOG_LIBRARIES} flex_graph_db) + install_without_export_flex_target(wal_consumer) +endif() \ No newline at end of file diff --git a/flex/bin/wal_consumer.cc b/flex/bin/wal_consumer.cc index fa9e5b2d5dcb..774e350bd411 100644 --- a/flex/bin/wal_consumer.cc +++ b/flex/bin/wal_consumer.cc @@ -13,6 +13,8 @@ * limitations under the License. */ +#ifdef BUILD_KAFKA_WAL_WRITER + #include #include #include @@ -34,12 +36,11 @@ class WalSender { static constexpr int32_t CONNECTION_TIMEOUT = 10; static constexpr int32_t READ_TIMEOUT = 60; static constexpr int32_t WRITE_TIMEOUT = 60; - WalSender(const std::string& host, int port, const std::string& graph_id) - : host_(host), port_(port), client_(host_, port_), graph_id_(graph_id) { + WalSender(const std::string& host, int port, const std::string& dst_url) + : host_(host), port_(port), client_(host_, port_), req_url_(dst_url) { client_.set_connection_timeout(CONNECTION_TIMEOUT); client_.set_read_timeout(READ_TIMEOUT); client_.set_write_timeout(WRITE_TIMEOUT); - req_url_ = "/v1/graph/" + graph_id_ + "/wal"; } void send(const std::string& payload) { @@ -57,7 +58,6 @@ class WalSender { std::string host_; int port_; httplib::Client client_; - std::string graph_id_; std::string req_url_; }; @@ -68,23 +68,25 @@ namespace bpo = boost::program_options; int main(int argc, char** argv) { std::string brokers; std::string topic_name; - std::string graph_id; std::string group_id; - std::string engine_url; + std::string engine_host; + std::string req_url; int32_t engine_port; bpo::options_description desc("Usage:"); desc.add_options()("help", "Display help message")( "kafka-brokers,b", bpo::value(&brokers)->required(), "Kafka brokers list")( - "graph-id,i", bpo::value(&graph_id)->required(), "graph_id")( + "url,u", bpo::value(&req_url)->required(), "req_url")( "group-id,g", bpo::value(&group_id)->default_value("interactive_group"), - "Kafka group id")("engine-url,u", - bpo::value(&engine_url)->required(), + "Kafka group id")("engine-host,h", + bpo::value(&engine_host)->required(), "Engine URL")( "engine-port,p", bpo::value(&engine_port)->required(), - "Engine port"); + "Engine port")("topic,t", + bpo::value(&topic_name)->required(), + "Kafka topic name"); google::InitGoogleLogging(argv[0]); FLAGS_logtostderr = true; @@ -110,14 +112,11 @@ int main(int argc, char** argv) { {"group.id", group_id}, // Disable auto commit {"enable.auto.commit", false}}; - - topic_name = "graph_" + graph_id + "_wal"; // Create the consumer gs::WalSender sender(vm["engine-url"].as(), - vm["engine-port"].as(), graph_id); - gs::WalConsumer consumer(config, topic_name, 1); - - // signal(SIGINT, [](int) { consumer.terminate(); }); + vm["engine-port"].as(), + vm["req-url"].as()); + gs::KafkaWalConsumer consumer(config, topic_name, 1); if (vm.count("help")) { std::cout << desc << std::endl; @@ -126,16 +125,13 @@ int main(int argc, char** argv) { while (true) { auto msg = consumer.poll(); - if (msg.first == std::numeric_limits::max()) { - continue; - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - LOG(INFO) << "Received message: <" << msg.first << " -> " << msg.second - << ">"; - sender.send(msg.second); + LOG(INFO) << "Received message: -> " << msg << ">"; + sender.send(msg); } LOG(INFO) << "Consuming messages from topic " << topic_name; return 0; -} \ No newline at end of file +} + +#endif \ No newline at end of file diff --git a/flex/engines/graph_db/CMakeLists.txt b/flex/engines/graph_db/CMakeLists.txt index 516923605761..55aed368994a 100644 --- a/flex/engines/graph_db/CMakeLists.txt +++ b/flex/engines/graph_db/CMakeLists.txt @@ -9,8 +9,8 @@ target_include_directories(flex_graph_db PUBLIC $timestamp = timestamp_; header->type = 1; - logger_.append(timestamp_, arc_.GetBuffer(), arc_.GetSize()); + logger_.append(arc_.GetBuffer(), arc_.GetSize()); arc_.Clear(); LOG(INFO) << "before compact - " << timestamp_; diff --git a/flex/engines/graph_db/database/graph_db.cc b/flex/engines/graph_db/database/graph_db.cc index ac660ae715dd..f9642b113f41 100644 --- a/flex/engines/graph_db/database/graph_db.cc +++ b/flex/engines/graph_db/database/graph_db.cc @@ -348,7 +348,7 @@ void GraphDB::GetAppInfo(Encoder& output) { static void IngestWalRange(SessionLocalContext* contexts, MutablePropertyFragment& graph, - const WalContentUnit* insert_wal_list, uint32_t from, + const IWalsParser& parser, uint32_t from, uint32_t to, int thread_num) { LOG(INFO) << "IngestWalRange, from " << from << ", to " << to; std::atomic cur_ts(from); @@ -362,7 +362,7 @@ static void IngestWalRange(SessionLocalContext* contexts, if (got_ts >= to) { break; } - const auto& unit = insert_wal_list[got_ts]; + const auto& unit = parser.get_insert_wal(got_ts); InsertTransaction::IngestWal(graph, got_ts, unit.ptr, unit.size, alloc); if (got_ts % 1000000 == 0) { @@ -387,13 +387,12 @@ void GraphDB::ingestWalsFromLocalFiles(const std::string& wal_dir_path, for (const auto& entry : std::filesystem::directory_iterator(wal_dir_path)) { wals.push_back(entry.path().string()); } - WalsParser parser(wals); + LocalWalsParser parser(wals); uint32_t from_ts = 1; for (auto& update_wal : parser.update_wals()) { uint32_t to_ts = update_wal.timestamp; if (from_ts < to_ts) { - IngestWalRange(contexts_, graph_, parser.insert_wal_list(), from_ts, - to_ts, thread_num); + IngestWalRange(contexts_, graph_, parser, from_ts, to_ts, thread_num); } if (update_wal.size == 0) { graph_.Compact(update_wal.timestamp); @@ -405,12 +404,13 @@ void GraphDB::ingestWalsFromLocalFiles(const std::string& wal_dir_path, from_ts = to_ts + 1; } if (from_ts <= parser.last_ts()) { - IngestWalRange(contexts_, graph_, parser.insert_wal_list(), from_ts, - parser.last_ts() + 1, thread_num); + IngestWalRange(contexts_, graph_, parser, from_ts, parser.last_ts() + 1, + thread_num); } version_manager_.init_ts(parser.last_ts(), thread_num); } +#ifdef BUILD_KAFKA_WAL_WRITER void GraphDB::ingestWalsFromKafka(const std::string& kafka_brokers, const std::string& kafka_topic, const std::string& work_dir, int thread_num) { @@ -418,13 +418,12 @@ void GraphDB::ingestWalsFromKafka(const std::string& kafka_brokers, {"group.id", "primary_group"}, // Disable auto commit {"enable.auto.commit", false}}; - KafkaWalParser parser(config, kafka_topic); + KafkaWalsParser parser(config, kafka_topic); uint32_t from_ts = 1; for (auto& update_wal : parser.update_wals()) { uint32_t to_ts = update_wal.timestamp; if (from_ts < to_ts) { - IngestWalRange(contexts_, graph_, parser.insert_wal_list(), from_ts, - to_ts, thread_num); + IngestWalRange(contexts_, graph_, parser, from_ts, to_ts, thread_num); } if (update_wal.size == 0) { graph_.Compact(update_wal.timestamp); @@ -436,12 +435,13 @@ void GraphDB::ingestWalsFromKafka(const std::string& kafka_brokers, from_ts = to_ts + 1; } if (from_ts <= parser.last_ts()) { - IngestWalRange(contexts_, graph_, parser.insert_wal_list(), from_ts, - parser.last_ts() + 1, thread_num); + IngestWalRange(contexts_, graph_, parser, from_ts, parser.last_ts() + 1, + thread_num); } version_manager_.init_ts(parser.last_ts(), thread_num); } +#endif void GraphDB::initApps( const std::unordered_map>& @@ -508,6 +508,7 @@ void GraphDB::openWalAndCreateContexts(const std::string& data_dir, contexts_[i].logger->open(wal_dir_path, i); } } else if (config.wal_writer_type == IWalWriter::WalWriterType::kKafka) { +#ifdef BUILD_KAFKA_WAL_WRITER for (int i = 0; i < thread_num_; ++i) { new (&contexts_[i]) SessionLocalContext( *this, data_dir, i, allocator_strategy, @@ -520,6 +521,9 @@ void GraphDB::openWalAndCreateContexts(const std::string& data_dir, for (int i = 0; i < thread_num_; ++i) { contexts_[i].logger->open(config.kafka_topic, i); } +#else + LOG(FATAL) << "Kafka wal writer is not enabled in this build"; +#endif } else { LOG(FATAL) << "Unsupported wal writer type: " << static_cast(config.wal_writer_type); diff --git a/flex/engines/graph_db/database/graph_db.h b/flex/engines/graph_db/database/graph_db.h index 9591600697bd..2910e9acb70f 100644 --- a/flex/engines/graph_db/database/graph_db.h +++ b/flex/engines/graph_db/database/graph_db.h @@ -170,9 +170,11 @@ class GraphDB { void ingestWalsFromLocalFiles(const std::string& wal_dir, const std::string& work_dir, int thread_num); +#ifdef BUILD_KAFKA_WAL_WRITER void ingestWalsFromKafka(const std::string& kafka_brokers, const std::string& kafka_topic, const std::string& work_dir, int thread_num); +#endif void initApps( const std::unordered_map>& diff --git a/flex/engines/graph_db/database/graph_db_operations.cc b/flex/engines/graph_db/database/graph_db_operations.cc index d90a450cc307..648d007da4d4 100644 --- a/flex/engines/graph_db/database/graph_db_operations.cc +++ b/flex/engines/graph_db/database/graph_db_operations.cc @@ -52,6 +52,7 @@ Result GraphDBOperations::CreateVertex( for (auto& edge_insert : input_json["edge_request"].GetArray()) { edge_data.push_back(inputEdge(edge_insert, schema, session)); } + LOG(INFO) << "CreateVertex edge_data: " << edge_data.size(); } catch (std::exception& e) { return Result( gs::Status(StatusCode::INVALID_SCHEMA, @@ -452,7 +453,6 @@ Status GraphDBOperations::singleInsertVertex( "Fail to create edge; All inserts are rollbacked"); } } - // LOG(INFO) << "Commit singleInsertVertex"; txnWrite.Commit(); } catch (std::exception& e) { return Status(StatusCode::INVALID_SCHEMA, e.what()); diff --git a/flex/engines/graph_db/database/graph_db_session.cc b/flex/engines/graph_db/database/graph_db_session.cc index d30dacb3339d..8e437dabf8cd 100644 --- a/flex/engines/graph_db/database/graph_db_session.cc +++ b/flex/engines/graph_db/database/graph_db_session.cc @@ -251,17 +251,17 @@ AppBase* GraphDBSession::GetApp(int type) { #undef likely // likely -Result GraphDBSession::IngestWals(const std::string_view& input) { - if (input.size() < sizeof(WalHeader)) { +Result GraphDBSession::IngestWals(const char* input, size_t len) { + if (len < sizeof(WalHeader)) { LOG(ERROR) << "Invalid wal content"; return Status(StatusCode::INVALID_ARGUMENT, "Invalid wal content"); } - const WalHeader* header = reinterpret_cast(input.data()); - if (header->length + sizeof(WalHeader) != input.size()) { + const WalHeader* header = reinterpret_cast(input); + if (header->length + sizeof(WalHeader) != len) { LOG(ERROR) << "Invalid wal content"; return Status(StatusCode::INVALID_ARGUMENT, "Invalid wal content"); } - return deserialize_and_apply_wal(header, input.data(), header->length); + return deserialize_and_apply_wal(header, input, header->length); } // Quite similar to the InsertTransation::Commit(); @@ -278,7 +278,7 @@ Result GraphDBSession::deserialize_and_apply_insert_wal( LOG(INFO) << "Applying insert wal with timestamp: " << ts << ", length: " << length << ", logger type: " << static_cast(logger_.type()); - logger_.append(ts, data, length); + logger_.append(data, length); InsertTransaction::IngestWal(db_.graph(), ts, const_cast(data) + sizeof(WalHeader), @@ -296,7 +296,7 @@ Result GraphDBSession::deserialize_and_apply_update_wal( db_.version_manager_.revert_update_timestamp(ts); return Status(StatusCode::INVALID_ARGUMENT, "Invalid wal timestamp"); } - logger_.append(ts, data, length); + logger_.append(data, length); UpdateTransaction::IngestWal(db_.graph(), work_dir_, ts, const_cast(data), length, alloc_); db_.version_manager_.release_update_timestamp(ts); @@ -310,13 +310,8 @@ Result GraphDBSession::deserialize_and_apply_wal( return Status::OK(); } if (header->type == 0) { - // insert_transaction - // single_edge_insert_transaction - // single_vertex_insert_transaction return deserialize_and_apply_insert_wal(header, data, length); } else if (header->type == 1) { - // compact_transaction - // update_transaction return deserialize_and_apply_update_wal(header, data, length); } else { return Status(StatusCode::INVALID_ARGUMENT, "Invalid wal type"); diff --git a/flex/engines/graph_db/database/graph_db_session.h b/flex/engines/graph_db/database/graph_db_session.h index c02007b20ac3..6a88db7d5a97 100644 --- a/flex/engines/graph_db/database/graph_db_session.h +++ b/flex/engines/graph_db/database/graph_db_session.h @@ -109,7 +109,7 @@ class GraphDBSession { // Ingest wals from a string, the input string is a serialized wal. // We will convert it to a transaction and apply it to the graph. - Result IngestWals(const std::string_view& input); + Result IngestWals(const char* input, size_t len); private: Result> diff --git a/flex/engines/graph_db/database/insert_transaction.cc b/flex/engines/graph_db/database/insert_transaction.cc index a454981df7af..60b93aab7db0 100644 --- a/flex/engines/graph_db/database/insert_transaction.cc +++ b/flex/engines/graph_db/database/insert_transaction.cc @@ -149,7 +149,7 @@ void InsertTransaction::Commit() { header->type = 0; header->timestamp = timestamp_; - logger_.append(timestamp_, arc_.GetBuffer(), arc_.GetSize()); + logger_.append(arc_.GetBuffer(), arc_.GetSize()); IngestWal(graph_, timestamp_, arc_.GetBuffer() + sizeof(WalHeader), header->length, alloc_); diff --git a/flex/engines/graph_db/database/single_edge_insert_transaction.cc b/flex/engines/graph_db/database/single_edge_insert_transaction.cc index 7567ea583f73..56330846c40c 100644 --- a/flex/engines/graph_db/database/single_edge_insert_transaction.cc +++ b/flex/engines/graph_db/database/single_edge_insert_transaction.cc @@ -117,7 +117,7 @@ void SingleEdgeInsertTransaction::Commit() { header->length = arc_.GetSize() - sizeof(WalHeader); header->type = 0; header->timestamp = timestamp_; - logger_.append(timestamp_, arc_.GetBuffer(), arc_.GetSize()); + logger_.append(arc_.GetBuffer(), arc_.GetSize()); grape::OutArchive arc; { diff --git a/flex/engines/graph_db/database/single_vertex_insert_transaction.cc b/flex/engines/graph_db/database/single_vertex_insert_transaction.cc index c421ae0a731f..5bd313893fc4 100644 --- a/flex/engines/graph_db/database/single_vertex_insert_transaction.cc +++ b/flex/engines/graph_db/database/single_vertex_insert_transaction.cc @@ -164,7 +164,7 @@ void SingleVertexInsertTransaction::Commit() { header->type = 0; header->timestamp = timestamp_; - logger_.append(timestamp_, arc_.GetBuffer(), arc_.GetSize()); + logger_.append(arc_.GetBuffer(), arc_.GetSize()); ingestWal(); vm_.release_insert_timestamp(timestamp_); diff --git a/flex/engines/graph_db/database/update_transaction.cc b/flex/engines/graph_db/database/update_transaction.cc index 779838417d21..d568e1794407 100644 --- a/flex/engines/graph_db/database/update_transaction.cc +++ b/flex/engines/graph_db/database/update_transaction.cc @@ -105,7 +105,7 @@ void UpdateTransaction::Commit() { header->length = arc_.GetSize() - sizeof(WalHeader); header->type = 1; header->timestamp = timestamp_; - logger_.append(timestamp_, arc_.GetBuffer(), arc_.GetSize()); + logger_.append(arc_.GetBuffer(), arc_.GetSize()); applyVerticesUpdates(); applyEdgesUpdates(); @@ -748,7 +748,7 @@ void UpdateTransaction::batch_commit(UpdateBatch& batch) { header->length = arc.GetSize() - sizeof(WalHeader); header->type = 1; header->timestamp = timestamp_; - logger_.append(timestamp_, arc.GetBuffer(), arc.GetSize()); + logger_.append(arc.GetBuffer(), arc.GetSize()); } release(); diff --git a/flex/engines/graph_db/database/wal.cc b/flex/engines/graph_db/database/wal.cc index 825a268fda1d..34cf702b37bf 100644 --- a/flex/engines/graph_db/database/wal.cc +++ b/flex/engines/graph_db/database/wal.cc @@ -20,22 +20,11 @@ namespace gs { -int tmout_multip(int msecs) { - int r; - r = (int) (((double) (msecs)) * 1.0); - return r; -} - -void test_conf_init(rd_kafka_conf_t** conf, rd_kafka_topic_conf_t** topic_conf, - int timeout) { - if (conf) { - *conf = rd_kafka_conf_new(); - } - - if (topic_conf) - *topic_conf = rd_kafka_topic_conf_new(); -} - +/** + *The following implementation for creating a topic is derived from + *https://github.com/confluentinc/librdkafka/blob/master/tests/test.c#L4734. + */ +#ifdef BUILD_KAFKA_WAL_WRITER rd_kafka_t* test_create_handle(rd_kafka_type_t mode, rd_kafka_conf_t* conf) { rd_kafka_t* rk; char errstr[512]; @@ -51,17 +40,14 @@ rd_kafka_t* test_create_handle(rd_kafka_type_t mode, rd_kafka_conf_t* conf) { } rd_kafka_t* test_create_producer(const std::string& brokers) { - rd_kafka_conf_t* conf; + rd_kafka_conf_t* conf = rd_kafka_conf_new(); - test_conf_init(&conf, NULL, 0); - // set metadata.broker.list char errstr[512]; if (rd_kafka_conf_set(conf, "metadata.broker.list", brokers.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { LOG(ERROR) << "Failed to set metadata.broker.list: " << errstr; return NULL; } - return test_create_handle(RD_KAFKA_PRODUCER, conf); } std::string generate_graph_wal_topic(const std::string& kafka_brokers, @@ -77,7 +63,7 @@ std::string generate_graph_wal_topic(const std::string& kafka_brokers, rd_kafka_event_t* rkev; const rd_kafka_CreateTopics_result_t* res; const rd_kafka_topic_result_t** terr; - int timeout_ms = tmout_multip(10000); + int timeout_ms = 10000; size_t res_cnt; rd_kafka_resp_err_t err; char errstr[512]; @@ -148,18 +134,16 @@ std::string generate_graph_wal_topic(const std::string& kafka_brokers, } rd_kafka_event_destroy(rkev); - rd_kafka_queue_destroy(rkqu); - rd_kafka_AdminOptions_destroy(options); - rd_kafka_NewTopic_destroy(newt[0]); - rd_kafka_destroy(rk); LOG(INFO) << "Destroyed producer for topic " << topic_name; return topic_name; } +#endif // BUILD_KAFKA_WAL_WRITER + void LocalWalWriter::open(const std::string& prefix, int thread_id) { if (fd_ != -1 || thread_id_ != -1) { LOG(FATAL) << "LocalWalWriter has been opened"; @@ -198,7 +182,7 @@ void LocalWalWriter::close() { #define unlikely(x) __builtin_expect(!!(x), 0) -bool LocalWalWriter::append(uint32_t ts, const char* data, size_t length) { +bool LocalWalWriter::append(const char* data, size_t length) { if (unlikely(fd_ == -1)) { return false; } @@ -241,7 +225,7 @@ IWalWriter::WalWriterType LocalWalWriter::type() const { return IWalWriter::WalWriterType::kLocal; } -#ifdef ENABLE_KAFKA +#ifdef BUILD_KAFKA_WAL_WRITER void KafkaWalWriter::open(const std::string& topic, int thread_id) { if (thread_id_ != -1 || producer_) { @@ -271,12 +255,9 @@ void KafkaWalWriter::close() { } } -bool KafkaWalWriter::append(uint32_t ts, const char* data, size_t length) { +bool KafkaWalWriter::append(const char* data, size_t length) { try { - std::string key = std::to_string(ts); - // cppkafka::Buffer key(reinterpret_cast(&ts), - // sizeof(uint32_t)); - producer_->sync_produce(builder_.key(key).payload({data, length})); + producer_->sync_produce(builder_.payload({data, length})); producer_->flush(true); } catch (const cppkafka::HandleException& e) { LOG(ERROR) << "Failed to send to kafka: " << e.what(); @@ -292,7 +273,7 @@ IWalWriter::WalWriterType KafkaWalWriter::type() const { } #endif -WalsParser::WalsParser(const std::vector& paths) +LocalWalsParser::LocalWalsParser(const std::vector& paths) : insert_wal_list_(NULL), insert_wal_list_size_(0) { for (auto path : paths) { LOG(INFO) << "Start to ingest WALs from file: " << path; @@ -354,7 +335,7 @@ WalsParser::WalsParser(const std::vector& paths) } } -WalsParser::~WalsParser() { +LocalWalsParser::~LocalWalsParser() { if (insert_wal_list_ != NULL) { munmap(insert_wal_list_, insert_wal_list_size_ * sizeof(WalContentUnit)); } @@ -367,25 +348,36 @@ WalsParser::~WalsParser() { } } -uint32_t WalsParser::last_ts() const { return last_ts_; } +uint32_t LocalWalsParser::last_ts() const { return last_ts_; } -const WalContentUnit& WalsParser::get_insert_wal(uint32_t ts) const { +const WalContentUnit& LocalWalsParser::get_insert_wal(uint32_t ts) const { return insert_wal_list_[ts]; } -const std::vector& WalsParser::update_wals() const { +const std::vector& LocalWalsParser::update_wals() const { return update_wal_list_; } -const WalContentUnit* WalsParser::insert_wal_list() const { - return insert_wal_list_; +#ifdef BUILD_KAFKA_WAL_WRITER + +std::vector get_all_topic_partitions( + const cppkafka::Configuration& config, const std::string& topic_name) { + std::vector partitions; + cppkafka::Consumer consumer(config); // tmp consumer + auto metadata = + consumer.get_metadata().get_topics({topic_name}).front().get_partitions(); + for (const auto& partition : metadata) { + partitions.push_back(cppkafka::TopicPartition( + topic_name, partition.get_id(), 0)); // from the beginning + } + return partitions; } -// WalConsumer consumes from multiple partitions of a topic, and can start from -// the beginning or from the latest message. -WalConsumer::WalConsumer(cppkafka::Configuration config, - const std::string& topic_name, int32_t thread_num) - : running(true), expect_timestamp_(1) { +// KafkaWalConsumer consumes from multiple partitions of a topic, and can start +// from the beginning or from the latest message. +KafkaWalConsumer::KafkaWalConsumer(cppkafka::Configuration config, + const std::string& topic_name, + int32_t thread_num) { auto topic_partitions = get_all_topic_partitions(config, topic_name); consumers_.reserve(topic_partitions.size()); for (size_t i = 0; i < topic_partitions.size(); ++i) { @@ -394,7 +386,8 @@ WalConsumer::WalConsumer(cppkafka::Configuration config, } } -std::pair WalConsumer::poll() { +// TODO: make this effiicient +std::string KafkaWalConsumer::poll() { for (auto& consumer : consumers_) { auto msg = consumer->poll(); if (msg) { @@ -403,67 +396,26 @@ std::pair WalConsumer::poll() { LOG(INFO) << "[+] Received error notification: " << msg.get_error(); } } else { - uint32_t key = atoi(static_cast(msg.get_key()).c_str()); std::string payload = msg.get_payload(); LOG(INFO) << "receive from partition " << msg.get_partition() - << ", key: " << key << ", payload: " << payload - << " size: " << payload.size(); - message_queue_.push(std::make_pair(key, payload)); - // consumer->commit(msg); + << ", payload: " << payload << " size: " << payload.size(); + message_queue_.push(payload); + consumer->commit(msg); } } } - // Check the message queue, if the top message is the expected message, - // send it to the engine. Otherwise, wait for the expected message. - if (!message_queue_.empty()) { - while (!message_queue_.empty() && - message_queue_.top().first < expect_timestamp_) { - LOG(WARNING) << "Drop message: <" << message_queue_.top().first << " -> " - << message_queue_.top().second << ">"; - message_queue_.pop(); - } - while (!message_queue_.empty() && - message_queue_.top().first == expect_timestamp_) { - expect_timestamp_++; - auto ret = message_queue_.top(); - message_queue_.pop(); - return ret; - } - while (!message_queue_.empty() && - message_queue_.top().first < expect_timestamp_) { - LOG(WARNING) << "Drop message: <" << message_queue_.top().first << " -> " - << message_queue_.top().second << ">"; - message_queue_.pop(); - } - LOG(INFO) << "Expect timestamp: " << expect_timestamp_ - << ", but got: " << message_queue_.top().first; + if (message_queue_.empty()) { + return ""; } - return std::make_pair(std::numeric_limits::max(), ""); + std::string payload = message_queue_.top(); + message_queue_.pop(); + return payload; } -std::vector get_all_topic_partitions( - const cppkafka::Configuration& config, const std::string& topic_name) { - std::vector partitions; - cppkafka::Consumer consumer(config); // tmp consumer - auto metadata = - consumer.get_metadata().get_topics({topic_name}).front().get_partitions(); - LOG(INFO) << "metadata: " << metadata.size(); - for (const auto& partition : metadata) { - partitions.push_back(cppkafka::TopicPartition( - topic_name, partition.get_id(), 0)); // from the beginning - } - return partitions; -} - -KafkaWalParser::KafkaWalParser(cppkafka::Configuration config, - const std::string& topic_name) +KafkaWalsParser::KafkaWalsParser(cppkafka::Configuration config, + const std::string& topic_name) : insert_wal_list_(NULL), insert_wal_list_size_(0), last_ts_(0) { auto topic_partitions = get_all_topic_partitions(config, topic_name); - // consumers_.reserve(topic_partitions.size()); - // for (size_t i = 0; i < topic_partitions.size(); ++i) { - // consumers_.emplace_back(std::make_unique(config)); - // consumers_.back()->assign({topic_partitions[i]}); - // } consumer_ = std::make_unique(config); consumer_->assign(topic_partitions); @@ -479,7 +431,6 @@ KafkaWalParser::KafkaWalParser(cppkafka::Configuration config, LOG(INFO) << "No message are polled, the topic has been all consumed."; break; } - LOG(INFO) << "got messages of size: " << msgs.size(); // message_vector_.emplace_back(std::move(msgs)); for (auto& msg : msgs) { if (msg) { @@ -525,22 +476,20 @@ KafkaWalParser::KafkaWalParser(cppkafka::Configuration config, } } -KafkaWalParser::~KafkaWalParser() { +KafkaWalsParser::~KafkaWalsParser() { if (insert_wal_list_ != NULL) { munmap(insert_wal_list_, insert_wal_list_size_ * sizeof(WalContentUnit)); } } -uint32_t KafkaWalParser::last_ts() const { return last_ts_; } -const WalContentUnit& KafkaWalParser::get_insert_wal(uint32_t ts) const { +uint32_t KafkaWalsParser::last_ts() const { return last_ts_; } +const WalContentUnit& KafkaWalsParser::get_insert_wal(uint32_t ts) const { return insert_wal_list_[ts]; } -const std::vector& KafkaWalParser::update_wals() const { +const std::vector& KafkaWalsParser::update_wals() const { return update_wal_list_; } -const WalContentUnit* KafkaWalParser::insert_wal_list() const { - return insert_wal_list_; -} +#endif // BUILD_KAFKA_WAL_WRITER } // namespace gs diff --git a/flex/engines/graph_db/database/wal.h b/flex/engines/graph_db/database/wal.h index 762dc516bd56..29ba548d3b55 100644 --- a/flex/engines/graph_db/database/wal.h +++ b/flex/engines/graph_db/database/wal.h @@ -28,9 +28,7 @@ #include "flex/utils/result.h" -#ifdef ENABLE_KAFKA -#include -#include +#ifdef BUILD_KAFKA_WAL_WRITER #include "cppkafka/cppkafka.h" #endif @@ -93,7 +91,7 @@ class IWalWriter { * @param length The length of the data * */ - virtual bool append(uint32_t ts, const char* data, size_t length) = 0; + virtual bool append(const char* data, size_t length) = 0; }; class LocalWalWriter : public IWalWriter { @@ -107,7 +105,7 @@ class LocalWalWriter : public IWalWriter { void close() override; - bool append(uint32_t ts, const char* data, size_t length) override; + bool append(const char* data, size_t length) override; IWalWriter::WalWriterType type() const override; @@ -118,7 +116,7 @@ class LocalWalWriter : public IWalWriter { size_t file_used_; }; -#ifdef ENABLE_KAFKA +#ifdef BUILD_KAFKA_WAL_WRITER class KafkaWalWriter : public IWalWriter { public: KafkaWalWriter(const std::string& kafka_brokers) @@ -132,7 +130,7 @@ class KafkaWalWriter : public IWalWriter { void close() override; - bool append(uint32_t ts, const char* data, size_t length) override; + bool append(const char* data, size_t length) override; IWalWriter::WalWriterType type() const override; @@ -152,18 +150,16 @@ class IWalsParser { virtual uint32_t last_ts() const = 0; virtual const WalContentUnit& get_insert_wal(uint32_t ts) const = 0; virtual const std::vector& update_wals() const = 0; - virtual const WalContentUnit* insert_wal_list() const = 0; }; -class WalsParser : public IWalsParser { +class LocalWalsParser : public IWalsParser { public: - WalsParser(const std::vector& paths); - ~WalsParser(); + LocalWalsParser(const std::vector& paths); + ~LocalWalsParser(); uint32_t last_ts() const override; const WalContentUnit& get_insert_wal(uint32_t ts) const override; const std::vector& update_wals() const override; - const WalContentUnit* insert_wal_list() const override; private: std::vector fds_; @@ -176,72 +172,71 @@ class WalsParser : public IWalsParser { std::vector update_wal_list_; }; +#ifdef BUILD_KAFKA_WAL_WRITER + /** - * Get all topic partitions for a given topic + * @brief Parse all the WALs from kafka. */ -std::vector get_all_topic_partitions( - const cppkafka::Configuration& config, const std::string& topic_name); +class KafkaWalsParser : public IWalsParser { + public: + static constexpr const std::chrono::milliseconds POLL_TIMEOUT = + std::chrono::milliseconds(100); + static constexpr const size_t MAX_BATCH_SIZE = 1000; + + // always track all partitions and from begining + KafkaWalsParser(cppkafka::Configuration config, + const std::string& topic_name); + ~KafkaWalsParser(); + + uint32_t last_ts() const override; + const WalContentUnit& get_insert_wal(uint32_t ts) const override; + const std::vector& update_wals() const override; + + private: + std::unique_ptr consumer_; + WalContentUnit* insert_wal_list_; + size_t insert_wal_list_size_; + uint32_t last_ts_; + + std::vector update_wal_list_; + std::vector message_vector_; // used to hold the polled messages +}; /** - * @brief The WalConsumer class is used to consume the WAL from kafka. The topic - * could have multiple partitions, and we could use multiple thread to consume - * the WAL from different partitions.(Not necessary to use the same number of - * partitions) + * @brief The KafkaWalConsumer class is used to consume the WAL from kafka. The + * topic could have multiple partitions, and we could use multiple thread to + * consume the WAL from different partitions.(Not necessary to use the same + * number of partitions) * * We assume that the messages in each partition are ordered by the timestamp. */ -class WalConsumer { +class KafkaWalConsumer { public: struct CustomComparator { - inline bool operator()(const std::pair& lhs, - const std::pair& rhs) { - return lhs.first > rhs.first; + inline bool operator()(const std::string& lhs, const std::string& rhs) { + const WalHeader* header1 = reinterpret_cast(lhs.data()); + const WalHeader* header2 = reinterpret_cast(rhs.data()); + return header1->timestamp > header2->timestamp; } }; static constexpr const std::chrono::milliseconds POLL_TIMEOUT = std::chrono::milliseconds(100); // always track all partitions and from begining - WalConsumer(cppkafka::Configuration config, const std::string& topic_name, - int32_t thread_num); + KafkaWalConsumer(cppkafka::Configuration config, + const std::string& topic_name, int32_t thread_num); - std::pair poll(); + std::string poll(); private: bool running; - uint32_t expect_timestamp_; // The expected timestamp of the next message std::vector> consumers_; - std::priority_queue, - std::vector>, - CustomComparator> + std::priority_queue, CustomComparator> message_queue_; }; -/** Consumes offline data in batch. */ -class KafkaWalParser : public IWalsParser { - public: - static constexpr const std::chrono::milliseconds POLL_TIMEOUT = - std::chrono::milliseconds(100); - static constexpr const size_t MAX_BATCH_SIZE = 1000; - - // always track all partitions and from begining - KafkaWalParser(cppkafka::Configuration config, const std::string& topic_name); - ~KafkaWalParser(); - - uint32_t last_ts() const override; - const WalContentUnit& get_insert_wal(uint32_t ts) const override; - const std::vector& update_wals() const override; - const WalContentUnit* insert_wal_list() const override; - - private: - std::unique_ptr consumer_; - WalContentUnit* insert_wal_list_; - size_t insert_wal_list_size_; - uint32_t last_ts_; +#endif // BUILD_KAFKA_WAL_WRITER - std::vector update_wal_list_; - std::vector message_vector_; // used to hold the polled messages -}; } // namespace gs #endif // GRAPHSCOPE_DATABASE_WAL_H_ diff --git a/flex/engines/http_server/actor/admin_actor.act.cc b/flex/engines/http_server/actor/admin_actor.act.cc index cc21ade6bb4f..9133c08e92b2 100644 --- a/flex/engines/http_server/actor/admin_actor.act.cc +++ b/flex/engines/http_server/actor/admin_actor.act.cc @@ -978,10 +978,24 @@ seastar::future admin_actor::start_service( db.Close(); VLOG(10) << "Closed the previous graph db"; gs::GraphDBConfig config(schema_value, data_dir_value, thread_num); - config.kafka_brokers = - graph_db_service.get_service_config().kafka_brokers; - config.kafka_topic = gs::generate_graph_wal_topic( - config.kafka_brokers, graph_name, thread_num); + config.set_wal_writer_type( + graph_db_service.get_service_config().wal_writer_type); + if (graph_db_service.get_service_config().wal_writer_type == + gs::IWalWriter::WalWriterType::kKafka) { +#ifdef BUILD_KAFKA_WAL_WRITER + config.kafka_brokers = + graph_db_service.get_service_config().kafka_brokers; + config.kafka_topic = gs::generate_graph_wal_topic( + config.kafka_brokers, graph_name, thread_num); + } +#else + LOG(ERROR) << "Kafka wal writer is not supported in this build"; + return seastar::make_ready_future( + gs::Result(gs::Status( + gs::StatusCode::INTERNAL_ERROR, + "Kafka wal writer is not supported in this build"))); + } +#endif if (!db.Open(config).ok()) { LOG(ERROR) << "Fail to load graph from data directory: " << data_dir_value; diff --git a/flex/engines/http_server/actor/executor.act.cc b/flex/engines/http_server/actor/executor.act.cc index 268c6b3c8867..312685693da5 100644 --- a/flex/engines/http_server/actor/executor.act.cc +++ b/flex/engines/http_server/actor/executor.act.cc @@ -257,13 +257,9 @@ seastar::future executor::delete_edge( seastar::future executor::ingest_wal(query_param&& param) { // Receive the WAL from the client, and write it to the WAL file. - auto& content = param.content; - std::string_view wal_data(content.data(), content.size()); - LOG(INFO) << "Receive wal of size: " << content.size(); - auto& db = gs::GraphDB::get(); auto& sess = db.GetSession(hiactor::local_shard_id()); - auto result = sess.IngestWals({wal_data}); + auto result = sess.IngestWals(param.content.data(), param.content.size()); if (result.ok()) { LOG(ERROR) << "Ingest WALs successfully"; return seastar::make_ready_future( diff --git a/flex/engines/http_server/graph_db_service.cc b/flex/engines/http_server/graph_db_service.cc index 83eb85dc5f28..81611b6b1979 100644 --- a/flex/engines/http_server/graph_db_service.cc +++ b/flex/engines/http_server/graph_db_service.cc @@ -99,10 +99,15 @@ void openGraph(const gs::GraphId& graph_id, config.set_wal_writer_type(service_config.wal_writer_type); config.memory_level = service_config.memory_level; if (config.wal_writer_type == gs::IWalWriter::WalWriterType::kKafka) { +#ifdef BUILD_KAFKA_WAL_WRITER config.kafka_brokers = service_config.kafka_brokers; config.kafka_topic = gs::generate_graph_wal_topic( service_config.kafka_brokers, graph_id, service_config.shard_num); +#else + LOG(FATAL) << "Kafka wal writer is not enabled in this build"; +#endif } + if (config.memory_level >= 2) { config.enable_auto_compaction = true; } diff --git a/flex/engines/http_server/handler/graph_db_http_handler.cc b/flex/engines/http_server/handler/graph_db_http_handler.cc index f302060d81b3..3555cc5b6fb6 100644 --- a/flex/engines/http_server/handler/graph_db_http_handler.cc +++ b/flex/engines/http_server/handler/graph_db_http_handler.cc @@ -882,7 +882,6 @@ graph_db_http_handler::graph_db_http_handler(uint16_t http_port, running_(false), actors_running_(true) { current_graph_query_handlers_.resize(shard_num); - current_wal_handlers_.resize(shard_num); all_graph_query_handlers_.resize(shard_num); all_wal_handlers_.resize(shard_num); adhoc_query_handlers_.resize(shard_num); @@ -919,10 +918,6 @@ seastar::future<> graph_db_http_handler::stop_query_actors(size_t index) { } return current_graph_query_handlers_[index] ->stop() - .then([this, index] { - LOG(INFO) << "Stopped current query actors on shard id: " << index; - return current_wal_handlers_[index]->stop(); - }) .then([this, index] { LOG(INFO) << "Stopped current query actors on shard id: " << index; return all_graph_query_handlers_[index]->stop(); @@ -965,7 +960,6 @@ void graph_db_http_handler::start_query_actors() { // to start actors, call method on each handler for (size_t i = 0; i < current_graph_query_handlers_.size(); ++i) { current_graph_query_handlers_[i]->start(); - current_wal_handlers_[i]->start(); all_graph_query_handlers_[i]->start(); all_wal_handlers_[i]->start(); for (size_t j = 0; j < vertex_handlers_[i].size(); ++j) { @@ -1012,9 +1006,6 @@ seastar::future<> graph_db_http_handler::set_routes() { current_graph_query_handlers_[hiactor::local_shard_id()] = new stored_proc_handler(ic_query_group_id, max_group_id, group_inc_step, shard_query_concurrency); - current_wal_handlers_[hiactor::local_shard_id()] = - new stored_proc_handler(ic_query_group_id, max_group_id, group_inc_step, - shard_query_concurrency); r.put(seastar::httpd::operation_type::POST, "/v1/graph/current/query", current_graph_query_handlers_[hiactor::local_shard_id()]); diff --git a/flex/engines/http_server/handler/graph_db_http_handler.h b/flex/engines/http_server/handler/graph_db_http_handler.h index 9ef1a4566e7c..2666a09a8374 100644 --- a/flex/engines/http_server/handler/graph_db_http_handler.h +++ b/flex/engines/http_server/handler/graph_db_http_handler.h @@ -145,7 +145,6 @@ class graph_db_http_handler { std::vector current_graph_query_handlers_; std::vector all_graph_query_handlers_; std::vector adhoc_query_handlers_; - std::vector current_wal_handlers_; std::vector all_wal_handlers_; // shard_num * operation time(PUT/GET/POST/DELETE) std::vector> vertex_handlers_; diff --git a/flex/interactive/README.md b/flex/interactive/README.md index 33b3b0762058..f8788e5c0844 100755 --- a/flex/interactive/README.md +++ b/flex/interactive/README.md @@ -1,30 +1,4 @@ # GraphScope Interactive GraphScope Interactive is a specialized construction of [GraphScope Flex](https://github.com/alibaba/GraphScope/tree/main/flex), designed to handle concurrent graph queries at an impressive speed. Its primary goal is to process as many queries as possible within a given timeframe, emphasizing a high query throughput rate. -For the full documentation of GraphScope Interactive, please refer to [GraphScope Interactive Documentation](https://graphscope.io/docs/latest/flex/interactive_intro). - - -## Master Slave Replication - -### Download kafka - -```bash -wget https://dlcdn.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz -tar -xzf kafka_2.13-3.8.0.tgz -cd kafka_2.13-3.8.0 -``` - -### Start kafka with Kraft mode - -```bash -KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)" -bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties -``` - -### Create topics - -```bash -bin/kafka-topics.sh --create --topic graph-0-wal --bootstrap-server localhost:9092 -``` - -### Write events to topic in c++ \ No newline at end of file +For the full documentation of GraphScope Interactive, please refer to [GraphScope Interactive Documentation](https://graphscope.io/docs/latest/flex/interactive_intro). \ No newline at end of file diff --git a/flex/interactive/sdk/examples/python/basic_example.py b/flex/interactive/sdk/examples/python/basic_example.py index 1d7f1936fb71..5e52792c12e0 100644 --- a/flex/interactive/sdk/examples/python/basic_example.py +++ b/flex/interactive/sdk/examples/python/basic_example.py @@ -275,10 +275,10 @@ def addEdge(sess: Session, graph_id: str): driver = Driver() with driver.session() as sess: - # graph_id = createGraph(sess) - # job_id = bulkLoading(sess, graph_id) - # waitJobFinish(sess, job_id) - # print("bulk loading finished") + graph_id = createGraph(sess) + job_id = bulkLoading(sess, graph_id) + waitJobFinish(sess, job_id) + print("bulk loading finished") graph_id = "3" # Now start service on the created graph. @@ -289,44 +289,44 @@ def addEdge(sess: Session, graph_id: str): time.sleep(5) print("restart service on graph ", graph_id) - # running a simple cypher query - # query = "MATCH (n) RETURN COUNT(n);" - # with driver.getNeo4jSession() as session: - # resp = session.run(query) - # for record in resp: - # print(record) - - # # more advanced usage of procedure - # create_proc_request = CreateProcedureRequest( - # name="test_procedure", - # description="test procedure", - # query="MATCH (n) RETURN COUNT(n);", - # type="cypher", - # ) - # resp = sess.create_procedure(graph_id, create_proc_request) - # assert resp.is_ok() - - # get_proc_res = sess.get_procedure(graph_id, "test_procedure") - # assert get_proc_res.is_ok() - # # Check the description of the procedure - # assert get_proc_res.get_value().description == "test procedure" - - # # must start service on the current graph, to let the procedure take effect - # resp = sess.restart_service() - # assert resp.is_ok() - # print("restarted service on graph ", graph_id) - # time.sleep(5) - - # # Now call the procedure - # with driver.getNeo4jSession() as session: - # result = session.run("CALL test_procedure();") - # for record in result: - # print(record) + running a simple cypher query + query = "MATCH (n) RETURN COUNT(n);" + with driver.getNeo4jSession() as session: + resp = session.run(query) + for record in resp: + print(record) + + # more advanced usage of procedure + create_proc_request = CreateProcedureRequest( + name="test_procedure", + description="test procedure", + query="MATCH (n) RETURN COUNT(n);", + type="cypher", + ) + resp = sess.create_procedure(graph_id, create_proc_request) + assert resp.is_ok() + + get_proc_res = sess.get_procedure(graph_id, "test_procedure") + assert get_proc_res.is_ok() + # Check the description of the procedure + assert get_proc_res.get_value().description == "test procedure" + + # must start service on the current graph, to let the procedure take effect + resp = sess.restart_service() + assert resp.is_ok() + print("restarted service on graph ", graph_id) + time.sleep(5) + + # Now call the procedure + with driver.getNeo4jSession() as session: + result = session.run("CALL test_procedure();") + for record in result: + print(record) addVertex(sess, graph_id) - # getVertex(sess, graph_id) - # updateVertex(sess, graph_id) + getVertex(sess, graph_id) + updateVertex(sess, graph_id) - # addEdge(sess, graph_id) - # getEdge(sess, graph_id) - # updateEdge(sess, graph_id) + addEdge(sess, graph_id) + getEdge(sess, graph_id) + updateEdge(sess, graph_id) diff --git a/flex/tests/CMakeLists.txt b/flex/tests/CMakeLists.txt index 4a5c0dbf3f33..d486539a1534 100644 --- a/flex/tests/CMakeLists.txt +++ b/flex/tests/CMakeLists.txt @@ -1,5 +1,5 @@ -# add_subdirectory(hqps) -# add_subdirectory(rt_mutable_graph) -if (ENABLE_KAFKA) +add_subdirectory(hqps) +add_subdirectory(rt_mutable_graph) +if (BUILD_KAFKA_WAL_WRITER) add_subdirectory(wal_writer) endif() \ No newline at end of file diff --git a/flex/tests/hqps/interactive_config_test.yaml b/flex/tests/hqps/interactive_config_test.yaml index 87371fa6d9d8..0e1f9f15ed21 100644 --- a/flex/tests/hqps/interactive_config_test.yaml +++ b/flex/tests/hqps/interactive_config_test.yaml @@ -5,7 +5,7 @@ compute_engine: type: hiactor workers: - localhost:10000 - thread_num_per_worker: 4 + thread_num_per_worker: 1 store: type: cpp-mcsr metadata_store: diff --git a/flex/tests/wal_writer/CMakeLists.txt b/flex/tests/wal_writer/CMakeLists.txt index 0f8d5be25fd0..2a3a629d83e8 100644 --- a/flex/tests/wal_writer/CMakeLists.txt +++ b/flex/tests/wal_writer/CMakeLists.txt @@ -1,2 +1,2 @@ add_executable(wal_writer_test ${CMAKE_CURRENT_SOURCE_DIR}/wal_writer_test.cc) -target_link_libraries(wal_writer_test PUBLIC CppKafka::cppkafka ${LIBGRAPELITE_LIBRARIES} flex_graph_db) \ No newline at end of file +target_link_libraries(wal_writer_test PUBLIC ${CppKafka_LIBRARIES} ${LIBGRAPELITE_LIBRARIES} flex_graph_db) \ No newline at end of file diff --git a/flex/tests/wal_writer/wal_writer_test.cc b/flex/tests/wal_writer/wal_writer_test.cc index 8664d0269955..304eab8f5127 100644 --- a/flex/tests/wal_writer/wal_writer_test.cc +++ b/flex/tests/wal_writer/wal_writer_test.cc @@ -17,7 +17,7 @@ void run(std::vector& writers, const std::string& payload) { for (size_t i = 0; i < writers.size(); ++i) { threads.emplace_back([&writers, i, payload]() { for (int j = 0; j < times; ++j) { - if (!writers[i]->append(j, payload.c_str(), payload.size())) { + if (!writers[i]->append(payload.c_str(), payload.size())) { std::cerr << "Failed to append message to kafka" << std::endl; } if (j % 10000 == 0) { diff --git a/flex/utils/CMakeLists.txt b/flex/utils/CMakeLists.txt index 990130a807fc..4da3916c18b5 100644 --- a/flex/utils/CMakeLists.txt +++ b/flex/utils/CMakeLists.txt @@ -69,7 +69,7 @@ target_include_directories(flex_utils PUBLIC $ $) # Link the static library of arrow, to save the trouble of linking the shared library of arrow -target_link_libraries(flex_utils ${ARROW_LIB} ${YAML_CPP_LIBRARIES} ${Boost_LIBRARIES} ${GLOG_LIBRARIES} CppKafka::cppkafka) +target_link_libraries(flex_utils ${ARROW_LIB} ${YAML_CPP_LIBRARIES} ${Boost_LIBRARIES} ${GLOG_LIBRARIES}) install_flex_target(flex_utils) install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} diff --git a/flex/utils/property/column.h b/flex/utils/property/column.h index 3142c99fd84d..a556857e77cc 100644 --- a/flex/utils/property/column.h +++ b/flex/utils/property/column.h @@ -189,8 +189,7 @@ class TypedColumn : public ColumnBase { } else if (index < basic_size_) { basic_buffer_.set(index, val); } else { - LOG(FATAL) << "Index out of range: " << index << " vs " << size() - << ": value: " << val; + throw std::runtime_error("Index out of range"); } }