Skip to content

Commit

Permalink
refactor: simplify journal and restore streamer cancelation
Browse files Browse the repository at this point in the history
  • Loading branch information
BorysTheDev committed Feb 4, 2025
1 parent 80e4012 commit fa338a6
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 27 deletions.
1 change: 1 addition & 0 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
}

void Cancel() {
cntx_.SwitchErrorHandler([](auto ge) {});
// Close socket for clean disconnect.
CloseSocket();
streamer_.Cancel();
Expand Down
33 changes: 17 additions & 16 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ JournalStreamer::JournalStreamer(journal::Journal* journal, Context* cntx)
}

JournalStreamer::~JournalStreamer() {
if (!cntx_->IsCancelled()) {
if (!cntx_->GetError()) {
DCHECK_EQ(in_flight_bytes_, 0u);
}
VLOG(1) << "~JournalStreamer";
Expand Down Expand Up @@ -83,7 +83,7 @@ void JournalStreamer::Cancel() {
VLOG(1) << "JournalStreamer::Cancel";
waker_.notifyAll();
journal_->UnregisterOnChange(journal_cb_id_);
if (!cntx_->IsCancelled()) {
if (!cntx_->GetError()) {
WaitForInflightToComplete();
}
}
Expand Down Expand Up @@ -134,10 +134,12 @@ void JournalStreamer::OnCompletion(std::error_code ec, size_t len) {
DVLOG(3) << "Completing " << in_flight_bytes_;
in_flight_bytes_ = 0;
pending_buf_.Pop();
if (ec && !IsStopped()) {
cntx_->ReportError(ec);
} else if (!pending_buf_.Empty() && !IsStopped()) {
AsyncWrite();
if (!cntx_->IsCancelled()) {
if (ec) {
cntx_->ReportError(ec);
} else if (!pending_buf_.Empty()) {
AsyncWrite();
}
}

// notify ThrottleIfNeeded or WaitForInflightToComplete that waits
Expand All @@ -149,7 +151,7 @@ void JournalStreamer::OnCompletion(std::error_code ec, size_t len) {
}

void JournalStreamer::ThrottleIfNeeded() {
if (IsStopped() || !IsStalled())
if (cntx_->IsCancelled() || !IsStalled())
return;

auto next =
Expand All @@ -158,7 +160,7 @@ void JournalStreamer::ThrottleIfNeeded() {
size_t sent_start = total_sent_;

std::cv_status status =
waker_.await_until([this]() { return !IsStalled() || IsStopped(); }, next);
waker_.await_until([this]() { return !IsStalled() || cntx_->IsCancelled(); }, next);
if (status == std::cv_status::timeout) {
LOG(WARNING) << "Stream timed out, inflight bytes/sent start: " << inflight_start << "/"
<< sent_start << ", end: " << in_flight_bytes_ << "/" << total_sent_;
Expand Down Expand Up @@ -188,7 +190,7 @@ RestoreStreamer::RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal
}

void RestoreStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) {
if (fiber_cancelled_)
if (cntx_->IsCancelled())
return;

VLOG(1) << "RestoreStreamer start";
Expand All @@ -206,16 +208,16 @@ void RestoreStreamer::Run() {
PrimeTable* pt = &db_array_[0]->prime;

do {
if (fiber_cancelled_)
if (cntx_->IsCancelled())
return;
cursor = pt->TraverseBuckets(cursor, [&](PrimeTable::bucket_iterator it) {
if (fiber_cancelled_) // Could be cancelled any time as Traverse may preempt
if (cntx_->IsCancelled()) // Could be cancelled any time as Traverse may preempt
return;

db_slice_->FlushChangeToEarlierCallbacks(0 /*db_id always 0 for cluster*/,
DbSlice::Iterator::FromPrime(it), snapshot_version_);

if (fiber_cancelled_) // Could have been cancelled in above call too
if (cntx_->IsCancelled()) // Could have been cancelled in above call too
return;

std::lock_guard guard(big_value_mu_);
Expand All @@ -231,7 +233,7 @@ void RestoreStreamer::Run() {
ThisFiber::Yield();
last_yield = 0;
}
} while (cursor && !fiber_cancelled_);
} while (cursor);

VLOG(1) << "RestoreStreamer finished loop of " << my_slots_.ToSlotRanges().ToString()
<< ", shard " << db_slice_->shard_id() << ". Buckets looped " << stats_.buckets_loop;
Expand All @@ -252,8 +254,7 @@ void RestoreStreamer::SendFinalize(long attempt) {
writer.Write(entry);
Write(std::move(sink).str());

// TODO: is the intent here to flush everything?
//
// DFLYMIGRATE ACK command has a timeout so we want to send it only when LSN is ready to be sent
ThrottleIfNeeded();
}

Expand All @@ -263,7 +264,7 @@ RestoreStreamer::~RestoreStreamer() {
void RestoreStreamer::Cancel() {
auto sver = snapshot_version_;
snapshot_version_ = 0; // to prevent double cancel in another fiber
fiber_cancelled_ = true;
cntx_->Cancel();
if (sver != 0) {
db_slice_->UnregisterOnChange(sver);
JournalStreamer::Cancel();
Expand Down
13 changes: 2 additions & 11 deletions src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class JournalStreamer {
void ThrottleIfNeeded();

virtual bool ShouldWrite(const journal::JournalItem& item) const {
return !IsStopped();
return !cntx_->IsCancelled();
}

void WaitForInflightToComplete();
Expand All @@ -59,10 +59,6 @@ class JournalStreamer {
void AsyncWrite();
void OnCompletion(std::error_code ec, size_t len);

bool IsStopped() const {
return cntx_->IsCancelled();
}

bool IsStalled() const;

journal::Journal* journal_;
Expand Down Expand Up @@ -91,10 +87,6 @@ class RestoreStreamer : public JournalStreamer {

void SendFinalize(long attempt);

bool IsSnapshotFinished() const {
return snapshot_finished_;
}

private:
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);
bool ShouldWrite(const journal::JournalItem& item) const override;
Expand All @@ -121,8 +113,7 @@ class RestoreStreamer : public JournalStreamer {
DbTableArray db_array_;
uint64_t snapshot_version_ = 0;
cluster::SlotSet my_slots_;
bool fiber_cancelled_ = false;
bool snapshot_finished_ = false;

ThreadLocalMutex big_value_mu_;
Stats stats_;
};
Expand Down

0 comments on commit fa338a6

Please sign in to comment.