diff --git a/cpp/bench/run.cpp b/cpp/bench/run.cpp index 98ac18cb29..20291576a0 100644 --- a/cpp/bench/run.cpp +++ b/cpp/bench/run.cpp @@ -539,6 +539,58 @@ static void BM_McapWriterFileWriterChunked(benchmark::State& state) { std::remove(filename.c_str()); } +static void BM_McapWriterFileWriterChunkedManyChannels(benchmark::State& state) { + // Create a message payload + std::array payload; + const uint32_t length = 13; + std::memcpy(payload.data(), &length, 4); + std::memcpy(payload.data() + 4, "Hello, world!", 13); + + // Create a chunked writer using the ros1 profile + mcap::McapWriter writer; + auto options = mcap::McapWriterOptions("ros1"); + options.chunkSize = uint64_t(state.range(0)); + + // Open an output file stream and write the file header + const std::string filename = TempFilename(); + assertOk(writer.open(filename, options)); + + // Register a Schema record + mcap::Schema stdMsgsString("std_msgs/String", "ros1msg", StringSchema); + writer.addSchema(stdMsgsString); + + uint16_t channelCount = uint16_t(state.range(1)); + + mcap::Channel topic("/chatter", "ros1", stdMsgsString.id); + std::vector channelIds; + for (uint16_t i = 0; i < channelCount; ++i) { + // Register a Channel record + writer.addChannel(topic); + channelIds.push_back(topic.id); + } + + // Create a message + mcap::Message msg; + msg.channelId = topic.id; + msg.sequence = 0; + msg.publishTime = 0; + msg.logTime = msg.publishTime; + msg.data = payload.data(); + msg.dataSize = payload.size(); + + while (state.KeepRunning()) { + for (size_t i = 0; i < WriteIterations; i++) { + msg.channelId = channelIds[i % channelCount]; + (void)writer.write(msg); + benchmark::ClobberMemory(); + } + } + + // Finish writing the file and delete it + writer.close(); + std::remove(filename.c_str()); +} + int main(int argc, char* argv[]) { benchmark::RegisterBenchmark("BM_CRC32", BM_CRC32)->RangeMultiplier(10)->Range(1, 10000000); benchmark::RegisterBenchmark("BM_McapWriterBufferWriterUnchunkedUnindexed", @@ -619,6 +671,18 @@ int main(int argc, char* argv[]) { ->Arg(100000) ->Arg(1000000) ->Arg(10000000); + benchmark::RegisterBenchmark("BM_McapWriterFileWriterChunkedManyChannels", + BM_McapWriterFileWriterChunkedManyChannels) + ->Args({mcap::DefaultChunkSize, 1}) + ->Args({mcap::DefaultChunkSize, 10}) + ->Args({mcap::DefaultChunkSize, 100}) + ->Args({mcap::DefaultChunkSize, 1000}) + ->Args({mcap::DefaultChunkSize, 10000}) + ->Args({mcap::DefaultChunkSize * 10, 1}) + ->Args({mcap::DefaultChunkSize * 10, 10}) + ->Args({mcap::DefaultChunkSize * 10, 100}) + ->Args({mcap::DefaultChunkSize * 10, 1000}) + ->Args({mcap::DefaultChunkSize * 10, 10000}); benchmark::Initialize(&argc, argv); benchmark::RunSpecifiedBenchmarks(); diff --git a/cpp/mcap/include/mcap/writer.inl b/cpp/mcap/include/mcap/writer.inl index abbdc1da56..e1eb26461f 100644 --- a/cpp/mcap/include/mcap/writer.inl +++ b/cpp/mcap/include/mcap/writer.inl @@ -723,11 +723,18 @@ void McapWriter::writeChunk(IWritable& output, IChunkWriter& chunkData) { const uint64_t messageIndexOffset = output.size(); if (!options_.noMessageIndex) { // Write the message index records - for (const auto& [channelId, messageIndex] : currentMessageIndex_) { - chunkIndexRecord.messageIndexOffsets.emplace(channelId, output.size()); - write(output, messageIndex); + for (auto& [channelId, messageIndex] : currentMessageIndex_) { + // currentMessageIndex_ contains entries for every channel ever seen, not just in this + // chunk. Only write message index records for channels with messages in this chunk. + if (messageIndex.records.size() > 0) { + chunkIndexRecord.messageIndexOffsets.emplace(channelId, output.size()); + write(output, messageIndex); + // reset this message index for the next chunk. This allows us to re-use + // allocations vs. the alternative strategy of allocating a fresh set of MessageIndex + // objects per chunk. + messageIndex.records.clear(); + } } - currentMessageIndex_.clear(); } const uint64_t messageIndexLength = output.size() - messageIndexOffset; @@ -743,10 +750,17 @@ void McapWriter::writeChunk(IWritable& output, IChunkWriter& chunkData) { chunkIndexRecord.uncompressedSize = uncompressedSize; } else if (!options_.noMessageIndex) { // Write the message index records - for (const auto& [channelId, messageIndex] : currentMessageIndex_) { - write(output, messageIndex); + for (auto& [channelId, messageIndex] : currentMessageIndex_) { + // currentMessageIndex_ contains entries for every channel ever seen, not just in this + // chunk. Only write message index records for channels with messages in this chunk. + if (messageIndex.records.size() > 0) { + write(output, messageIndex); + // reset this message index for the next chunk. This allows us to re-use + // allocations vs. the alternative strategy of allocating a fresh set of MessageIndex + // objects per chunk. + messageIndex.records.clear(); + } } - currentMessageIndex_.clear(); } // Reset uncompressedSize and start/end times for the next chunk