Skip to content

Commit

Permalink
implemented blob : store in WSO
Browse files Browse the repository at this point in the history
  • Loading branch information
ban-nobuhiro committed Jan 22, 2025
1 parent 4bd8469 commit e89eb96
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 38 deletions.
12 changes: 10 additions & 2 deletions src/concurrency_control/include/local_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ class write_set_obj { // NOLINT
// for update / upsert / insert
write_set_obj(Storage const storage, OP_TYPE const op,
Record* const rec_ptr, std::string_view const val,
bool const inc_tombstone)
bool const inc_tombstone, std::vector<blob_id_type>&& lobs)
: storage_(storage), op_(op), rec_ptr_(rec_ptr), val_(val),
inc_tombstone_(inc_tombstone) {
inc_tombstone_(inc_tombstone), lobs_(lobs) {
if (op == OP_TYPE::DELETE) {
LOG_FIRST_N(ERROR, 1) << log_location_prefix << "unreachable path";
}
Expand Down Expand Up @@ -125,6 +125,8 @@ class write_set_obj { // NOLINT

[[nodiscard]] bool get_inc_tombstone() const { return inc_tombstone_; }

[[nodiscard]] const std::vector<blob_id_type>& get_lobs() const { return lobs_; }

void set_op(OP_TYPE op) { op_ = op; }

void set_rec_ptr(Record* rec_ptr) { rec_ptr_ = rec_ptr; }
Expand All @@ -137,6 +139,8 @@ class write_set_obj { // NOLINT

void set_inc_tombstone(bool tf) { inc_tombstone_ = tf; }

void set_lobs(std::vector<blob_id_type>&& lobs) { lobs_.swap(lobs); }

private:
/**
* @brief The target storage of this write.
Expand All @@ -162,6 +166,10 @@ class write_set_obj { // NOLINT
* we want to use std::atomic<bool> but some overload to reduce copy forbid it.
*/
bool inc_tombstone_{false};
/**
* large object info
*/
std::vector<blob_id_type> lobs_;
};

class local_write_set {
Expand Down
33 changes: 20 additions & 13 deletions src/concurrency_control/interface/insert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ void abort_insert(session* const ti) {
static inline Status insert_process(session* const ti, Storage st,
const std::string_view key,
const std::string_view val,
Record*& out_rec_ptr) {
Record*& out_rec_ptr,
std::vector<blob_id_type>& lobs) {
Record* rec_ptr{};
rec_ptr = new Record(key); // NOLINT
tid_word tid{rec_ptr->get_tidw()};
Expand Down Expand Up @@ -66,7 +67,7 @@ static inline Status insert_process(session* const ti, Storage st,
ti->push_to_read_set_for_stx({st, rec_ptr, tid});
}
}
ti->push_to_write_set({st, OP_TYPE::INSERT, rec_ptr, val, true});
ti->push_to_write_set({st, OP_TYPE::INSERT, rec_ptr, val, true, std::move(lobs)});
out_rec_ptr = rec_ptr;
return Status::OK;
}
Expand All @@ -84,7 +85,9 @@ static void register_read_if_ltx(session* const ti, Record* const rec_ptr) {

Status insert_body(Token const token, Storage const storage, // NOLINT
const std::string_view key,
const std::string_view val) {
const std::string_view val,
blob_id_type const* blobs_data,
std::size_t blobs_size) {
// check constraint: key
auto ret = check_constraint_key_length(key);
if (ret != Status::OK) { return ret; }
Expand All @@ -99,6 +102,10 @@ Status insert_body(Token const token, Storage const storage, // NOLINT
auto rc{check_before_write_ops(ti, storage, key, OP_TYPE::INSERT)};
if (rc != Status::OK) { return rc; }

std::vector<blob_id_type> lobs{blobs_size};
if (blobs_size != 0) {
lobs.assign(blobs_data, blobs_data + blobs_size); // NOLINT
}
for (;;) {
// index access to check local write set
Record* rec_ptr{};
Expand All @@ -113,15 +120,15 @@ Status insert_body(Token const token, Storage const storage, // NOLINT
if (in_ws->get_op() == OP_TYPE::DELETE) {
in_ws->set_op(OP_TYPE::UPDATE);
in_ws->set_val(val);
in_ws->set_lobs(std::move(lobs));
return Status::OK;
}
}

tid_word found_tid{};
rc = try_deleted_to_inserting(storage, key, rec_ptr, found_tid);
if (rc == Status::OK) { // ok already count up tombstone count
ti->push_to_write_set(
{storage, OP_TYPE::INSERT, rec_ptr, val, true});
ti->push_to_write_set({storage, OP_TYPE::INSERT, rec_ptr, val, true, std::move(lobs)});
register_read_if_ltx(ti, rec_ptr);
return Status::OK;
}
Expand Down Expand Up @@ -152,7 +159,7 @@ Status insert_body(Token const token, Storage const storage, // NOLINT
}

INSERT_PROCESS: // NOLINT
auto rc{insert_process(ti, storage, key, val, rec_ptr)};
auto rc{insert_process(ti, storage, key, val, rec_ptr, lobs)};
if (rc == Status::OK) {
register_read_if_ltx(ti, rec_ptr);
return Status::OK;
Expand All @@ -164,20 +171,20 @@ Status insert_body(Token const token, Storage const storage, // NOLINT
Status insert(Token const token, Storage const storage, // NOLINT
const std::string_view key,
const std::string_view val,
[[maybe_unused]] blob_id_type const* blobs_data,
[[maybe_unused]] std::size_t blobs_size) {
//TODO implement blobs
shirakami_log_entry << "insert, token: " << token
<< ", storage: " << storage << shirakami_binstring(key)
<< shirakami_binstring(val);
blob_id_type const* blobs_data,
std::size_t blobs_size) {
shirakami_log_entry_lazy("insert, token: " << token << ", storage: " << storage
<< shirakami_binstring(key) << shirakami_binstring(val)
<< ", blobs_data: " << blobs_data << ", blobs_size: " << blobs_size
<< " " << span_printer(blobs_data, blobs_size));
auto* ti = static_cast<session*>(token);
ti->process_before_start_step();
Status ret{};
{ // for strand
std::shared_lock<std::shared_mutex> lock{ti->get_mtx_state_da_term()};

// insert_body check warn not begin
ret = insert_body(token, storage, key, val);
ret = insert_body(token, storage, key, val, blobs_data, blobs_size);
}
ti->process_before_finish_step();
shirakami_log_exit << "insert, Status: " << ret;
Expand Down
26 changes: 17 additions & 9 deletions src/concurrency_control/interface/update.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ static void process_before_return_not_found(session* const ti,

Status update_body(Token token, Storage storage,
const std::string_view key,
const std::string_view val) {
const std::string_view val,
blob_id_type const* blobs_data,
std::size_t blobs_size) {
// check constraint: key
auto ret = check_constraint_key_length(key);
if (ret != Status::OK) { return ret; }
Expand All @@ -61,6 +63,11 @@ Status update_body(Token token, Storage storage,
auto rc{check_before_write_ops(ti, storage, key, OP_TYPE::UPDATE)};
if (rc != Status::OK) { return rc; }

std::vector<blob_id_type> lobs{blobs_size};
if (blobs_size != 0) {
lobs.assign(blobs_data, blobs_data + blobs_size); // NOLINT
}

// index access to check local write set
Record* rec_ptr{};
if (Status::OK == get<Record>(storage, key, rec_ptr)) {
Expand All @@ -71,6 +78,7 @@ Status update_body(Token token, Storage storage,
return Status::WARN_NOT_FOUND;
}
in_ws->set_val(val);
in_ws->set_lobs(std::move(lobs));
return Status::OK;
}

Expand All @@ -82,7 +90,7 @@ Status update_body(Token token, Storage storage,
}

// prepare write
ti->push_to_write_set({storage, OP_TYPE::UPDATE, rec_ptr, val, false});
ti->push_to_write_set({storage, OP_TYPE::UPDATE, rec_ptr, val, false, std::move(lobs)});
register_read_if_ltx(ti, rec_ptr);
return Status::OK;
}
Expand All @@ -93,20 +101,20 @@ Status update_body(Token token, Storage storage,
Status update(Token token, Storage storage,
std::string_view const key,
std::string_view const val,
[[maybe_unused]] blob_id_type const* blobs_data,
[[maybe_unused]] std::size_t blobs_size) {
//TODO implement blobs
shirakami_log_entry << "update, token: " << token
<< ", storage: " << storage << shirakami_binstring(key)
<< shirakami_binstring(val);
blob_id_type const* blobs_data,
std::size_t blobs_size) {
shirakami_log_entry_lazy("update, token: " << token << ", storage: " << storage
<< shirakami_binstring(key) << shirakami_binstring(val)
<< ", blobs_data: " << blobs_data << ", blobs_size: " << blobs_size
<< " " << span_printer(blobs_data, blobs_size));
auto* ti = static_cast<session*>(token);
ti->process_before_start_step();
Status ret{};
{ // for strand
std::shared_lock<std::shared_mutex> lock{ti->get_mtx_state_da_term()};

// update_body check termation by concurrent strand
ret = update_body(token, storage, key, val);
ret = update_body(token, storage, key, val, blobs_data, blobs_size);
}
ti->process_before_finish_step();
shirakami_log_exit << "update, Status: " << ret;
Expand Down
36 changes: 22 additions & 14 deletions src/concurrency_control/interface/upsert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ void abort_update(session* ti) {

static inline Status insert_process(session* const ti, Storage st,
const std::string_view key,
const std::string_view val) {
const std::string_view val,
std::vector<blob_id_type>& lobs) {
Record* rec_ptr{};
rec_ptr = new Record(key); // NOLINT
tid_word tid{rec_ptr->get_tidw()};
Expand Down Expand Up @@ -67,7 +68,7 @@ static inline Status insert_process(session* const ti, Storage st,
ti->push_to_read_set_for_stx({st, rec_ptr, tid});
}
}
ti->push_to_write_set({st, OP_TYPE::UPSERT, rec_ptr, val, true});
ti->push_to_write_set({st, OP_TYPE::UPSERT, rec_ptr, val, true, std::move(lobs)});
return Status::OK;
}
// fail insert rec_ptr
Expand All @@ -76,7 +77,9 @@ static inline Status insert_process(session* const ti, Storage st,
}

Status upsert_body(Token token, Storage storage, const std::string_view key,
const std::string_view val) {
const std::string_view val,
blob_id_type const* blobs_data,
std::size_t blobs_size) {
// check constraint: key
auto ret = check_constraint_key_length(key);
if (ret != Status::OK) { return ret; }
Expand All @@ -91,6 +94,11 @@ Status upsert_body(Token token, Storage storage, const std::string_view key,
auto rc{check_before_write_ops(ti, storage, key, OP_TYPE::UPSERT)};
if (rc != Status::OK) { return rc; }

std::vector<blob_id_type> lobs{blobs_size};
if (blobs_size != 0) {
lobs.assign(blobs_data, blobs_data + blobs_size); // NOLINT
}

for (;;) {
// index access to check local write set
Record* rec_ptr{};
Expand All @@ -104,6 +112,7 @@ Status upsert_body(Token token, Storage storage, const std::string_view key,
} else {
in_ws->set_val(val);
}
in_ws->set_lobs(std::move(lobs));
return Status::OK;
}

Expand All @@ -121,36 +130,35 @@ Status upsert_body(Token token, Storage storage, const std::string_view key,
}
if (rc == Status::OK) { // sharing tombstone
// prepare insert / upsert with tombstone count
ti->push_to_write_set({storage, OP_TYPE::UPSERT, rec_ptr, val,
true}); // NOLINT
ti->push_to_write_set({storage, OP_TYPE::UPSERT, rec_ptr, val, true, std::move(lobs)});
return Status::OK;
}
if (rc == Status::WARN_ALREADY_EXISTS) {
// prepare update
ti->push_to_write_set({storage, OP_TYPE::UPSERT, rec_ptr, val, false});
ti->push_to_write_set({storage, OP_TYPE::UPSERT, rec_ptr, val, false, std::move(lobs)});
return Status::OK;
}
if (rc == Status::WARN_CONCURRENT_INSERT) { continue; } // else
LOG_FIRST_N(ERROR, 1) << log_location_prefix << "unreachable path.";
}

INSERT_PROCESS:
rc = insert_process(ti, storage, key, val);
rc = insert_process(ti, storage, key, val, lobs);
if (rc == Status::ERR_CC) { return rc; }
}
}

Status upsert(Token token, Storage storage, std::string_view const key,
std::string_view const val,
[[maybe_unused]] blob_id_type const* blobs_data,
[[maybe_unused]] std::size_t blobs_size) {
//TODO implement blobs
shirakami_log_entry << "upsert, token: " << token << ", storage; "
<< storage << shirakami_binstring(key)
<< shirakami_binstring(val);
blob_id_type const* blobs_data,
std::size_t blobs_size) {
shirakami_log_entry_lazy("upsert, token: " << token << ", storage: " << storage
<< shirakami_binstring(key) << shirakami_binstring(val)
<< ", blobs_data: " << blobs_data << ", blobs_size: " << blobs_size
<< " " << span_printer(blobs_data, blobs_size));
auto* ti = static_cast<session*>(token);
ti->process_before_start_step();
auto ret = upsert_body(token, storage, key, val);
auto ret = upsert_body(token, storage, key, val, blobs_data, blobs_size);
ti->process_before_finish_step();
shirakami_log_exit << "upsert, Status: " << ret;
return ret;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#include <mutex>
#include <string>

#include "concurrency_control/include/session.h"

#include "gtest/gtest.h"
#include "shirakami/interface.h"
#include "glog/logging.h"
Expand Down Expand Up @@ -59,6 +61,40 @@ TEST_F(insert_update_test, insert_update) { // NOLINT
ASSERT_EQ(Status::OK, leave(s));
}

TEST_F(insert_update_test, insert_update_blob) {
// prepare
Storage st{};
create_storage("", st);
Token s{};
ASSERT_EQ(Status::OK, enter(s));

// test and verify
ASSERT_EQ(Status::OK, tx_begin({s, transaction_options::transaction_type::SHORT}));
const blob_id_type b1[2] = {11, 22};
ASSERT_EQ(Status::OK, insert(s, st, "", "v", b1, 2));
{
ASSERT_EQ(1, static_cast<session*>(s)->get_write_set().get_ref_cont_for_occ().size());
auto& lobs = static_cast<session*>(s)->get_write_set().get_ref_cont_for_occ().at(0).get_lobs();
EXPECT_EQ(2, lobs.size());
EXPECT_EQ(b1[0], lobs.at(0));
EXPECT_EQ(b1[1], lobs.at(1));
}
const blob_id_type b2[3] = {99, 88, 77};
ASSERT_EQ(Status::OK, update(s, st, "", "v1", b2, 3));
{
ASSERT_EQ(1, static_cast<session*>(s)->get_write_set().get_ref_cont_for_occ().size());
auto& lobs = static_cast<session*>(s)->get_write_set().get_ref_cont_for_occ().at(0).get_lobs();
EXPECT_EQ(3, lobs.size());
EXPECT_EQ(b2[0], lobs.at(0));
EXPECT_EQ(b2[1], lobs.at(1));
EXPECT_EQ(b2[2], lobs.at(2));
}
ASSERT_EQ(Status::OK, commit(s));

// cleanup
ASSERT_EQ(Status::OK, leave(s));
}

TEST_F(insert_update_test, update_insert) { // NOLINT
// prepare
Storage st{};
Expand Down

0 comments on commit e89eb96

Please sign in to comment.