Skip to content

Commit

Permalink
cpp: re-use allocations for message indexes in writer
Browse files Browse the repository at this point in the history
We use a hash map { channel ID: message index vector } to
store message indexes while writing messages. For M messages in
N channels in a chunk, there will always be N separately-allocated
vectors containing on average M/N elements.

Before this commit, we destroy and re-create these vectors for
every chunk. With a large number of channels, this can result
in many allocations and re-allocations as the vectors get rebuilt
for every new chunk.

After this commit, we clear the vectors after writing a chunk,
allowing us to re-use the allocated memory for the next chunk.
This should result in much less work for the allocator while writing.

This commit also contains a new benchmark to demonstrate this behavior.
I need to properly measure the results but i've eyeballed a small speedup
around ~10% speedup for large numbers of channels.
  • Loading branch information
james-rms committed Jul 13, 2023
1 parent dc0384d commit 5a91faa
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 7 deletions.
64 changes: 64 additions & 0 deletions cpp/bench/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,58 @@ static void BM_McapWriterFileWriterChunked(benchmark::State& state) {
std::remove(filename.c_str());
}

static void BM_McapWriterFileWriterChunkedManyChannels(benchmark::State& state) {
// Create a message payload
std::array<std::byte, 4 + 13> payload;
const uint32_t length = 13;
std::memcpy(payload.data(), &length, 4);
std::memcpy(payload.data() + 4, "Hello, world!", 13);

// Create a chunked writer using the ros1 profile
mcap::McapWriter writer;
auto options = mcap::McapWriterOptions("ros1");
options.chunkSize = uint64_t(state.range(0));

// Open an output file stream and write the file header
const std::string filename = TempFilename();
assertOk(writer.open(filename, options));

// Register a Schema record
mcap::Schema stdMsgsString("std_msgs/String", "ros1msg", StringSchema);
writer.addSchema(stdMsgsString);

uint16_t channelCount = uint16_t(state.range(1));

mcap::Channel topic("/chatter", "ros1", stdMsgsString.id);
std::vector<uint16_t> channelIds;
for (uint16_t i = 0; i < channelCount; ++i) {
// Register a Channel record
writer.addChannel(topic);
channelIds.push_back(topic.id);
}

// Create a message
mcap::Message msg;
msg.channelId = topic.id;
msg.sequence = 0;
msg.publishTime = 0;
msg.logTime = msg.publishTime;
msg.data = payload.data();
msg.dataSize = payload.size();

while (state.KeepRunning()) {
for (size_t i = 0; i < WriteIterations; i++) {
msg.channelId = channelIds[i % channelCount];
(void)writer.write(msg);
benchmark::ClobberMemory();
}
}

// Finish writing the file and delete it
writer.close();
std::remove(filename.c_str());
}

int main(int argc, char* argv[]) {
benchmark::RegisterBenchmark("BM_CRC32", BM_CRC32)->RangeMultiplier(10)->Range(1, 10000000);
benchmark::RegisterBenchmark("BM_McapWriterBufferWriterUnchunkedUnindexed",
Expand Down Expand Up @@ -619,6 +671,18 @@ int main(int argc, char* argv[]) {
->Arg(100000)
->Arg(1000000)
->Arg(10000000);
benchmark::RegisterBenchmark("BM_McapWriterFileWriterChunkedManyChannels",
BM_McapWriterFileWriterChunkedManyChannels)
->Args({mcap::DefaultChunkSize, 1})
->Args({mcap::DefaultChunkSize, 10})
->Args({mcap::DefaultChunkSize, 100})
->Args({mcap::DefaultChunkSize, 1000})
->Args({mcap::DefaultChunkSize, 10000})
->Args({mcap::DefaultChunkSize * 10, 1})
->Args({mcap::DefaultChunkSize * 10, 10})
->Args({mcap::DefaultChunkSize * 10, 100})
->Args({mcap::DefaultChunkSize * 10, 1000})
->Args({mcap::DefaultChunkSize * 10, 10000});
benchmark::Initialize(&argc, argv);
benchmark::RunSpecifiedBenchmarks();

Expand Down
28 changes: 21 additions & 7 deletions cpp/mcap/include/mcap/writer.inl
Original file line number Diff line number Diff line change
Expand Up @@ -723,11 +723,18 @@ void McapWriter::writeChunk(IWritable& output, IChunkWriter& chunkData) {
const uint64_t messageIndexOffset = output.size();
if (!options_.noMessageIndex) {
// Write the message index records
for (const auto& [channelId, messageIndex] : currentMessageIndex_) {
chunkIndexRecord.messageIndexOffsets.emplace(channelId, output.size());
write(output, messageIndex);
for (auto& [channelId, messageIndex] : currentMessageIndex_) {
// currentMessageIndex_ contains entries for every channel ever seen, not just in this
// chunk. Only write message index records for channels with messages in this chunk.
if (messageIndex.records.size() > 0) {
chunkIndexRecord.messageIndexOffsets.emplace(channelId, output.size());
write(output, messageIndex);
// reset this message index for the next chunk. This allows us to re-use
// allocations vs. the alternative strategy of allocating a fresh set of MessageIndex
// objects per chunk.
messageIndex.records.clear();
}
}
currentMessageIndex_.clear();
}
const uint64_t messageIndexLength = output.size() - messageIndexOffset;

Expand All @@ -743,10 +750,17 @@ void McapWriter::writeChunk(IWritable& output, IChunkWriter& chunkData) {
chunkIndexRecord.uncompressedSize = uncompressedSize;
} else if (!options_.noMessageIndex) {
// Write the message index records
for (const auto& [channelId, messageIndex] : currentMessageIndex_) {
write(output, messageIndex);
for (auto& [channelId, messageIndex] : currentMessageIndex_) {
// currentMessageIndex_ contains entries for every channel ever seen, not just in this
// chunk. Only write message index records for channels with messages in this chunk.
if (messageIndex.records.size() > 0) {
write(output, messageIndex);
// reset this message index for the next chunk. This allows us to re-use
// allocations vs. the alternative strategy of allocating a fresh set of MessageIndex
// objects per chunk.
messageIndex.records.clear();
}
}
currentMessageIndex_.clear();
}

// Reset uncompressedSize and start/end times for the next chunk
Expand Down

0 comments on commit 5a91faa

Please sign in to comment.