Skip to content

Commit

Permalink
Add wide column serialization primitives (facebook#9915)
Browse files Browse the repository at this point in the history
Summary:
The patch adds some low-level logic that can be used to serialize/deserialize
a sorted vector of wide columns to/from a simple binary searchable string
representation. Currently, there is no user-facing API; this will be implemented in
subsequent stages.

Pull Request resolved: facebook#9915

Test Plan: `make check`

Reviewed By: siying

Differential Revision: D35978076

Pulled By: ltamasi

fbshipit-source-id: 33f5f6628ec3bcd8c8beab363b1978ac047a8788
  • Loading branch information
ltamasi authored and facebook-github-bot committed Jun 4, 2022
1 parent 3e02c6e commit e9c74bc
Show file tree
Hide file tree
Showing 9 changed files with 589 additions and 1 deletion.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,7 @@ set(SOURCES
db/version_set.cc
db/wal_edit.cc
db/wal_manager.cc
db/wide/wide_column_serialization.cc
db/write_batch.cc
db/write_batch_base.cc
db/write_controller.cc
Expand Down Expand Up @@ -1294,6 +1295,7 @@ if(WITH_TESTS)
db/version_set_test.cc
db/wal_manager_test.cc
db/wal_edit_test.cc
db/wide/wide_column_serialization_test.cc
db/write_batch_test.cc
db/write_callback_test.cc
db/write_controller_test.cc
Expand Down
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1902,6 +1902,10 @@ db_basic_bench: $(OBJ_DIR)/microbench/db_basic_bench.o $(LIBRARY)

cache_reservation_manager_test: $(OBJ_DIR)/cache/cache_reservation_manager_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

wide_column_serialization_test: $(OBJ_DIR)/db/wide/wide_column_serialization_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

#-------------------------------------------------
# make install related stuff
PREFIX ?= /usr/local
Expand Down
8 changes: 8 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"db/version_set.cc",
"db/wal_edit.cc",
"db/wal_manager.cc",
"db/wide/wide_column_serialization.cc",
"db/write_batch.cc",
"db/write_batch_base.cc",
"db/write_controller.cc",
Expand Down Expand Up @@ -419,6 +420,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"db/version_set.cc",
"db/wal_edit.cc",
"db/wal_manager.cc",
"db/wide/wide_column_serialization.cc",
"db/write_batch.cc",
"db/write_batch_base.cc",
"db/write_controller.cc",
Expand Down Expand Up @@ -5814,6 +5816,12 @@ cpp_unittest_wrapper(name="wal_manager_test",
extra_compiler_flags=[])


cpp_unittest_wrapper(name="wide_column_serialization_test",
srcs=["db/wide/wide_column_serialization_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])


cpp_unittest_wrapper(name="work_queue_test",
srcs=["util/work_queue_test.cc"],
deps=[":rocksdb_test_lib"],
Expand Down
141 changes: 141 additions & 0 deletions db/wide/wide_column_serialization.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#include "db/wide/wide_column_serialization.h"

#include <algorithm>
#include <cassert>
#include <limits>

#include "rocksdb/slice.h"
#include "util/autovector.h"
#include "util/coding.h"

namespace ROCKSDB_NAMESPACE {

Status WideColumnSerialization::Serialize(const WideColumns& columns,
std::string& output) {
// Column names should be strictly ascending
assert(std::adjacent_find(columns.cbegin(), columns.cend(),
[](const WideColumn& lhs, const WideColumn& rhs) {
return lhs.name().compare(rhs.name()) > 0;
}) == columns.cend());

if (columns.size() >
static_cast<size_t>(std::numeric_limits<uint32_t>::max())) {
return Status::InvalidArgument("Too many wide columns");
}

PutVarint32(&output, kCurrentVersion);

PutVarint32(&output, static_cast<uint32_t>(columns.size()));

for (const auto& column : columns) {
const Slice& name = column.name();
if (name.size() >
static_cast<size_t>(std::numeric_limits<uint32_t>::max())) {
return Status::InvalidArgument("Wide column name too long");
}

const Slice& value = column.value();
if (value.size() >
static_cast<size_t>(std::numeric_limits<uint32_t>::max())) {
return Status::InvalidArgument("Wide column value too long");
}

PutLengthPrefixedSlice(&output, name);
PutVarint32(&output, static_cast<uint32_t>(value.size()));
}

for (const auto& column : columns) {
const Slice& value = column.value();

output.append(value.data(), value.size());
}

return Status::OK();
}

Status WideColumnSerialization::Deserialize(Slice& input,
WideColumns& columns) {
assert(columns.empty());

uint32_t version = 0;
if (!GetVarint32(&input, &version)) {
return Status::Corruption("Error decoding wide column version");
}

if (version > kCurrentVersion) {
return Status::NotSupported("Unsupported wide column version");
}

uint32_t num_columns = 0;
if (!GetVarint32(&input, &num_columns)) {
return Status::Corruption("Error decoding number of wide columns");
}

if (!num_columns) {
return Status::OK();
}

columns.reserve(num_columns);

autovector<uint32_t, 16> column_value_sizes;
column_value_sizes.reserve(num_columns);

for (uint32_t i = 0; i < num_columns; ++i) {
Slice name;
if (!GetLengthPrefixedSlice(&input, &name)) {
return Status::Corruption("Error decoding wide column name");
}

if (!columns.empty() && columns.back().name().compare(name) >= 0) {
return Status::Corruption("Wide columns out of order");
}

columns.emplace_back(name, Slice());

uint32_t value_size = 0;
if (!GetVarint32(&input, &value_size)) {
return Status::Corruption("Error decoding wide column value size");
}

column_value_sizes.emplace_back(value_size);
}

const Slice data(input);
size_t pos = 0;

for (uint32_t i = 0; i < num_columns; ++i) {
const uint32_t value_size = column_value_sizes[i];

if (pos + value_size > data.size()) {
return Status::Corruption("Error decoding wide column value payload");
}

columns[i].value() = Slice(data.data() + pos, value_size);

pos += value_size;
}

return Status::OK();
}

WideColumns::const_iterator WideColumnSerialization::Find(
const WideColumns& columns, const Slice& column_name) {
const auto it =
std::lower_bound(columns.cbegin(), columns.cend(), column_name,
[](const WideColumn& lhs, const Slice& rhs) {
return lhs.name().compare(rhs) < 0;
});

if (it == columns.cend() || it->name() != column_name) {
return columns.cend();
}

return it;
}

} // namespace ROCKSDB_NAMESPACE
55 changes: 55 additions & 0 deletions db/wide/wide_column_serialization.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#pragma once

#include <cstdint>
#include <string>

#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/status.h"
#include "rocksdb/wide_columns.h"

namespace ROCKSDB_NAMESPACE {

class Slice;

// Wide-column serialization/deserialization primitives.
//
// The two main parts of the layout are 1) a sorted index containing the column
// names and column value sizes and 2) the column values themselves. Keeping the
// index and the values separate will enable selectively reading column values
// down the line. Note that currently the index has to be fully parsed in order
// to find out the offset of each column value.
//
// Legend: cn = column name, cv = column value, cns = column name size, cvs =
// column value size.
//
// +----------+--------------+----------+-------+----------+---...
// | version | # of columns | cns 1 | cn 1 | cvs 1 |
// +----------+--------------+------------------+--------- +---...
// | varint32 | varint32 | varint32 | bytes | varint32 |
// +----------+--------------+----------+-------+----------+---...
//
// ... continued ...
//
// ...---+----------+-------+----------+-------+---...---+-------+
// | cns N | cn N | cvs N | cv 1 | | cv N |
// ...---+----------+-------+----------+-------+---...---+-------+
// | varint32 | bytes | varint32 | bytes | | bytes |
// ...---+----------+-------+----------+-------+---...---+-------+

class WideColumnSerialization {
public:
static Status Serialize(const WideColumns& columns, std::string& output);
static Status Deserialize(Slice& input, WideColumns& columns);

static WideColumns::const_iterator Find(const WideColumns& columns,
const Slice& column_name);

static constexpr uint32_t kCurrentVersion = 1;
};

} // namespace ROCKSDB_NAMESPACE
Loading

0 comments on commit e9c74bc

Please sign in to comment.