Skip to content

Commit

Permalink
Add API for writing wide-column entities (facebook#10242)
Browse files Browse the repository at this point in the history
Summary:
The patch builds on facebook#9915 and adds
a new API called `PutEntity` that can be used to write a wide-column entity
to the database. The new API is added to both `DB` and `WriteBatch`. Note
that currently there is no way to retrieve these entities; more precisely, all
read APIs (`Get`, `MultiGet`, and iterator) return `NotSupported` when they
encounter a wide-column entity that is required to answer a query. Read-side
support (as well as other missing functionality like `Merge`, compaction filter,
and timestamp support) will be added in later PRs.

Pull Request resolved: facebook#10242

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D37369748

Pulled By: ltamasi

fbshipit-source-id: 7f5e412359ed7a400fd80b897dae5599dbcd685d
  • Loading branch information
ltamasi authored and facebook-github-bot committed Jun 25, 2022
1 parent f322f27 commit c73d2a9
Show file tree
Hide file tree
Showing 32 changed files with 714 additions and 56 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1301,6 +1301,7 @@ if(WITH_TESTS)
db/version_set_test.cc
db/wal_manager_test.cc
db/wal_edit_test.cc
db/wide/db_wide_basic_test.cc
db/wide/wide_column_serialization_test.cc
db/write_batch_test.cc
db/write_callback_test.cc
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1383,6 +1383,9 @@ db_blob_compaction_test: $(OBJ_DIR)/db/blob/db_blob_compaction_test.o $(TEST_LIB
db_readonly_with_timestamp_test: $(OBJ_DIR)/db/db_readonly_with_timestamp_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

db_wide_basic_test: $(OBJ_DIR)/db/wide/db_wide_basic_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

db_with_timestamp_basic_test: $(OBJ_DIR)/db/db_with_timestamp_basic_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

Expand Down
6 changes: 6 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -5240,6 +5240,12 @@ cpp_unittest_wrapper(name="db_wal_test",
extra_compiler_flags=[])


cpp_unittest_wrapper(name="db_wide_basic_test",
srcs=["db/wide/db_wide_basic_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])


cpp_unittest_wrapper(name="db_with_timestamp_basic_test",
srcs=["db/db_with_timestamp_basic_test.cc"],
deps=[":rocksdb_test_lib"],
Expand Down
9 changes: 6 additions & 3 deletions db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ void CompactionIterator::Next() {

bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
Slice* skip_until) {
// TODO: support compaction filter for wide-column entities
if (!compaction_filter_ ||
(ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex)) {
return true;
Expand Down Expand Up @@ -519,7 +520,8 @@ void CompactionIterator::NextFromInput() {
// In the previous iteration we encountered a single delete that we could
// not compact out. We will keep this Put, but can drop it's data.
// (See Optimization 3, below.)
if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex) {
if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex &&
ikey_.type != kTypeWideColumnEntity) {
ROCKS_LOG_FATAL(info_log_, "Unexpected key %s for compaction output",
ikey_.DebugString(allow_data_in_errors_, true).c_str());
assert(false);
Expand All @@ -533,7 +535,7 @@ void CompactionIterator::NextFromInput() {
assert(false);
}

if (ikey_.type == kTypeBlobIndex) {
if (ikey_.type == kTypeBlobIndex || ikey_.type == kTypeWideColumnEntity) {
ikey_.type = kTypeValue;
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
}
Expand Down Expand Up @@ -689,7 +691,8 @@ void CompactionIterator::NextFromInput() {
// either way. We will maintain counts of how many mismatches
// happened
if (next_ikey.type != kTypeValue &&
next_ikey.type != kTypeBlobIndex) {
next_ikey.type != kTypeBlobIndex &&
next_ikey.type != kTypeWideColumnEntity) {
++iter_stats_.num_single_del_mismatch;
}

Expand Down
15 changes: 15 additions & 0 deletions db/compaction/compaction_iterator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,21 @@ TEST_F(CompactionIteratorWithSnapshotCheckerTest,
2 /*earliest_write_conflict_snapshot*/);
}

// Same as above but with a wide-column entity. In addition to the value getting
// trimmed, the type of the KV is changed to kTypeValue.
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
KeepSingleDeletionForWriteConflictChecking_WideColumnEntity) {
AddSnapshot(2, 0);
RunTest({test::KeyStr("a", 2, kTypeSingleDeletion),
test::KeyStr("a", 1, kTypeWideColumnEntity)},
{"", "fake_entity"},
{test::KeyStr("a", 2, kTypeSingleDeletion),
test::KeyStr("a", 1, kTypeValue)},
{"", ""}, 2 /* last_committed_seq */, nullptr /* merge_operator */,
nullptr /* compaction_filter */, false /* bottommost_level */,
2 /* earliest_write_conflict_snapshot */);
}

// Compaction filter should keep uncommitted key as-is, and
// * Convert the latest value to deletion, and/or
// * if latest value is a merge, apply filter to all subsequent merges.
Expand Down
10 changes: 10 additions & 0 deletions db/db_impl/compacted_db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,22 @@ class CompactedDBImpl : public DBImpl {
const Slice& /*key*/, const Slice& /*value*/) override {
return Status::NotSupported("Not supported in compacted db mode.");
}

using DBImpl::PutEntity;
Status PutEntity(const WriteOptions& /* options */,
ColumnFamilyHandle* /* column_family */,
const Slice& /* key */,
const WideColumns& /* columns */) override {
return Status::NotSupported("Not supported in compacted db mode.");
}

using DBImpl::Merge;
virtual Status Merge(const WriteOptions& /*options*/,
ColumnFamilyHandle* /*column_family*/,
const Slice& /*key*/, const Slice& /*value*/) override {
return Status::NotSupported("Not supported in compacted db mode.");
}

using DBImpl::Delete;
virtual Status Delete(const WriteOptions& /*options*/,
ColumnFamilyHandle* /*column_family*/,
Expand Down
5 changes: 5 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ class DBImpl : public DB {
Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& ts, const Slice& value) override;

using DB::PutEntity;
Status PutEntity(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const WideColumns& columns) override;

using DB::Merge;
Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) override;
Expand Down
9 changes: 9 additions & 0 deletions db/db_impl/db_impl_readonly.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ class DBImplReadOnly : public DBImpl {
const Slice& /*key*/, const Slice& /*value*/) override {
return Status::NotSupported("Not supported operation in read only mode.");
}

using DBImpl::PutEntity;
Status PutEntity(const WriteOptions& /* options */,
ColumnFamilyHandle* /* column_family */,
const Slice& /* key */,
const WideColumns& /* columns */) override {
return Status::NotSupported("Not supported operation in read only mode.");
}

using DBImpl::Merge;
virtual Status Merge(const WriteOptions& /*options*/,
ColumnFamilyHandle* /*column_family*/,
Expand Down
8 changes: 8 additions & 0 deletions db/db_impl/db_impl_secondary.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ class DBImplSecondary : public DBImpl {
return Status::NotSupported("Not supported operation in secondary mode.");
}

using DBImpl::PutEntity;
Status PutEntity(const WriteOptions& /* options */,
ColumnFamilyHandle* /* column_family */,
const Slice& /* key */,
const WideColumns& /* columns */) override {
return Status::NotSupported("Not supported operation in secondary mode.");
}

using DBImpl::Merge;
Status Merge(const WriteOptions& /*options*/,
ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/,
Expand Down
32 changes: 32 additions & 0 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@ Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family,
return DB::Put(o, column_family, key, ts, val);
}

Status DBImpl::PutEntity(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const WideColumns& columns) {
const Status s = FailIfCfHasTs(column_family);
if (!s.ok()) {
return s;
}

return DB::PutEntity(options, column_family, key, columns);
}

Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& val) {
const Status s = FailIfCfHasTs(column_family);
Expand Down Expand Up @@ -2236,6 +2247,27 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
return Write(opt, &batch);
}

Status DB::PutEntity(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const WideColumns& columns) {
const ColumnFamilyHandle* const default_cf = DefaultColumnFamily();
assert(default_cf);

const Comparator* const default_cf_ucmp = default_cf->GetComparator();
assert(default_cf_ucmp);

WriteBatch batch(/* reserved_bytes */ 0, /* max_bytes */ 0,
options.protection_bytes_per_key,
default_cf_ucmp->timestamp_size());

const Status s = batch.PutEntity(column_family, key, columns);
if (!s.ok()) {
return s;
}

return Write(options, &batch);
}

Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key) {
WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
Expand Down
52 changes: 50 additions & 2 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,15 @@ bool DBIter::SetBlobValueIfNeeded(const Slice& user_key,
return true;
}

bool DBIter::SetWideColumnValueIfNeeded(const Slice& /* wide_columns_slice */) {
assert(!is_blob_);

// TODO: support wide-column entities
status_ = Status::NotSupported("Encountered unexpected wide-column entity");
valid_ = false;
return false;
}

// PRE: saved_key_ has the current user key if skipping_saved_key
// POST: saved_key_ should have the next user key if valid_,
// if the current entry is a result of merge
Expand Down Expand Up @@ -341,13 +350,18 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
break;
case kTypeValue:
case kTypeBlobIndex:
case kTypeWideColumnEntity:
if (timestamp_lb_) {
saved_key_.SetInternalKey(ikey_);

if (ikey_.type == kTypeBlobIndex) {
if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) {
return false;
}
} else if (ikey_.type == kTypeWideColumnEntity) {
if (!SetWideColumnValueIfNeeded(iter_.value())) {
return false;
}
}

valid_ = true;
Expand All @@ -369,6 +383,10 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) {
return false;
}
} else if (ikey_.type == kTypeWideColumnEntity) {
if (!SetWideColumnValueIfNeeded(iter_.value())) {
return false;
}
}

valid_ = true;
Expand Down Expand Up @@ -580,6 +598,12 @@ bool DBIter::MergeValuesNewToOld() {
return false;
}
return true;
} else if (kTypeWideColumnEntity == ikey.type) {
// TODO: support wide-column entities
status_ = Status::NotSupported(
"Merge currently not supported for wide-column entities");
valid_ = false;
return false;
} else {
valid_ = false;
status_ = Status::Corruption(
Expand Down Expand Up @@ -783,7 +807,8 @@ bool DBIter::FindValueForCurrentKey() {
merge_context_.Clear();
current_entry_is_merged_ = false;
// last entry before merge (could be kTypeDeletion,
// kTypeDeletionWithTimestamp, kTypeSingleDeletion or kTypeValue)
// kTypeDeletionWithTimestamp, kTypeSingleDeletion, kTypeValue,
// kTypeBlobIndex, or kTypeWideColumnEntity)
ValueType last_not_merge_type = kTypeDeletion;
ValueType last_key_entry_type = kTypeDeletion;

Expand Down Expand Up @@ -831,6 +856,7 @@ bool DBIter::FindValueForCurrentKey() {
switch (last_key_entry_type) {
case kTypeValue:
case kTypeBlobIndex:
case kTypeWideColumnEntity:
if (range_del_agg_.ShouldDelete(
ikey, RangeDelPositioningMode::kBackwardTraversal)) {
last_key_entry_type = kTypeRangeDeletion;
Expand Down Expand Up @@ -927,6 +953,12 @@ bool DBIter::FindValueForCurrentKey() {
}
is_blob_ = false;
return true;
} else if (last_not_merge_type == kTypeWideColumnEntity) {
// TODO: support wide-column entities
status_ = Status::NotSupported(
"Merge currently not supported for wide-column entities");
valid_ = false;
return false;
} else {
assert(last_not_merge_type == kTypeValue);
s = Merge(&pinned_value_, saved_key_.GetUserKey());
Expand All @@ -944,6 +976,11 @@ bool DBIter::FindValueForCurrentKey() {
return false;
}
break;
case kTypeWideColumnEntity:
if (!SetWideColumnValueIfNeeded(pinned_value_)) {
return false;
}
break;
default:
valid_ = false;
status_ = Status::Corruption(
Expand Down Expand Up @@ -1034,13 +1071,18 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
Slice ts = ExtractTimestampFromUserKey(ikey.user_key, timestamp_size_);
saved_timestamp_.assign(ts.data(), ts.size());
}
if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) {
if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex ||
ikey.type == kTypeWideColumnEntity) {
assert(iter_.iter()->IsValuePinned());
pinned_value_ = iter_.value();
if (ikey.type == kTypeBlobIndex) {
if (!SetBlobValueIfNeeded(ikey.user_key, pinned_value_)) {
return false;
}
} else if (ikey_.type == kTypeWideColumnEntity) {
if (!SetWideColumnValueIfNeeded(pinned_value_)) {
return false;
}
}

valid_ = true;
Expand Down Expand Up @@ -1109,6 +1151,12 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
}
is_blob_ = false;
return true;
} else if (ikey.type == kTypeWideColumnEntity) {
// TODO: support wide-column entities
status_ = Status::NotSupported(
"Merge currently not supported for wide-column entities");
valid_ = false;
return false;
} else {
valid_ = false;
status_ = Status::Corruption(
Expand Down
2 changes: 2 additions & 0 deletions db/db_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ class DBIter final : public Iterator {
// index when using the integrated BlobDB implementation.
bool SetBlobValueIfNeeded(const Slice& user_key, const Slice& blob_index);

bool SetWideColumnValueIfNeeded(const Slice& wide_columns_slice);

Status Merge(const Slice* val, const Slice& user_key);

const SliceTransform* prefix_extractor_;
Expand Down
Loading

0 comments on commit c73d2a9

Please sign in to comment.