From bdc92e06aff87d92dbf4fee360b31903c0805720 Mon Sep 17 00:00:00 2001 From: Borelset Date: Thu, 24 Mar 2022 19:14:54 +0800 Subject: [PATCH] Fix a problem in TitanTableBuilder::Add() (#233) * Fix a problem in TitanTableBuilder::Add() When flushing memtable, which includes kTypeBlobIndex or kTypeMerge generated by GC, processing kTypeBlobIndex or kTypeMerge must consider whether blob_builder_ is in the buffered state. Signed-off-by: Borelset * Update the "DictCompressDisorder" test to cover the new-found corner case. Signed-off-by: Borelset * Extract duplicate code and replace them with TitanTableBuilder::AddSmallToTableAdaptively(). Signed-off-by: Borelset * Rename function. Signed-off-by: Borelset * Reformat Signed-off-by: Borelset * Rename function to AddSmall Signed-off-by: Borelset * Update comments in AddSmall() Co-authored-by: Connor Signed-off-by: Borelset * Reformat comments Signed-off-by: Borelset * Rename functions for clarity. Signed-off-by: Borelset * fix a mistake in comments. Signed-off-by: Borelset * add a comment. Signed-off-by: Borelset * Reformat Signed-off-by: Borelset Co-authored-by: Connor --- src/table_builder.cc | 48 +++++++++++++++++++++------------------ src/table_builder.h | 5 +++- src/table_builder_test.cc | 46 +++++++++++++++++++++++++++++++++---- 3 files changed, 72 insertions(+), 27 deletions(-) diff --git a/src/table_builder.cc b/src/table_builder.cc index cc3493330..9b11ae3cc 100644 --- a/src/table_builder.cc +++ b/src/table_builder.cc @@ -70,16 +70,7 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) { cf_options_.blob_run_mode == TitanBlobRunMode::kNormal) { bool is_small_kv = value.size() < cf_options_.min_blob_size; if (is_small_kv) { - if (builder_unbuffered()) { - // We can append this into SST safely, without disorder issue. - base_builder_->Add(key, value); - } else { - // We have to let builder to cache this KV pair, and it will be returned - // when state changed - std::unique_ptr ctx = - NewCachedRecordContext(ikey, value); - blob_builder_->AddSmall(std::move(ctx)); - } + AddBase(key, ikey, value); return; } else { // We write to blob file and insert index @@ -115,16 +106,28 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) { index.file_number, get_status.ToString().c_str()); } } - if (builder_unbuffered()) { - base_builder_->Add(key, value); - } else { - std::unique_ptr ctx = - NewCachedRecordContext(ikey, value); - blob_builder_->AddSmall(std::move(ctx)); - } + AddBase(key, ikey, value); } else { - assert(builder_unbuffered()); + // Mainly processing kTypeMerge and kTypeBlobIndex in both flushing and + // compaction. + AddBase(key, ikey, value); + } +} + +void TitanTableBuilder::AddBase(const Slice& key, + const ParsedInternalKey& parsedKey, + const Slice& value) { + // "parsedKey" was parsed from "key" (i.e., an internal key). + if (builder_unbuffered()) { + // We can directly append this into SST safely, without disorder issue. + // Only when base_builder_ is in unbuffered state base_builder_->Add(key, value); + } else { + // We have to let builder to cache this KV pair, and it will be flushed to + // base table when the state changes to unbuffered + std::unique_ptr ctx = + NewCachedRecordContext(parsedKey, value); + blob_builder_->AddSmall(std::move(ctx)); } } @@ -180,14 +183,15 @@ void TitanTableBuilder::AddBlob(const ParsedInternalKey& ikey, cf_options_.blob_file_target_size) { // if blob file hit the size limit, we have to finish it // in this case, when calling `BlobFileBuilder::Finish`, builder will be in - // unbuffered state, so it will not trigger another `AddToBaseTable` call + // unbuffered state, so it will not trigger another `AddBlobResultsToBase` + // call FinishBlobFile(); } - AddToBaseTable(contexts); + AddBlobResultsToBase(contexts); } -void TitanTableBuilder::AddToBaseTable( +void TitanTableBuilder::AddBlobResultsToBase( const BlobFileBuilder::OutContexts& contexts) { if (contexts.empty()) return; for (const std::unique_ptr& ctx : @@ -227,7 +231,7 @@ void TitanTableBuilder::FinishBlobFile() { s = blob_builder_->Finish(&contexts); UpdateIOBytes(prev_bytes_read, prev_bytes_written, &io_bytes_read_, &io_bytes_written_); - AddToBaseTable(contexts); + AddBlobResultsToBase(contexts); if (s.ok() && ok()) { TITAN_LOG_INFO(db_options_.info_log, diff --git a/src/table_builder.h b/src/table_builder.h index 6763bac29..c69206304 100644 --- a/src/table_builder.h +++ b/src/table_builder.h @@ -60,7 +60,7 @@ class TitanTableBuilder : public TableBuilder { void AddBlob(const ParsedInternalKey& ikey, const Slice& value); - void AddToBaseTable(const BlobFileBuilder::OutContexts& contexts); + void AddBlobResultsToBase(const BlobFileBuilder::OutContexts& contexts); bool ShouldMerge(const std::shared_ptr& file); @@ -71,6 +71,9 @@ class TitanTableBuilder : public TableBuilder { Status GetBlobRecord(const BlobIndex& index, BlobRecord* record, PinnableSlice* buffer); + void AddBase(const Slice& key, const ParsedInternalKey& parsedKey, + const Slice& value); + Status status_; uint32_t cf_id_; TitanDBOptions db_options_; diff --git a/src/table_builder_test.cc b/src/table_builder_test.cc index 76f2ab2cc..b85f9fdf0 100644 --- a/src/table_builder_test.cc +++ b/src/table_builder_test.cc @@ -495,10 +495,28 @@ TEST_F(TableBuilderTest, DictCompressDisorder) { std::string key(1, i); InternalKey ikey(key, 1, kTypeValue); std::string value; - if (i % 2 == 0) { + if (i % 4 == 0) { value = std::string(1, i); - } else { + } else if (i % 4 == 1) { value = std::string(kMinBlobSize, i); + } else if (i % 4 == 2) { + ikey.Set(key, 1, kTypeBlobIndex); + BlobIndex blobIndex; + // set different values in different fields + blobIndex.file_number = i; + blobIndex.blob_handle.size = i * 2 + 1; + blobIndex.blob_handle.offset = i * 3 + 2; + blobIndex.EncodeTo(&value); + } else { + ikey.Set(key, 1, kTypeMerge); + MergeBlobIndex mergeIndex; + // set different values in different fields + mergeIndex.file_number = i; + mergeIndex.blob_handle.size = i * 2 + 1; + mergeIndex.blob_handle.offset = i * 3 + 2; + mergeIndex.source_file_number = i * 4 + 3; + mergeIndex.source_file_offset = i * 5 + 4; + mergeIndex.EncodeTo(&value); } table_builder->Add(ikey.Encode(), value); } @@ -523,10 +541,10 @@ TEST_F(TableBuilderTest, DictCompressDisorder) { ASSERT_TRUE(ParseInternalKey(iter->key(), &ikey)); // check order ASSERT_EQ(ikey.user_key, key); - if (i % 2 == 0) { + if (i % 4 == 0) { ASSERT_EQ(ikey.type, kTypeValue); ASSERT_EQ(iter->value(), std::string(1, i)); - } else { + } else if (i % 4 == 1) { ASSERT_EQ(ikey.type, kTypeBlobIndex); BlobIndex index; ASSERT_OK(DecodeInto(iter->value(), &index)); @@ -536,6 +554,26 @@ TEST_F(TableBuilderTest, DictCompressDisorder) { ASSERT_OK(blob_reader->Get(ro, index.blob_handle, &record, &buffer)); ASSERT_EQ(record.key, key); ASSERT_EQ(record.value, std::string(kMinBlobSize, i)); + } else if (i % 4 == 2) { + ASSERT_EQ(ikey.type, kTypeBlobIndex); + BlobIndex index; + // We do not have corresponding blob file in this test, so we only check + // BlobIndex. + ASSERT_OK(DecodeInto(iter->value(), &index)); + ASSERT_EQ(index.file_number, i); + ASSERT_EQ(index.blob_handle.size, i * 2 + 1); + ASSERT_EQ(index.blob_handle.offset, i * 3 + 2); + } else { + ASSERT_EQ(ikey.type, kTypeMerge); + MergeBlobIndex mergeIndex; + // We do not have corresponding blob file in this test, so we only check + // MergeBlobIndex. + ASSERT_OK(DecodeInto(iter->value(), &mergeIndex)); + ASSERT_EQ(mergeIndex.file_number, i); + ASSERT_EQ(mergeIndex.blob_handle.size, i * 2 + 1); + ASSERT_EQ(mergeIndex.blob_handle.offset, i * 3 + 2); + ASSERT_EQ(mergeIndex.source_file_number, i * 4 + 3); + ASSERT_EQ(mergeIndex.source_file_offset, i * 5 + 4); } iter->Next(); }