Skip to content

Commit

Permalink
Add utils to use for handling user defined timestamp size record in W…
Browse files Browse the repository at this point in the history
…AL (facebook#11451)

Summary:
Add a util method `HandleWriteBatchTimestampSizeDifference` to handle a `WriteBatch` read from WAL log when user-defined timestamp size record is written and read. Two check modes are added: `kVerifyConsistency` that just verifies the recorded timestamp size are consistent with the running ones. This mode is to be used by `db_impl_secondary` for opening a DB as secondary instance. It will also be used by `db_impl_open` before the user comparator switch support is added to make a column switch between enabling/disable UDT feature. The other mode `kReconcileInconsistency` will be used by `db_impl_open` later when user comparator can be changed.

Another change is to extract a method `CollectColumnFamilyIdsFromWriteBatch` in db_secondary_impl.h into its standalone util file so it can be shared.

Pull Request resolved: facebook#11451

Test Plan:
```
make check
./udt_util_test
```

Reviewed By: ltamasi

Differential Revision: D45894386

Pulled By: jowlyzhang

fbshipit-source-id: b96790777f154cddab6d45d9ba2e5d20ebc6fe9d
  • Loading branch information
jowlyzhang authored and facebook-github-bot committed May 22, 2023
1 parent ffb5f1f commit 11ebddb
Show file tree
Hide file tree
Showing 11 changed files with 854 additions and 80 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,8 @@ set(SOURCES
util/string_util.cc
util/thread_local.cc
util/threadpool_imp.cc
util/udt_util.cc
util/write_batch_util.cc
util/xxhash.cc
utilities/agg_merge/agg_merge.cc
utilities/backup/backup_engine.cc
Expand Down Expand Up @@ -1421,6 +1423,7 @@ if(WITH_TESTS)
util/timer_test.cc
util/thread_list_test.cc
util/thread_local_test.cc
util/udt_util_test.cc
util/work_queue_test.cc
utilities/agg_merge/agg_merge_test.cc
utilities/backup/backup_engine_test.cc
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1417,6 +1417,9 @@ thread_local_test: $(OBJ_DIR)/util/thread_local_test.o $(TEST_LIBRARY) $(LIBRARY
work_queue_test: $(OBJ_DIR)/util/work_queue_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

udt_util_test: $(OBJ_DIR)/util/udt_util_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

corruption_test: $(OBJ_DIR)/db/corruption_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

Expand Down
8 changes: 8 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"util/string_util.cc",
"util/thread_local.cc",
"util/threadpool_imp.cc",
"util/udt_util.cc",
"util/write_batch_util.cc",
"util/xxhash.cc",
"utilities/agg_merge/agg_merge.cc",
"utilities/backup/backup_engine.cc",
Expand Down Expand Up @@ -5508,6 +5510,12 @@ cpp_unittest_wrapper(name="ttl_test",
extra_compiler_flags=[])


cpp_unittest_wrapper(name="udt_util_test",
srcs=["util/udt_util_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])


cpp_unittest_wrapper(name="util_merge_operators_test",
srcs=["utilities/util_merge_operators_test.cc"],
deps=[":rocksdb_test_lib"],
Expand Down
1 change: 1 addition & 0 deletions db/db_impl/db_impl_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "monitoring/perf_context_imp.h"
#include "rocksdb/configurable.h"
#include "util/cast_util.h"
#include "util/write_batch_util.h"

namespace ROCKSDB_NAMESPACE {

Expand Down
79 changes: 0 additions & 79 deletions db/db_impl/db_impl_secondary.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,85 +273,6 @@ class DBImplSecondary : public DBImpl {
return Status::OK();
}

// ColumnFamilyCollector is a write batch handler which does nothing
// except recording unique column family IDs
class ColumnFamilyCollector : public WriteBatch::Handler {
std::unordered_set<uint32_t> column_family_ids_;

Status AddColumnFamilyId(uint32_t column_family_id) {
if (column_family_ids_.find(column_family_id) ==
column_family_ids_.end()) {
column_family_ids_.insert(column_family_id);
}
return Status::OK();
}

public:
explicit ColumnFamilyCollector() {}

~ColumnFamilyCollector() override {}

Status PutCF(uint32_t column_family_id, const Slice&,
const Slice&) override {
return AddColumnFamilyId(column_family_id);
}

Status DeleteCF(uint32_t column_family_id, const Slice&) override {
return AddColumnFamilyId(column_family_id);
}

Status SingleDeleteCF(uint32_t column_family_id, const Slice&) override {
return AddColumnFamilyId(column_family_id);
}

Status DeleteRangeCF(uint32_t column_family_id, const Slice&,
const Slice&) override {
return AddColumnFamilyId(column_family_id);
}

Status MergeCF(uint32_t column_family_id, const Slice&,
const Slice&) override {
return AddColumnFamilyId(column_family_id);
}

Status PutBlobIndexCF(uint32_t column_family_id, const Slice&,
const Slice&) override {
return AddColumnFamilyId(column_family_id);
}

Status MarkBeginPrepare(bool) override { return Status::OK(); }

Status MarkEndPrepare(const Slice&) override { return Status::OK(); }

Status MarkRollback(const Slice&) override { return Status::OK(); }

Status MarkCommit(const Slice&) override { return Status::OK(); }

Status MarkCommitWithTimestamp(const Slice&, const Slice&) override {
return Status::OK();
}

Status MarkNoop(bool) override { return Status::OK(); }

const std::unordered_set<uint32_t>& column_families() const {
return column_family_ids_;
}
};

Status CollectColumnFamilyIdsFromWriteBatch(
const WriteBatch& batch, std::vector<uint32_t>* column_family_ids) {
assert(column_family_ids != nullptr);
column_family_ids->clear();
ColumnFamilyCollector handler;
Status s = batch.Iterate(&handler);
if (s.ok()) {
for (const auto& cf : handler.column_families()) {
column_family_ids->push_back(cf);
}
}
return s;
}

bool OwnTablesAndLogs() const override {
// Currently, the secondary instance does not own the database files. It
// simply opens the files of the primary instance and tracks their file
Expand Down
3 changes: 3 additions & 0 deletions src.mk
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ LIB_SOURCES = \
util/string_util.cc \
util/thread_local.cc \
util/threadpool_imp.cc \
util/udt_util.cc \
util/write_batch_util.cc \
util/xxhash.cc \
utilities/agg_merge/agg_merge.cc \
utilities/backup/backup_engine.cc \
Expand Down Expand Up @@ -593,6 +595,7 @@ TEST_MAIN_SOURCES = \
util/timer_test.cc \
util/thread_list_test.cc \
util/thread_local_test.cc \
util/udt_util_test.cc \
util/work_queue_test.cc \
utilities/agg_merge/agg_merge_test.cc \
utilities/backup/backup_engine_test.cc \
Expand Down
Loading

0 comments on commit 11ebddb

Please sign in to comment.