-
Notifications
You must be signed in to change notification settings - Fork 155
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merge provides all elements from the subsequences on cancellation #276
base: main
Are you sure you want to change the base?
Merge provides all elements from the subsequences on cancellation #276
Conversation
We should apply the same to zip, debounce and co |
let (stream2, continuation2) = AsyncStream.makeStream(of: Int.self) | ||
continuation1.onTermination = { reason in | ||
XCTAssertEqual(reason, .cancelled) | ||
continuation1.yield(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems a bit off; it is yielding a value AFTER it has transitioned to a terminal state? It would be perfectly reasonable for the stream to no longer produce values once it has moved to a terminal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was actually surprised by this behaviour as well and I don't know if this is intentional in AsyncStream
. I expected the onTermination
to be called after we returned nil
the consumer. This allows to inject new values before cancellation finishes :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess that is perhaps the cooperative nature of cancellation - it eventually happens but may not happen immediately
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do agree that cancellation can be cooperative but having users rely on this behaviour seems rather brittle. Nothing in the API nor in the proposal specifies this behaviour. I will include this in the new back pressured interface proposal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is reasonable, that users have the option to yield something before cancellation. If it fits with current naming 🤷, but I think the feature in itself is very important.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I disagree on the premise that user's should be able to yield
something before cancellation and my new proposed APIs for AsyncStream
actually disallow this but it has a future direction that lays out how we can enable this again. The reason why I think the current APIs are a bit wonky here is that you can yield something in onTermination
but you cannot "stop" the cancellation. So we are somewhere in the middle where we allow users to produce something but do not give them full control like withTaskCancellationHandler
does.
Overall, not important to this PR but a general discussion for the forums.
@@ -41,7 +41,8 @@ struct MergeStateMachine< | |||
buffer: Deque<Element>, | |||
upstreamContinuations: [UnsafeContinuation<Void, Error>], | |||
upstreamsFinished: Int, | |||
downstreamContinuation: UnsafeContinuation<Element?, Error>? | |||
downstreamContinuation: UnsafeContinuation<Element?, Error>?, | |||
cancelled: Bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this just be another state case?
if so it wouldn't need to hold onto the Task
which means that the closure (which captures the bases) would no longer be held on to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we make this another state case, we will have a lot of duplicated code that is shared between merging
and cancelled
. In the interest in keeping code duplication minimal, IMO we should not create a separate case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. I think we should just leave it with the current case
@swift-ci please test |
@phausler, @FranzBusch is there anything outstanding? Can we merge this? |
d67e0b8
to
284e934
Compare
@swift-ci please test |
This looks good from my side. The CI failure is due to a broken nightly toolchain. @phausler If you are happy please merge it. |
On cancellation, merge currently does not yield all elements. This leads to situations in which the final elements of AsyncStreams are not forwarded to the user. This patch ensures, that only the underlying Task is cancelled and all subsequences' elements are forwarded to the user.
284e934
to
a9a02a4
Compare
On cancellation, merge currently does not yield all elements. This leads to situations in which the final elements of AsyncStreams are not forwarded to the user. This patch ensures, that only the underlying Task is cancelled and all subsequences' elements are forwarded to the user.