Skip to content

Commit

Permalink
Introduce a dedicated class to represent blob values (facebook#10571)
Browse files Browse the repository at this point in the history
Summary:
The patch introduces a new class called `BlobContents`, which represents
a single uncompressed blob value. We currently use `std::string` for this
purpose; `BlobContents` is somewhat smaller but the primary reason for a
dedicated class is that it enables certain improvements and optimizations
like eliding a copy when inserting a blob into the cache, using custom
allocators, or more control over and better accounting of the memory usage
of cached blobs (see facebook#10484).
(We plan to implement these in subsequent PRs.)

Pull Request resolved: facebook#10571

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D39000965

Pulled By: ltamasi

fbshipit-source-id: f296eddf9dec4fc3e11cad525b462bdf63c78f96
  • Loading branch information
ltamasi authored and facebook-github-bot committed Aug 25, 2022
1 parent 418b36a commit 3f57d84
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 88 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ set(SOURCES
cache/lru_cache.cc
cache/sharded_cache.cc
db/arena_wrapped_db_iter.cc
db/blob/blob_contents.cc
db/blob/blob_fetcher.cc
db/blob/blob_file_addition.cc
db/blob/blob_file_builder.cc
Expand Down
2 changes: 2 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"cache/lru_cache.cc",
"cache/sharded_cache.cc",
"db/arena_wrapped_db_iter.cc",
"db/blob/blob_contents.cc",
"db/blob/blob_fetcher.cc",
"db/blob/blob_file_addition.cc",
"db/blob/blob_file_builder.cc",
Expand Down Expand Up @@ -357,6 +358,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"cache/lru_cache.cc",
"cache/sharded_cache.cc",
"db/arena_wrapped_db_iter.cc",
"db/blob/blob_contents.cc",
"db/blob/blob_fetcher.cc",
"db/blob/blob_file_addition.cc",
"db/blob/blob_file_builder.cc",
Expand Down
63 changes: 63 additions & 0 deletions db/blob/blob_contents.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#include "db/blob/blob_contents.h"

#include <cassert>

#include "cache/cache_helpers.h"

namespace ROCKSDB_NAMESPACE {

std::unique_ptr<BlobContents> BlobContents::Create(
CacheAllocationPtr&& allocation, size_t size) {
return std::unique_ptr<BlobContents>(
new BlobContents(std::move(allocation), size));
}

size_t BlobContents::SizeCallback(void* obj) {
assert(obj);

return static_cast<const BlobContents*>(obj)->size();
}

Status BlobContents::SaveToCallback(void* from_obj, size_t from_offset,
size_t length, void* out) {
assert(from_obj);

const BlobContents* buf = static_cast<const BlobContents*>(from_obj);
assert(buf->size() >= from_offset + length);

memcpy(out, buf->data().data() + from_offset, length);

return Status::OK();
}

void BlobContents::DeleteCallback(const Slice& key, void* value) {
DeleteCacheEntry<BlobContents>(key, value);
}

Cache::CacheItemHelper* BlobContents::GetCacheItemHelper() {
static Cache::CacheItemHelper cache_helper(&SizeCallback, &SaveToCallback,
&DeleteCallback);

return &cache_helper;
}

Status BlobContents::CreateCallback(const void* buf, size_t size,
void** out_obj, size_t* charge) {
CacheAllocationPtr allocation(new char[size]);
memcpy(allocation.get(), buf, size);

std::unique_ptr<BlobContents> obj = Create(std::move(allocation), size);
BlobContents* const contents = obj.release();

*out_obj = contents;
*charge = contents->size();

return Status::OK();
}

} // namespace ROCKSDB_NAMESPACE
56 changes: 56 additions & 0 deletions db/blob/blob_contents.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#pragma once

#include <memory>

#include "memory/memory_allocator.h"
#include "rocksdb/cache.h"
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"

namespace ROCKSDB_NAMESPACE {

// A class representing a single uncompressed value read from a blob file.
class BlobContents {
public:
static std::unique_ptr<BlobContents> Create(CacheAllocationPtr&& allocation,
size_t size);

BlobContents(const BlobContents&) = delete;
BlobContents& operator=(const BlobContents&) = delete;

BlobContents(BlobContents&&) = default;
BlobContents& operator=(BlobContents&&) = default;

~BlobContents() = default;

const Slice& data() const { return data_; }
size_t size() const { return data_.size(); }

// Callbacks for secondary cache
static size_t SizeCallback(void* obj);

static Status SaveToCallback(void* from_obj, size_t from_offset,
size_t length, void* out);

static void DeleteCallback(const Slice& key, void* value);

static Cache::CacheItemHelper* GetCacheItemHelper();

static Status CreateCallback(const void* buf, size_t size, void** out_obj,
size_t* charge);

private:
BlobContents(CacheAllocationPtr&& allocation, size_t size)
: allocation_(std::move(allocation)), data_(allocation_.get(), size) {}

CacheAllocationPtr allocation_;
Slice data_;
};

} // namespace ROCKSDB_NAMESPACE
12 changes: 7 additions & 5 deletions db/blob/blob_file_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <cassert>

#include "db/blob/blob_contents.h"
#include "db/blob/blob_file_addition.h"
#include "db/blob/blob_file_completion_callback.h"
#include "db/blob/blob_index.h"
Expand Down Expand Up @@ -408,15 +409,16 @@ Status BlobFileBuilder::PutBlobIntoCacheIfNeeded(const Slice& blob,

// Objects to be put into the cache have to be heap-allocated and
// self-contained, i.e. own their contents. The Cache has to be able to
// take unique ownership of them. Therefore, we copy the blob into a
// string directly, and insert that into the cache.
std::unique_ptr<std::string> buf = std::make_unique<std::string>();
buf->assign(blob.data(), blob.size());
// take unique ownership of them.
CacheAllocationPtr allocation(new char[blob.size()]);
memcpy(allocation.get(), blob.data(), blob.size());
std::unique_ptr<BlobContents> buf =
BlobContents::Create(std::move(allocation), blob.size());

// TODO: support custom allocators and provide a better estimated memory
// usage using malloc_usable_size.
s = blob_cache->Insert(key, buf.get(), buf->size(),
&DeleteCacheEntry<std::string>,
&BlobContents::DeleteCallback,
nullptr /* cache_handle */, priority);
if (s.ok()) {
RecordTick(statistics, BLOB_DB_CACHE_ADD);
Expand Down
76 changes: 24 additions & 52 deletions db/blob/blob_source.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "cache/cache_reservation_manager.h"
#include "cache/charged_cache.h"
#include "db/blob/blob_contents.h"
#include "db/blob/blob_file_reader.h"
#include "db/blob/blob_log_format.h"
#include "monitoring/statistics.h"
Expand Down Expand Up @@ -43,8 +44,8 @@ BlobSource::BlobSource(const ImmutableOptions* immutable_options,

BlobSource::~BlobSource() = default;

Status BlobSource::GetBlobFromCache(const Slice& cache_key,
CacheHandleGuard<std::string>* blob) const {
Status BlobSource::GetBlobFromCache(
const Slice& cache_key, CacheHandleGuard<BlobContents>* blob) const {
assert(blob);
assert(blob->IsEmpty());
assert(blob_cache_);
Expand All @@ -53,7 +54,7 @@ Status BlobSource::GetBlobFromCache(const Slice& cache_key,
Cache::Handle* cache_handle = nullptr;
cache_handle = GetEntryFromCache(cache_key);
if (cache_handle != nullptr) {
*blob = CacheHandleGuard<std::string>(blob_cache_.get(), cache_handle);
*blob = CacheHandleGuard<BlobContents>(blob_cache_.get(), cache_handle);
return Status::OK();
}

Expand All @@ -63,7 +64,7 @@ Status BlobSource::GetBlobFromCache(const Slice& cache_key,
}

Status BlobSource::PutBlobIntoCache(const Slice& cache_key,
CacheHandleGuard<std::string>* cached_blob,
CacheHandleGuard<BlobContents>* cached_blob,
PinnableSlice* blob) const {
assert(blob);
assert(!cache_key.empty());
Expand All @@ -74,10 +75,11 @@ Status BlobSource::PutBlobIntoCache(const Slice& cache_key,

// Objects to be put into the cache have to be heap-allocated and
// self-contained, i.e. own their contents. The Cache has to be able to take
// unique ownership of them. Therefore, we copy the blob into a string
// directly, and insert that into the cache.
std::unique_ptr<std::string> buf = std::make_unique<std::string>();
buf->assign(blob->data(), blob->size());
// unique ownership of them.
CacheAllocationPtr allocation(new char[blob->size()]);
memcpy(allocation.get(), blob->data(), blob->size());
std::unique_ptr<BlobContents> buf =
BlobContents::Create(std::move(allocation), blob->size());

// TODO: support custom allocators and provide a better estimated memory
// usage using malloc_usable_size.
Expand All @@ -88,7 +90,7 @@ Status BlobSource::PutBlobIntoCache(const Slice& cache_key,
buf.release();
assert(cache_handle != nullptr);
*cached_blob =
CacheHandleGuard<std::string>(blob_cache_.get(), cache_handle);
CacheHandleGuard<BlobContents>(blob_cache_.get(), cache_handle);
}

return s;
Expand All @@ -98,18 +100,9 @@ Cache::Handle* BlobSource::GetEntryFromCache(const Slice& key) const {
Cache::Handle* cache_handle = nullptr;

if (lowest_used_cache_tier_ == CacheTier::kNonVolatileBlockTier) {
Cache::CreateCallback create_cb = [&](const void* buf, size_t size,
void** out_obj,
size_t* charge) -> Status {
std::string* blob = new std::string();
blob->assign(static_cast<const char*>(buf), size);
*out_obj = blob;
*charge = size;
return Status::OK();
};
cache_handle = blob_cache_->Lookup(key, GetCacheItemHelper(), create_cb,
Cache::Priority::BOTTOM,
true /* wait_for_cache */, statistics_);
cache_handle = blob_cache_->Lookup(
key, BlobContents::GetCacheItemHelper(), &BlobContents::CreateCallback,
Cache::Priority::BOTTOM, true /* wait_for_cache */, statistics_);
} else {
cache_handle = blob_cache_->Lookup(key, statistics_);
}
Expand All @@ -125,17 +118,17 @@ Cache::Handle* BlobSource::GetEntryFromCache(const Slice& key) const {
return cache_handle;
}

Status BlobSource::InsertEntryIntoCache(const Slice& key, std::string* value,
Status BlobSource::InsertEntryIntoCache(const Slice& key, BlobContents* value,
size_t charge,
Cache::Handle** cache_handle,
Cache::Priority priority) const {
Status s;

if (lowest_used_cache_tier_ == CacheTier::kNonVolatileBlockTier) {
s = blob_cache_->Insert(key, value, GetCacheItemHelper(), charge,
cache_handle, priority);
s = blob_cache_->Insert(key, value, BlobContents::GetCacheItemHelper(),
charge, cache_handle, priority);
} else {
s = blob_cache_->Insert(key, value, charge, &DeleteCacheEntry<std::string>,
s = blob_cache_->Insert(key, value, charge, &BlobContents::DeleteCallback,
cache_handle, priority);
}

Expand Down Expand Up @@ -164,7 +157,7 @@ Status BlobSource::GetBlob(const ReadOptions& read_options,

const CacheKey cache_key = GetCacheKey(file_number, file_size, offset);

CacheHandleGuard<std::string> blob_handle;
CacheHandleGuard<BlobContents> blob_handle;

// First, try to get the blob from the cache
//
Expand All @@ -180,7 +173,7 @@ Status BlobSource::GetBlob(const ReadOptions& read_options,
// the target PinnableSlice. This has the potential to save a lot of
// CPU, especially with large blob values.
value->PinSlice(
*blob_handle.GetValue(),
blob_handle.GetValue()->data(),
[](void* arg1, void* arg2) {
Cache* const cache = static_cast<Cache*>(arg1);
Cache::Handle* const handle = static_cast<Cache::Handle*>(arg2);
Expand Down Expand Up @@ -310,7 +303,7 @@ void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options,
for (size_t i = 0; i < num_blobs; ++i) {
auto& req = blob_reqs[i];

CacheHandleGuard<std::string> blob_handle;
CacheHandleGuard<BlobContents> blob_handle;
const CacheKey cache_key = base_cache_key.WithOffset(req.offset);
const Slice key = cache_key.AsSlice();

Expand All @@ -327,7 +320,7 @@ void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options,
// to the target PinnableSlice. This has the potential to save a lot
// of CPU, especially with large blob values.
req.result->PinSlice(
*blob_handle.GetValue(),
blob_handle.GetValue()->data(),
[](void* arg1, void* arg2) {
Cache* const cache = static_cast<Cache*>(arg1);
Cache::Handle* const handle = static_cast<Cache::Handle*>(arg2);
Expand Down Expand Up @@ -407,7 +400,7 @@ void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options,
// the blob(s) to the cache.
for (size_t i = 0; i < _blob_reqs.size(); ++i) {
if (_blob_reqs[i]->status->ok()) {
CacheHandleGuard<std::string> blob_handle;
CacheHandleGuard<BlobContents> blob_handle;
const CacheKey cache_key =
base_cache_key.WithOffset(_blob_reqs[i]->offset);
const Slice key = cache_key.AsSlice();
Expand All @@ -431,7 +424,7 @@ bool BlobSource::TEST_BlobInCache(uint64_t file_number, uint64_t file_size,
const CacheKey cache_key = GetCacheKey(file_number, file_size, offset);
const Slice key = cache_key.AsSlice();

CacheHandleGuard<std::string> blob_handle;
CacheHandleGuard<BlobContents> blob_handle;
const Status s = GetBlobFromCache(key, &blob_handle);

if (s.ok() && blob_handle.GetValue() != nullptr) {
Expand All @@ -441,25 +434,4 @@ bool BlobSource::TEST_BlobInCache(uint64_t file_number, uint64_t file_size,
return false;
}

// Callbacks for secondary blob cache
size_t BlobSource::SizeCallback(void* obj) {
assert(obj != nullptr);
return static_cast<const std::string*>(obj)->size();
}

Status BlobSource::SaveToCallback(void* from_obj, size_t from_offset,
size_t length, void* out) {
assert(from_obj != nullptr);
const std::string* buf = static_cast<const std::string*>(from_obj);
assert(buf->size() >= from_offset + length);
memcpy(out, buf->data() + from_offset, length);
return Status::OK();
}

Cache::CacheItemHelper* BlobSource::GetCacheItemHelper() {
static Cache::CacheItemHelper cache_helper(SizeCallback, SaveToCallback,
&DeleteCacheEntry<std::string>);
return &cache_helper;
}

} // namespace ROCKSDB_NAMESPACE
Loading

0 comments on commit 3f57d84

Please sign in to comment.