Skip to content

Commit

Permalink
refactor: remove redundant allocations for streamer
Browse files Browse the repository at this point in the history
  • Loading branch information
BorysTheDev committed Nov 29, 2024
1 parent 3ad5b38 commit 351de74
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 45 deletions.
3 changes: 2 additions & 1 deletion src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ void OutgoingMigration::SyncFb() {
bool OutgoingMigration::FinalizeMigration(long attempt) {
// if it's not the 1st attempt and flows are work correctly we try to
// reconnect and ACK one more time
VLOG(1) << "FinalizeMigration for " << cf_->MyID() << " : " << migration_info_.node_info.id;
VLOG(1) << "FinalizeMigration for " << cf_->MyID() << " : " << migration_info_.node_info.id
<< " attempt " << attempt;
if (attempt > 1) {
if (cntx_.GetError()) {
return true;
Expand Down
77 changes: 36 additions & 41 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ void JournalStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) {
io::StringSink sink;
JournalWriter writer(&sink);
writer.Write(Entry{journal::Op::LSN, item.lsn});
Write(sink.str());
Write(std::move(sink).str());
}
});
}
Expand All @@ -83,50 +83,50 @@ void JournalStreamer::Cancel() {
}

size_t JournalStreamer::GetTotalBufferCapacities() const {
return in_flight_bytes_ + pending_buf_.capacity();
return in_flight_bytes_ + pending_buf_mem_size;
}

void JournalStreamer::Write(std::string_view str) {
DCHECK(!str.empty());
DVLOG(3) << "Writing " << str.size() << " bytes";

size_t total_pending = pending_buf_.size() + str.size();
void JournalStreamer::AsyncWrite() {
DCHECK(pending_buf_mem_size != 0);
DCHECK(!pending_buf_.empty());

if (in_flight_bytes_ > 0) {
// We can not flush data while there are in flight requests because AsyncWrite
// is not atomic. Therefore, we just aggregate.
size_t tail = pending_buf_.size();
pending_buf_.resize(pending_buf_.size() + str.size());
memcpy(pending_buf_.data() + tail, str.data(), str.size());
return;
}

// If we do not have any in flight requests we send the string right a way.
// We can not aggregate it since we do not know when the next update will follow.
// because of potential SOO with strings, we allocate explicitly on heap.
uint8_t* buf(new uint8_t[str.size()]);
const auto len = pending_buf_mem_size;
pending_buf_mem_size = 0;
auto tmp_pbuf = std::move(pending_buf_);
pending_buf_ = {};

// TODO: it is possible to remove these redundant copies if we adjust high level
// interfaces to pass reference-counted buffers.
memcpy(buf, str.data(), str.size());
in_flight_bytes_ += total_pending;
total_sent_ += total_pending;
in_flight_bytes_ += len;
total_sent_ += len;

iovec v[2];
unsigned next_buf_id = 0;
const auto v_size = tmp_pbuf.size();
absl::InlinedVector<iovec, 4> v(
v_size); // consider to use inline_vector and make part of JournalStreamer

if (!pending_buf_.empty()) {
v[0] = IoVec(pending_buf_);
++next_buf_id;
for (size_t i = 0; i < v_size; ++i) {
const auto* uptr = reinterpret_cast<const uint8_t*>(tmp_pbuf[i].data());
v[i] = (IoVec(io::Bytes(uptr, tmp_pbuf[i].size())));
}
v[next_buf_id++] = IoVec(io::Bytes(buf, str.size()));

dest_->AsyncWrite(
v, next_buf_id,
[buf0 = std::move(pending_buf_), buf, this, len = total_pending](std::error_code ec) {
delete[] buf;
OnCompletion(ec, len);
});
dest_->AsyncWrite(v.data(), v.size(),
[buf0 = std::move(tmp_pbuf), this, len](std::error_code ec) {
OnCompletion(std::move(ec), len);
});
}

void JournalStreamer::Write(std::string str) {
DCHECK(!str.empty());
DVLOG(2) << "Writing " << str.size() << " bytes";

pending_buf_mem_size += str.size();
pending_buf_.push_back(std::move(str));

AsyncWrite();
}

void JournalStreamer::OnCompletion(std::error_code ec, size_t len) {
Expand All @@ -136,13 +136,8 @@ void JournalStreamer::OnCompletion(std::error_code ec, size_t len) {
in_flight_bytes_ -= len;
if (ec && !IsStopped()) {
cntx_->ReportError(ec);
} else if (in_flight_bytes_ == 0 && !pending_buf_.empty() && !IsStopped()) {
// If everything was sent but we have a pending buf, flush it.
io::Bytes src(pending_buf_);
in_flight_bytes_ += src.size();
dest_->AsyncWrite(src, [buf = std::move(pending_buf_), this](std::error_code ec) {
OnCompletion(ec, buf.size());
});
} else if (!pending_buf_.empty() && !IsStopped()) {
AsyncWrite();
}

// notify ThrottleIfNeeded or WaitForInflightToComplete that waits
Expand Down Expand Up @@ -182,7 +177,7 @@ void JournalStreamer::WaitForInflightToComplete() {
}

bool JournalStreamer::IsStalled() const {
return in_flight_bytes_ + pending_buf_.size() >= replication_stream_output_limit_cached;
return in_flight_bytes_ + pending_buf_mem_size >= replication_stream_output_limit_cached;
}

RestoreStreamer::RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal,
Expand Down Expand Up @@ -234,7 +229,7 @@ void RestoreStreamer::SendFinalize(long attempt) {
io::StringSink sink;
JournalWriter writer{&sink};
writer.Write(entry);
Write(sink.str());
Write(std::move(sink).str());

// TODO: is the intent here to flush everything?
//
Expand Down Expand Up @@ -318,7 +313,7 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req

void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms) {
CmdSerializer serializer([&](std::string s) { Write(s); });
CmdSerializer serializer([&](std::string s) { Write(std::move(s)); });
serializer.SerializeEntry(key, pk, pv, expire_ms);
}

Expand Down
7 changes: 5 additions & 2 deletions src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class JournalStreamer {
// or wrap JournalItem::data in shared_ptr, we can avoid the cost of copying strings.
// Also, for small strings it's more peformant to copy to the intermediate buffer than
// to issue an io operation.
void Write(std::string_view str);
void Write(std::string str);

// Blocks the if the consumer if not keeping up.
void ThrottleIfNeeded();
Expand All @@ -53,6 +53,7 @@ class JournalStreamer {
Context* cntx_;

private:
void AsyncWrite();
void OnCompletion(std::error_code ec, size_t len);

bool IsStopped() const {
Expand All @@ -62,7 +63,9 @@ class JournalStreamer {
bool IsStalled() const;

journal::Journal* journal_;
std::vector<uint8_t> pending_buf_;

size_t pending_buf_mem_size = 0;
std::vector<std::string> pending_buf_;
size_t in_flight_bytes_ = 0, total_sent_ = 0;

time_t last_lsn_time_ = 0;
Expand Down
2 changes: 1 addition & 1 deletion tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1165,7 +1165,7 @@ async def test_cluster_data_migration(df_factory: DflyInstanceFactory, interrupt
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9",
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9,streamer=9",
)
for i in range(2)
]
Expand Down

0 comments on commit 351de74

Please sign in to comment.