Skip to content

Commit

Permalink
Revert "Add missing range conflict check between file ingestion and R…
Browse files Browse the repository at this point in the history
…efitLevel() (facebook#10988)"

This reverts commit 9502856.
  • Loading branch information
inikep committed Feb 18, 2025
1 parent e42e570 commit 3150b40
Show file tree
Hide file tree
Showing 19 changed files with 186 additions and 530 deletions.
1 change: 0 additions & 1 deletion HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ For Leveled Compaction users, `CompactRange()` with `bottommost_level_compaction
* Fixed a bug in LockWAL() leading to re-locking mutex (#11020).
* Fixed a heap use after free bug in async scan prefetching when the scan thread and another thread try to read and load the same seek block into cache.
* Fixed a heap use after free in async scan prefetching if dictionary compression is enabled, in which case sync read of the compression dictionary gets mixed with async prefetching
* Fixed a data race bug of `CompactRange()` under `change_level=true` acts on overlapping range with an ongoing file ingestion for level compaction. This will either result in overlapping file ranges corruption at a certain level caught by `force_consistency_checks=true` or protentially two same keys both with seqno 0 in two different levels (i.e, new data ends up in lower/older level). The latter will be caught by assertion in debug build but go silently and result in read returning wrong result in release build. This fix is general so it also replaced previous fixes to a similar problem for `CompactFiles()` (#4665), general `CompactRange()` and auto compaction (commit 5c64fb6 and 87dfc1d).
* Fixed a bug in compaction output cutting where small output files were produced due to TTL file cutting states were not being updated (#11075).

### New Features
Expand Down
1 change: 0 additions & 1 deletion db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1204,7 +1204,6 @@ Compaction* ColumnFamilyData::CompactRange(
if (result != nullptr) {
result->SetInputVersion(current_);
}
TEST_SYNC_POINT("ColumnFamilyData::CompactRange:Return");
return result;
}

Expand Down
22 changes: 4 additions & 18 deletions db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,19 +237,12 @@ Compaction::Compaction(
inputs_(PopulateWithAtomicBoundaries(vstorage, std::move(_inputs))),
grandparents_(std::move(_grandparents)),
score_(_score),
bottommost_level_(
// For simplicity, we don't support the concept of "bottommost level"
// with
// `CompactionReason::kExternalSstIngestion` and
// `CompactionReason::kRefitLevel`
(_compaction_reason == CompactionReason::kExternalSstIngestion ||
_compaction_reason == CompactionReason::kRefitLevel)
? false
: IsBottommostLevel(output_level_, vstorage, inputs_)),
bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)),
is_full_compaction_(IsFullCompaction(vstorage, inputs_)),
is_manual_compaction_(_manual_compaction),
trim_ts_(_trim_ts),
is_trivial_move_(false),

compaction_reason_(_compaction_reason),
notify_on_compaction_completion_(false),
enable_blob_garbage_collection_(
Expand All @@ -264,15 +257,8 @@ Compaction::Compaction(
_blob_garbage_collection_age_cutoff > 1
? mutable_cf_options()->blob_garbage_collection_age_cutoff
: _blob_garbage_collection_age_cutoff),
penultimate_level_(
// For simplicity, we don't support the concept of "penultimate level"
// with `CompactionReason::kExternalSstIngestion` and
// `CompactionReason::kRefitLevel`
_compaction_reason == CompactionReason::kExternalSstIngestion ||
_compaction_reason == CompactionReason::kRefitLevel
? Compaction::kInvalidLevel
: EvaluatePenultimateLevel(vstorage, immutable_options_,
start_level_, output_level_)) {
penultimate_level_(EvaluatePenultimateLevel(
vstorage, immutable_options_, start_level_, output_level_)) {
MarkFilesBeingCompacted(true);
if (is_manual_compaction_) {
compaction_reason_ = CompactionReason::kManualCompaction;
Expand Down
2 changes: 0 additions & 2 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,6 @@ const char* GetCompactionReasonString(CompactionReason compaction_reason) {
return "ForcedBlobGC";
case CompactionReason::kRoundRobinTtl:
return "RoundRobinTtl";
case CompactionReason::kRefitLevel:
return "RefitLevel";
case CompactionReason::kNumOfReasons:
// fall through
default:
Expand Down
6 changes: 1 addition & 5 deletions db/compaction/compaction_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1124,11 +1124,7 @@ void CompactionPicker::RegisterCompaction(Compaction* c) {
c->output_level() == 0 ||
!FilesRangeOverlapWithCompaction(*c->inputs(), c->output_level(),
c->GetPenultimateLevel()));
// CompactionReason::kExternalSstIngestion's start level is just a placeholder
// number without actual meaning as file ingestion technically does not have
// an input level like other compactions
if ((c->start_level() == 0 &&
c->compaction_reason() != CompactionReason::kExternalSstIngestion) ||
if (c->start_level() == 0 ||
ioptions_.compaction_style == kCompactionStyleUniversal) {
level0_compactions_in_progress_.insert(c);
}
Expand Down
28 changes: 14 additions & 14 deletions db/compaction/compaction_picker_level.cc
Original file line number Diff line number Diff line change
Expand Up @@ -449,21 +449,21 @@ bool LevelCompactionBuilder::SetupOtherInputsIfNeeded() {
compaction_inputs_.push_back(output_level_inputs_);
}

// In some edge cases we could pick a compaction that will be compacting
// a key range that overlap with another running compaction, and both
// of them have the same output level. This could happen if
// (1) we are running a non-exclusive manual compaction
// (2) AddFile ingest a new file into the LSM tree
// We need to disallow this from happening.
if (compaction_picker_->FilesRangeOverlapWithCompaction(
compaction_inputs_, output_level_,
Compaction::EvaluatePenultimateLevel(
vstorage_, ioptions_, start_level_, output_level_))) {
// This compaction output could potentially conflict with the output
// of a currently running compaction, we cannot run it.
return false;
}
if (!is_l0_trivial_move_) {
// In some edge cases we could pick a compaction that will be compacting
// a key range that overlap with another running compaction, and both
// of them have the same output level. This could happen if
// (1) we are running a non-exclusive manual compaction
// (2) AddFile ingest a new file into the LSM tree
// We need to disallow this from happening.
if (compaction_picker_->FilesRangeOverlapWithCompaction(
compaction_inputs_, output_level_,
Compaction::EvaluatePenultimateLevel(
vstorage_, ioptions_, start_level_, output_level_))) {
// This compaction output could potentially conflict with the output
// of a currently running compaction, we cannot run it.
return false;
}
compaction_picker_->GetGrandparents(vstorage_, start_level_inputs_,
output_level_inputs_, &grandparents_);
}
Expand Down
5 changes: 3 additions & 2 deletions db/db_bloom_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1232,7 +1232,7 @@ TEST_P(ChargeFilterConstructionTestWithParam, Basic) {
*
* The test is designed in a way such that the reservation for (p1 - b')
* will trigger at least another dummy entry insertion
* (or equivalently to saying, creating another peak).
* (or equivelantly to saying, creating another peak).
*
* kStandard128Ribbon + FullFilter +
* detect_filter_construct_corruption
Expand Down Expand Up @@ -2612,7 +2612,8 @@ TEST_F(DBBloomFilterTest, OptimizeFiltersForHits) {
BottommostLevelCompaction::kSkip;
compact_options.change_level = true;
compact_options.target_level = 7;
ASSERT_OK(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
ASSERT_TRUE(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)
.IsNotSupported());

ASSERT_GE(trivial_move, 1);
ASSERT_EQ(non_trivial_move, 0);
Expand Down
225 changes: 0 additions & 225 deletions db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6600,231 +6600,6 @@ TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) {
Close();
}

class DBCompactionTestWithOngoingFileIngestionParam
: public DBCompactionTest,
public testing::WithParamInterface<std::string> {
public:
DBCompactionTestWithOngoingFileIngestionParam() : DBCompactionTest() {
compaction_path_to_test_ = GetParam();
}
void SetupOptions() {
options_ = CurrentOptions();
options_.create_if_missing = true;

if (compaction_path_to_test_ == "RefitLevelCompactRange") {
options_.num_levels = 7;
} else {
options_.num_levels = 3;
}
options_.compaction_style = CompactionStyle::kCompactionStyleLevel;
if (compaction_path_to_test_ == "AutoCompaction") {
options_.disable_auto_compactions = false;
options_.level0_file_num_compaction_trigger = 1;
} else {
options_.disable_auto_compactions = true;
}
}

void PauseCompactionThread() {
sleeping_task_.reset(new test::SleepingBackgroundTask());
env_->SetBackgroundThreads(1, Env::LOW);
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
sleeping_task_.get(), Env::Priority::LOW);
sleeping_task_->WaitUntilSleeping();
}

void ResumeCompactionThread() {
if (sleeping_task_) {
sleeping_task_->WakeUp();
sleeping_task_->WaitUntilDone();
}
}

void SetupFilesToForceFutureFilesIngestedToCertainLevel() {
SstFileWriter sst_file_writer(EnvOptions(), options_);
std::string dummy = dbname_ + "/dummy.sst";
ASSERT_OK(sst_file_writer.Open(dummy));
ASSERT_OK(sst_file_writer.Put("k2", "dummy"));
ASSERT_OK(sst_file_writer.Finish());
ASSERT_OK(db_->IngestExternalFile({dummy}, IngestExternalFileOptions()));
// L2 is made to contain a file overlapped with files to be ingested in
// later steps on key "k2". This will force future files ingested to L1 or
// above.
ASSERT_EQ("0,0,1", FilesPerLevel(0));
}

void SetupSyncPoints() {
if (compaction_path_to_test_ == "AutoCompaction") {
SyncPoint::GetInstance()->SetCallBack(
"ExternalSstFileIngestionJob::Run", [&](void*) {
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BackgroundCompaction():AfterPickCompaction",
"VersionSet::LogAndApply:WriteManifest"}});
});
} else if (compaction_path_to_test_ == "NonRefitLevelCompactRange") {
SyncPoint::GetInstance()->SetCallBack(
"ExternalSstFileIngestionJob::Run", [&](void*) {
SyncPoint::GetInstance()->LoadDependency(
{{"ColumnFamilyData::CompactRange:Return",
"VersionSet::LogAndApply:WriteManifest"}});
});
} else if (compaction_path_to_test_ == "RefitLevelCompactRange") {
SyncPoint::GetInstance()->SetCallBack(
"ExternalSstFileIngestionJob::Run", [&](void*) {
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::CompactRange:PostRefitLevel",
"VersionSet::LogAndApply:WriteManifest"}});
});
} else if (compaction_path_to_test_ == "CompactFiles") {
SyncPoint::GetInstance()->SetCallBack(
"ExternalSstFileIngestionJob::Run", [&](void*) {
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::CompactFilesImpl::PostSanitizeCompactionInputFiles",
"VersionSet::LogAndApply:WriteManifest"}});
});
} else {
assert(false);
}
SyncPoint::GetInstance()->LoadDependency(
{{"ExternalSstFileIngestionJob::Run", "PreCompaction"}});
SyncPoint::GetInstance()->EnableProcessing();
}

void RunCompactionOverlappedWithFileIngestion() {
if (compaction_path_to_test_ == "AutoCompaction") {
TEST_SYNC_POINT("PreCompaction");
ResumeCompactionThread();
// Without proper range conflict check,
// this would have been `Status::Corruption` about overlapping ranges
Status s = dbfull()->TEST_WaitForCompact();
EXPECT_OK(s);
} else if (compaction_path_to_test_ == "NonRefitLevelCompactRange") {
CompactRangeOptions cro;
cro.change_level = false;
std::string start_key = "k1";
Slice start(start_key);
std::string end_key = "k4";
Slice end(end_key);
TEST_SYNC_POINT("PreCompaction");
// Without proper range conflict check,
// this would have been `Status::Corruption` about overlapping ranges
Status s = dbfull()->CompactRange(cro, &start, &end);
EXPECT_OK(s);
} else if (compaction_path_to_test_ == "RefitLevelCompactRange") {
CompactRangeOptions cro;
cro.change_level = true;
cro.target_level = 5;
std::string start_key = "k1";
Slice start(start_key);
std::string end_key = "k4";
Slice end(end_key);
TEST_SYNC_POINT("PreCompaction");
Status s = dbfull()->CompactRange(cro, &start, &end);
// Without proper range conflict check,
// this would have been `Status::Corruption` about overlapping ranges
// To see this, remove the fix AND replace
// `DBImpl::CompactRange:PostRefitLevel` in sync point dependency with
// `DBImpl::ReFitLevel:PostRegisterCompaction`
EXPECT_TRUE(s.IsNotSupported());
EXPECT_TRUE(s.ToString().find("some ongoing compaction's output") !=
std::string::npos);
} else if (compaction_path_to_test_ == "CompactFiles") {
ColumnFamilyMetaData cf_meta_data;
db_->GetColumnFamilyMetaData(&cf_meta_data);
ASSERT_EQ(cf_meta_data.levels[0].files.size(), 1);
std::vector<std::string> input_files;
for (const auto& file : cf_meta_data.levels[0].files) {
input_files.push_back(file.name);
}
TEST_SYNC_POINT("PreCompaction");
Status s = db_->CompactFiles(CompactionOptions(), input_files, 1);
// Without proper range conflict check,
// this would have been `Status::Corruption` about overlapping ranges
EXPECT_TRUE(s.IsAborted());
EXPECT_TRUE(
s.ToString().find(
"A running compaction is writing to the same output level") !=
std::string::npos);
} else {
assert(false);
}
}

void DisableSyncPoints() {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}

protected:
std::string compaction_path_to_test_;
Options options_;
std::shared_ptr<test::SleepingBackgroundTask> sleeping_task_;
};

INSTANTIATE_TEST_CASE_P(DBCompactionTestWithOngoingFileIngestionParam,
DBCompactionTestWithOngoingFileIngestionParam,
::testing::Values("AutoCompaction",
"NonRefitLevelCompactRange",
"RefitLevelCompactRange",
"CompactFiles"));

TEST_P(DBCompactionTestWithOngoingFileIngestionParam, RangeConflictCheck) {
SetupOptions();
DestroyAndReopen(options_);

if (compaction_path_to_test_ == "AutoCompaction") {
PauseCompactionThread();
}

if (compaction_path_to_test_ != "RefitLevelCompactRange") {
SetupFilesToForceFutureFilesIngestedToCertainLevel();
}

// Create s1
ASSERT_OK(Put("k1", "v"));
ASSERT_OK(Put("k4", "v"));
ASSERT_OK(Flush());
if (compaction_path_to_test_ == "RefitLevelCompactRange") {
MoveFilesToLevel(6 /* level */);
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel(0));
} else {
ASSERT_EQ("1,0,1", FilesPerLevel(0));
}

// To coerce following sequence of events
// Timeline Thread 1 (Ingest s2) Thread 2 (Compact s1)
// t0 | Decide to output to Lk
// t1 | Release lock in LogAndApply()
// t2 | Acquire lock
// t3 | Decides to compact to Lk
// | Expected to fail due to range
// | conflict check with file
// | ingestion
// t4 | Release lock in LogAndApply()
// t5 | Acquire lock again and finish
// t6 | Acquire lock again and finish
SetupSyncPoints();

// Ingest s2
port::Thread thread1([&] {
SstFileWriter sst_file_writer(EnvOptions(), options_);
std::string s2 = dbname_ + "/ingested_s2.sst";
ASSERT_OK(sst_file_writer.Open(s2));
ASSERT_OK(sst_file_writer.Put("k2", "v2"));
ASSERT_OK(sst_file_writer.Put("k3", "v2"));
ASSERT_OK(sst_file_writer.Finish());
ASSERT_OK(db_->IngestExternalFile({s2}, IngestExternalFileOptions()));
});

// Compact s1. Without proper range conflict check,
// this will encounter overlapping file corruption.
port::Thread thread2([&] { RunCompactionOverlappedWithFileIngestion(); });

thread1.join();
thread2.join();
DisableSyncPoints();
}

TEST_F(DBCompactionTest, ConsistencyFailTest) {
Options options = CurrentOptions();
options.force_consistency_checks = true;
Expand Down
Loading

0 comments on commit 3150b40

Please sign in to comment.