diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index 9abe9cf08bc5..b0d0ce794266 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -281,7 +281,8 @@ void OutgoingMigration::SyncFb() { bool OutgoingMigration::FinalizeMigration(long attempt) { // if it's not the 1st attempt and flows are work correctly we try to // reconnect and ACK one more time - VLOG(1) << "FinalizeMigration for " << cf_->MyID() << " : " << migration_info_.node_info.id; + VLOG(1) << "FinalizeMigration for " << cf_->MyID() << " : " << migration_info_.node_info.id + << " attempt " << attempt; if (attempt > 1) { if (cntx_.GetError()) { return true; diff --git a/src/server/common.h b/src/server/common.h index 8460b6e78c9b..6b63aafa37bb 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -140,8 +140,13 @@ extern unsigned kernel_version; const char* GlobalStateName(GlobalState gs); -template std::string GetRandomHex(RandGen& gen, size_t len) { +template +std::string GetRandomHex(RandGen& gen, size_t len, size_t len_deviation = 0) { static_assert(std::is_same::value); + if (len_deviation) { + len += (gen() % len_deviation); + } + std::string res(len, '\0'); size_t indx = 0; diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index c7eadc6c0071..b2b806c06cd2 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -778,7 +778,7 @@ void DflyCmd::GetReplicationMemoryStats(ReplicationMemoryStats* stats) const { const auto& flow = info->flows[shard->shard_id()]; if (flow.streamer) - streamer_bytes.fetch_add(flow.streamer->GetTotalBufferCapacities(), memory_order_relaxed); + streamer_bytes.fetch_add(flow.streamer->UsedBytes(), memory_order_relaxed); if (flow.saver) full_sync_bytes.fetch_add(flow.saver->GetTotalBuffersSize(), memory_order_relaxed); } diff --git a/src/server/journal/journal_test.cc b/src/server/journal/journal_test.cc index b15f3f46a107..7461f7a59cb2 100644 --- a/src/server/journal/journal_test.cc +++ b/src/server/journal/journal_test.cc @@ -1,7 +1,9 @@ +#include #include #include "base/gtest.h" #include "base/logging.h" +#include "server/journal/pending_buf.h" #include "server/journal/serializer.h" #include "server/journal/types.h" #include "server/serializer_commons.h" @@ -125,5 +127,95 @@ TEST(Journal, WriteRead) { } } +TEST(Journal, PendingBuf) { + PendingBuf pbuf; + + ASSERT_TRUE(pbuf.Empty()); + ASSERT_EQ(pbuf.Size(), 0); + + pbuf.Push("one"); + pbuf.Push(" smallllllllllllllllllllllllllllllll"); + pbuf.Push(" test"); + + ASSERT_FALSE(pbuf.Empty()); + ASSERT_EQ(pbuf.Size(), 44); + + { + auto& sending_buf = pbuf.PrepareSendingBuf(); + ASSERT_EQ(sending_buf.buf.size(), 3); + ASSERT_EQ(sending_buf.mem_size, 44); + + ASSERT_EQ(sending_buf.buf[0], "one"); + ASSERT_EQ(sending_buf.buf[1], " smallllllllllllllllllllllllllllllll"); + ASSERT_EQ(sending_buf.buf[2], " test"); + } + + const size_t string_num = PendingBuf::Buf::kMaxBufSize + 1000; + std::vector test_data; + test_data.reserve(string_num); + + absl::InsecureBitGen gen; + + for (size_t i = 0; i < string_num; ++i) { + auto str = GetRandomHex(gen, 10, 90); + test_data.push_back(str); + pbuf.Push(std::move(str)); + } + + const size_t test_data_size = + std::accumulate(test_data.begin(), test_data.end(), 0, + [](size_t size, const auto& s) { return s.size() + size; }); + + ASSERT_FALSE(pbuf.Empty()); + ASSERT_EQ(pbuf.Size(), 44 + test_data_size); + + pbuf.Pop(); + + ASSERT_FALSE(pbuf.Empty()); + ASSERT_EQ(pbuf.Size(), test_data_size); + + { + auto& sending_buf = pbuf.PrepareSendingBuf(); + + const size_t send_buf_size = + std::accumulate(test_data.begin(), test_data.begin() + PendingBuf::Buf::kMaxBufSize, 0, + [](size_t size, const auto& s) { return s.size() + size; }); + + ASSERT_EQ(sending_buf.buf.size(), PendingBuf::Buf::kMaxBufSize); + ASSERT_EQ(sending_buf.mem_size, send_buf_size); + + for (size_t i = 0; i < sending_buf.buf.size(); ++i) { + ASSERT_EQ(sending_buf.buf[i], test_data[i]); + } + } + + pbuf.Pop(); + + test_data.erase(test_data.begin(), test_data.begin() + PendingBuf::Buf::kMaxBufSize); + + const size_t last_buf_size = + std::accumulate(test_data.begin(), test_data.end(), 0, + [](size_t size, const auto& s) { return s.size() + size; }); + + ASSERT_FALSE(pbuf.Empty()); + ASSERT_EQ(pbuf.Size(), last_buf_size); + + { + auto& sending_buf = pbuf.PrepareSendingBuf(); + + ASSERT_EQ(sending_buf.buf.size(), 1000); + ASSERT_EQ(sending_buf.mem_size, last_buf_size); + + for (size_t i = 0; i < sending_buf.buf.size(); ++i) { + ASSERT_EQ(sending_buf.buf[i], test_data[i]); + } + } + + pbuf.Pop(); + + ASSERT_TRUE(pbuf.Empty()); + ASSERT_EQ(pbuf.Size(), 0); +} + } // namespace journal } // namespace dfly diff --git a/src/server/journal/pending_buf.h b/src/server/journal/pending_buf.h new file mode 100644 index 000000000000..6049bd05d6b2 --- /dev/null +++ b/src/server/journal/pending_buf.h @@ -0,0 +1,69 @@ +// Copyright 2024, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include + +#include +#include + +namespace dfly { + +class PendingBuf { + public: + struct Buf { + size_t mem_size = 0; + absl::InlinedVector buf; + +#ifdef UIO_MAXIOV + static constexpr size_t kMaxBufSize = UIO_MAXIOV; +#else + static constexpr size_t kMaxBufSize = 1024; +#endif + }; + + PendingBuf() : bufs_(1) { + } + + bool Empty() const { + return std::all_of(bufs_.begin(), bufs_.end(), [](const auto& b) { return b.buf.empty(); }); + } + + void Push(std::string str) { + DCHECK(!bufs_.empty()); + if (bufs_.back().buf.size() == Buf::kMaxBufSize) { + bufs_.emplace_back(); + } + auto& fron_buf = bufs_.back(); + + fron_buf.mem_size += str.size(); + fron_buf.buf.push_back(std::move(str)); + } + + // should be called to get the next buffer for sending + const Buf& PrepareSendingBuf() { + // Adding to the buffer ensures that future `Push()`es will not modify the in-flight buffer + if (bufs_.size() == 1) { + bufs_.emplace_back(); + } + return bufs_.front(); + } + + // should be called when the buf from PrepareSendingBuf() method was sent + void Pop() { + DCHECK(bufs_.size() >= 2); + bufs_.pop_front(); + } + + size_t Size() const { + return std::accumulate(bufs_.begin(), bufs_.end(), 0, + [](size_t s, const auto& b) { return s + b.mem_size; }); + } + + private: + std::deque bufs_; +}; + +} // namespace dfly diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index c224d25dc95e..8f9fe8750198 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -72,7 +72,7 @@ void JournalStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) { io::StringSink sink; JournalWriter writer(&sink); writer.Write(Entry{journal::Op::LSN, item.lsn}); - Write(sink.str()); + Write(std::move(sink).str()); } }); } @@ -86,51 +86,44 @@ void JournalStreamer::Cancel() { } } -size_t JournalStreamer::GetTotalBufferCapacities() const { - return in_flight_bytes_ + pending_buf_.capacity(); +size_t JournalStreamer::UsedBytes() const { + return pending_buf_.Size(); } -void JournalStreamer::Write(std::string_view str) { - DCHECK(!str.empty()); - DVLOG(3) << "Writing " << str.size() << " bytes"; - - size_t total_pending = pending_buf_.size() + str.size(); +void JournalStreamer::AsyncWrite() { + DCHECK(!pending_buf_.Empty()); if (in_flight_bytes_ > 0) { // We can not flush data while there are in flight requests because AsyncWrite // is not atomic. Therefore, we just aggregate. - size_t tail = pending_buf_.size(); - pending_buf_.resize(pending_buf_.size() + str.size()); - memcpy(pending_buf_.data() + tail, str.data(), str.size()); return; } - // If we do not have any in flight requests we send the string right a way. - // We can not aggregate it since we do not know when the next update will follow. - // because of potential SOO with strings, we allocate explicitly on heap. - uint8_t* buf(new uint8_t[str.size()]); + const auto& cur_buf = pending_buf_.PrepareSendingBuf(); - // TODO: it is possible to remove these redundant copies if we adjust high level - // interfaces to pass reference-counted buffers. - memcpy(buf, str.data(), str.size()); - in_flight_bytes_ += total_pending; - total_sent_ += total_pending; + in_flight_bytes_ = cur_buf.mem_size; + total_sent_ += cur_buf.mem_size; - iovec v[2]; - unsigned next_buf_id = 0; + const auto v_size = cur_buf.buf.size(); + absl::InlinedVector v(v_size); - if (!pending_buf_.empty()) { - v[0] = IoVec(pending_buf_); - ++next_buf_id; + for (size_t i = 0; i < v_size; ++i) { + const auto* uptr = reinterpret_cast(cur_buf.buf[i].data()); + v[i] = IoVec(io::Bytes(uptr, cur_buf.buf[i].size())); } - v[next_buf_id++] = IoVec(io::Bytes(buf, str.size())); - dest_->AsyncWrite( - v, next_buf_id, - [buf0 = std::move(pending_buf_), buf, this, len = total_pending](std::error_code ec) { - delete[] buf; - OnCompletion(ec, len); - }); + dest_->AsyncWrite(v.data(), v.size(), [this, len = cur_buf.mem_size](std::error_code ec) { + OnCompletion(std::move(ec), len); + }); +} + +void JournalStreamer::Write(std::string str) { + DCHECK(!str.empty()); + DVLOG(2) << "Writing " << str.size() << " bytes"; + + pending_buf_.Push(std::move(str)); + + AsyncWrite(); } void JournalStreamer::OnCompletion(std::error_code ec, size_t len) { @@ -138,15 +131,13 @@ void JournalStreamer::OnCompletion(std::error_code ec, size_t len) { DVLOG(3) << "Completing from " << in_flight_bytes_ << " to " << in_flight_bytes_ - len; in_flight_bytes_ -= len; + if (in_flight_bytes_ == 0) { + pending_buf_.Pop(); + } if (ec && !IsStopped()) { cntx_->ReportError(ec); - } else if (in_flight_bytes_ == 0 && !pending_buf_.empty() && !IsStopped()) { - // If everything was sent but we have a pending buf, flush it. - io::Bytes src(pending_buf_); - in_flight_bytes_ += src.size(); - dest_->AsyncWrite(src, [buf = std::move(pending_buf_), this](std::error_code ec) { - OnCompletion(ec, buf.size()); - }); + } else if (!pending_buf_.Empty() && !IsStopped()) { + AsyncWrite(); } // notify ThrottleIfNeeded or WaitForInflightToComplete that waits @@ -186,7 +177,7 @@ void JournalStreamer::WaitForInflightToComplete() { } bool JournalStreamer::IsStalled() const { - return in_flight_bytes_ + pending_buf_.size() >= replication_stream_output_limit_cached; + return pending_buf_.Size() >= replication_stream_output_limit_cached; } RestoreStreamer::RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal, @@ -245,7 +236,7 @@ void RestoreStreamer::SendFinalize(long attempt) { io::StringSink sink; JournalWriter writer{&sink}; writer.Write(entry); - Write(sink.str()); + Write(std::move(sink).str()); // TODO: is the intent here to flush everything? // @@ -329,7 +320,7 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv, uint64_t expire_ms) { - CmdSerializer serializer([&](std::string s) { Write(s); }); + CmdSerializer serializer([&](std::string s) { Write(std::move(s)); }); serializer.SerializeEntry(key, pk, pv, expire_ms); } diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index a5ef1ad978a2..a18615a053f1 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -4,9 +4,12 @@ #pragma once +#include + #include "server/common.h" #include "server/db_slice.h" #include "server/journal/journal.h" +#include "server/journal/pending_buf.h" #include "server/journal/serializer.h" #include "server/rdb_save.h" @@ -30,7 +33,7 @@ class JournalStreamer { // and manual cleanup. virtual void Cancel(); - size_t GetTotalBufferCapacities() const; + size_t UsedBytes() const; protected: // TODO: we copy the string on each write because JournalItem may be passed to multiple @@ -38,7 +41,7 @@ class JournalStreamer { // or wrap JournalItem::data in shared_ptr, we can avoid the cost of copying strings. // Also, for small strings it's more peformant to copy to the intermediate buffer than // to issue an io operation. - void Write(std::string_view str); + void Write(std::string str); // Blocks the if the consumer if not keeping up. void ThrottleIfNeeded(); @@ -53,6 +56,7 @@ class JournalStreamer { Context* cntx_; private: + void AsyncWrite(); void OnCompletion(std::error_code ec, size_t len); bool IsStopped() const { @@ -62,9 +66,10 @@ class JournalStreamer { bool IsStalled() const; journal::Journal* journal_; - std::vector pending_buf_; - size_t in_flight_bytes_ = 0, total_sent_ = 0; + PendingBuf pending_buf_; + + size_t in_flight_bytes_ = 0, total_sent_ = 0; time_t last_lsn_time_ = 0; util::fb2::EventCount waker_; uint32_t journal_cb_id_{0}; diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 879aa291796c..7d4e7941db7c 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1300,7 +1300,7 @@ async def test_cluster_data_migration(df_factory: DflyInstanceFactory, interrupt df_factory.create( port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000, - vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9", + vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9,streamer=9", ) for i in range(2) ]