Skip to content

Commit

Permalink
Add support for wide-column point lookups (facebook#10540)
Browse files Browse the repository at this point in the history
Summary:
The patch adds a new API `GetEntity` that can be used to perform
wide-column point lookups. It also extends the `Get` code path and
the `MemTable` / `MemTableList` and `Version` / `GetContext` logic
accordingly so that wide-column entities can be served from both
memtables and SSTs. If the result of a lookup is a wide-column entity
(`kTypeWideColumnEntity`), it is passed to the application in deserialized
form; if it is a plain old key-value (`kTypeValue`), it is presented as a
wide-column entity with a single default (anonymous) column.
(In contrast, regular `Get` returns plain old key-values as-is, and
returns the value of the default column for wide-column entities, see
facebook#10483 .)

The result of `GetEntity` is a self-contained `PinnableWideColumns` object.
`PinnableWideColumns` contains a `PinnableSlice`, which either stores the
underlying data in its own buffer or holds on to a cache handle. It also contains
a `WideColumns` instance, which indexes the contents of the `PinnableSlice`,
so applications can access the values of columns efficiently.

There are several pieces of functionality which are currently not supported
for wide-column entities: there is currently no `MultiGetEntity` or wide-column
iterator; also, `Merge` and `GetMergeOperands` are not supported, and there
is no `GetEntity` implementation for read-only and secondary instances.
We plan to implement these in future PRs.

Pull Request resolved: facebook#10540

Test Plan: `make check`

Reviewed By: akankshamahajan15

Differential Revision: D38847474

Pulled By: ltamasi

fbshipit-source-id: 42311a34ccdfe88b3775e847a5e2a5296e002b5b
  • Loading branch information
ltamasi authored and facebook-github-bot committed Aug 19, 2022
1 parent 2553d1e commit 81388b3
Show file tree
Hide file tree
Showing 29 changed files with 512 additions and 241 deletions.
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ set(SOURCES
db/wal_edit.cc
db/wal_manager.cc
db/wide/wide_column_serialization.cc
db/wide/wide_columns.cc
db/write_batch.cc
db/write_batch_base.cc
db/write_controller.cc
Expand Down Expand Up @@ -1510,7 +1511,7 @@ if(WITH_BENCHMARK_TOOLS)
endif()

option(WITH_TRACE_TOOLS "build with trace tools" ON)
if(WITH_TRACE_TOOLS)
if(WITH_TRACE_TOOLS)
add_executable(block_cache_trace_analyzer_tool${ARTIFACT_SUFFIX}
tools/block_cache_analyzer/block_cache_trace_analyzer_tool.cc)
target_link_libraries(block_cache_trace_analyzer_tool${ARTIFACT_SUFFIX}
Expand Down
2 changes: 2 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"db/wal_edit.cc",
"db/wal_manager.cc",
"db/wide/wide_column_serialization.cc",
"db/wide/wide_columns.cc",
"db/write_batch.cc",
"db/write_batch_base.cc",
"db/write_controller.cc",
Expand Down Expand Up @@ -435,6 +436,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"db/wal_edit.cc",
"db/wal_manager.cc",
"db/wide/wide_column_serialization.cc",
"db/wide/wide_columns.cc",
"db/write_batch.cc",
"db/write_batch_base.cc",
"db/write_controller.cc",
Expand Down
8 changes: 4 additions & 4 deletions db/db_impl/compacted_db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
user_comparator_->timestamp_size() > 0 ? timestamp : nullptr;
LookupKey lkey(key, kMaxSequenceNumber, options.timestamp);
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
GetContext::kNotFound, lkey.user_key(), value, ts,
nullptr, nullptr, true, nullptr, nullptr, nullptr,
nullptr, &read_cb);
GetContext::kNotFound, lkey.user_key(), value,
/*columns=*/nullptr, ts, nullptr, nullptr, true,
nullptr, nullptr, nullptr, nullptr, &read_cb);

const FdWithKeyRange& f = files_.files[FindFile(lkey.user_key())];
if (user_comparator_->CompareWithoutTimestamp(
Expand Down Expand Up @@ -159,7 +159,7 @@ std::vector<Status> CompactedDBImpl::MultiGet(
std::string* timestamp = timestamps ? &(*timestamps)[idx] : nullptr;
GetContext get_context(
user_comparator_, nullptr, nullptr, nullptr, GetContext::kNotFound,
lkey.user_key(), &pinnable_val,
lkey.user_key(), &pinnable_val, /*columns=*/nullptr,
user_comparator_->timestamp_size() > 0 ? timestamp : nullptr, nullptr,
nullptr, true, nullptr, nullptr, nullptr, nullptr, &read_cb);
Status s = r->Get(options, lkey.internal_key(), &get_context, nullptr);
Expand Down
118 changes: 81 additions & 37 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1822,6 +1822,28 @@ Status DBImpl::Get(const ReadOptions& read_options,
return s;
}

Status DBImpl::GetEntity(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableWideColumns* columns) {
if (!column_family) {
return Status::InvalidArgument(
"Cannot call GetEntity without a column family handle");
}

if (!columns) {
return Status::InvalidArgument(
"Cannot call GetEntity without a PinnableWideColumns object");
}

columns->Reset();

GetImplOptions get_impl_options;
get_impl_options.column_family = column_family;
get_impl_options.columns = columns;

return GetImpl(read_options, key, get_impl_options);
}

bool DBImpl::ShouldReferenceSuperVersion(const MergeContext& merge_context) {
// If both thresholds are reached, a function returning merge operands as
// `PinnableSlice`s should reference the `SuperVersion` to avoid large and/or
Expand Down Expand Up @@ -1853,7 +1875,8 @@ bool DBImpl::ShouldReferenceSuperVersion(const MergeContext& merge_context) {
Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
GetImplOptions& get_impl_options) {
assert(get_impl_options.value != nullptr ||
get_impl_options.merge_operands != nullptr);
get_impl_options.merge_operands != nullptr ||
get_impl_options.columns != nullptr);

assert(get_impl_options.column_family);

Expand Down Expand Up @@ -1980,31 +2003,46 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
if (!skip_memtable) {
// Get value associated with key
if (get_impl_options.get_value) {
if (sv->mem->Get(lkey, get_impl_options.value->GetSelf(), timestamp, &s,
&merge_context, &max_covering_tombstone_seq,
read_options, false /* immutable_memtable */,
get_impl_options.callback,
get_impl_options.is_blob_index)) {
if (sv->mem->Get(
lkey,
get_impl_options.value ? get_impl_options.value->GetSelf()
: nullptr,
get_impl_options.columns, timestamp, &s, &merge_context,
&max_covering_tombstone_seq, read_options,
false /* immutable_memtable */, get_impl_options.callback,
get_impl_options.is_blob_index)) {
done = true;
get_impl_options.value->PinSelf();

if (get_impl_options.value) {
get_impl_options.value->PinSelf();
}

RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
sv->imm->Get(lkey, get_impl_options.value->GetSelf(),
timestamp, &s, &merge_context,
&max_covering_tombstone_seq, read_options,
get_impl_options.callback,
sv->imm->Get(lkey,
get_impl_options.value
? get_impl_options.value->GetSelf()
: nullptr,
get_impl_options.columns, timestamp, &s,
&merge_context, &max_covering_tombstone_seq,
read_options, get_impl_options.callback,
get_impl_options.is_blob_index)) {
done = true;
get_impl_options.value->PinSelf();

if (get_impl_options.value) {
get_impl_options.value->PinSelf();
}

RecordTick(stats_, MEMTABLE_HIT);
}
} else {
// Get Merge Operands associated with key, Merge Operands should not be
// merged and raw values should be returned to the user.
if (sv->mem->Get(lkey, /*value*/ nullptr, /*timestamp=*/nullptr, &s,
&merge_context, &max_covering_tombstone_seq,
read_options, false /* immutable_memtable */, nullptr,
nullptr, false)) {
if (sv->mem->Get(lkey, /*value=*/nullptr, /*columns=*/nullptr,
/*timestamp=*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, read_options,
false /* immutable_memtable */, nullptr, nullptr,
false)) {
done = true;
RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
Expand All @@ -2026,8 +2064,9 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
if (!done) {
PERF_TIMER_GUARD(get_from_output_files_time);
sv->current->Get(
read_options, lkey, get_impl_options.value, timestamp, &s,
&merge_context, &max_covering_tombstone_seq, &pinned_iters_mgr,
read_options, lkey, get_impl_options.value, get_impl_options.columns,
timestamp, &s, &merge_context, &max_covering_tombstone_seq,
&pinned_iters_mgr,
get_impl_options.get_value ? get_impl_options.value_found : nullptr,
nullptr, nullptr,
get_impl_options.get_value ? get_impl_options.callback : nullptr,
Expand All @@ -2043,7 +2082,11 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
size_t size = 0;
if (s.ok()) {
if (get_impl_options.get_value) {
size = get_impl_options.value->size();
if (get_impl_options.value) {
size = get_impl_options.value->size();
} else if (get_impl_options.columns) {
size = get_impl_options.columns->serialized_size();
}
} else {
// Return all merge operands for get_impl_options.key
*get_impl_options.number_of_operands =
Expand Down Expand Up @@ -2252,14 +2295,14 @@ std::vector<Status> DBImpl::MultiGet(
has_unpersisted_data_.load(std::memory_order_relaxed));
bool done = false;
if (!skip_memtable) {
if (super_version->mem->Get(lkey, value, timestamp, &s, &merge_context,
&max_covering_tombstone_seq, read_options,
false /* immutable_memtable */,
read_callback)) {
if (super_version->mem->Get(
lkey, value, /*columns=*/nullptr, timestamp, &s, &merge_context,
&max_covering_tombstone_seq, read_options,
false /* immutable_memtable */, read_callback)) {
done = true;
RecordTick(stats_, MEMTABLE_HIT);
} else if (super_version->imm->Get(lkey, value, timestamp, &s,
&merge_context,
} else if (super_version->imm->Get(lkey, value, /*columns=*/nullptr,
timestamp, &s, &merge_context,
&max_covering_tombstone_seq,
read_options, read_callback)) {
done = true;
Expand All @@ -2270,9 +2313,9 @@ std::vector<Status> DBImpl::MultiGet(
PinnableSlice pinnable_val;
PERF_TIMER_GUARD(get_from_output_files_time);
PinnedIteratorsManager pinned_iters_mgr;
super_version->current->Get(read_options, lkey, &pinnable_val, timestamp,
&s, &merge_context,
&max_covering_tombstone_seq,
super_version->current->Get(read_options, lkey, &pinnable_val,
/*columns=*/nullptr, timestamp, &s,
&merge_context, &max_covering_tombstone_seq,
&pinned_iters_mgr, /*value_found=*/nullptr,
/*key_exists=*/nullptr,
/*seq=*/nullptr, read_callback);
Expand Down Expand Up @@ -4861,8 +4904,8 @@ Status DBImpl::GetLatestSequenceForKey(
*found_record_for_key = false;

// Check if there is a record for this key in the latest memtable
sv->mem->Get(lkey, /*value=*/nullptr, timestamp, &s, &merge_context,
&max_covering_tombstone_seq, seq, read_options,
sv->mem->Get(lkey, /*value=*/nullptr, /*columns=*/nullptr, timestamp, &s,
&merge_context, &max_covering_tombstone_seq, seq, read_options,
false /* immutable_memtable */, nullptr /*read_callback*/,
is_blob_index);

Expand Down Expand Up @@ -4895,8 +4938,8 @@ Status DBImpl::GetLatestSequenceForKey(
}

// Check if there is a record for this key in the immutable memtables
sv->imm->Get(lkey, /*value=*/nullptr, timestamp, &s, &merge_context,
&max_covering_tombstone_seq, seq, read_options,
sv->imm->Get(lkey, /*value=*/nullptr, /*columns=*/nullptr, timestamp, &s,
&merge_context, &max_covering_tombstone_seq, seq, read_options,
nullptr /*read_callback*/, is_blob_index);

if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
Expand Down Expand Up @@ -4927,9 +4970,10 @@ Status DBImpl::GetLatestSequenceForKey(
}

// Check if there is a record for this key in the immutable memtables
sv->imm->GetFromHistory(lkey, /*value=*/nullptr, timestamp, &s,
&merge_context, &max_covering_tombstone_seq, seq,
read_options, is_blob_index);
sv->imm->GetFromHistory(lkey, /*value=*/nullptr, /*columns=*/nullptr,
timestamp, &s, &merge_context,
&max_covering_tombstone_seq, seq, read_options,
is_blob_index);

if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
// unexpected error reading memtable.
Expand Down Expand Up @@ -4962,8 +5006,8 @@ Status DBImpl::GetLatestSequenceForKey(
if (!cache_only) {
// Check tables
PinnedIteratorsManager pinned_iters_mgr;
sv->current->Get(read_options, lkey, /*value=*/nullptr, timestamp, &s,
&merge_context, &max_covering_tombstone_seq,
sv->current->Get(read_options, lkey, /*value=*/nullptr, /*columns=*/nullptr,
timestamp, &s, &merge_context, &max_covering_tombstone_seq,
&pinned_iters_mgr, nullptr /* value_found */,
found_record_for_key, seq, nullptr /*read_callback*/,
is_blob_index);
Expand Down
6 changes: 6 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ class DBImpl : public DB {
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value, std::string* timestamp) override;

using DB::GetEntity;
Status GetEntity(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableWideColumns* columns) override;

using DB::GetMergeOperands;
Status GetMergeOperands(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
Expand Down Expand Up @@ -592,6 +597,7 @@ class DBImpl : public DB {
struct GetImplOptions {
ColumnFamilyHandle* column_family = nullptr;
PinnableSlice* value = nullptr;
PinnableWideColumns* columns = nullptr;
std::string* timestamp = nullptr;
bool* value_found = nullptr;
ReadCallback* callback = nullptr;
Expand Down
12 changes: 6 additions & 6 deletions db/db_impl/db_impl_readonly.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,18 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options,
SequenceNumber max_covering_tombstone_seq = 0;
LookupKey lkey(key, snapshot, read_options.timestamp);
PERF_TIMER_STOP(get_snapshot_time);
if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), ts, &s,
&merge_context, &max_covering_tombstone_seq,
read_options, false /* immutable_memtable */,
&read_cb)) {
if (super_version->mem->Get(lkey, pinnable_val->GetSelf(),
/*columns=*/nullptr, ts, &s, &merge_context,
&max_covering_tombstone_seq, read_options,
false /* immutable_memtable */, &read_cb)) {
pinnable_val->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
} else {
PERF_TIMER_GUARD(get_from_output_files_time);
PinnedIteratorsManager pinned_iters_mgr;
super_version->current->Get(
read_options, lkey, pinnable_val, ts, &s, &merge_context,
&max_covering_tombstone_seq, &pinned_iters_mgr,
read_options, lkey, pinnable_val, /*columns=*/nullptr, ts, &s,
&merge_context, &max_covering_tombstone_seq, &pinned_iters_mgr,
/*value_found*/ nullptr,
/*key_exists*/ nullptr, /*seq*/ nullptr, &read_cb,
/*is_blob*/ nullptr,
Expand Down
18 changes: 10 additions & 8 deletions db/db_impl/db_impl_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -390,17 +390,18 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
const Comparator* ucmp = column_family->GetComparator();
assert(ucmp);
std::string* ts = ucmp->timestamp_size() > 0 ? timestamp : nullptr;
if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), ts, &s,
&merge_context, &max_covering_tombstone_seq,
read_options, false /* immutable_memtable */,
&read_cb)) {
if (super_version->mem->Get(lkey, pinnable_val->GetSelf(),
/*columns=*/nullptr, ts, &s, &merge_context,
&max_covering_tombstone_seq, read_options,
false /* immutable_memtable */, &read_cb)) {
done = true;
pinnable_val->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
super_version->imm->Get(
lkey, pinnable_val->GetSelf(), ts, &s, &merge_context,
&max_covering_tombstone_seq, read_options, &read_cb)) {
lkey, pinnable_val->GetSelf(), /*columns=*/nullptr, ts, &s,
&merge_context, &max_covering_tombstone_seq, read_options,
&read_cb)) {
done = true;
pinnable_val->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
Expand All @@ -413,8 +414,9 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
PERF_TIMER_GUARD(get_from_output_files_time);
PinnedIteratorsManager pinned_iters_mgr;
super_version->current->Get(
read_options, lkey, pinnable_val, ts, &s, &merge_context,
&max_covering_tombstone_seq, &pinned_iters_mgr, /*value_found*/ nullptr,
read_options, lkey, pinnable_val, /*columns=*/nullptr, ts, &s,
&merge_context, &max_covering_tombstone_seq, &pinned_iters_mgr,
/*value_found*/ nullptr,
/*key_exists*/ nullptr, /*seq*/ nullptr, &read_cb, /*is_blob*/ nullptr,
/*do_merge*/ true);
RecordTick(stats_, MEMTABLE_MISS);
Expand Down
6 changes: 3 additions & 3 deletions db/db_memtable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,9 @@ TEST_F(DBMemTableTest, ConcurrentMergeWrite) {
ReadOptions roptions;
SequenceNumber max_covering_tombstone_seq = 0;
LookupKey lkey("key", kMaxSequenceNumber);
bool res = mem->Get(lkey, &value, /*timestamp=*/nullptr, &status,
&merge_context, &max_covering_tombstone_seq, roptions,
false /* immutable_memtable */);
bool res = mem->Get(lkey, &value, /*columns=*/nullptr, /*timestamp=*/nullptr,
&status, &merge_context, &max_covering_tombstone_seq,
roptions, false /* immutable_memtable */);
ASSERT_OK(status);
ASSERT_TRUE(res);
uint64_t ivalue = DecodeFixed64(Slice(value).data());
Expand Down
12 changes: 6 additions & 6 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -734,9 +734,9 @@ bool FlushJob::MemPurgeDecider(double threshold) {
min_seqno_snapshot < kMaxSequenceNumber ? &min_snapshot : nullptr;

// Estimate if the sample entry is valid or not.
get_res = mt->Get(lkey, &vget, nullptr, &mget_s, &merge_context,
&max_covering_tombstone_seq, &sqno, ro,
true /* immutable_memtable */);
get_res = mt->Get(lkey, &vget, /*columns=*/nullptr, /*timestamp=*/nullptr,
&mget_s, &merge_context, &max_covering_tombstone_seq,
&sqno, ro, true /* immutable_memtable */);
if (!get_res) {
ROCKS_LOG_WARN(
db_options_.info_log,
Expand Down Expand Up @@ -776,9 +776,9 @@ bool FlushJob::MemPurgeDecider(double threshold) {
for (auto next_mem_iter = mem_iter + 1;
next_mem_iter != std::end(mems_); next_mem_iter++) {
if ((*next_mem_iter)
->Get(lkey, &vget, nullptr, &mget_s, &merge_context,
&max_covering_tombstone_seq, &sqno, ro,
true /* immutable_memtable */)) {
->Get(lkey, &vget, /*columns=*/nullptr, /*timestamp=*/nullptr,
&mget_s, &merge_context, &max_covering_tombstone_seq,
&sqno, ro, true /* immutable_memtable */)) {
not_in_next_mems = false;
break;
}
Expand Down
Loading

0 comments on commit 81388b3

Please sign in to comment.