diff --git a/.gitignore b/.gitignore index 199458901ec..edd39bcf2cf 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ make_config.mk *.gcda *.gcno *.o +*.o.tmp *.so *.so.* *_test diff --git a/CMakeLists.txt b/CMakeLists.txt index b66f0dfaee9..68892e1e515 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -576,6 +576,8 @@ set(SOURCES memtable/hash_linklist_rep.cc memtable/hash_skiplist_rep.cc memtable/skiplistrep.cc + memtable/artrep.cc + memtable/art.cc memtable/vectorrep.cc memtable/write_buffer_manager.cc monitoring/histogram.cc @@ -973,6 +975,7 @@ if(WITH_TESTS) memory/arena_test.cc memtable/inlineskiplist_test.cc memtable/skiplist_test.cc + memtable/art_test.cc memtable/write_buffer_manager_test.cc monitoring/histogram_test.cc monitoring/iostats_context_test.cc diff --git a/Makefile b/Makefile index e935bc14be6..b1d7848ac0d 100644 --- a/Makefile +++ b/Makefile @@ -430,6 +430,7 @@ TESTS = \ crc32c_test \ coding_test \ inlineskiplist_test \ + art_test \ encryption_test \ env_basic_test \ env_test \ @@ -1423,6 +1424,9 @@ data_block_hash_index_test: table/block_based/data_block_hash_index_test.o $(LIB inlineskiplist_test: memtable/inlineskiplist_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +art_test: memtable/art_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + skiplist_test: memtable/skiplist_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index d59e145c6d8..ca8cc8ea678 100644 --- a/TARGETS +++ b/TARGETS @@ -186,6 +186,8 @@ cpp_library( "memory/concurrent_arena.cc", "memory/jemalloc_nodump_allocator.cc", "memtable/alloc_tracker.cc", + "memtable/art.cc", + "memtable/artrep.cc", "memtable/hash_linklist_rep.cc", "memtable/hash_skiplist_rep.cc", "memtable/skiplistrep.cc", @@ -1255,6 +1257,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "art_test", + "memtable/art_test.cc", + "serial", + [], + [], + ], [ "slice_transform_test", "util/slice_transform_test.cc", diff --git a/db/db_test.cc b/db/db_test.cc index 16ac9f79173..4058f5de61b 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1021,18 +1021,15 @@ TEST_F(DBTest, FailMoreDbPaths) { void CheckColumnFamilyMeta(const ColumnFamilyMetaData& cf_meta) { uint64_t cf_size = 0; - uint64_t cf_csize = 0; size_t file_count = 0; for (auto level_meta : cf_meta.levels) { uint64_t level_size = 0; - uint64_t level_csize = 0; file_count += level_meta.files.size(); for (auto file_meta : level_meta.files) { level_size += file_meta.size; } ASSERT_EQ(level_meta.size, level_size); cf_size += level_size; - cf_csize += level_csize; } ASSERT_EQ(cf_meta.file_count, file_count); ASSERT_EQ(cf_meta.size, cf_size); diff --git a/db/perf_context_test.cc b/db/perf_context_test.cc index 3cd82078267..2c542e1788d 100644 --- a/db/perf_context_test.cc +++ b/db/perf_context_test.cc @@ -944,10 +944,8 @@ TEST_F(PerfContextTest, BitMapControl) { auto db = OpenDb(); WriteOptions write_options; SetPerfLevel(PerfLevel::kDisable); - SetPerfFlags(NewPerfFlags({ - PerfFlag::user_key_comparison_count, - PerfFlag::write_wal_time - })); + SetPerfFlags(NewPerfFlags( + {PerfFlag::user_key_comparison_count, PerfFlag::write_wal_time})); for (int i = 0; i < FLAGS_total_keys; ++i) { std::string i_str = ToString(i); diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index dca9650d8a3..3f6c5744b6b 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -323,6 +323,24 @@ class DoublySkipListFactory : public MemTableRepFactory { const size_t lookahead_; }; +// This uses an adaptive radix tree to store keys, which is similar to trie, +// but optimize for memory use. +class AdaptiveRadixTreeFactory : public MemTableRepFactory { + public: + explicit AdaptiveRadixTreeFactory() {} + virtual ~AdaptiveRadixTreeFactory() {} + + using MemTableRepFactory::CreateMemTableRep; + MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator&, Allocator*, + const SliceTransform*, + Logger* logger) override; + const char* Name() const override { return "AdaptiveRadixTreeFactory"; } + + bool IsInsertConcurrentlySupported() const override { return false; } + + bool CanHandleDuplicatedKey() const override { return false; } +}; + #ifndef ROCKSDB_LITE // This creates MemTableReps that are backed by an std::vector. On iteration, // the vector is sorted. This is useful for workloads where iteration is very diff --git a/memtable/art.cc b/memtable/art.cc new file mode 100644 index 00000000000..98d5f7fd28d --- /dev/null +++ b/memtable/art.cc @@ -0,0 +1,499 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. Use of +// this source code is governed by a BSD-style license that can be found +// in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "memtable/art.h" + +#include + +#include "memtable/art_inner_node.h" +#include "memtable/art_node.h" +#include "memtable/art_node_4.h" + +namespace rocksdb { + +const char* AdaptiveRadixTree::Get(const char* key, int key_len) const { + Node* cur = root_.load(std::memory_order_acquire); + std::atomic* child = nullptr; + int depth = 0; + while (cur != nullptr) { + int prefix_match_len = cur->check_prefix(key, depth, key_len); + if (cur->prefix_len != prefix_match_len) { + /* prefix mismatch */ + return nullptr; + } + if (cur->prefix_len == key_len - depth) { + /* exact match */ + return cur->get_value(); + } + + if (cur->inner == nullptr) { + return nullptr; + } + child = cur->inner->find_child(key[depth + cur->prefix_len]); + depth += cur->prefix_len + 1; + cur = child != nullptr ? child->load(std::memory_order_acquire) : nullptr; + } + return nullptr; +} + +Node* AdaptiveRadixTree::AllocateNode(InnerNode* inner, int prefix_size) { + char* addr = allocator_->AllocateAligned(sizeof(Node)); + Node* node = new (addr) Node; + node->inner = inner; + node->value.store(nullptr, std::memory_order_relaxed); + node->prefix = nullptr; + node->prefix_len = prefix_size; + return node; +} + +const char* AdaptiveRadixTree::Insert(const char* key, int key_len, + const char* leaf) { + int depth = 0, prefix_match_len; + + std::atomic* cur_address = &root_; + Node* cur = root_.load(std::memory_order_relaxed); + if (cur == nullptr) { + Node* root = AllocateNode(nullptr, key_len); + root->set_value(leaf); + root->prefix = key; + root_.store(root, std::memory_order_release); + return nullptr; + } + + char child_partial_key; + + while (true) { + /* number of bytes of the current node's prefix that match the key */ + assert(cur != nullptr); + prefix_match_len = cur->check_prefix(key, depth, key_len); + /* true if the current node's prefix matches with a part of the key */ + bool is_prefix_match = cur->prefix_len == prefix_match_len; + const char* current_value = cur->get_value(); + + if (is_prefix_match && cur->prefix_len == key_len - depth) { + /* exact match: + * => "replace" + * => replace value of current node. + * => return old value to caller to handle. + * _ _ + * | | + * (aa) (aa) + * a / \ b +[aaaaa,v3] a / \ b + * / \ ==========> / \ + * *(aa)->v1 ()->v2 *(aa)->v3 ()->v2 + * + */ + + /* cur must be a leaf */ + cur->set_value(leaf); + return current_value; + } else if (!is_prefix_match) { + /* prefix mismatch: + * => new parent node with common prefix and no associated value. + * => new node with value to insert. + * => current and new node become children of new parent node. + * + * | | + * *(aa) +(a)->Ø + * a / \ b +[ab,v3] a / \ b + * / \ =======> / \ + * (aa)->v1 ()->v2 *()->Ø +()->v3 + * a / \ b + * / \ + * (aa)->v1 ()->v2 + * /|\ /|\ + */ + + InnerNode* inner = + new (allocator_->AllocateAligned(sizeof(Node4))) Node4(); + Node* new_parent = AllocateNode(inner, prefix_match_len); + new_parent->prefix = cur->prefix; + int old_prefix_len = cur->prefix_len; + int new_prefix_len = old_prefix_len - prefix_match_len - 1; + assert(new_prefix_len >= 0); + + Node* new_cur = AllocateNode(cur->inner, new_prefix_len); + new_cur->set_value(current_value); + if (new_prefix_len > 0) { + new_cur->prefix = cur->prefix + prefix_match_len + 1; + } else { + new_cur->prefix = nullptr; + } + inner->set_child(cur->prefix[prefix_match_len], new_cur); + if (depth + prefix_match_len < key_len) { + int leaf_prefix_len = key_len - depth - prefix_match_len - 1; + Node* new_node = AllocateNode(nullptr, leaf_prefix_len); + new_node->set_value(leaf); + new_node->prefix = key + depth + prefix_match_len + 1; + inner->set_child(key[depth + prefix_match_len], new_node); + } else { + new_parent->set_value(leaf); + } + cur_address->store(new_parent, std::memory_order_release); + return nullptr; + } + + assert(depth + cur->prefix_len < key_len); + /* must be inner node */ + child_partial_key = key[depth + cur->prefix_len]; + if (cur->inner == nullptr) { + InnerNode* new_inner = + new (allocator_->AllocateAligned(sizeof(Node4))) Node4(); + cur->inner = new_inner; + } + std::atomic* child = cur->inner->find_child(child_partial_key); + + if (child == nullptr || child->load(std::memory_order_relaxed) == nullptr) { + /* + * no child associated with the next partial key. + * => create new node with value to insert. + * => new node becomes current node's child. + * + * *(aa)->Ø *(aa)->Ø + * a / +[aab,v2] a / \ b + * / ========> / \ + * (a)->v1 (a)->v1 +()->v2 + */ + + if (cur->inner->is_full()) { + Node* old = cur; + cur = AllocateNode(cur->inner->grow(allocator_), old->prefix_len); + cur->prefix = old->prefix; + cur_address->store(cur, std::memory_order_release); + } + int leaf_prefix_len = key_len - depth - cur->prefix_len - 1; + Node* new_node = AllocateNode(nullptr, leaf_prefix_len); + new_node->set_value(leaf); + new_node->prefix = key + depth + cur->prefix_len + 1; + assert(leaf_prefix_len >= 0); + cur->inner->set_child(child_partial_key, new_node); + return nullptr; + } + + /* propagate down and repeat: + * + * *(aa)->Ø (aa)->Ø + * a / \ b +[aaba,v3] a / \ b repeat + * / \ =========> / \ ========> ... + * (a)->v1 ()->v2 (a)->v1 *()->v2 + */ + + depth += cur->prefix_len + 1; + cur_address = child; + cur = child->load(std::memory_order_relaxed); + } +} + +void AdaptiveRadixTree::NodeIterator::SeekToLast() { + cur_partial_key_ = 255; + cur_partial_key_ = node_->inner->prev_partial_key(cur_partial_key_); + auto next = node_->inner->find_child(cur_partial_key_); + if (next != nullptr) { + child = next->load(std::memory_order_acquire); + } else { + child = nullptr; + } +} + +void AdaptiveRadixTree::NodeIterator::SeekToFirst() { + cur_partial_key_ = node_->inner->next_partial_key(0); + auto next = node_->inner->find_child(cur_partial_key_); + if (next != nullptr) { + child = next->load(std::memory_order_acquire); + } else { + child = nullptr; + } +} + +void AdaptiveRadixTree::NodeIterator::Next() { + if (cur_partial_key_ == 255) { + child = nullptr; + return; + } + cur_partial_key_ = node_->inner->next_partial_key(cur_partial_key_ + 1); + auto next = node_->inner->find_child(cur_partial_key_); + if (next != nullptr) { + child = next->load(std::memory_order_acquire); + } else { + child = nullptr; + } +} +void AdaptiveRadixTree::NodeIterator::Prev() { + if (cur_partial_key_ == 0) { + child = nullptr; + return; + } + cur_partial_key_ = node_->inner->prev_partial_key(cur_partial_key_ - 1); + auto next = node_->inner->find_child(cur_partial_key_); + if (next != nullptr) { + child = next->load(std::memory_order_acquire); + } else { + child = nullptr; + } +} + +bool AdaptiveRadixTree::NodeIterator::Valid() { return child != nullptr; } + +void AdaptiveRadixTree::Iterator::Seek(const char* key, int l) { + SeekImpl(key, l); + if (!traversal_stack_.empty()) { + SeekLeftLeaf(); + } +} + +bool AdaptiveRadixTree::Iterator::Valid() const { + return !traversal_stack_.empty(); +} + +void AdaptiveRadixTree::Iterator::Next() { + NodeIterator& step = traversal_stack_.back(); + if (step.node_->inner == nullptr) { + SeekForward(); + if (!traversal_stack_.empty()) { + SeekLeftLeaf(); + } + } else { + step.SeekToFirst(); + traversal_stack_.emplace_back(step.child, + step.depth_ + step.node_->prefix_len + 1); + SeekLeftLeaf(); + } +} + +void AdaptiveRadixTree::Iterator::Prev() { + SeekBack(); + if (!traversal_stack_.empty()) { + SeekRightLeaf(); + } +} + +void AdaptiveRadixTree::Iterator::SeekLeftLeaf() { + if (traversal_stack_.empty()) { + return; + } + while (!traversal_stack_.back().node_->is_leaf()) { + NodeIterator& cur_step = traversal_stack_.back(); + cur_step.SeekToFirst(); + traversal_stack_.emplace_back( + cur_step.child, cur_step.depth_ + cur_step.node_->prefix_len + 1); + } +} + +void AdaptiveRadixTree::Iterator::SeekRightLeaf() { + if (traversal_stack_.empty()) { + return; + } + while (traversal_stack_.back().node_->inner != nullptr) { + NodeIterator& cur_step = traversal_stack_.back(); + cur_step.SeekToLast(); + traversal_stack_.emplace_back( + cur_step.child, cur_step.depth_ + cur_step.node_->prefix_len + 1); + } +} + +void AdaptiveRadixTree::Iterator::SeekToFirst() { + traversal_stack_.clear(); + Node* root = root_->load(std::memory_order_acquire); + if (root != nullptr) { + traversal_stack_.emplace_back(root, 0); + SeekLeftLeaf(); + } +} + +void AdaptiveRadixTree::Iterator::SeekToLast() { + traversal_stack_.clear(); + Node* root = root_->load(std::memory_order_acquire); + if (root != nullptr) { + traversal_stack_.emplace_back(root_->load(std::memory_order_acquire), 0); + SeekRightLeaf(); + } +} + +void AdaptiveRadixTree::Iterator::SeekForPrev(const char* key, int key_len) { + SeekForPrevImpl(key, key_len); + SeekRightLeaf(); +} + +void AdaptiveRadixTree::Iterator::SeekForPrevImpl(const char* key, + int key_len) { + Node* cur = root_->load(std::memory_order_acquire); + if (cur == nullptr) { + return; + } + // sentinel child iterator for root + traversal_stack_.clear(); + traversal_stack_.push_back(NodeIterator(cur, 0)); + + while (!traversal_stack_.empty()) { + NodeIterator& cur_step = traversal_stack_.back(); + Node* cur_node = cur_step.node_; + int cur_depth = cur_step.depth_; + int prefix_match_len = cur_node->check_prefix(key, cur_depth, key_len); + // if search key "equals" the prefix + if (key_len == cur_depth + prefix_match_len) { + // if search key is "equal" or "less" than the prefix, + // we only need to seek to left leaf in this tree. + return; + } else if (prefix_match_len < cur_node->prefix_len) { + uint8_t k1 = key[cur_depth + prefix_match_len]; + uint8_t k2 = cur_node->prefix[prefix_match_len]; + if (k1 > k2) { + // if search key is "less than" the prefix, + // we only need to seek to left leaf in this tree. + return; + } else { + // this prefix is less than target key, it means that no key in this + // subtree is less than the target key, try seek forward. + SeekBack(); + return; + } + } else { + assert(prefix_match_len == cur_node->prefix_len && + key_len > cur_depth + prefix_match_len); + // seek subtree where search key is "lesser than or equal" the subtree + // partial key + if (cur_node->is_leaf() && cur_node->inner == nullptr) { + return; + } + std::atomic* child = + cur_node->inner->find_child(key[cur_depth + cur_node->prefix_len]); + uint8_t current_c = key[cur_depth + cur_node->prefix_len]; + if (child != nullptr) { + Node* next = child->load(std::memory_order_acquire); + if (next != nullptr) { + cur_step.child = next; + cur_step.cur_partial_key_ = current_c; + traversal_stack_.emplace_back(next, + cur_depth + cur_node->prefix_len + 1); + continue; + } + } + cur_step.SeekToLast(); + for (; cur_step.Valid(); cur_step.Prev()) { + if (current_c > cur_step.cur_partial_key_) { + break; + } + } + if (cur_step.Valid()) { + traversal_stack_.emplace_back(cur_step.child, + cur_depth + cur_node->prefix_len + 1); + } else { + if (!cur_node->is_leaf()) { + SeekBack(); + } + } + return; + } + } +} + +void AdaptiveRadixTree::Iterator::SeekImpl(const char* key, int key_len) { + Node* cur = root_->load(std::memory_order_acquire); + if (cur == nullptr) { + return; + } + + // sentinel child iterator for root + traversal_stack_.clear(); + traversal_stack_.push_back(NodeIterator(cur, 0)); + + while (!traversal_stack_.empty()) { + NodeIterator& cur_step = traversal_stack_.back(); + Node* cur_node = cur_step.node_; + int cur_depth = cur_step.depth_; + int prefix_match_len = std::min( + cur_node->check_prefix(key, cur_depth, key_len), key_len - cur_depth); + // if search key "equals" the prefix + if (key_len == cur_depth + prefix_match_len) { + // if search key is "equal" or "less" than the prefix, + // we only need to seek to left leaf in this tree. + return; + } else if (prefix_match_len < cur_node->prefix_len) { + uint8_t k1 = key[cur_depth + prefix_match_len]; + uint8_t k2 = cur_node->prefix[prefix_match_len]; + if (k1 < k2) { + // if search key is "less than" the prefix, + // we only need to seek to left leaf in this tree. + return; + } else { + // this prefix is less than target key, it means that no key in this + // tree is greater than the target key. + SeekForward(); + return; + } + } else { + assert(prefix_match_len == cur_node->prefix_len && + key_len > cur_depth + prefix_match_len); + // seek subtree where search key is "lesser than or equal" the subtree + // partial key + if (cur_node->is_leaf() && cur_node->inner == nullptr) { + SeekForward(); + return; + } + uint8_t current_c = key[cur_depth + cur_node->prefix_len]; + std::atomic* child = + cur_node->inner->find_child(key[cur_depth + cur_node->prefix_len]); + if (child != nullptr) { + Node* next = child->load(std::memory_order_acquire); + if (next != nullptr) { + cur_step.child = next; + cur_step.cur_partial_key_ = current_c; + traversal_stack_.emplace_back(next, + cur_depth + cur_node->prefix_len + 1); + continue; + } + } + cur_step.SeekToFirst(); + for (; cur_step.Valid(); cur_step.Next()) { + if (current_c < cur_step.cur_partial_key_) { + break; + } + } + if (cur_step.Valid()) { + traversal_stack_.emplace_back(cur_step.child, + cur_depth + cur_node->prefix_len + 1); + } else { + SeekForward(); + } + return; + } + } +} + +void AdaptiveRadixTree::Iterator::SeekForward() { + traversal_stack_.pop_back(); + while (!traversal_stack_.empty()) { + NodeIterator& cur_step = traversal_stack_.back(); + cur_step.Next(); + if (cur_step.Valid()) { + traversal_stack_.emplace_back( + cur_step.child, cur_step.depth_ + cur_step.node_->prefix_len + 1); + break; + } + traversal_stack_.pop_back(); + } +} + +void AdaptiveRadixTree::Iterator::SeekBack() { + traversal_stack_.pop_back(); + while (!traversal_stack_.empty()) { + NodeIterator& step = traversal_stack_.back(); + step.Prev(); + if (step.Valid()) { + traversal_stack_.emplace_back(step.child, + step.depth_ + step.node_->prefix_len + 1); + break; + } else if (step.node_->is_leaf()) { + break; + } + traversal_stack_.pop_back(); + } +} + +} // namespace rocksdb diff --git a/memtable/art.h b/memtable/art.h new file mode 100644 index 00000000000..4454d086af8 --- /dev/null +++ b/memtable/art.h @@ -0,0 +1,79 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. Use of +// this source code is governed by a BSD-style license that can be found +// in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include "memtable/art_node.h" +#include "memory/allocator.h" +#include +#include +#include + +namespace rocksdb { + +class AdaptiveRadixTree { + struct NodeIterator { + Node* node_; + Node* child = nullptr; + int depth_; + uint8_t cur_partial_key_ = 0; + + NodeIterator(Node* node, int depth) : node_(node), depth_(depth) {} + + void Next(); + void Prev(); + bool Valid(); + void SeekToFirst(); + void SeekToLast(); + }; + + public: + struct Iterator { + public: + std::atomic* root_; + std::vector traversal_stack_; + explicit Iterator(AdaptiveRadixTree* tree) : root_(&tree->root_) {} + void Seek(const char* key, int l); + void SeekToFirst(); + void SeekToLast(); + void SeekForPrev(const char* key, int l); + void Next(); + void Prev(); + bool Valid() const; + const char* Value() const { + return traversal_stack_.back().node_->get_value(); + } + + private: + void SeekForPrevImpl(const char* key, int l); + void SeekImpl(const char* key, int key_len); + void SeekLeftLeaf(); + void SeekRightLeaf(); + void SeekBack(); + void SeekForward(); + }; + +public: + AdaptiveRadixTree(Allocator* allocator) + : root_(nullptr), allocator_(allocator) {} + ~AdaptiveRadixTree() {} + + const char* Get(const char* key, int key_len) const; + + const char* Insert(const char* key, int key_len, const char* v); + + Node* AllocateNode(InnerNode* inner, int prefix_size); + char* AllocateKey(size_t l) { return allocator_->AllocateAligned(l); } + +private: + std::atomic root_; + Allocator* allocator_; +}; + +} // namespace rocksdb diff --git a/memtable/art_inner_node.h b/memtable/art_inner_node.h new file mode 100644 index 00000000000..6bfadce66f2 --- /dev/null +++ b/memtable/art_inner_node.h @@ -0,0 +1,67 @@ +/** + * @file InnerNode header + * @author Rafael Kallis + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "memory/allocator.h" + +namespace rocksdb { + +struct Node; + +class InnerNode { +public: + virtual ~InnerNode() {} + + /** + * Finds and returns the child Node identified by the given partial key. + * + * @param partial_key - The partial key associated with the child. + * @return Child Node identified by the given partial key or + * a null pointer of no child Node is associated with the partial key. + */ + virtual std::atomic* find_child(uint8_t partial_key) = 0; + + /** + * Adds the given Node to the Node's children. + * No bounds checking is done. + * If a child already exists under the given partial key, the child + * is overwritten without deleting it. + * + * @pre Node should not be full. + * @param partial_key - The partial key associated with the child. + * @param child - The child Node. + */ + virtual void set_child(uint8_t partial_key, Node* child) = 0; + virtual const char* node_type() const = 0; + + /** + * Creates and returns a new Node with bigger children capacity. + * The current Node gets deleted. + * + * @return Node with bigger capacity + */ + virtual InnerNode *grow(Allocator* allocator) = 0; + + /** + * Determines if the Node is full, i.e. can carry no more child Nodes. + */ + virtual bool is_full() const = 0; + + virtual uint8_t next_partial_key(uint8_t partial_key) const = 0; + + virtual uint8_t prev_partial_key(uint8_t partial_key) const = 0; +}; + + +} diff --git a/memtable/art_node.h b/memtable/art_node.h new file mode 100644 index 00000000000..382aa088a3f --- /dev/null +++ b/memtable/art_node.h @@ -0,0 +1,57 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. Use of +// this source code is governed by a BSD-style license that can be found +// in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace rocksdb { + +class InnerNode; + +struct Node { + Node() {} + + bool is_leaf() const { + return value.load(std::memory_order_acquire) != nullptr; + } + + int check_prefix(const char* key, int depth, int key_len) const { + int l = std::min(prefix_len, key_len - depth); + for (int i = 0; i < l; i++) { + if (key[i + depth] != prefix[i]) { + return i; + } + } + return l; + } + + void set_value(const char* leaf) { + value.store(leaf, std::memory_order_release); + } + + const char* get_value() const { + return value.load(std::memory_order_acquire); + } + + InnerNode* inner; + std::atomic value; + int prefix_len; + const char* prefix; +}; + +} // namespace rocksdb + diff --git a/memtable/art_node_16.h b/memtable/art_node_16.h new file mode 100644 index 00000000000..ecb07666e32 --- /dev/null +++ b/memtable/art_node_16.h @@ -0,0 +1,147 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include "memtable/art_inner_node.h" +#include "memtable/art_node_48.h" +#include +#include +#include +#include + +namespace rocksdb { + +class Node12 : public InnerNode { + const static uint8_t MAX_CHILDREN_NUM = 12; + struct ChildrenNode { + ChildrenNode() {} + uint8_t c; + uint8_t idx; + std::atomic next; + std::atomic child; + }; + + public: + Node12() : n_children_(0), first_(nullptr) {} + ~Node12() {} + std::atomic* find_child(uint8_t partial_key) override; + void set_child(uint8_t partial_key, Node* child) override; + const char* node_type() const override { return "Node16"; } + InnerNode *grow(Allocator* allocator) override; + bool is_full() const override; + + uint8_t next_partial_key(uint8_t partial_key) const override; + + uint8_t prev_partial_key(uint8_t partial_key) const override; + + private: + std::atomic n_children_; + std::atomic first_; + ChildrenNode children_[MAX_CHILDREN_NUM]; +}; + +std::atomic* Node12::find_child(uint8_t partial_key) { + ChildrenNode* next = first_.load(std::memory_order_acquire); + while (next != nullptr) { + if (next->c == partial_key) { + return &next->child; + } + uint8_t idx = next->next.load(std::memory_order_acquire); + if (idx > 0) { + next = &children_[idx - 1]; + } else { + break; + } + } + return nullptr; +} + +void Node12::set_child(uint8_t partial_key, Node* child) { + /* determine index for child */ + uint8_t child_i = n_children_.fetch_add(1, std::memory_order_relaxed); + ChildrenNode* new_child = &children_[child_i]; + new_child->idx = child_i + 1; + new_child->c = partial_key; + new_child->next.store(0, std::memory_order_relaxed); + new_child->child.store(child, std::memory_order_release); + ChildrenNode* prev = nullptr; + ChildrenNode* cur = first_.load(std::memory_order_relaxed); + while (cur != nullptr) { + if (cur->c > partial_key) { + new_child->next.store(cur->idx, std::memory_order_relaxed); + if (prev == nullptr) { + first_.store(new_child, std::memory_order_release); + } else { + prev->next.store(new_child->idx, std::memory_order_release); + } + return; + } + prev = cur; + uint8_t idx = cur->next.load(std::memory_order_relaxed); + if (idx > 0) { + cur = &children_[idx - 1]; + } else { + break; + } + } + if (prev == nullptr) { + first_.store(new_child, std::memory_order_release); + } else { + prev->next.store(new_child->idx, std::memory_order_release); + } +} + +InnerNode* Node12::grow(Allocator* allocator) { + auto new_node = new (allocator->AllocateAligned(sizeof(Node48)))Node48(); + ChildrenNode* cur = first_.load(std::memory_order_acquire); + while (cur != nullptr) { + new_node->set_child(cur->c, cur->child.load(std::memory_order_relaxed)); + uint8_t idx = cur->next.load(std::memory_order_acquire); + if (idx > 0) { + cur = &children_[idx - 1]; + } else { + break; + } + } + return new_node; +} + +bool Node12::is_full() const { return n_children_ == MAX_CHILDREN_NUM; } + +uint8_t Node12::next_partial_key(uint8_t partial_key) const { + const ChildrenNode* cur = first_.load(std::memory_order_acquire); + while (cur != nullptr) { + if (cur->c >= partial_key) { + return cur->c; + } + uint8_t idx = cur->next.load(std::memory_order_acquire); + if (idx == 0) { + break; + } + cur = &children_[idx - 1]; + } + return 255; +} + +uint8_t Node12::prev_partial_key(uint8_t partial_key) const { + uint8_t ret = 0; + const ChildrenNode* cur = first_.load(std::memory_order_acquire); + while (cur != nullptr) { + if (cur->c <= partial_key) { + ret = cur->c; + } + uint8_t idx = cur->next.load(std::memory_order_acquire); + if (idx > 0) { + cur = &children_[idx - 1]; + } else { + break; + } + } + return ret; +} + +} // namespace rocksdb + diff --git a/memtable/art_node_256.h b/memtable/art_node_256.h new file mode 100644 index 00000000000..6e0e0a7d540 --- /dev/null +++ b/memtable/art_node_256.h @@ -0,0 +1,76 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include "memtable/art_inner_node.h" +#include +#include + +namespace rocksdb { + +class Node256 : public InnerNode { + public: + Node256() { + n_children_.store(0, std::memory_order_relaxed); + for (int i = 0; i < 256; i++) { + children_[i].store(nullptr, std::memory_order_relaxed); + } + } + virtual ~Node256() {} + + std::atomic *find_child(uint8_t partial_key) override; + void set_child(uint8_t partial_key, Node *child) override; + const char *node_type() const override { return "Node256"; } + InnerNode *grow(Allocator* allocator) override; + bool is_full() const override; + + uint8_t next_partial_key(uint8_t partial_key) const override; + + uint8_t prev_partial_key(uint8_t partial_key) const override; + + private: + std::atomic n_children_; + std::atomic children_[256]; +}; + +std::atomic *Node256::find_child(uint8_t key) { + return &children_[key]; +} + +void Node256::set_child(uint8_t partial_key, Node *child) { + uint8_t key = partial_key; + children_[key].store(child, std::memory_order_release); + ++n_children_; +} + +InnerNode *Node256::grow(Allocator *allocator) { + (void)allocator; + throw std::runtime_error("Node256 cannot grow"); +} + +bool Node256::is_full() const { return false; } + +uint8_t Node256::next_partial_key(uint8_t key) const { + while (key < 255) { + if (children_[key].load(std::memory_order_acquire) != nullptr) { + break; + } + ++key; + } + return key; +} + +uint8_t Node256::prev_partial_key(uint8_t key) const { + while (key > 0) { + if (children_[key].load(std::memory_order_acquire) != nullptr) { + break; + } + --key; + } + return key; +} + +} // namespace rocksdb diff --git a/memtable/art_node_4.h b/memtable/art_node_4.h new file mode 100644 index 00000000000..a71eef2476f --- /dev/null +++ b/memtable/art_node_4.h @@ -0,0 +1,128 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include "memtable/art_inner_node.h" +#include "memtable/art_node_16.h" +#include +#include +#include +#include +#include + +namespace rocksdb { + +class Node4 : public InnerNode { +public: + ~Node4() {} + Node4() : n_children_(0), keys_(0) { + for (int i = 0; i < 4; i++) { + children_[i].store(nullptr, std::memory_order_relaxed); + } + } + std::atomic *find_child(uint8_t partial_key) override; + void set_child(uint8_t partial_key, Node *child) override; + const char *node_type() const override { return "Node4"; } + InnerNode *grow(Allocator *allocator) override; + bool is_full() const override; + + uint8_t next_partial_key(uint8_t partial_key) const override; + + uint8_t prev_partial_key(uint8_t partial_key) const override; + +private: + std::atomic n_children_; + std::atomic keys_; + std::atomic children_[4]; +}; + +std::atomic *Node4::find_child(uint8_t key) { + uint64_t keys = keys_.load(std::memory_order_acquire); + while (keys > 0) { + uint8_t c = keys & 255; + uint8_t idx = ((keys >> 8) & 255) - 1; + if (c == key) { + assert(idx < 4); + return &children_[idx]; + } + keys >>= 16; + } + return nullptr; +} + +void Node4::set_child(uint8_t partial_key, Node *child) { + /* determine index for child */ + uint8_t n_children = n_children_.load(std::memory_order_relaxed); + assert(n_children < 4); + children_[n_children].store(child, std::memory_order_release); + uint64_t keys = keys_.load(std::memory_order_relaxed); + uint8_t c_i = partial_key; + uint64_t c_value = ((uint64_t)n_children + 1) << 8 | c_i; + uint64_t new_keys = 0; + bool found = false; + uint8_t base_value = 0; + while (keys > 0) { + uint8_t c = keys & 255; + if (c > c_i && !found) { + new_keys |= c_value << base_value; + base_value += 16; + found = true; + } + new_keys |= (keys & 65535) << base_value; + keys >>= 16; + base_value += 16; + } + if (!found) { + new_keys |= c_value << base_value; + } + keys_.store(new_keys, std::memory_order_release); + n_children_.store(n_children + 1, std::memory_order_release); +} + +InnerNode *Node4::grow(Allocator *allocator) { + Node12 *new_node = new (allocator->AllocateAligned(sizeof(Node12))) Node12(); + uint64_t keys = keys_.load(std::memory_order_relaxed); + while (keys > 0) { + uint8_t c = keys & 255; + uint8_t idx = ((keys >> 8) & 255) - 1; + new_node->set_child(c, children_[idx].load(std::memory_order_relaxed)); + keys >>= 16; + } + return new_node; +} + +bool Node4::is_full() const { return n_children_ == 4; } + +uint8_t Node4::next_partial_key(uint8_t partial_key) const { + uint8_t key = partial_key; + uint64_t keys = keys_.load(std::memory_order_acquire); + uint8_t ret = 255; + while (keys > 0) { + uint8_t c = keys & 255; + if (c >= key) { + ret = c; + break; + } + keys >>= 16; + } + return ret; +} + +uint8_t Node4::prev_partial_key(uint8_t partial_key) const { + uint8_t ret = 0; + uint8_t key = partial_key; + uint64_t keys = keys_.load(std::memory_order_acquire); + while (keys > 0) { + uint8_t c = keys & 255; + if (c <= key) { + ret = c; + } + keys >>= 16; + } + return ret; +} + +} // namespace rocksdb diff --git a/memtable/art_node_48.h b/memtable/art_node_48.h new file mode 100644 index 00000000000..42e5cc3ec46 --- /dev/null +++ b/memtable/art_node_48.h @@ -0,0 +1,116 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include "memtable/art_node_256.h" +#include +#include +#include +#include + +namespace rocksdb { + +class Node48 : public InnerNode { + public: + Node48(); + virtual ~Node48() {} + + std::atomic *find_child(uint8_t partial_key) override; + void set_child(uint8_t partial_key, Node *child) override; + const char *node_type() const override { return "Node48"; } + InnerNode *grow(Allocator* allocator) override; + bool is_full() const override; + + uint8_t next_partial_key(uint8_t partial_key) const override; + uint8_t prev_partial_key(uint8_t partial_key) const override; + uint8_t get_index(uint8_t key) const; + void set_index(uint8_t key, uint8_t index); + +private: + static const uint8_t EMPTY; + + std::atomic n_children_; + std::atomic indexes_[32]; + std::atomic children_[48]; +}; + +Node48::Node48() { + for (int i = 0; i < 32; i++) { + indexes_[i].store(0, std::memory_order_relaxed); + } + for (int i = 0; i < 48; i++) { + children_[i].store(nullptr, std::memory_order_relaxed); + } + n_children_.store(0, std::memory_order_relaxed); +} + +std::atomic *Node48::find_child(uint8_t partial_key) { + // TODO(rafaelkallis): direct lookup instead of temp save? + uint8_t index = get_index(partial_key); + return Node48::EMPTY != index ? &children_[index] : nullptr; +} + +uint8_t Node48::get_index(uint8_t key) const { + uint64_t index_value = indexes_[key >> 3].load(std::memory_order_acquire); + uint8_t index = (index_value >> ((key & 7) << 3) & 255); + return index - 1; +} + +void Node48::set_index(uint8_t key, uint8_t index) { + uint64_t old_index = indexes_[key >> 3].load(std::memory_order_acquire); + indexes_[key >> 3].store(old_index | ((uint64_t)index + 1) + << ((key & 7) << 3), + std::memory_order_release); +} + +void Node48::set_child(uint8_t partial_key, Node *child) { + uint8_t n_children = n_children_.load(std::memory_order_relaxed); + set_index(partial_key, n_children); + children_[n_children].store(child, std::memory_order_release); + n_children_.store(n_children + 1, std::memory_order_release); +} + +InnerNode *Node48::grow(Allocator *allocator) { + auto new_node = new (allocator->AllocateAligned(sizeof(Node256)))Node256(); + uint8_t index; + for (int partial_key = 0; partial_key <= 255; ++partial_key) { + index = get_index(partial_key); + if (index != Node48::EMPTY) { + new_node->set_child(partial_key, children_[index]); + } + } + return new_node; +} + +bool Node48::is_full() const { return n_children_ == 48; } + +const uint8_t Node48::EMPTY = 255; + +uint8_t Node48::next_partial_key(uint8_t partial_key) const { + uint8_t key = partial_key; + while (key < 255) { + uint8_t index = get_index(key); + if (index != Node48::EMPTY) { + break; + } + ++key; + } + return key; +} + +uint8_t Node48::prev_partial_key(uint8_t key) const { + while (key > 0) { + uint8_t index = get_index(key); + if (index != Node48::EMPTY) { + break; + } + --key; + } + return key; +} + +} // namespace rocksdb + diff --git a/memtable/art_test.cc b/memtable/art_test.cc new file mode 100644 index 00000000000..4fb9ecc5a20 --- /dev/null +++ b/memtable/art_test.cc @@ -0,0 +1,504 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "memtable/art.h" + +#include + +#include "memory/arena.h" +#include "memory/concurrent_arena.h" +#include "rocksdb/env.h" +#include "test_util/testharness.h" +#include "util/coding.h" +#include "util/hash.h" +#include "util/random.h" + +namespace rocksdb { + +typedef uint64_t Key; + + +static const char* Encode(const uint64_t value) { + thread_local char tmp_buf[9]; + tmp_buf[7] = (uint8_t)(value & 0xff); + tmp_buf[6] = (uint8_t)((value >> 8) & 0xff); + tmp_buf[5] = (uint8_t)((value >> 16) & 0xff); + tmp_buf[4] = (uint8_t)((value >> 24) & 0xff); + tmp_buf[3] = (uint8_t)((value >> 32) & 0xff); + tmp_buf[2] = (uint8_t)((value >> 40) & 0xff); + tmp_buf[1] = (uint8_t)((value >> 48) & 0xff); + tmp_buf[0] = (uint8_t)((value >> 56) & 0xff); + return tmp_buf; +} + +static uint32_t Decode32(const char* ptr) { + return ((static_cast(static_cast(ptr[3]))) | + (static_cast(static_cast(ptr[2])) << 8) | + (static_cast(static_cast(ptr[1])) << 16) | + (static_cast(static_cast(ptr[0])) << 24)); +} + +static Key Decode(const char* ptr) { + uint64_t hi = Decode32(ptr); + uint64_t lo = Decode32(ptr + 4); + return (hi << 32) | lo; +} + +class ArtTest : public testing::Test { + public: + void Insert(AdaptiveRadixTree* list, Key key) { + char* buf = list->AllocateKey(sizeof(Key)); + memcpy(buf, &key, sizeof(Key)); + list->Insert(buf, 8, buf); + keys_.insert(key); + } + + void Validate(AdaptiveRadixTree* list) { + // Check keys exist. + for (Key key : keys_) { + ASSERT_TRUE(list->Get(Encode(key), 8) != nullptr); + } + // Iterate over the list, make sure keys appears in order and no extra + // keys exist. + AdaptiveRadixTree::Iterator iter(list); + ASSERT_FALSE(iter.Valid()); + Key zero = 0; + iter.Seek(Encode(zero), 8); + for (Key key : keys_) { + ASSERT_TRUE(iter.Valid()); + ASSERT_EQ(key, Decode(iter.Value())); + iter.Next(); + } + ASSERT_FALSE(iter.Valid()); + } + + private: + std::set keys_; +}; + +TEST_F(ArtTest, Empty) { + Arena arena; + AdaptiveRadixTree list(&arena); + + AdaptiveRadixTree::Iterator iter(&list); + ASSERT_TRUE(!iter.Valid()); + iter.SeekToFirst(); + ASSERT_TRUE(!iter.Valid()); + iter.Seek("ancd", 4); + ASSERT_TRUE(!iter.Valid()); + iter.SeekToLast(); + ASSERT_TRUE(!iter.Valid()); +} + +TEST_F(ArtTest, InsertAndLookup) { + const int N = 2000; + const int R = 5000; + Random rnd(1000); + std::set keys; + Arena arena; + AdaptiveRadixTree list(&arena); + for (int i = 0; i < N; i++) { + Key key = rnd.Next() % R; + if (keys.insert(key).second) { + char* buf = arena.AllocateAligned(sizeof(Key)); + const char* d = Encode(key); + memcpy(buf, d, sizeof(Key)); + list.Insert(buf, sizeof(key), buf); + } + } + + for (Key i = 0; i < R; i++) { + if (list.Get(Encode(i), 8) != nullptr) { + ASSERT_EQ(keys.count(i), 1U); + } else { + ASSERT_EQ(keys.count(i), 0U); + } + } + + // Simple iterator tests + { + AdaptiveRadixTree::Iterator iter(&list); + ASSERT_TRUE(!iter.Valid()); + + uint64_t zero = 0; + iter.Seek(Encode(zero), 8); + ASSERT_TRUE(iter.Valid()); + ASSERT_EQ(*(keys.begin()), Decode(iter.Value())); + + uint64_t max_key = R - 1; + iter.SeekForPrev(Encode(max_key), 8); + ASSERT_TRUE(iter.Valid()); + ASSERT_EQ(*(keys.rbegin()), Decode(iter.Value())); + + iter.SeekToFirst(); + ASSERT_TRUE(iter.Valid()); + ASSERT_EQ(*(keys.begin()), Decode(iter.Value())); + + iter.SeekToLast(); + ASSERT_TRUE(iter.Valid()); + ASSERT_EQ(*(keys.rbegin()), Decode(iter.Value())); + } + + // Forward iteration test + for (Key i = 0; i < R; i++) { + AdaptiveRadixTree::Iterator iter(&list); + iter.Seek(Encode(i), 8); + + // Compare against model iterator + std::set::iterator model_iter = keys.lower_bound(i); + for (int j = 0; j < 3; j++) { + if (model_iter == keys.end()) { + ASSERT_TRUE(!iter.Valid()); + break; + } else { + ASSERT_TRUE(iter.Valid()); + ASSERT_EQ(*model_iter, Decode(iter.Value())); + ++model_iter; + iter.Next(); + } + } + } + + // Backward iteration test + for (Key i = 0; i < R; i++) { + AdaptiveRadixTree::Iterator iter(&list); + iter.SeekForPrev(Encode(i), 8); + + // Compare against model iterator + std::set::iterator model_iter = keys.upper_bound(i); + for (int j = 0; j < 3; j++) { + if (model_iter == keys.begin()) { + ASSERT_TRUE(!iter.Valid()); + break; + } else { + ASSERT_TRUE(iter.Valid()); + ASSERT_EQ(*--model_iter, Decode(iter.Value())); + iter.Prev(); + } + } + } +} + +#ifndef ROCKSDB_VALGRIND_RUN +// We want to make sure that with a single writer and multiple +// concurrent readers (with no synchronization other than when a +// reader's iterator is created), the reader always observes all the +// data that was present in the skip list when the iterator was +// constructor. Because insertions are happening concurrently, we may +// also observe new values that were inserted since the iterator was +// constructed, but we should never miss any values that were present +// at iterator construction time. +// +// We generate multi-part keys: +// +// where: +// key is in range [0..K-1] +// gen is a generation number for key +// hash is hash(key,gen) +// +// The insertion code picks a random key, sets gen to be 1 + the last +// generation number inserted for that key, and sets hash to Hash(key,gen). +// +// At the beginning of a read, we snapshot the last inserted +// generation number for each key. We then iterate, including random +// calls to Next() and Seek(). For every key we encounter, we +// check that it is either expected given the initial snapshot or has +// been concurrently added since the iterator started. +class ConcurrentTest { + public: + static const uint32_t K = 8; + + private: + static uint64_t key(Key key) { return (key >> 40); } + static uint64_t gen(Key key) { return (key >> 8) & 0xffffffffu; } + static uint64_t hash(Key key) { return key & 0xff; } + + static uint64_t HashNumbers(uint64_t k, uint64_t g) { + uint64_t data[2] = {k, g}; + return Hash(reinterpret_cast(data), sizeof(data), 0); + } + + static Key MakeKey(uint64_t k, uint64_t g) { + assert(sizeof(Key) == sizeof(uint64_t)); + assert(k <= K); // We sometimes pass K to seek to the end of the skiplist + assert(g <= 0xffffffffu); + return ((k << 40) | (g << 8) | (HashNumbers(k, g) & 0xff)); + } + + static bool IsValidKey(Key k) { + return hash(k) == (HashNumbers(key(k), gen(k)) & 0xff); + } + + static Key RandomTarget(Random* rnd) { + switch (rnd->Next() % 10) { + case 0: + // Seek to beginning + return MakeKey(0, 0); + case 1: + // Seek to end + return MakeKey(K, 0); + default: + // Seek to middle + return MakeKey(rnd->Next() % K, 0); + } + } + + // Per-key generation + struct State { + std::atomic generation[K]; + void Set(int k, int v) { + generation[k].store(v, std::memory_order_release); + } + int Get(int k) { return generation[k].load(std::memory_order_acquire); } + + State() { + for (unsigned int k = 0; k < K; k++) { + Set(k, 0); + } + } + }; + + // Current state of the test + State current_; + + Arena arena_; + + // InlineSkipList is not protected by mu_. We just use a single writer + // thread to modify it. + AdaptiveRadixTree list_; + + public: + ConcurrentTest() : arena_(4096 * 16), list_(&arena_) {} + + void DEBUG_print() { + printf("memory allocated: %lu\n", arena_.MemoryAllocatedBytes()); + printf("memory usage: %lu\n", arena_.ApproximateMemoryUsage()); + } + + // REQUIRES: No concurrent calls to WriteStep or ConcurrentWriteStep + void WriteStep(Random* rnd) { + const uint32_t k = rnd->Next() % K; + const int g = current_.Get(k) + 1; + const Key new_key = MakeKey(k, g); + char* buf = list_.AllocateKey(sizeof(Key)); + memcpy(buf, Encode(new_key), sizeof(Key)); + list_.Insert(buf, 8, buf); + current_.Set(k, g); + } + + void ReadStep(Random* rnd) { + // Remember the initial committed state of the skiplist. + State initial_state; + for (unsigned int k = 0; k < K; k++) { + initial_state.Set(k, current_.Get(k)); + } + + Key pos = RandomTarget(rnd); + typename AdaptiveRadixTree::Iterator iter(&list_); + iter.Seek(Encode(pos), 8); + while (true) { + Key current; + if (!iter.Valid()) { + current = MakeKey(K, 0); + } else { + current = Decode(iter.Value()); + ASSERT_TRUE(IsValidKey(current)) << current; + } + ASSERT_LE(pos, current) << "should not go backwards"; + + // Verify that everything in [pos,current) was not present in + // initial_state. + while (pos < current) { + ASSERT_LT(key(pos), K) << pos; + + // Note that generation 0 is never inserted, so it is ok if + // <*,0,*> is missing. + ASSERT_TRUE((gen(pos) == 0U) || + (gen(pos) > static_cast(initial_state.Get( + static_cast(key(pos)))))) + << "key: " << key(pos) << "; gen: " << gen(pos) + << "; initgen: " << initial_state.Get(static_cast(key(pos))); + + // Advance to next key in the valid key space + if (key(pos) < key(current)) { + pos = MakeKey(key(pos) + 1, 0); + } else { + pos = MakeKey(key(pos), gen(pos) + 1); + } + } + + if (!iter.Valid()) { + break; + } + + if (rnd->Next() % 2) { + iter.Next(); + pos = MakeKey(key(pos), gen(pos) + 1); + } else { + Key new_target = RandomTarget(rnd); + if (new_target > pos) { + pos = new_target; + iter.Seek(Encode(new_target), 8); + } + } + } + } +}; + +const uint32_t ConcurrentTest::K; + +// Simple test that does single-threaded testing of the ConcurrentTest +// scaffolding. +TEST_F(ArtTest, ConcurrentReadWithoutThreads) { + { + ConcurrentTest test; + Random rnd(test::RandomSeed()); + for (int i = 0; i < 20000; i++) { + test.ReadStep(&rnd); + test.WriteStep(&rnd); + } + } + { + ConcurrentTest test; + Random rnd(test::RandomSeed()); + for (int i = 0; i < 20000; i++) { + test.ReadStep(&rnd); + test.WriteStep(&rnd); + } + } +} + +class TestState { + public: + TestState(int s) : seed_(s), quit_flag_(false) {} + + enum ReaderState { STARTING, RUNNING, DONE }; + virtual ~TestState() {} + virtual void Wait(ReaderState s) = 0; + virtual void Change(ReaderState s) = 0; + virtual void AdjustPendingWriters(int delta) = 0; + virtual void WaitForPendingWriters() = 0; + // REQUIRES: No concurrent calls for the same k + virtual void ReadStep(Random* rnd) = 0; + + public: + int seed_; + std::atomic quit_flag_; + std::atomic next_writer_; +}; + +class TestStateImpl : public TestState { + public: + ConcurrentTest t_; + + explicit TestStateImpl(int s) + : TestState(s), state_(STARTING), pending_writers_(0), state_cv_(&mu_) {} + + void Wait(ReaderState s) override { + mu_.Lock(); + while (state_ != s) { + state_cv_.Wait(); + } + mu_.Unlock(); + } + + void Change(ReaderState s) override { + mu_.Lock(); + state_ = s; + state_cv_.Signal(); + mu_.Unlock(); + } + + void AdjustPendingWriters(int delta) override { + mu_.Lock(); + pending_writers_ += delta; + if (pending_writers_ == 0) { + state_cv_.Signal(); + } + mu_.Unlock(); + } + + void WaitForPendingWriters() override { + mu_.Lock(); + while (pending_writers_ != 0) { + state_cv_.Wait(); + } + mu_.Unlock(); + } + + void ReadStep(Random* rnd) override { t_.ReadStep(rnd); } + + private: + port::Mutex mu_; + ReaderState state_; + int pending_writers_; + port::CondVar state_cv_; +}; + +static void ConcurrentReader(void* arg) { + TestState* state = reinterpret_cast(arg); + Random rnd(state->seed_); + int64_t reads = 0; + state->Change(TestState::RUNNING); + while (!state->quit_flag_.load(std::memory_order_acquire)) { + state->ReadStep(&rnd); + ++reads; + } + state->Change(TestState::DONE); +} + +static void RunConcurrentRead(int run) { + const int seed = test::RandomSeed() + (run * 100); + Random rnd(seed); + const int N = 1000; + const int kSize = 1000; + for (int i = 0; i < N; i++) { + if ((i % 100) == 0) { + fprintf(stderr, "Run %d of %d\n", i, N); + } + TestStateImpl state(seed + 1); + Env::Default()->SetBackgroundThreads(1); + Env::Default()->Schedule(ConcurrentReader, &state); + state.Wait(TestState::RUNNING); + for (int k = 0; k < kSize; ++k) { + state.t_.WriteStep(&rnd); + } + state.quit_flag_.store(true, std::memory_order_release); + state.Wait(TestState::DONE); + } +} + +TEST_F(ArtTest, ConcurrentRead1) { + RunConcurrentRead(1); + RunConcurrentRead(1); +} +TEST_F(ArtTest, ConcurrentRead2) { + RunConcurrentRead(2); + RunConcurrentRead(2); +} +TEST_F(ArtTest, ConcurrentRead3) { + RunConcurrentRead(3); + RunConcurrentRead(3); +} +TEST_F(ArtTest, ConcurrentRead4) { + RunConcurrentRead(4); + RunConcurrentRead(4); +} +TEST_F(ArtTest, ConcurrentRead5) { + RunConcurrentRead(5); + RunConcurrentRead(5); +} + +#endif // ROCKSDB_VALGRIND_RUN +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/memtable/artrep.cc b/memtable/artrep.cc new file mode 100644 index 00000000000..961efe34378 --- /dev/null +++ b/memtable/artrep.cc @@ -0,0 +1,143 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#include "db/memtable.h" +#include "memory/arena.h" +#include "memtable/art.h" +#include "rocksdb/memtablerep.h" + +namespace rocksdb { +namespace { + +class AdaptiveRadixTreeRep : public MemTableRep { + AdaptiveRadixTree skip_list_; + +public: + explicit AdaptiveRadixTreeRep(Allocator* allocator) + : MemTableRep(allocator), skip_list_(allocator) {} + + KeyHandle Allocate(const size_t len, char** buf) override { + *buf = allocator_->Allocate(len); + return static_cast(*buf); + } + + // Insert key into the list. + // REQUIRES: nothing that compares equal to key is currently in the list. + void Insert(KeyHandle handle) override { + char* buf = static_cast(handle); + uint32_t len = 0; + // +5: we assume "data" is not corrupted + // unsigned char is 7 bits, uint32_t is 32 bits, need 5 unsigned char + auto p = GetVarint32Ptr(buf, buf + 5 /* limit */, &len); + skip_list_.Insert(p, len - 8, buf); + } + + bool InsertKey(KeyHandle handle) override { + Insert(handle); + return true; + } + + // Returns true iff an entry that compares equal to key is in the list. + bool Contains(const char* key) const override { + return skip_list_.Get(key, static_cast(strlen(key))) != nullptr; + } + + size_t ApproximateMemoryUsage() override { + // All memory is allocated through allocator_; nothing to report here + return 0; + } + + void Get(const LookupKey& k, void* callback_args, + bool (*callback_func)(void* arg, const char* entry)) override { + const char* value = skip_list_.Get(k.user_key().data(), + static_cast(k.user_key().size())); + if (value != nullptr) { + callback_func(callback_args, value); + } + } + + uint64_t ApproximateNumEntries(const Slice& /* start_ikey */, + const Slice& /* end_ikey */) override { + return 0; + } + + ~AdaptiveRadixTreeRep() override {} + + // Iteration over the contents of a skip list + class Iterator : public MemTableRep::Iterator { + public: + // Initialize an iterator over the specified list. + // The returned iterator is not valid. + explicit Iterator(AdaptiveRadixTree* list) : iter_(list) {} + + ~Iterator() override {} + + // Returns true iff the iterator is positioned at a valid node. + bool Valid() const override { return iter_.Valid(); } + + // Returns the key at the current position. + // REQUIRES: Valid() + const char* key() const override { return iter_.Value(); } + + // Advances to the next position. + // REQUIRES: Valid() + void Next() override { iter_.Next(); } + + // Advances to the previous position. + // REQUIRES: Valid() + void Prev() override { assert(false); } + + // Advance to the first entry with a key >= target + void Seek(const Slice& user_key, const char* memtable_key) override { + if (memtable_key != nullptr) { + uint32_t l = 0; + const char* k = GetVarint32Ptr(memtable_key, memtable_key + 5, &l); + iter_.Seek(k, static_cast(l) - 8); + } else { + iter_.Seek(user_key.data(), static_cast(user_key.size()) - 8); + } + } + + // Retreat to the last entry with a key <= target + void SeekForPrev(const Slice& user_key, const char* memtable_key) override { + if (memtable_key != nullptr) { + uint32_t l = 0; + const char* k = GetVarint32Ptr(memtable_key, memtable_key + 5, &l); + iter_.SeekForPrev(k, l - 8); + } else { + iter_.SeekForPrev(user_key.data(), + static_cast(user_key.size()) - 8); + } + } + + // Position at the first entry in list. + // Final state of iterator is Valid() iff list is not empty. + void SeekToFirst() override { iter_.SeekToFirst(); } + + // Position at the last entry in list. + // Final state of iterator is Valid() iff list is not empty. + void SeekToLast() override { iter_.SeekToLast(); } + + protected: + std::string tmp_; // For passing to EncodeKey + AdaptiveRadixTree::Iterator iter_; + }; + + MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override { + void* mem = + arena ? arena->AllocateAligned(sizeof(AdaptiveRadixTreeRep::Iterator)) : + operator new(sizeof(AdaptiveRadixTreeRep::Iterator)); + return new (mem) AdaptiveRadixTreeRep::Iterator(&skip_list_); + } +}; +} + +MemTableRep* AdaptiveRadixTreeFactory::CreateMemTableRep( + const MemTableRep::KeyComparator& /* compare */, Allocator* allocator, + const SliceTransform* /* transform */, Logger* /*logger*/) { + return new AdaptiveRadixTreeRep(allocator); +} + +} // namespace rocksdb diff --git a/src.mk b/src.mk index 13923612f6d..9b9b9377d42 100644 --- a/src.mk +++ b/src.mk @@ -85,6 +85,8 @@ LIB_SOURCES = \ memtable/hash_linklist_rep.cc \ memtable/hash_skiplist_rep.cc \ memtable/skiplistrep.cc \ + memtable/artrep.cc \ + memtable/art.cc \ memtable/vectorrep.cc \ memtable/write_buffer_manager.cc \ monitoring/histogram.cc \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 68bc400b4f3..0fed01a5482 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1166,6 +1166,7 @@ enum RepFactory { kPrefixHash, kVectorRep, kHashLinkedList, + kAdaptiveRadixTree, }; static enum RepFactory StringToRepFactory(const char* ctype) { @@ -1179,6 +1180,9 @@ static enum RepFactory StringToRepFactory(const char* ctype) { return kVectorRep; else if (!strcasecmp(ctype, "hash_linkedlist")) return kHashLinkedList; + else if (!strcasecmp(ctype, "art")) { + return kAdaptiveRadixTree; + } fprintf(stdout, "Cannot parse memreptable %s\n", ctype); return kSkipList; @@ -2265,6 +2269,8 @@ class Benchmark { case kHashLinkedList: fprintf(stdout, "Memtablerep: hash_linkedlist\n"); break; + case kAdaptiveRadixTree: + fprintf(stdout, "Memtablerep: adaptive_radix_tree\n"); } fprintf(stdout, "Perf Level: %d\n", FLAGS_perf_level); @@ -3465,6 +3471,12 @@ class Benchmark { new VectorRepFactory ); break; + case kAdaptiveRadixTree: + options.memtable_factory.reset(new AdaptiveRadixTreeFactory()); + options.enable_pipelined_write = true; + options.enable_pipelined_commit = false; + options.allow_concurrent_memtable_write = false; + break; #else default: fprintf(stderr, "Only skip list is supported in lite mode\n");