Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: simplify journal and restore streamer cancellation NOT READY FOR REVIEW #4549

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
}

void Cancel() {
cntx_.SwitchErrorHandler([](auto ge) {});
// Close socket for clean disconnect.
CloseSocket();
streamer_.Cancel();
Expand Down
33 changes: 17 additions & 16 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ JournalStreamer::JournalStreamer(journal::Journal* journal, Context* cntx)
}

JournalStreamer::~JournalStreamer() {
if (!cntx_->IsCancelled()) {
if (!cntx_->GetError()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why GetError() here and not IsCancelled()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because if we cancel the streamer, we should wait for WaitForInflightToComplete() and in_flight_bytes_ should be 0
otherwise not

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it's the exact same thing? We can cancel a context in two ways:

  1. Calling Contenxt::Cancel which submits the error code operation_cancel to the underline error handler via the ReportError
  2. By calling ReportError which submits the error code passed as an argument to the underline error handler

Both (1) and (2) call Cancellation::Cancel so IsCancelled() will always be true for both paths.

Why do we need to fetch the error here via GetError and why does not !cntx_->IsCancelled() is not enough in this context ?

I could be missing something -- I just want to make sure I understand this

Copy link
Contributor Author

@BorysTheDev BorysTheDev Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx @kostasrim you are right. Honestly, such context interface doesn't look logical to me and I thought the functions are different. your comment helps me

DCHECK_EQ(in_flight_bytes_, 0u);
}
VLOG(1) << "~JournalStreamer";
Expand Down Expand Up @@ -83,7 +83,7 @@ void JournalStreamer::Cancel() {
VLOG(1) << "JournalStreamer::Cancel";
waker_.notifyAll();
journal_->UnregisterOnChange(journal_cb_id_);
if (!cntx_->IsCancelled()) {
if (!cntx_->GetError()) {
WaitForInflightToComplete();
}
}
Expand Down Expand Up @@ -134,10 +134,12 @@ void JournalStreamer::OnCompletion(std::error_code ec, size_t len) {
DVLOG(3) << "Completing " << in_flight_bytes_;
in_flight_bytes_ = 0;
pending_buf_.Pop();
if (ec && !IsStopped()) {
cntx_->ReportError(ec);
} else if (!pending_buf_.Empty() && !IsStopped()) {
AsyncWrite();
if (!cntx_->IsCancelled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we care loosing the error code here on cancellation ? Could it be that the AsyncWrite fails for some other reason ? Would it be useful to store this error or log it ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't care regarding error if we cancel streamer

if (ec) {
cntx_->ReportError(ec);
} else if (!pending_buf_.Empty()) {
AsyncWrite();
}
}

// notify ThrottleIfNeeded or WaitForInflightToComplete that waits
Expand All @@ -149,7 +151,7 @@ void JournalStreamer::OnCompletion(std::error_code ec, size_t len) {
}

void JournalStreamer::ThrottleIfNeeded() {
if (IsStopped() || !IsStalled())
if (cntx_->IsCancelled() || !IsStalled())
return;

auto next =
Expand All @@ -158,7 +160,7 @@ void JournalStreamer::ThrottleIfNeeded() {
size_t sent_start = total_sent_;

std::cv_status status =
waker_.await_until([this]() { return !IsStalled() || IsStopped(); }, next);
waker_.await_until([this]() { return !IsStalled() || cntx_->IsCancelled(); }, next);
if (status == std::cv_status::timeout) {
LOG(WARNING) << "Stream timed out, inflight bytes/sent start: " << inflight_start << "/"
<< sent_start << ", end: " << in_flight_bytes_ << "/" << total_sent_;
Expand Down Expand Up @@ -188,7 +190,7 @@ RestoreStreamer::RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal
}

void RestoreStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) {
if (fiber_cancelled_)
if (cntx_->IsCancelled())
return;

VLOG(1) << "RestoreStreamer start";
Expand All @@ -206,16 +208,16 @@ void RestoreStreamer::Run() {
PrimeTable* pt = &db_array_[0]->prime;

do {
if (fiber_cancelled_)
if (cntx_->IsCancelled())
return;
cursor = pt->TraverseBuckets(cursor, [&](PrimeTable::bucket_iterator it) {
if (fiber_cancelled_) // Could be cancelled any time as Traverse may preempt
if (cntx_->IsCancelled()) // Could be cancelled any time as Traverse may preempt
return;

db_slice_->FlushChangeToEarlierCallbacks(0 /*db_id always 0 for cluster*/,
DbSlice::Iterator::FromPrime(it), snapshot_version_);

if (fiber_cancelled_) // Could have been cancelled in above call too
if (cntx_->IsCancelled()) // Could have been cancelled in above call too
return;

std::lock_guard guard(big_value_mu_);
Expand All @@ -231,7 +233,7 @@ void RestoreStreamer::Run() {
ThisFiber::Yield();
last_yield = 0;
}
} while (cursor && !fiber_cancelled_);
} while (cursor);

VLOG(1) << "RestoreStreamer finished loop of " << my_slots_.ToSlotRanges().ToString()
<< ", shard " << db_slice_->shard_id() << ". Buckets looped " << stats_.buckets_loop;
Expand All @@ -252,8 +254,7 @@ void RestoreStreamer::SendFinalize(long attempt) {
writer.Write(entry);
Write(std::move(sink).str());

// TODO: is the intent here to flush everything?
//
// DFLYMIGRATE ACK command has a timeout so we want to send it only when LSN is ready to be sent
ThrottleIfNeeded();
}

Expand All @@ -263,7 +264,7 @@ RestoreStreamer::~RestoreStreamer() {
void RestoreStreamer::Cancel() {
auto sver = snapshot_version_;
snapshot_version_ = 0; // to prevent double cancel in another fiber
fiber_cancelled_ = true;
cntx_->Cancel();
if (sver != 0) {
db_slice_->UnregisterOnChange(sver);
JournalStreamer::Cancel();
Expand Down
13 changes: 2 additions & 11 deletions src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class JournalStreamer {
void ThrottleIfNeeded();

virtual bool ShouldWrite(const journal::JournalItem& item) const {
return !IsStopped();
return !cntx_->IsCancelled();
}

void WaitForInflightToComplete();
Expand All @@ -59,10 +59,6 @@ class JournalStreamer {
void AsyncWrite();
void OnCompletion(std::error_code ec, size_t len);

bool IsStopped() const {
return cntx_->IsCancelled();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

}

bool IsStalled() const;

journal::Journal* journal_;
Expand Down Expand Up @@ -91,10 +87,6 @@ class RestoreStreamer : public JournalStreamer {

void SendFinalize(long attempt);

bool IsSnapshotFinished() const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead function! nice!

return snapshot_finished_;
}

private:
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);
bool ShouldWrite(const journal::JournalItem& item) const override;
Expand All @@ -121,8 +113,7 @@ class RestoreStreamer : public JournalStreamer {
DbTableArray db_array_;
uint64_t snapshot_version_ = 0;
cluster::SlotSet my_slots_;
bool fiber_cancelled_ = false;
bool snapshot_finished_ = false;

ThreadLocalMutex big_value_mu_;
Stats stats_;
};
Expand Down
Loading