Skip to content

Commit

Permalink
update dev_and_test
Browse files Browse the repository at this point in the history
Committed-by: xiaolei.zl from Dev container
  • Loading branch information
zhanglei1949 committed Oct 29, 2024
1 parent 4af9ba1 commit e10f73d
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 13 deletions.
31 changes: 30 additions & 1 deletion docs/flex/interactive/development/dev_and_test.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,38 @@ We Compare the performance of `LocalWalWriter` and `KafkaWalWriter` on a host wi

##### Producing Wals

Both `LocalWalWriter` and `KafkaWalWriter` log data on 894GB NVMe disks. When deploying Kafka with two brokers, we use kraft mode, with each message being 600 bytes.

| Threads | Messages Per Thread | LocalWalWriter | KafkaWalWriter (One Broker) | KafkaWalWriter (Two Brokers) |
|---------|---------------------|----------------|-----------------------------|------------------------------|
| 1 | 100,000 | 2.93s | 8.04s | 8.63s |
| 2 | 100,000 | 3.14s | 8.15s | 9.58s |
| 4 | 100,000 | 3.69s | 9.3s | 13.8s |
| 8 | 100,000 | 4.82s | 11s | 16.9s |
| 16 | 100,000 | 7.8s | 17s | 40.8s |
| 32 | 100,000 | 14.0s | 28s | 79.2s |

##### Consuming Wals
To run the microbenchmark:

```bash
mkdir build && cd build && cmake .. -DBUILD_KAFKA_WAL_WRITER=ON && make -j
./tests/wal/wal_writer_test localhost:9092 kafka topic_1 1 10
mkdir /tmp/wal_dir/
./tests/wal/wal_writer_test localhost:9092 local /tmp/wal_dir/ 1 10
```

##### Consuming WALs

We compare the performance of parsing WALs from local disk versus Kafka. The WALs are generated with `wal_writer_test`, using 8 partitions with 100,000 messages each.

| Threads | LocalWalParser | KafkaWalParser |
|---------|----------------|----------------|
| 1 | 0.001s | 8.35s |

```bash
./tests/wal/wal_reader_test h10:9092 kafka topic_1 1
./tests/wal/wal_reader_test localhost:9092 local /tmp/wal_dir 1
```

## Testing

Expand Down
1 change: 0 additions & 1 deletion flex/bin/wal_consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ int main(int argc, char** argv) {
LOG(INFO) << "No message polled, exit";
break;
}
LOG(INFO) << "Received message:" << msg.size();
if (sender) {
sender->send(msg);
}
Expand Down
4 changes: 1 addition & 3 deletions flex/engines/graph_db/database/wal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ KafkaWalConsumer::KafkaWalConsumer(cppkafka::Configuration config,
}
}

// TODO: make this effiicient
// TODO(zhanglei): make this more effiicient
std::string KafkaWalConsumer::poll() {
for (auto& consumer : consumers_) {
auto msg = consumer->poll();
Expand All @@ -402,8 +402,6 @@ std::string KafkaWalConsumer::poll() {
}
} else {
std::string payload = msg.get_payload();
LOG(INFO) << "receive from partition " << msg.get_partition()
<< "payload size: " << payload.size();
message_queue_.push(payload);
consumer->commit(msg);
}
Expand Down
13 changes: 5 additions & 8 deletions flex/tests/wal/wal_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,28 @@
#include "grape/grape.h"

int main(int argc, char** argv) {
if (argc != 5) {
if (argc != 4) {
std::cerr << "Usage: " << argv[0]
<< "<kafka brokers> <local/kafka> <topic/directory> <thread_num>"
<< std::endl;
<< "<kafka brokers> <local/kafka> <topic/directory>" << std::endl;
return 1;
}
std::string brokers = argv[1];
std::string type = argv[2];
std::string topic_name = argv[3];
int thread_num = std::stoi(argv[4]);

double t = -grape::GetCurrentTime();

if (type == "local") {
std::cout << "Consuming message from directory " << topic_name
<< " thread num " << thread_num;
std::cout << "Consuming message from directory " << topic_name;
std::vector<std::string> wals;
for (const auto& entry : std::filesystem::directory_iterator(topic_name)) {
wals.push_back(entry.path().string());
}
std::unique_ptr<gs::IWalsParser> parser =
std::make_unique<gs::LocalWalsParser>(wals);
} else {
std::cout << "Consuming message from topic " << topic_name << " thread num "
<< thread_num;
std::cout << "Consuming message from topic " << topic_name
<< " thread num ";
cppkafka::Configuration config = {{"metadata.broker.list", brokers},
{"group.id", "primary_group"},
// Disable auto commit
Expand Down

0 comments on commit e10f73d

Please sign in to comment.