From fbe89414feb888a208ac9657fc8b16efc241dbf3 Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Fri, 25 Oct 2024 16:15:04 +0800 Subject: [PATCH] implement wal ingestion and wal consuming Committed-by: xiaolei.zl from Dev container --- flex/CMakeLists.txt | 8 +- flex/bin/CMakeLists.txt | 2 +- flex/bin/wal_consumer.cc | 128 ++---------- flex/engines/graph_db/database/graph_db.cc | 50 ++++- flex/engines/graph_db/database/graph_db.h | 5 +- .../graph_db/database/insert_transaction.cc | 4 +- .../single_vertex_insert_transaction.cc | 2 +- flex/engines/graph_db/database/wal.cc | 190 ++++++++++++++++-- flex/engines/graph_db/database/wal.h | 94 ++++++++- .../mutable_property_fragment.cc | 2 +- flex/tests/wal_writer/CMakeLists.txt | 2 +- flex/tests/wal_writer/wal_writer_test.cc | 92 +++++++-- 12 files changed, 404 insertions(+), 175 deletions(-) diff --git a/flex/CMakeLists.txt b/flex/CMakeLists.txt index e67501132d5b..95020ae1c6f6 100644 --- a/flex/CMakeLists.txt +++ b/flex/CMakeLists.txt @@ -64,10 +64,16 @@ if (ENABLE_KAFKA) message(STATUS "cppkafka found, build without third_party/cppkafka") else () add_subdirectory(third_party/cppkafka) + # if cppkafka/CMakeLists.txt not exits, tell user to run git submodule update --init --recursive + if (NOT EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka/CMakeLists.txt) + message(FATAL_ERROR "cppkafka not found, please run git submodule update --init --recursive") + endif () include_directories(SYSTEM ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka/include) set (CppKafka_INSTALL_DIR ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka/build) list (APPEND CMAKE_PREFIX_PATH ${CppKafka_INSTALL_DIR}) - find_package (CppKafka CONFIG REQUIRED) + # find_package (CppKafka CONFIG REQUIRED) + # alias target + add_library(CppKafka::cppkafka ALIAS cppkafka) endif () add_definitions(-DENABLE_KAFKA) endif() diff --git a/flex/bin/CMakeLists.txt b/flex/bin/CMakeLists.txt index 76f0da563bcb..88d7ae683863 100644 --- a/flex/bin/CMakeLists.txt +++ b/flex/bin/CMakeLists.txt @@ -47,4 +47,4 @@ target_link_libraries(stored_procedure_runner flex_rt_mutable_graph flex_utils f install_without_export_flex_target(stored_procedure_runner) add_executable(wal_consumer wal_consumer.cc) -target_link_libraries(wal_consumer PUBLIC CppKafka::cppkafka ${Boost_LIBRARIES} ${GLOG_LIBRARIES}) \ No newline at end of file +target_link_libraries(wal_consumer PUBLIC CppKafka::cppkafka ${Boost_LIBRARIES} ${GLOG_LIBRARIES} flex_graph_db) \ No newline at end of file diff --git a/flex/bin/wal_consumer.cc b/flex/bin/wal_consumer.cc index d4f7f012679a..fa9e5b2d5dcb 100644 --- a/flex/bin/wal_consumer.cc +++ b/flex/bin/wal_consumer.cc @@ -22,6 +22,7 @@ #include #include "cppkafka/cppkafka.h" +#include "flex/engines/graph_db/database/wal.h" #include "flex/third_party/httplib.h" namespace gs { @@ -60,120 +61,6 @@ class WalSender { std::string req_url_; }; -// WalConsumer consumes from multiple partitions of a topic, and can start from -// different offsets. -// It use a priority queue to store the messages, and the messages are ordered -// by the time_stamp. -class WalConsumer { - public: - struct CustomComparator { - bool operator()(const std::pair& lhs, - const std::pair& rhs) { - return lhs.first > rhs.first; - } - }; - static constexpr const std::chrono::milliseconds POLL_TIMEOUT = - std::chrono::milliseconds(1000); - // always track all partitions and from begining - WalConsumer(cppkafka::Configuration config, const std::string& topic_name, - WalSender&& sender) - : running(true), expect_timestamp_(1), sender_(std::move(sender)) { - auto topic_partitions = get_all_topic_partitions(config, topic_name); - consumers_.reserve(topic_partitions.size()); - for (size_t i = 0; i < topic_partitions.size(); ++i) { - consumers_.emplace_back(std::make_unique(config)); - consumers_.back()->assign({topic_partitions[i]}); - } - } - - void poll() { - while (running) { - for (auto& consumer : consumers_) { - auto msg = consumer->poll(POLL_TIMEOUT); - if (msg) { - if (msg.get_error()) { - if (!msg.is_eof()) { - LOG(INFO) << "[+] Received error notification: " - << msg.get_error(); - } - } else { - uint32_t key = - atoi(static_cast(msg.get_key()).c_str()); - std::string payload = msg.get_payload(); - LOG(INFO) << "receive from partition " << msg.get_partition() - << ", key: " << key << ", payload: " << payload; - message_queue_.push(std::make_pair(key, payload)); - } - } - } - // Check the message queue, if the top message is the expected message, - // send it to the engine. Otherwise, wait for the expected message. - if (!message_queue_.empty()) { - while (!message_queue_.empty() && - message_queue_.top().first < expect_timestamp_) { - LOG(WARNING) << "Drop message: <" << message_queue_.top().first - << " -> " << message_queue_.top().second << ">"; - message_queue_.pop(); - } - while (!message_queue_.empty() && - message_queue_.top().first == expect_timestamp_) { - send_to_engine(message_queue_.top()); - // send to engine - message_queue_.pop(); - expect_timestamp_++; - } - while (!message_queue_.empty() && - message_queue_.top().first < expect_timestamp_) { - LOG(WARNING) << "Drop message: <" << message_queue_.top().first - << " -> " << message_queue_.top().second << ">"; - message_queue_.pop(); - } - LOG(INFO) << "Expect timestamp: " << expect_timestamp_ - << ", but got: " << message_queue_.top().first; - } else { - LOG(INFO) << "No message in the queue, wait for the next message..."; - } - // sleep(1); - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - } - - void terminate() { running = false; } - - private: - std::vector get_all_topic_partitions( - const cppkafka::Configuration& config, const std::string& topic_name) { - std::vector partitions; - cppkafka::Consumer consumer(config); // tmp consumer - auto metadata = consumer.get_metadata() - .get_topics({topic_name}) - .front() - .get_partitions(); - LOG(INFO) << "metadata: " << metadata.size(); - for (const auto& partition : metadata) { - partitions.push_back(cppkafka::TopicPartition( - topic_name, partition.get_id(), 0)); // from the beginning - } - return partitions; - } - - void send_to_engine(const std::pair& message) { - // send to engine - LOG(INFO) << "Send to engine: <" << message.first << " -> " - << message.second << ">" - << ", timestamp: " << expect_timestamp_; - sender_.send(message.second); - } - - bool running; - uint32_t expect_timestamp_; // The expected timestamp of the next message - WalSender&& sender_; - std::vector> consumers_; - std::priority_queue, - std::vector>, - CustomComparator> - message_queue_; -}; } // namespace gs namespace bpo = boost::program_options; @@ -228,7 +115,7 @@ int main(int argc, char** argv) { // Create the consumer gs::WalSender sender(vm["engine-url"].as(), vm["engine-port"].as(), graph_id); - gs::WalConsumer consumer(config, topic_name, std::move(sender)); + gs::WalConsumer consumer(config, topic_name, 1); // signal(SIGINT, [](int) { consumer.terminate(); }); @@ -237,7 +124,16 @@ int main(int argc, char** argv) { return 0; } - consumer.poll(); + while (true) { + auto msg = consumer.poll(); + if (msg.first == std::numeric_limits::max()) { + continue; + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + LOG(INFO) << "Received message: <" << msg.first << " -> " << msg.second + << ">"; + sender.send(msg.second); + } LOG(INFO) << "Consuming messages from topic " << topic_name; diff --git a/flex/engines/graph_db/database/graph_db.cc b/flex/engines/graph_db/database/graph_db.cc index 13baa14a4500..51de0a8b70a2 100644 --- a/flex/engines/graph_db/database/graph_db.cc +++ b/flex/engines/graph_db/database/graph_db.cc @@ -295,8 +295,6 @@ const GraphDBSession& GraphDB::GetSession(int thread_id) const { int GraphDB::SessionNum() const { return thread_num_; } -const std::string& GraphDB::GetKafkaBrokers() const { return kafka_brokers_; } - void GraphDB::UpdateCompactionTimestamp(timestamp_t ts) { last_compaction_ts_ = ts; } @@ -351,8 +349,9 @@ void GraphDB::GetAppInfo(Encoder& output) { static void IngestWalRange(SessionLocalContext* contexts, MutablePropertyFragment& graph, - const WalsParser& parser, uint32_t from, uint32_t to, - int thread_num) { + const WalContentUnit* insert_wal_list, uint32_t from, + uint32_t to, int thread_num) { + LOG(INFO) << "IngestWalRange, from " << from << ", to " << to; std::atomic cur_ts(from); std::vector threads(thread_num); for (int i = 0; i < thread_num; ++i) { @@ -364,7 +363,7 @@ static void IngestWalRange(SessionLocalContext* contexts, if (got_ts >= to) { break; } - const auto& unit = parser.get_insert_wal(got_ts); + const auto& unit = insert_wal_list[got_ts]; InsertTransaction::IngestWal(graph, got_ts, unit.ptr, unit.size, alloc); if (got_ts % 1000000 == 0) { @@ -394,7 +393,8 @@ void GraphDB::ingestWalsFromLocalFiles(const std::string& wal_dir_path, for (auto& update_wal : parser.update_wals()) { uint32_t to_ts = update_wal.timestamp; if (from_ts < to_ts) { - IngestWalRange(contexts_, graph_, parser, from_ts, to_ts, thread_num); + IngestWalRange(contexts_, graph_, parser.insert_wal_list(), from_ts, + to_ts, thread_num); } if (update_wal.size == 0) { graph_.Compact(update_wal.timestamp); @@ -406,15 +406,42 @@ void GraphDB::ingestWalsFromLocalFiles(const std::string& wal_dir_path, from_ts = to_ts + 1; } if (from_ts <= parser.last_ts()) { - IngestWalRange(contexts_, graph_, parser, from_ts, parser.last_ts() + 1, - thread_num); + IngestWalRange(contexts_, graph_, parser.insert_wal_list(), from_ts, + parser.last_ts() + 1, thread_num); } version_manager_.init_ts(parser.last_ts(), thread_num); } -void GraphDB::ingestWalsFromKafka(const std::string& kafka_topic, +void GraphDB::ingestWalsFromKafka(const std::string& kafka_brokers, + const std::string& kafka_topic, const std::string& work_dir, int thread_num) { - LOG(WARNING) << "Kafka ingestion is not supported yet"; + cppkafka::Configuration config = {{"metadata.broker.list", kafka_brokers}, + {"group.id", "primary_group"}, + // Disable auto commit + {"enable.auto.commit", false}}; + KafkaWalParser parser(config, kafka_topic); + uint32_t from_ts = 1; + for (auto& update_wal : parser.update_wals()) { + uint32_t to_ts = update_wal.timestamp; + if (from_ts < to_ts) { + IngestWalRange(contexts_, graph_, parser.insert_wal_list(), from_ts, + to_ts, thread_num); + } + if (update_wal.size == 0) { + graph_.Compact(update_wal.timestamp); + last_compaction_ts_ = update_wal.timestamp; + } else { + UpdateTransaction::IngestWal(graph_, work_dir, to_ts, update_wal.ptr, + update_wal.size, contexts_[0].allocator); + } + from_ts = to_ts + 1; + } + if (from_ts <= parser.last_ts()) { + IngestWalRange(contexts_, graph_, parser.insert_wal_list(), from_ts, + parser.last_ts() + 1, thread_num); + } + + version_manager_.init_ts(parser.last_ts(), thread_num); } void GraphDB::initApps( @@ -483,7 +510,8 @@ void GraphDB::openWalAndCreateContexts(const std::string& data_dir, std::make_unique(config.kafka_brokers)); } // ingest wals from kafka - ingestWalsFromKafka(config.kafka_topic, data_dir, thread_num_); + ingestWalsFromKafka(config.kafka_brokers, config.kafka_topic, data_dir, + thread_num_); for (int i = 0; i < thread_num_; ++i) { contexts_[i].logger->open(config.kafka_topic, i); diff --git a/flex/engines/graph_db/database/graph_db.h b/flex/engines/graph_db/database/graph_db.h index 42f176887e6f..dd646be6c495 100644 --- a/flex/engines/graph_db/database/graph_db.h +++ b/flex/engines/graph_db/database/graph_db.h @@ -170,7 +170,8 @@ class GraphDB { void ingestWalsFromLocalFiles(const std::string& wal_dir, const std::string& work_dir, int thread_num); - void ingestWalsFromKafka(const std::string& kafka_topic, + void ingestWalsFromKafka(const std::string& kafka_brokers, + const std::string& kafka_topic, const std::string& work_dir, int thread_num); void initApps( @@ -204,8 +205,6 @@ class GraphDB { timestamp_t last_compaction_ts_; bool compact_thread_running_ = false; std::thread compact_thread_; - - std::string kafka_brokers_; }; } // namespace gs diff --git a/flex/engines/graph_db/database/insert_transaction.cc b/flex/engines/graph_db/database/insert_transaction.cc index 7e43f9bd7d48..a454981df7af 100644 --- a/flex/engines/graph_db/database/insert_transaction.cc +++ b/flex/engines/graph_db/database/insert_transaction.cc @@ -150,8 +150,8 @@ void InsertTransaction::Commit() { header->timestamp = timestamp_; logger_.append(timestamp_, arc_.GetBuffer(), arc_.GetSize()); - // IngestWal(graph_, timestamp_, arc_.GetBuffer() + sizeof(WalHeader), - // header->length, alloc_); + IngestWal(graph_, timestamp_, arc_.GetBuffer() + sizeof(WalHeader), + header->length, alloc_); vm_.release_insert_timestamp(timestamp_); clear(); diff --git a/flex/engines/graph_db/database/single_vertex_insert_transaction.cc b/flex/engines/graph_db/database/single_vertex_insert_transaction.cc index f1b690ecb06a..c421ae0a731f 100644 --- a/flex/engines/graph_db/database/single_vertex_insert_transaction.cc +++ b/flex/engines/graph_db/database/single_vertex_insert_transaction.cc @@ -165,7 +165,7 @@ void SingleVertexInsertTransaction::Commit() { header->timestamp = timestamp_; logger_.append(timestamp_, arc_.GetBuffer(), arc_.GetSize()); - // ingestWal(); + ingestWal(); vm_.release_insert_timestamp(timestamp_); clear(); diff --git a/flex/engines/graph_db/database/wal.cc b/flex/engines/graph_db/database/wal.cc index 00a92062282c..825a268fda1d 100644 --- a/flex/engines/graph_db/database/wal.cc +++ b/flex/engines/graph_db/database/wal.cc @@ -254,7 +254,9 @@ void KafkaWalWriter::open(const std::string& topic, int thread_id) { } kafka_topic_ = topic; cppkafka::Configuration config = {{"metadata.broker.list", kafka_brokers_}}; - producer_ = std::make_shared(config); + producer_ = + std::make_shared>(config); + builder_.topic(kafka_topic_).partition(thread_id_); } else { LOG(FATAL) << "Kafka brokers is empty"; } @@ -270,15 +272,12 @@ void KafkaWalWriter::close() { } bool KafkaWalWriter::append(uint32_t ts, const char* data, size_t length) { - // send to kafka - std::string message(data, length); try { std::string key = std::to_string(ts); - producer_->produce(cppkafka::MessageBuilder(kafka_topic_) - .partition(thread_id_) - .key(key) - .payload(message)); - producer_->flush(); + // cppkafka::Buffer key(reinterpret_cast(&ts), + // sizeof(uint32_t)); + producer_->sync_produce(builder_.key(key).payload({data, length})); + producer_->flush(true); } catch (const cppkafka::HandleException& e) { LOG(ERROR) << "Failed to send to kafka: " << e.what(); return false; @@ -293,8 +292,6 @@ IWalWriter::WalWriterType KafkaWalWriter::type() const { } #endif -static constexpr size_t MAX_WALS_NUM = 134217728; - WalsParser::WalsParser(const std::vector& paths) : insert_wal_list_(NULL), insert_wal_list_size_(0) { for (auto path : paths) { @@ -320,9 +317,10 @@ WalsParser::WalsParser(const std::vector& paths) insert_wal_list_size_ = 0; } insert_wal_list_ = static_cast( - mmap(NULL, MAX_WALS_NUM * sizeof(WalContentUnit), PROT_READ | PROT_WRITE, - MAP_PRIVATE | MAP_ANONYMOUS | MAP_NORESERVE, -1, 0)); - insert_wal_list_size_ = MAX_WALS_NUM; + mmap(NULL, IWalWriter::MAX_WALS_NUM * sizeof(WalContentUnit), + PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS | MAP_NORESERVE, + -1, 0)); + insert_wal_list_size_ = IWalWriter::MAX_WALS_NUM; for (size_t i = 0; i < mmapped_ptrs_.size(); ++i) { char* ptr = static_cast(mmapped_ptrs_[i]); while (true) { @@ -379,4 +377,170 @@ const std::vector& WalsParser::update_wals() const { return update_wal_list_; } +const WalContentUnit* WalsParser::insert_wal_list() const { + return insert_wal_list_; +} + +// WalConsumer consumes from multiple partitions of a topic, and can start from +// the beginning or from the latest message. +WalConsumer::WalConsumer(cppkafka::Configuration config, + const std::string& topic_name, int32_t thread_num) + : running(true), expect_timestamp_(1) { + auto topic_partitions = get_all_topic_partitions(config, topic_name); + consumers_.reserve(topic_partitions.size()); + for (size_t i = 0; i < topic_partitions.size(); ++i) { + consumers_.emplace_back(std::make_unique(config)); + consumers_.back()->assign({topic_partitions[i]}); + } +} + +std::pair WalConsumer::poll() { + for (auto& consumer : consumers_) { + auto msg = consumer->poll(); + if (msg) { + if (msg.get_error()) { + if (!msg.is_eof()) { + LOG(INFO) << "[+] Received error notification: " << msg.get_error(); + } + } else { + uint32_t key = atoi(static_cast(msg.get_key()).c_str()); + std::string payload = msg.get_payload(); + LOG(INFO) << "receive from partition " << msg.get_partition() + << ", key: " << key << ", payload: " << payload + << " size: " << payload.size(); + message_queue_.push(std::make_pair(key, payload)); + // consumer->commit(msg); + } + } + } + // Check the message queue, if the top message is the expected message, + // send it to the engine. Otherwise, wait for the expected message. + if (!message_queue_.empty()) { + while (!message_queue_.empty() && + message_queue_.top().first < expect_timestamp_) { + LOG(WARNING) << "Drop message: <" << message_queue_.top().first << " -> " + << message_queue_.top().second << ">"; + message_queue_.pop(); + } + while (!message_queue_.empty() && + message_queue_.top().first == expect_timestamp_) { + expect_timestamp_++; + auto ret = message_queue_.top(); + message_queue_.pop(); + return ret; + } + while (!message_queue_.empty() && + message_queue_.top().first < expect_timestamp_) { + LOG(WARNING) << "Drop message: <" << message_queue_.top().first << " -> " + << message_queue_.top().second << ">"; + message_queue_.pop(); + } + LOG(INFO) << "Expect timestamp: " << expect_timestamp_ + << ", but got: " << message_queue_.top().first; + } + return std::make_pair(std::numeric_limits::max(), ""); +} + +std::vector get_all_topic_partitions( + const cppkafka::Configuration& config, const std::string& topic_name) { + std::vector partitions; + cppkafka::Consumer consumer(config); // tmp consumer + auto metadata = + consumer.get_metadata().get_topics({topic_name}).front().get_partitions(); + LOG(INFO) << "metadata: " << metadata.size(); + for (const auto& partition : metadata) { + partitions.push_back(cppkafka::TopicPartition( + topic_name, partition.get_id(), 0)); // from the beginning + } + return partitions; +} + +KafkaWalParser::KafkaWalParser(cppkafka::Configuration config, + const std::string& topic_name) + : insert_wal_list_(NULL), insert_wal_list_size_(0), last_ts_(0) { + auto topic_partitions = get_all_topic_partitions(config, topic_name); + // consumers_.reserve(topic_partitions.size()); + // for (size_t i = 0; i < topic_partitions.size(); ++i) { + // consumers_.emplace_back(std::make_unique(config)); + // consumers_.back()->assign({topic_partitions[i]}); + // } + consumer_ = std::make_unique(config); + consumer_->assign(topic_partitions); + + insert_wal_list_ = static_cast( + mmap(NULL, IWalWriter::MAX_WALS_NUM * sizeof(WalContentUnit), + PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS | MAP_NORESERVE, + -1, 0)); + insert_wal_list_size_ = IWalWriter::MAX_WALS_NUM; + + while (true) { + auto msgs = consumer_->poll_batch(MAX_BATCH_SIZE); + if (msgs.empty() || msgs.empty()) { + LOG(INFO) << "No message are polled, the topic has been all consumed."; + break; + } + LOG(INFO) << "got messages of size: " << msgs.size(); + // message_vector_.emplace_back(std::move(msgs)); + for (auto& msg : msgs) { + if (msg) { + if (msg.get_error()) { + if (!msg.is_eof()) { + LOG(INFO) << "[+] Received error notification: " << msg.get_error(); + } + } else { + message_vector_.emplace_back(msg.get_payload()); + const char* payload = message_vector_.back().data(); + const WalHeader* header = reinterpret_cast(payload); + uint32_t cur_ts = header->timestamp; + if (cur_ts == 0) { + LOG(WARNING) << "Invalid timestamp 0, skip"; + continue; + } + int length = header->length; + if (header->type) { + UpdateWalUnit unit; + unit.timestamp = cur_ts; + unit.ptr = const_cast(payload + sizeof(WalHeader)); + unit.size = length; + update_wal_list_.push_back(unit); + } else { + if (insert_wal_list_[cur_ts].ptr) { + LOG(WARNING) << "Duplicated timestamp " << cur_ts << ", skip"; + } + insert_wal_list_[cur_ts].ptr = + const_cast(payload + sizeof(WalHeader)); + insert_wal_list_[cur_ts].size = length; + } + last_ts_ = std::max(cur_ts, last_ts_); + } + } + } + } + LOG(INFO) << "last_ts: " << last_ts_; + if (!update_wal_list_.empty()) { + std::sort(update_wal_list_.begin(), update_wal_list_.end(), + [](const UpdateWalUnit& lhs, const UpdateWalUnit& rhs) { + return lhs.timestamp < rhs.timestamp; + }); + } +} + +KafkaWalParser::~KafkaWalParser() { + if (insert_wal_list_ != NULL) { + munmap(insert_wal_list_, insert_wal_list_size_ * sizeof(WalContentUnit)); + } +} + +uint32_t KafkaWalParser::last_ts() const { return last_ts_; } +const WalContentUnit& KafkaWalParser::get_insert_wal(uint32_t ts) const { + return insert_wal_list_[ts]; +} +const std::vector& KafkaWalParser::update_wals() const { + return update_wal_list_; +} + +const WalContentUnit* KafkaWalParser::insert_wal_list() const { + return insert_wal_list_; +} + } // namespace gs diff --git a/flex/engines/graph_db/database/wal.h b/flex/engines/graph_db/database/wal.h index 4498de994e27..762dc516bd56 100644 --- a/flex/engines/graph_db/database/wal.h +++ b/flex/engines/graph_db/database/wal.h @@ -66,6 +66,7 @@ class IWalWriter { kLocal = 0, kKafka = 1, }; + static constexpr size_t MAX_WALS_NUM = 134217728; static inline WalWriterType parseWalWriterType(const std::string& type_str) { if (type_str == "local") { return WalWriterType::kLocal; @@ -121,7 +122,10 @@ class LocalWalWriter : public IWalWriter { class KafkaWalWriter : public IWalWriter { public: KafkaWalWriter(const std::string& kafka_brokers) - : thread_id_(-1), kafka_brokers_(kafka_brokers), kafka_topic_("") {} + : thread_id_(-1), + kafka_brokers_(kafka_brokers), + kafka_topic_(""), + builder_("") {} ~KafkaWalWriter() { close(); } void open(const std::string& kafka_topic, int thread_id) override; @@ -136,18 +140,30 @@ class KafkaWalWriter : public IWalWriter { int thread_id_; std::string kafka_brokers_; std::string kafka_topic_; - std::shared_ptr producer_; + std::shared_ptr> producer_; + cppkafka::MessageBuilder builder_; }; #endif -class WalsParser { +class IWalsParser { + public: + virtual ~IWalsParser() {} + + virtual uint32_t last_ts() const = 0; + virtual const WalContentUnit& get_insert_wal(uint32_t ts) const = 0; + virtual const std::vector& update_wals() const = 0; + virtual const WalContentUnit* insert_wal_list() const = 0; +}; + +class WalsParser : public IWalsParser { public: WalsParser(const std::vector& paths); ~WalsParser(); - uint32_t last_ts() const; - const WalContentUnit& get_insert_wal(uint32_t ts) const; - const std::vector& update_wals() const; + uint32_t last_ts() const override; + const WalContentUnit& get_insert_wal(uint32_t ts) const override; + const std::vector& update_wals() const override; + const WalContentUnit* insert_wal_list() const override; private: std::vector fds_; @@ -160,6 +176,72 @@ class WalsParser { std::vector update_wal_list_; }; +/** + * Get all topic partitions for a given topic + */ +std::vector get_all_topic_partitions( + const cppkafka::Configuration& config, const std::string& topic_name); + +/** + * @brief The WalConsumer class is used to consume the WAL from kafka. The topic + * could have multiple partitions, and we could use multiple thread to consume + * the WAL from different partitions.(Not necessary to use the same number of + * partitions) + * + * We assume that the messages in each partition are ordered by the timestamp. + */ +class WalConsumer { + public: + struct CustomComparator { + inline bool operator()(const std::pair& lhs, + const std::pair& rhs) { + return lhs.first > rhs.first; + } + }; + static constexpr const std::chrono::milliseconds POLL_TIMEOUT = + std::chrono::milliseconds(100); + + // always track all partitions and from begining + WalConsumer(cppkafka::Configuration config, const std::string& topic_name, + int32_t thread_num); + + std::pair poll(); + + private: + bool running; + uint32_t expect_timestamp_; // The expected timestamp of the next message + std::vector> consumers_; + std::priority_queue, + std::vector>, + CustomComparator> + message_queue_; +}; + +/** Consumes offline data in batch. */ +class KafkaWalParser : public IWalsParser { + public: + static constexpr const std::chrono::milliseconds POLL_TIMEOUT = + std::chrono::milliseconds(100); + static constexpr const size_t MAX_BATCH_SIZE = 1000; + + // always track all partitions and from begining + KafkaWalParser(cppkafka::Configuration config, const std::string& topic_name); + ~KafkaWalParser(); + + uint32_t last_ts() const override; + const WalContentUnit& get_insert_wal(uint32_t ts) const override; + const std::vector& update_wals() const override; + const WalContentUnit* insert_wal_list() const override; + + private: + std::unique_ptr consumer_; + WalContentUnit* insert_wal_list_; + size_t insert_wal_list_size_; + uint32_t last_ts_; + + std::vector update_wal_list_; + std::vector message_vector_; // used to hold the polled messages +}; } // namespace gs #endif // GRAPHSCOPE_DATABASE_WAL_H_ diff --git a/flex/storages/rt_mutable_graph/mutable_property_fragment.cc b/flex/storages/rt_mutable_graph/mutable_property_fragment.cc index 53c14cf6ea49..1cb2c329efd5 100644 --- a/flex/storages/rt_mutable_graph/mutable_property_fragment.cc +++ b/flex/storages/rt_mutable_graph/mutable_property_fragment.cc @@ -193,7 +193,7 @@ void MutablePropertyFragment::Open(const std::string& work_dir, // We will reserve the at least 4096 slots for each vertex label size_t vertex_capacity = - std::max(schema_.get_max_vnum(v_label_name), (size_t) 4096); + std::max(lf_indexers_[i].capacity(), (size_t) 4096); if (vertex_capacity >= lf_indexers_[i].size()) { lf_indexers_[i].reserve(vertex_capacity); } diff --git a/flex/tests/wal_writer/CMakeLists.txt b/flex/tests/wal_writer/CMakeLists.txt index 01f437cbf881..0f8d5be25fd0 100644 --- a/flex/tests/wal_writer/CMakeLists.txt +++ b/flex/tests/wal_writer/CMakeLists.txt @@ -1,2 +1,2 @@ add_executable(wal_writer_test ${CMAKE_CURRENT_SOURCE_DIR}/wal_writer_test.cc) -target_link_libraries(wal_writer_test PUBLIC CppKafka::cppkafka) \ No newline at end of file +target_link_libraries(wal_writer_test PUBLIC CppKafka::cppkafka ${LIBGRAPELITE_LIBRARIES} flex_graph_db) \ No newline at end of file diff --git a/flex/tests/wal_writer/wal_writer_test.cc b/flex/tests/wal_writer/wal_writer_test.cc index 8cb53fa76970..8664d0269955 100644 --- a/flex/tests/wal_writer/wal_writer_test.cc +++ b/flex/tests/wal_writer/wal_writer_test.cc @@ -3,32 +3,86 @@ #include #include +#include "flex/engines/graph_db/database/wal.h" +#include "grape/grape.h" using namespace std; using namespace cppkafka; -int main(int argc, char** argv) { - Configuration config = {{"metadata.broker.list", "127.0.0.1:9092"}}; +const int times = 100000; + +void run(std::vector& writers, const std::string& payload) { + std::vector threads; + double t = -grape::GetCurrentTime(); + for (size_t i = 0; i < writers.size(); ++i) { + threads.emplace_back([&writers, i, payload]() { + for (int j = 0; j < times; ++j) { + if (!writers[i]->append(j, payload.c_str(), payload.size())) { + std::cerr << "Failed to append message to kafka" << std::endl; + } + if (j % 10000 == 0) { + std::cout << "Producing " << j << " messages" << std::endl; + } + } + }); + } + for (auto& thrd : threads) { + thrd.join(); + } + t += grape::GetCurrentTime(); + std::cout << "Producing " << writers.size() * 100 + << " messages to kafka took " << t << " seconds" << std::endl; +} + +void test_local_wal_writer(const std::string& topic_name, int thread_num, + const std::string& payload) { + std::vector kafka_writers; + for (int i = 0; i < thread_num; ++i) { + kafka_writers.emplace_back(new gs::LocalWalWriter()); + } + for (int i = 0; i < thread_num; ++i) { + std::string dst_path = "/tmp/"; + // check whether files exist + kafka_writers[i]->open(dst_path, i); + } + run(kafka_writers, payload); +} - if (argc != 5) { - cerr << "Usage: " << argv[0] << " " +void test_kafka_wal_writer(const std::string& topic_name, int thread_num, + const std::string& brokers, + const std::string& payload) { + // auto kafka_writer = std::make_unique(config); + std::vector kafka_writers; + for (int i = 0; i < thread_num; ++i) { + kafka_writers.emplace_back(new gs::KafkaWalWriter(brokers)); + } + for (int i = 0; i < thread_num; ++i) { + kafka_writers[i]->open(topic_name, i); + } + run(kafka_writers, payload); +} + +int main(int argc, char** argv) { + if (argc != 4) { + cerr << "Usage: " << argv[0] << " " << endl; return 1; } - std::string topic_name = argv[1]; - int partition = std::stoi(argv[2]); - std::string key = argv[3]; - std::string value = argv[4]; - std::cout << "Producing message to topic " << topic_name << " partition " - << partition << " key " << key << " value " << value << std::endl; - - // Create the producer - Producer producer(config); - - // Produce a message! - producer.produce( - MessageBuilder(topic_name).partition(partition).key(key).payload(value)); - producer.flush(); - std::cout << "Message produced" << std::endl; + std::string type = argv[1]; + std::string topic_name = argv[2]; + int thread_num = std::stoi(argv[3]); + std::cout << "Producing message to topic " << topic_name << " thread num " + << thread_num << std::endl; + std::stringstream ss; + for (size_t i = 0; i < 50; ++i) { + ss << "hello world " << i << std::endl; + } + std::string payload = ss.str(); + if (type == "local") { + test_local_wal_writer(topic_name, thread_num, payload); + } else { + test_kafka_wal_writer(topic_name, thread_num, "localhost:9092", payload); + } + return 0; } \ No newline at end of file