Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update for kyo-reactive-stream #1029

Merged
merged 1 commit into from
Jan 20, 2025
Merged

Conversation

HollandDM
Copy link
Contributor

/claim #1022
This is an update for post-ack era of Emit and Poll for kyo-reactive-stream

Copy link

algora-pbc bot commented Jan 18, 2025

💵 To receive payouts, sign up on Algora, link your Github account and connect with Stripe.

@HollandDM
Copy link
Contributor Author

HollandDM commented Jan 18, 2025

About the upstream confusion in the original PR #977, it was my bad to not clarify this at the moment.
StreamSubscriber, when subscribed to a flow.Publisher, will receive a flow.Subscription as the mechanism to back-pressure the publisher. When a subscriber need n elements, it would request(n) via subscription, and it is guaranteed to receive n onNext() message, or the finalize message onComplete/onError. It's also the job of Subscriber to call cancel on Subscription when it's not needed anymore. So the upstream I was talking about was the subscription, not the publisher. Publisher remain intact whether subscriber done/fail or not (there is a testcase for this).
Anw, I've re-added the removed testcase, and change it description to be clearer in it's meaning

}
case Result.Failure(_) => IO(Loop.done(StreamFinishState.StreamCanceled))
case Result.Panic(exception) => Abort.panic(exception).andThen(Loop.done(StreamFinishState.StreamCanceled))
case Result.Failure(_) =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems these two cases could be simplified to:

case result => Abort.get(result)

case _ => IO.unit
case Result.Success(StreamComplete) => IO(subscriber.onComplete())
case Result.Panic(e) => IO(subscriber.onError(e))
case _ => IO.unit
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case catches Result.Faiure(StreamCanceled), is that the intent? Doesn't the subscriber need to be notified?

Copy link
Contributor Author

@HollandDM HollandDM Jan 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intended, cancel can only be called by subscribers, so they should know already. Notifying in here will actually fail a test in reactive-stream-tck

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change this into a concrete branch to be clearer

@HollandDM HollandDM force-pushed the update-interop-stream branch from 161d78d to c61f5e6 Compare January 19, 2025 07:28
@HollandDM
Copy link
Contributor Author

Updated!

@HollandDM HollandDM requested a review from fwbrasil January 19, 2025 07:29
@fwbrasil fwbrasil merged commit f084290 into getkyo:main Jan 20, 2025
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants