Skip to content

Commit

Permalink
Initial
Browse files Browse the repository at this point in the history
Signed-off-by: subains <[email protected]>
  • Loading branch information
subains committed Dec 13, 2022
1 parent 8899a36 commit 3c8862b
Show file tree
Hide file tree
Showing 53 changed files with 5,101 additions and 187 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ else()
endif()

if( NOT DEFINED CMAKE_CXX_STANDARD )
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD 20)
endif()

include(CMakeDependentOption)
Expand Down
10 changes: 6 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ MACHINE ?= $(shell uname -m)
ARFLAGS = ${EXTRA_ARFLAGS} rs
STRIPFLAGS = -S -x

CXXFLAGS += -fcoroutines

# Transform parallel LOG output into something more readable.
perl_command = perl -n \
-e '@a=split("\t",$$_,-1); $$t=$$a[8];' \
Expand All @@ -46,7 +48,7 @@ quoted_perl_command = $(subst ','\'',$(perl_command))
# `make install`

# Set the default DEBUG_LEVEL to 1
DEBUG_LEVEL?=1
DEBUG_LEVEL?=2

# LIB_MODE says whether or not to use/build "shared" or "static" libraries.
# Mode "static" means to link against static libraries (.a)
Expand All @@ -69,7 +71,7 @@ else ifneq ($(filter shared_lib install-shared, $(MAKECMDGOALS)),)
DEBUG_LEVEL=0
LIB_MODE=shared
else ifneq ($(filter static_lib install-static, $(MAKECMDGOALS)),)
DEBUG_LEVEL=0
DEBUG_LEVEL=2
LIB_MODE=static
else ifneq ($(filter jtest rocksdbjava%, $(MAKECMDGOALS)),)
OBJ_DIR=jl
Expand Down Expand Up @@ -145,15 +147,15 @@ ifeq ($(DEBUG_LEVEL),0)
OPT += -DNDEBUG

ifneq ($(USE_RTTI), 1)
CXXFLAGS += -fno-rtti
# CXXFLAGS += -fno-rtti
else
CXXFLAGS += -DROCKSDB_USE_RTTI
endif
else
ifneq ($(USE_RTTI), 0)
CXXFLAGS += -DROCKSDB_USE_RTTI
else
CXXFLAGS += -fno-rtti
# CXXFLAGS += -fno-rtti
endif

ifdef ASSERT_STATUS_CHECKED
Expand Down
2 changes: 2 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ cpp_library(
"cache/lru_cache.cc",
"cache/sharded_cache.cc",
"db/arena_wrapped_db_iter.cc",
"db/async_future.cc",
"db/blob/blob_fetcher.cc",
"db/blob/blob_file_addition.cc",
"db/blob/blob_file_builder.cc",
Expand Down Expand Up @@ -483,6 +484,7 @@ cpp_library(
"cache/lru_cache.cc",
"cache/sharded_cache.cc",
"db/arena_wrapped_db_iter.cc",
"db/async_future.cc",
"db/blob/blob_fetcher.cc",
"db/blob/blob_file_addition.cc",
"db/blob/blob_file_builder.cc",
Expand Down
4 changes: 2 additions & 2 deletions build_tools/build_detect_platform
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fi
if [ "$ROCKSDB_CXX_STANDARD" ]; then
PLATFORM_CXXFLAGS="-std=$ROCKSDB_CXX_STANDARD"
else
PLATFORM_CXXFLAGS="-std=c++11"
PLATFORM_CXXFLAGS="-std=c++20"
fi

# we currently depend on POSIX platform
Expand Down Expand Up @@ -246,7 +246,7 @@ EOF
Cygwin)
PLATFORM=CYGWIN
PLATFORM_SHARED_CFLAGS=""
PLATFORM_CXXFLAGS="-std=gnu++11"
PLATFORM_CXXFLAGS="-std=gnu++20"
COMMON_FLAGS="$COMMON_FLAGS -DCYGWIN"
if [ -z "$USE_CLANG" ]; then
COMMON_FLAGS="$COMMON_FLAGS -fno-builtin-memcmp"
Expand Down
2 changes: 1 addition & 1 deletion db/arena_wrapped_db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ Status ArenaWrappedDBIter::Refresh() {
range_del_iter.reset(
sv->mem->NewRangeTombstoneIterator(read_options_, latest_seq));
range_del_agg->AddTombstones(std::move(range_del_iter));
cfd_->ReturnThreadLocalSuperVersion(sv);
cfd_->ReturnThreadLocalSuperVersion(db_impl_, sv);
}
// Refresh latest sequence number
db_iter_->set_sequence(latest_seq);
Expand Down
41 changes: 35 additions & 6 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,10 @@ SuperVersion* SuperVersion::Ref() {
return this;
}

uint32_t SuperVersion::GetRef() const {
return refs.load();
}

bool SuperVersion::Unref() {
// fetch_sub returns the previous value of ref
uint32_t previous_refs = refs.fetch_sub(1);
Expand Down Expand Up @@ -1200,7 +1204,7 @@ Compaction* ColumnFamilyData::CompactRange(
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(DBImpl* db) {
SuperVersion* sv = GetThreadLocalSuperVersion(db);
sv->Ref();
if (!ReturnThreadLocalSuperVersion(sv)) {
if (!ReturnThreadLocalSuperVersion(db, sv)) {
// This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
// when the thread-local pointer was populated. So, the Ref() earlier in
// this function still prevents the returned SuperVersion* from being
Expand All @@ -1219,7 +1223,7 @@ SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
// local pointer to guarantee exclusive access. If the thread local pointer
// is being used while a new SuperVersion is installed, the cached
// SuperVersion can become stale. In that case, the background thread would
// have swapped in kSVObsolete. We re-check the value at when returning
// have swapped in kSVObsolete. We re-check the value when returning
// SuperVersion back to thread local, with an atomic compare and swap.
// The superversion will need to be released if detected to be stale.
void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
Expand All @@ -1228,7 +1232,11 @@ SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
// (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
// should only keep kSVInUse before ReturnThreadLocalSuperVersion call
// (if no Scrape happens).
assert(ptr != SuperVersion::kSVInUse);
if (ptr == SuperVersion::kSVInUse) {
// FIXME: Check version number too.
return super_version_->Ref();
}

SuperVersion* sv = static_cast<SuperVersion*>(ptr);
if (sv == SuperVersion::kSVObsolete ||
sv->version_number != super_version_number_.load()) {
Expand Down Expand Up @@ -1259,22 +1267,42 @@ SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
return sv;
}

bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
bool ColumnFamilyData::ReturnThreadLocalSuperVersion(DBImpl* db, SuperVersion* sv) {
assert(sv != nullptr);

// Put the SuperVersion back
void* expected = SuperVersion::kSVInUse;

if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
// When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
// storage has not been altered and no Scrape has happened. The
// SuperVersion is still current.
return true;
} else if (expected != nullptr) {
// A scrape has happened, we have to adjust the refs in both
// the super version and CFD that it refers to.
// FIXME: This assumes the SVs "release" are not interleaved.
auto ptr = static_cast<SuperVersion*>(local_sv_->Swap(sv));
if (sv != ptr) {
db->mutex()->Lock();
assert(ptr != super_version_);
bool last_ref __attribute__((__unused__));
last_ref = ptr->Unref();
assert(last_ref);
refs_.fetch_sub(1);
ptr->Cleanup();
db->mutex()->Unlock();
// delete ptr;
} else {
sv->Unref();
}
return true;
} else {
// ThreadLocal scrape happened in the process of this GetImpl call (after
// thread local Swap() at the beginning and before CompareAndSwap()).
// This means the SuperVersion it holds is obsolete.
assert(expected == SuperVersion::kSVObsolete);
return false;
}
return false;
}

void ColumnFamilyData::InstallSuperVersion(
Expand Down Expand Up @@ -1329,6 +1357,7 @@ void ColumnFamilyData::ResetThreadLocalSuperVersions() {
continue;
}
auto sv = static_cast<SuperVersion*>(ptr);

bool was_last_ref __attribute__((__unused__));
was_last_ref = sv->Unref();
// sv couldn't have been the last reference because
Expand Down
9 changes: 7 additions & 2 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ struct SuperVersion {
// should be called outside the mutex
SuperVersion() = default;
~SuperVersion();

uint32_t GetRef() const;

SuperVersion* Ref();
// If Unref() returns true, Cleanup() should be called with mutex held
// before deleting this SuperVersion.
Expand Down Expand Up @@ -274,7 +277,9 @@ class ColumnFamilyData {
// Ref() can only be called from a context where the caller can guarantee
// that ColumnFamilyData is alive (while holding a non-zero ref already,
// holding a DB mutex, or as the leader in a write batch group).
void Ref() { refs_.fetch_add(1); }
void Ref() {
refs_.fetch_add(1);
}

// UnrefAndTryDelete() decreases the reference count and do free if needed,
// return true if this is freed else false, UnrefAndTryDelete() can only
Expand Down Expand Up @@ -440,7 +445,7 @@ class ColumnFamilyData {
// Try to return SuperVersion back to thread local storage. Return true on
// success and false on failure. It fails when the thread local storage
// contains anything other than SuperVersion::kSVInUse flag.
bool ReturnThreadLocalSuperVersion(SuperVersion* sv);
bool ReturnThreadLocalSuperVersion(DBImpl* db, SuperVersion* sv);
// thread-safe
uint64_t GetSuperVersionNumber() const {
return super_version_number_.load();
Expand Down
Loading

0 comments on commit 3c8862b

Please sign in to comment.