Skip to content

Commit

Permalink
Add timestamp support to DBImplReadOnly (facebook#10004)
Browse files Browse the repository at this point in the history
Summary:
This PR adds timestamp support to a read only DB instance opened as `DBImplReadOnly`. A follow up PR will add the same support to `CompactedDBImpl`.

 With this, read only database has these timestamp related APIs:

`ReadOptions.timestamp` : read should return the latest data visible to this specified timestamp
`Iterator::timestamp()` : returns the timestamp associated with the key, value
`DB:Get(..., std::string* timestamp)` : returns the timestamp associated with the key, value in `timestamp`

Test plan (on devserver):

```
$COMPILE_WITH_ASAN=1 make -j24 all
$./db_with_timestamp_basic_test --gtest_filter=DBBasicTestWithTimestamp.ReadOnlyDB*
```

Pull Request resolved: facebook#10004

Reviewed By: riversand963

Differential Revision: D36434422

Pulled By: jowlyzhang

fbshipit-source-id: 5d949e65b1ffb845758000e2b310fdd4aae71cfb
  • Loading branch information
jowlyzhang authored and facebook-github-bot committed May 20, 2022
1 parent 57997dd commit 16bdb1f
Show file tree
Hide file tree
Showing 13 changed files with 636 additions and 206 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ manifest_dump
sst_dump
blob_dump
block_cache_trace_analyzer
db_readonly_with_timestamp_test
db_with_timestamp_basic_test
tools/block_cache_analyzer/*.pyc
column_aware_encoding_exp
Expand Down
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,7 @@ if(WITH_TESTS)
db/comparator_db_test.cc
db/corruption_test.cc
db/cuckoo_table_db_test.cc
db/db_readonly_with_timestamp_test.cc
db/db_with_timestamp_basic_test.cc
db/db_block_cache_test.cc
db/db_bloom_filter_test.cc
Expand Down Expand Up @@ -1385,6 +1386,7 @@ if(WITH_TESTS)

set(TESTUTIL_SOURCE
db/db_test_util.cc
db/db_with_timestamp_test_util.cc
monitoring/thread_status_updater_debug.cc
table/mock_table.cc
utilities/agg_merge/test_agg_merge.cc
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1378,6 +1378,9 @@ db_blob_basic_test: $(OBJ_DIR)/db/blob/db_blob_basic_test.o $(TEST_LIBRARY) $(LI
db_blob_compaction_test: $(OBJ_DIR)/db/blob/db_blob_compaction_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

db_readonly_with_timestamp_test: $(OBJ_DIR)/db/db_readonly_with_timestamp_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

db_with_timestamp_basic_test: $(OBJ_DIR)/db/db_with_timestamp_basic_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

Expand Down
7 changes: 7 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[

cpp_library_wrapper(name="rocksdb_test_lib", srcs=[
"db/db_test_util.cc",
"db/db_with_timestamp_test_util.cc",
"table/mock_table.cc",
"test_util/mock_time_env.cc",
"test_util/testharness.cc",
Expand Down Expand Up @@ -5175,6 +5176,12 @@ cpp_unittest_wrapper(name="db_rate_limiter_test",
extra_compiler_flags=[])


cpp_unittest_wrapper(name="db_readonly_with_timestamp_test",
srcs=["db/db_readonly_with_timestamp_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])


cpp_unittest_wrapper(name="db_secondary_test",
srcs=["db/db_secondary_test.cc"],
deps=[":rocksdb_test_lib"],
Expand Down
11 changes: 0 additions & 11 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1723,17 +1723,6 @@ Status DBImpl::Get(const ReadOptions& read_options,
return s;
}

namespace {
class GetWithTimestampReadCallback : public ReadCallback {
public:
explicit GetWithTimestampReadCallback(SequenceNumber seq)
: ReadCallback(seq) {}
bool IsVisibleFullCheck(SequenceNumber seq) override {
return seq <= max_visible_seq_;
}
};
} // namespace

Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
GetImplOptions& get_impl_options) {
assert(get_impl_options.value != nullptr ||
Expand Down
9 changes: 9 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2400,6 +2400,15 @@ class DBImpl : public DB {
std::unique_ptr<StallInterface> wbm_stall_;
};

class GetWithTimestampReadCallback : public ReadCallback {
public:
explicit GetWithTimestampReadCallback(SequenceNumber seq)
: ReadCallback(seq) {}
bool IsVisibleFullCheck(SequenceNumber seq) override {
return seq <= max_visible_seq_;
}
};

extern Options SanitizeOptions(const std::string& db, const Options& src,
bool read_only = false);

Expand Down
77 changes: 54 additions & 23 deletions db/db_impl/db_impl_readonly.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,38 @@ DBImplReadOnly::~DBImplReadOnly() {}
Status DBImplReadOnly::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* pinnable_val) {
return Get(read_options, column_family, key, pinnable_val,
/*timestamp*/ nullptr);
}

