Skip to content

Commit

Permalink
implement wal ingestion and wal consuming
Browse files Browse the repository at this point in the history
Committed-by: xiaolei.zl from Dev container
  • Loading branch information
zhanglei1949 committed Oct 25, 2024
1 parent ad47f00 commit fbe8941
Show file tree
Hide file tree
Showing 12 changed files with 404 additions and 175 deletions.
8 changes: 7 additions & 1 deletion flex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,16 @@ if (ENABLE_KAFKA)
message(STATUS "cppkafka found, build without third_party/cppkafka")
else ()
add_subdirectory(third_party/cppkafka)
# if cppkafka/CMakeLists.txt not exits, tell user to run git submodule update --init --recursive
if (NOT EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka/CMakeLists.txt)
message(FATAL_ERROR "cppkafka not found, please run git submodule update --init --recursive")
endif ()
include_directories(SYSTEM ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka/include)
set (CppKafka_INSTALL_DIR ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka/build)
list (APPEND CMAKE_PREFIX_PATH ${CppKafka_INSTALL_DIR})
find_package (CppKafka CONFIG REQUIRED)
# find_package (CppKafka CONFIG REQUIRED)
# alias target
add_library(CppKafka::cppkafka ALIAS cppkafka)
endif ()
add_definitions(-DENABLE_KAFKA)
endif()
Expand Down
2 changes: 1 addition & 1 deletion flex/bin/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,4 @@ target_link_libraries(stored_procedure_runner flex_rt_mutable_graph flex_utils f
install_without_export_flex_target(stored_procedure_runner)

add_executable(wal_consumer wal_consumer.cc)
target_link_libraries(wal_consumer PUBLIC CppKafka::cppkafka ${Boost_LIBRARIES} ${GLOG_LIBRARIES})
target_link_libraries(wal_consumer PUBLIC CppKafka::cppkafka ${Boost_LIBRARIES} ${GLOG_LIBRARIES} flex_graph_db)
128 changes: 12 additions & 116 deletions flex/bin/wal_consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <boost/program_options.hpp>
#include "cppkafka/cppkafka.h"
#include "flex/engines/graph_db/database/wal.h"
#include "flex/third_party/httplib.h"

namespace gs {
Expand Down Expand Up @@ -60,120 +61,6 @@ class WalSender {
std::string req_url_;
};

// WalConsumer consumes from multiple partitions of a topic, and can start from
// different offsets.
// It use a priority queue to store the messages, and the messages are ordered
// by the time_stamp.
class WalConsumer {
public:
struct CustomComparator {
bool operator()(const std::pair<uint32_t, std::string>& lhs,
const std::pair<uint32_t, std::string>& rhs) {
return lhs.first > rhs.first;
}
};
static constexpr const std::chrono::milliseconds POLL_TIMEOUT =
std::chrono::milliseconds(1000);
// always track all partitions and from begining
WalConsumer(cppkafka::Configuration config, const std::string& topic_name,
WalSender&& sender)
: running(true), expect_timestamp_(1), sender_(std::move(sender)) {
auto topic_partitions = get_all_topic_partitions(config, topic_name);
consumers_.reserve(topic_partitions.size());
for (size_t i = 0; i < topic_partitions.size(); ++i) {
consumers_.emplace_back(std::make_unique<cppkafka::Consumer>(config));
consumers_.back()->assign({topic_partitions[i]});
}
}

void poll() {
while (running) {
for (auto& consumer : consumers_) {
auto msg = consumer->poll(POLL_TIMEOUT);
if (msg) {
if (msg.get_error()) {
if (!msg.is_eof()) {
LOG(INFO) << "[+] Received error notification: "
<< msg.get_error();
}
} else {
uint32_t key =
atoi(static_cast<std::string>(msg.get_key()).c_str());
std::string payload = msg.get_payload();
LOG(INFO) << "receive from partition " << msg.get_partition()
<< ", key: " << key << ", payload: " << payload;
message_queue_.push(std::make_pair(key, payload));
}
}
}
// Check the message queue, if the top message is the expected message,
// send it to the engine. Otherwise, wait for the expected message.
if (!message_queue_.empty()) {
while (!message_queue_.empty() &&
message_queue_.top().first < expect_timestamp_) {
LOG(WARNING) << "Drop message: <" << message_queue_.top().first
<< " -> " << message_queue_.top().second << ">";
message_queue_.pop();
}
while (!message_queue_.empty() &&
message_queue_.top().first == expect_timestamp_) {
send_to_engine(message_queue_.top());
// send to engine
message_queue_.pop();
expect_timestamp_++;
}
while (!message_queue_.empty() &&
message_queue_.top().first < expect_timestamp_) {
LOG(WARNING) << "Drop message: <" << message_queue_.top().first
<< " -> " << message_queue_.top().second << ">";
message_queue_.pop();
}
LOG(INFO) << "Expect timestamp: " << expect_timestamp_
<< ", but got: " << message_queue_.top().first;
} else {
LOG(INFO) << "No message in the queue, wait for the next message...";
}
// sleep(1);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}

void terminate() { running = false; }

private:
std::vector<cppkafka::TopicPartition> get_all_topic_partitions(
const cppkafka::Configuration& config, const std::string& topic_name) {
std::vector<cppkafka::TopicPartition> partitions;
cppkafka::Consumer consumer(config); // tmp consumer
auto metadata = consumer.get_metadata()
.get_topics({topic_name})
.front()
.get_partitions();
LOG(INFO) << "metadata: " << metadata.size();
for (const auto& partition : metadata) {
partitions.push_back(cppkafka::TopicPartition(
topic_name, partition.get_id(), 0)); // from the beginning
}
return partitions;
}

void send_to_engine(const std::pair<uint32_t, std::string>& message) {
// send to engine
LOG(INFO) << "Send to engine: <" << message.first << " -> "
<< message.second << ">"
<< ", timestamp: " << expect_timestamp_;
sender_.send(message.second);
}

bool running;
uint32_t expect_timestamp_; // The expected timestamp of the next message
WalSender&& sender_;
std::vector<std::unique_ptr<cppkafka::Consumer>> consumers_;
std::priority_queue<std::pair<uint32_t, std::string>,
std::vector<std::pair<uint32_t, std::string>>,
CustomComparator>
message_queue_;
};
} // namespace gs

namespace bpo = boost::program_options;
Expand Down Expand Up @@ -228,7 +115,7 @@ int main(int argc, char** argv) {
// Create the consumer
gs::WalSender sender(vm["engine-url"].as<std::string>(),
vm["engine-port"].as<int32_t>(), graph_id);
gs::WalConsumer consumer(config, topic_name, std::move(sender));
gs::WalConsumer consumer(config, topic_name, 1);

// signal(SIGINT, [](int) { consumer.terminate(); });

Expand All @@ -237,7 +124,16 @@ int main(int argc, char** argv) {
return 0;
}

consumer.poll();
while (true) {
auto msg = consumer.poll();
if (msg.first == std::numeric_limits<uint32_t>::max()) {
continue;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
LOG(INFO) << "Received message: <" << msg.first << " -> " << msg.second
<< ">";
sender.send(msg.second);
}

LOG(INFO) << "Consuming messages from topic " << topic_name;

Expand Down
50 changes: 39 additions & 11 deletions flex/engines/graph_db/database/graph_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,6 @@ const GraphDBSession& GraphDB::GetSession(int thread_id) const {

int GraphDB::SessionNum() const { return thread_num_; }

const std::string& GraphDB::GetKafkaBrokers() const { return kafka_brokers_; }

void GraphDB::UpdateCompactionTimestamp(timestamp_t ts) {
last_compaction_ts_ = ts;
}
Expand Down Expand Up @@ -351,8 +349,9 @@ void GraphDB::GetAppInfo(Encoder& output) {

static void IngestWalRange(SessionLocalContext* contexts,
MutablePropertyFragment& graph,
const WalsParser& parser, uint32_t from, uint32_t to,
int thread_num) {
const WalContentUnit* insert_wal_list, uint32_t from,
uint32_t to, int thread_num) {
LOG(INFO) << "IngestWalRange, from " << from << ", to " << to;
std::atomic<uint32_t> cur_ts(from);
std::vector<std::thread> threads(thread_num);
for (int i = 0; i < thread_num; ++i) {
Expand All @@ -364,7 +363,7 @@ static void IngestWalRange(SessionLocalContext* contexts,
if (got_ts >= to) {
break;
}
const auto& unit = parser.get_insert_wal(got_ts);
const auto& unit = insert_wal_list[got_ts];
InsertTransaction::IngestWal(graph, got_ts, unit.ptr, unit.size,
alloc);
if (got_ts % 1000000 == 0) {
Expand Down Expand Up @@ -394,7 +393,8 @@ void GraphDB::ingestWalsFromLocalFiles(const std::string& wal_dir_path,
for (auto& update_wal : parser.update_wals()) {
uint32_t to_ts = update_wal.timestamp;
if (from_ts < to_ts) {
IngestWalRange(contexts_, graph_, parser, from_ts, to_ts, thread_num);
IngestWalRange(contexts_, graph_, parser.insert_wal_list(), from_ts,
to_ts, thread_num);
}
if (update_wal.size == 0) {
graph_.Compact(update_wal.timestamp);
Expand All @@ -406,15 +406,42 @@ void GraphDB::ingestWalsFromLocalFiles(const std::string& wal_dir_path,
from_ts = to_ts + 1;
}
if (from_ts <= parser.last_ts()) {
IngestWalRange(contexts_, graph_, parser, from_ts, parser.last_ts() + 1,
thread_num);
IngestWalRange(contexts_, graph_, parser.insert_wal_list(), from_ts,
parser.last_ts() + 1, thread_num);
}
version_manager_.init_ts(parser.last_ts(), thread_num);
}

void GraphDB::ingestWalsFromKafka(const std::string& kafka_topic,
void GraphDB::ingestWalsFromKafka(const std::string& kafka_brokers,
const std::string& kafka_topic,
const std::string& work_dir, int thread_num) {
LOG(WARNING) << "Kafka ingestion is not supported yet";
cppkafka::Configuration config = {{"metadata.broker.list", kafka_brokers},
{"group.id", "primary_group"},
// Disable auto commit
{"enable.auto.commit", false}};
KafkaWalParser parser(config, kafka_topic);
uint32_t from_ts = 1;
for (auto& update_wal : parser.update_wals()) {
uint32_t to_ts = update_wal.timestamp;
if (from_ts < to_ts) {
IngestWalRange(contexts_, graph_, parser.insert_wal_list(), from_ts,
to_ts, thread_num);
}
if (update_wal.size == 0) {
graph_.Compact(update_wal.timestamp);
last_compaction_ts_ = update_wal.timestamp;
} else {
UpdateTransaction::IngestWal(graph_, work_dir, to_ts, update_wal.ptr,
update_wal.size, contexts_[0].allocator);
}
from_ts = to_ts + 1;
}
if (from_ts <= parser.last_ts()) {
IngestWalRange(contexts_, graph_, parser.insert_wal_list(), from_ts,
parser.last_ts() + 1, thread_num);
}

version_manager_.init_ts(parser.last_ts(), thread_num);
}

void GraphDB::initApps(
Expand Down Expand Up @@ -483,7 +510,8 @@ void GraphDB::openWalAndCreateContexts(const std::string& data_dir,
std::make_unique<KafkaWalWriter>(config.kafka_brokers));
}
// ingest wals from kafka
ingestWalsFromKafka(config.kafka_topic, data_dir, thread_num_);
ingestWalsFromKafka(config.kafka_brokers, config.kafka_topic, data_dir,
thread_num_);

for (int i = 0; i < thread_num_; ++i) {
contexts_[i].logger->open(config.kafka_topic, i);
Expand Down
5 changes: 2 additions & 3 deletions flex/engines/graph_db/database/graph_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ class GraphDB {
void ingestWalsFromLocalFiles(const std::string& wal_dir,
const std::string& work_dir, int thread_num);

void ingestWalsFromKafka(const std::string& kafka_topic,
void ingestWalsFromKafka(const std::string& kafka_brokers,
const std::string& kafka_topic,
const std::string& work_dir, int thread_num);

void initApps(
Expand Down Expand Up @@ -204,8 +205,6 @@ class GraphDB {
timestamp_t last_compaction_ts_;
bool compact_thread_running_ = false;
std::thread compact_thread_;

std::string kafka_brokers_;
};

} // namespace gs
Expand Down
4 changes: 2 additions & 2 deletions flex/engines/graph_db/database/insert_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ void InsertTransaction::Commit() {
header->timestamp = timestamp_;

logger_.append(timestamp_, arc_.GetBuffer(), arc_.GetSize());
// IngestWal(graph_, timestamp_, arc_.GetBuffer() + sizeof(WalHeader),
// header->length, alloc_);
IngestWal(graph_, timestamp_, arc_.GetBuffer() + sizeof(WalHeader),
header->length, alloc_);

vm_.release_insert_timestamp(timestamp_);
clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ void SingleVertexInsertTransaction::Commit() {
header->timestamp = timestamp_;

logger_.append(timestamp_, arc_.GetBuffer(), arc_.GetSize());
// ingestWal();
ingestWal();

vm_.release_insert_timestamp(timestamp_);
clear();
Expand Down
Loading

0 comments on commit fbe8941

Please sign in to comment.