Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: simplify journal and restore streamer cancellation NOT READY FOR REVIEW #4549

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

BorysTheDev
Copy link
Contributor

@BorysTheDev BorysTheDev commented Feb 3, 2025

fixes: #4284

@BorysTheDev BorysTheDev requested a review from adiholden February 3, 2025 13:00
@@ -42,7 +42,7 @@ JournalStreamer::JournalStreamer(journal::Journal* journal, Context* cntx)
}

JournalStreamer::~JournalStreamer() {
if (!cntx_->IsCancelled()) {
if (!cntx_->GetError()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why GetError() here and not IsCancelled()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because if we cancel the streamer, we should wait for WaitForInflightToComplete() and in_flight_bytes_ should be 0
otherwise not

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it's the exact same thing? We can cancel a context in two ways:

  1. Calling Contenxt::Cancel which submits the error code operation_cancel to the underline error handler via the ReportError
  2. By calling ReportError which submits the error code passed as an argument to the underline error handler

Both (1) and (2) call Cancellation::Cancel so IsCancelled() will always be true for both paths.

Why do we need to fetch the error here via GetError and why does not !cntx_->IsCancelled() is not enough in this context ?

I could be missing something -- I just want to make sure I understand this

Copy link
Contributor Author

@BorysTheDev BorysTheDev Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx @kostasrim you are right. Honestly, such context interface doesn't look logical to me and I thought the functions are different. your comment helps me

cntx_->ReportError(ec);
} else if (!pending_buf_.Empty() && !IsStopped()) {
AsyncWrite();
if (!cntx_->IsCancelled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we care loosing the error code here on cancellation ? Could it be that the AsyncWrite fails for some other reason ? Would it be useful to store this error or log it ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't care regarding error if we cancel streamer

@@ -59,10 +59,6 @@ class JournalStreamer {
void AsyncWrite();
void OnCompletion(std::error_code ec, size_t len);

bool IsStopped() const {
return cntx_->IsCancelled();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

@@ -91,10 +87,6 @@ class RestoreStreamer : public JournalStreamer {

void SendFinalize(long attempt);

bool IsSnapshotFinished() const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead function! nice!

@BorysTheDev BorysTheDev force-pushed the streamer_refactoring branch 2 times, most recently from 4a3cabc to 3735d47 Compare February 4, 2025 10:39
@@ -36,7 +36,9 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
SliceSlotMigration(DbSlice* slice, ServerContext server_context, SlotSet slots,
journal::Journal* journal, OutgoingMigration* om)
: ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &cntx_) {
cntx_.SwitchErrorHandler([om](auto ge) { om->Finish(std::move(ge)); });
cntx_.SwitchErrorHandler([om](auto ge) {
om->Finish(ge != errc::operation_canceled ? std::move(ge) : GenericError());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we purging operation_cancelled and replacing it with an empty GenericError ?

@kostasrim
Copy link
Contributor

kostasrim commented Feb 4, 2025

@BorysTheDev plz do not force push once a PR has comments. It's impossible to keep track of the logical changes between different commits.

  1. If nobody reviewed -> squash and force push as much as you want
  2. If there are reviews -> squash unpushed local commits but leave the previous ones unchanged

@BorysTheDev BorysTheDev changed the title refactor: simplify journal and restore streamer cancellation refactor: simplify journal and restore streamer cancellation NOT READY FOR REVIEW Feb 4, 2025
@BorysTheDev BorysTheDev removed the request for review from adiholden February 4, 2025 11:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

refactor Journal and Restore streamers
2 participants