Status DBImplReadOnly::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* pinnable_val,
std::string* timestamp) {
assert(pinnable_val != nullptr);
// TODO: stopwatch DB_GET needed?, perf timer needed?
PERF_TIMER_GUARD(get_snapshot_time);

assert(column_family);
if (read_options.timestamp) {
const Status s =
FailIfTsSizesMismatch(column_family, *(read_options.timestamp));
if (!s.ok()) {
return s;
}
} else {
const Status s = FailIfCfHasTs(column_family);
if (!s.ok()) {
return s;
}
}
const Comparator* ucmp = column_family->GetComparator();
assert(ucmp);
if (ucmp->timestamp_size() || read_options.timestamp) {
// TODO: support timestamp
return Status::NotSupported();
}
std::string* ts = ucmp->timestamp_size() > 0 ? timestamp : nullptr;

Status s;
SequenceNumber snapshot = versions_->LastSequence();
GetWithTimestampReadCallback read_cb(snapshot);
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
auto cfd = cfh->cfd();
if (tracer_) {
Expand All @@ -58,19 +76,23 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options,
SuperVersion* super_version = cfd->GetSuperVersion();
MergeContext merge_context;
SequenceNumber max_covering_tombstone_seq = 0;
LookupKey lkey(key, snapshot);
LookupKey lkey(key, snapshot, read_options.timestamp);
PERF_TIMER_STOP(get_snapshot_time);
if (super_version->mem->Get(lkey, pinnable_val->GetSelf(),
/*timestamp=*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, read_options)) {
if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), ts, &s,
&merge_context, &max_covering_tombstone_seq,
read_options, &read_cb)) {
pinnable_val->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
} else {
PERF_TIMER_GUARD(get_from_output_files_time);
PinnedIteratorsManager pinned_iters_mgr;
super_version->current->Get(read_options, lkey, pinnable_val,
/*timestamp=*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, &pinned_iters_mgr);
super_version->current->Get(
read_options, lkey, pinnable_val, ts, &s, &merge_context,
&max_covering_tombstone_seq, &pinned_iters_mgr,
/*value_found*/ nullptr,
/*key_exists*/ nullptr, /*seq*/ nullptr, &read_cb,
/*is_blob*/ nullptr,
/*do_merge*/ true);
RecordTick(stats_, MEMTABLE_MISS);
}
RecordTick(stats_, NUMBER_KEYS_READ);
Expand All @@ -84,11 +106,17 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options,
Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
ColumnFamilyHandle* column_family) {
assert(column_family);
const Comparator* ucmp = column_family->GetComparator();
assert(ucmp);
if (ucmp->timestamp_size() || read_options.timestamp) {
// TODO: support timestamp
return NewErrorIterator(Status::NotSupported());
if (read_options.timestamp) {
const Status s =
FailIfTsSizesMismatch(column_family, *(read_options.timestamp));
if (!s.ok()) {
return NewErrorIterator(s);
}
} else {
const Status s = FailIfCfHasTs(column_family);
if (!s.ok()) {
return NewErrorIterator(s);
}
}
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
auto cfd = cfh->cfd();
Expand Down Expand Up @@ -118,16 +146,19 @@ Status DBImplReadOnly::NewIterators(
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) {
if (read_options.timestamp) {
// TODO: support timestamp
return Status::NotSupported();
for (auto* cf : column_families) {
assert(cf);
const Status s = FailIfTsSizesMismatch(cf, *(read_options.timestamp));
if (!s.ok()) {
return s;
}
}
} else {
for (auto* cf : column_families) {
assert(cf);
const Comparator* ucmp = cf->GetComparator();
assert(ucmp);
if (ucmp->timestamp_size()) {
// TODO: support timestamp
return Status::NotSupported();
const Status s = FailIfCfHasTs(cf);
if (!s.ok()) {
return s;
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions db/db_impl/db_impl_readonly.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ class DBImplReadOnly : public DBImpl {
virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value,
std::string* timestamp) override;

// TODO: Implement ReadOnly MultiGet?

Expand Down
Loading

0 comments on commit 16bdb1f

Please sign in to comment.