Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove redundant allocations for streamer #4225

Merged
merged 7 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ void OutgoingMigration::SyncFb() {
bool OutgoingMigration::FinalizeMigration(long attempt) {
// if it's not the 1st attempt and flows are work correctly we try to
// reconnect and ACK one more time
VLOG(1) << "FinalizeMigration for " << cf_->MyID() << " : " << migration_info_.node_info.id;
VLOG(1) << "FinalizeMigration for " << cf_->MyID() << " : " << migration_info_.node_info.id
<< " attempt " << attempt;
if (attempt > 1) {
if (cntx_.GetError()) {
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,7 @@ void DflyCmd::GetReplicationMemoryStats(ReplicationMemoryStats* stats) const {

const auto& flow = info->flows[shard->shard_id()];
if (flow.streamer)
streamer_bytes.fetch_add(flow.streamer->GetTotalBufferCapacities(), memory_order_relaxed);
streamer_bytes.fetch_add(flow.streamer->UsedBytes(), memory_order_relaxed);
if (flow.saver)
full_sync_bytes.fetch_add(flow.saver->GetTotalBuffersSize(), memory_order_relaxed);
}
Expand Down
70 changes: 70 additions & 0 deletions src/server/journal/journal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "base/gtest.h"
#include "base/logging.h"
#include "server/journal/pending_buf.h"
#include "server/journal/serializer.h"
#include "server/journal/types.h"
#include "server/serializer_commons.h"
Expand Down Expand Up @@ -125,5 +126,74 @@ TEST(Journal, WriteRead) {
}
}

TEST(Journal, PendingBuf) {
BorysTheDev marked this conversation as resolved.
Show resolved Hide resolved
PendingBuf pbuf;

ASSERT_TRUE(pbuf.empty());
ASSERT_EQ(pbuf.size(), 0);

pbuf.push("one");
pbuf.push(" small");
pbuf.push(" test");

ASSERT_FALSE(pbuf.empty());
ASSERT_EQ(pbuf.size(), 14);

{
auto& sending_buf = pbuf.PrepareSendingBuf();
ASSERT_EQ(sending_buf.buf.size(), 3);
ASSERT_EQ(sending_buf.mem_size, 14);

ASSERT_EQ(sending_buf.buf[0], "one");
ASSERT_EQ(sending_buf.buf[1], " small");
ASSERT_EQ(sending_buf.buf[2], " test");
}

const size_t string_num = 2000;
BorysTheDev marked this conversation as resolved.
Show resolved Hide resolved
for (size_t i = 0; i < string_num; ++i) {
pbuf.push("big_test");
BorysTheDev marked this conversation as resolved.
Show resolved Hide resolved
}

ASSERT_FALSE(pbuf.empty());
ASSERT_EQ(pbuf.size(), 14 + string_num * 8);

pbuf.Pop();

ASSERT_FALSE(pbuf.empty());
ASSERT_EQ(pbuf.size(), string_num * 8);

const auto next_buf_size = std::min(PendingBuf::Buf::max_buf_size, string_num);
{
auto& sending_buf = pbuf.PrepareSendingBuf();
ASSERT_EQ(sending_buf.buf.size(), next_buf_size);
ASSERT_EQ(sending_buf.mem_size, next_buf_size * 8);

for (const auto& s : sending_buf.buf) {
ASSERT_EQ(s, "big_test");
}
}

pbuf.Pop();

if (next_buf_size < string_num) {
BorysTheDev marked this conversation as resolved.
Show resolved Hide resolved
const auto last_buf_size = string_num - next_buf_size;
ASSERT_FALSE(pbuf.empty());
ASSERT_EQ(pbuf.size(), last_buf_size * 8);

auto& sending_buf = pbuf.PrepareSendingBuf();
ASSERT_EQ(sending_buf.buf.size(), last_buf_size);
ASSERT_EQ(sending_buf.mem_size, last_buf_size * 8);

for (const auto& s : sending_buf.buf) {
ASSERT_EQ(s, "big_test");
}

pbuf.Pop();
}

ASSERT_TRUE(pbuf.empty());
ASSERT_EQ(pbuf.size(), 0);
}

} // namespace journal
} // namespace dfly
68 changes: 68 additions & 0 deletions src/server/journal/pending_buf.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#pragma once

#include <absl/container/inlined_vector.h>

#include <algorithm>
#include <deque>

namespace dfly {

class PendingBuf {
public:
struct Buf {
size_t mem_size = 0;
absl::InlinedVector<std::string, 8> buf;

#ifdef UIO_MAXIOV
static constexpr size_t max_buf_size = UIO_MAXIOV;
BorysTheDev marked this conversation as resolved.
Show resolved Hide resolved
#elif
static constexpr size_t max_buf_size = 1024;
BorysTheDev marked this conversation as resolved.
Show resolved Hide resolved
#endif
};

PendingBuf() : bufs_(1) {
}

bool empty() const {
return std::all_of(bufs_.begin(), bufs_.end(), [](const auto& b) { return b.buf.empty(); });
}

void push(std::string str) {
BorysTheDev marked this conversation as resolved.
Show resolved Hide resolved
DCHECK(!bufs_.empty());
if (bufs_.back().buf.size() == Buf::max_buf_size) {
bufs_.emplace_back();
}
auto& fron_buf = bufs_.back();

fron_buf.mem_size += str.size();
fron_buf.buf.push_back(std::move(str));
}

// should be called to get the next buffer for sending
const Buf& PrepareSendingBuf() {
if (bufs_.size() == 1) {
BorysTheDev marked this conversation as resolved.
Show resolved Hide resolved
bufs_.emplace_back();
}
return bufs_.front();
}

// should be called when the buf from PrepareSendingBuf() method was sent
void Pop() {
DCHECK(bufs_.size() >= 2);
bufs_.pop_front();
}

size_t size() const {
return std::accumulate(bufs_.begin(), bufs_.end(), 0,
[](size_t s, const auto& b) { return s + b.mem_size; });
}

private:
std::deque<Buf> bufs_;
};

} // namespace dfly
75 changes: 33 additions & 42 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ void JournalStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) {
io::StringSink sink;
JournalWriter writer(&sink);
writer.Write(Entry{journal::Op::LSN, item.lsn});
Write(sink.str());
Write(std::move(sink).str());
}
});
}
Expand All @@ -86,67 +86,58 @@ void JournalStreamer::Cancel() {
}
}

size_t JournalStreamer::GetTotalBufferCapacities() const {
return in_flight_bytes_ + pending_buf_.capacity();
size_t JournalStreamer::UsedBytes() const {
return pending_buf_.size();
BorysTheDev marked this conversation as resolved.
Show resolved Hide resolved
}

void JournalStreamer::Write(std::string_view str) {
DCHECK(!str.empty());
DVLOG(3) << "Writing " << str.size() << " bytes";

size_t total_pending = pending_buf_.size() + str.size();
void JournalStreamer::AsyncWrite() {
DCHECK(!pending_buf_.empty());

if (in_flight_bytes_ > 0) {
// We can not flush data while there are in flight requests because AsyncWrite
// is not atomic. Therefore, we just aggregate.
size_t tail = pending_buf_.size();
pending_buf_.resize(pending_buf_.size() + str.size());
memcpy(pending_buf_.data() + tail, str.data(), str.size());
return;
}

// If we do not have any in flight requests we send the string right a way.
// We can not aggregate it since we do not know when the next update will follow.
// because of potential SOO with strings, we allocate explicitly on heap.
uint8_t* buf(new uint8_t[str.size()]);
const auto& cur_buf = pending_buf_.PrepareSendingBuf();

// TODO: it is possible to remove these redundant copies if we adjust high level
// interfaces to pass reference-counted buffers.
memcpy(buf, str.data(), str.size());
in_flight_bytes_ += total_pending;
total_sent_ += total_pending;
in_flight_bytes_ = cur_buf.mem_size;
total_sent_ += cur_buf.mem_size;

iovec v[2];
unsigned next_buf_id = 0;
const auto v_size = cur_buf.buf.size();
absl::InlinedVector<iovec, 8> v(v_size);

if (!pending_buf_.empty()) {
v[0] = IoVec(pending_buf_);
++next_buf_id;
for (size_t i = 0; i < v_size; ++i) {
const auto* uptr = reinterpret_cast<const uint8_t*>(cur_buf.buf[i].data());
v[i] = IoVec(io::Bytes(uptr, cur_buf.buf[i].size()));
}
v[next_buf_id++] = IoVec(io::Bytes(buf, str.size()));

dest_->AsyncWrite(
v, next_buf_id,
[buf0 = std::move(pending_buf_), buf, this, len = total_pending](std::error_code ec) {
delete[] buf;
OnCompletion(ec, len);
});
dest_->AsyncWrite(v.data(), v.size(), [this, len = cur_buf.mem_size](std::error_code ec) {
OnCompletion(std::move(ec), len);
});
}

void JournalStreamer::Write(std::string str) {
DCHECK(!str.empty());
DVLOG(2) << "Writing " << str.size() << " bytes";

pending_buf_.push(std::move(str));

AsyncWrite();
}

void JournalStreamer::OnCompletion(std::error_code ec, size_t len) {
DCHECK_GE(in_flight_bytes_, len);

DVLOG(3) << "Completing from " << in_flight_bytes_ << " to " << in_flight_bytes_ - len;
in_flight_bytes_ -= len;
if (in_flight_bytes_ == 0) {
pending_buf_.Pop();
}
if (ec && !IsStopped()) {
cntx_->ReportError(ec);
} else if (in_flight_bytes_ == 0 && !pending_buf_.empty() && !IsStopped()) {
// If everything was sent but we have a pending buf, flush it.
io::Bytes src(pending_buf_);
in_flight_bytes_ += src.size();
dest_->AsyncWrite(src, [buf = std::move(pending_buf_), this](std::error_code ec) {
OnCompletion(ec, buf.size());
});
} else if (!pending_buf_.empty() && !IsStopped()) {
AsyncWrite();
}

// notify ThrottleIfNeeded or WaitForInflightToComplete that waits
Expand Down Expand Up @@ -186,7 +177,7 @@ void JournalStreamer::WaitForInflightToComplete() {
}

bool JournalStreamer::IsStalled() const {
return in_flight_bytes_ + pending_buf_.size() >= replication_stream_output_limit_cached;
return pending_buf_.size() >= replication_stream_output_limit_cached;
}

RestoreStreamer::RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal,
Expand Down Expand Up @@ -245,7 +236,7 @@ void RestoreStreamer::SendFinalize(long attempt) {
io::StringSink sink;
JournalWriter writer{&sink};
writer.Write(entry);
Write(sink.str());
Write(std::move(sink).str());

// TODO: is the intent here to flush everything?
//
Expand Down Expand Up @@ -329,7 +320,7 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req

void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms) {
CmdSerializer serializer([&](std::string s) { Write(s); });
CmdSerializer serializer([&](std::string s) { Write(std::move(s)); });
serializer.SerializeEntry(key, pk, pv, expire_ms);
}

Expand Down
13 changes: 9 additions & 4 deletions src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@

#pragma once

#include <deque>

#include "server/common.h"
#include "server/db_slice.h"
#include "server/journal/journal.h"
#include "server/journal/pending_buf.h"
#include "server/journal/serializer.h"
#include "server/rdb_save.h"

Expand All @@ -30,15 +33,15 @@ class JournalStreamer {
// and manual cleanup.
virtual void Cancel();

size_t GetTotalBufferCapacities() const;
size_t UsedBytes() const;

protected:
// TODO: we copy the string on each write because JournalItem may be passed to multiple
// streamers so we can not move it. However, if we would either wrap JournalItem in shared_ptr
// or wrap JournalItem::data in shared_ptr, we can avoid the cost of copying strings.
// Also, for small strings it's more peformant to copy to the intermediate buffer than
// to issue an io operation.
void Write(std::string_view str);
void Write(std::string str);

// Blocks the if the consumer if not keeping up.
void ThrottleIfNeeded();
Expand All @@ -53,6 +56,7 @@ class JournalStreamer {
Context* cntx_;

private:
void AsyncWrite();
void OnCompletion(std::error_code ec, size_t len);

bool IsStopped() const {
Expand All @@ -62,9 +66,10 @@ class JournalStreamer {
bool IsStalled() const;

journal::Journal* journal_;
std::vector<uint8_t> pending_buf_;
size_t in_flight_bytes_ = 0, total_sent_ = 0;

PendingBuf pending_buf_;

size_t in_flight_bytes_ = 0, total_sent_ = 0;
time_t last_lsn_time_ = 0;
util::fb2::EventCount waker_;
uint32_t journal_cb_id_{0};
Expand Down
2 changes: 1 addition & 1 deletion tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1300,7 +1300,7 @@ async def test_cluster_data_migration(df_factory: DflyInstanceFactory, interrupt
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9",
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9,streamer=9",
)
for i in range(2)
]
Expand Down
Loading