Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminate race conditions and fix tests #3

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions lib/Cro/ZeroMQ/Internal.pm6
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,19 @@ role Cro::ZeroMQ::Source::Impure does Cro::Source does Cro::ZeroMQ::Component::I
$!tapped = True;
my $closer = False;
my $messages = Supplier.new;
my $poller-ready = Promise.new;
start {
$poller-ready.keep;
loop {
last if $closer;
CATCH { default { .rethrow unless $closer } }
my $event = poll_one(self!socket, 100, :in);
if $event > 0 {
$messages.emit: Cro::ZeroMQ::Message.new(parts => self!socket.receivemore);
}
}
}
await $poller-ready;
whenever $messages { emit $_ }
CLOSE {
$!tapped = False;
Expand All @@ -130,15 +134,19 @@ role Cro::ZeroMQ::Source::Pure does Cro::Source does Cro::ZeroMQ::Component::Pur
my ($ictx, $isocket) = $socket ?? ($ctx, $socket) !! self!initial;
my $closer = False;
my $messages = Supplier.new;
my $poller-ready = Promise.new;
start {
$poller-ready.keep;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sure I forgot a lot of this codebase, but shouldn't the keep call be closer to the poll_one call? This PR makes the race gap smaller, but it can be even smaller, no?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would. But here is how I see it. The closest point is right before the call itself. But it would have to be like this:

unless ($already-set) { $poller-ready.keep; $already-set = True; }

I don't feel really good about sticking in a piece of code which is basically nop 99.99999% of time. Sure, moar can optimize it away, but it's not guaranteed.

Another way is to

FIRST $poller-ready.keep;

But the only difference this makes with the PR code is that it would be done right after the loop which makes too little difference to make sense.

The last option is once. It could be placed right before poll_one. But there is risk of immediate closing of supply which would result in last activated before once is ever reached – and therefore await will stuck.

Ultimately, I wonder why such a complex approach is used? Why not simple supply { loop { emit poll_one } }?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ultimately, I wonder why such a complex approach is used? Why not simple supply { loop { emit poll_one } }?

I am quite sure it's just that this code was not written with best practices on mind. If you have time and mind to rewrite this piece in a better way (not just patch) with something simpler and more robust - you are welcome.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to do it straightforward way, but the thing I missed is that when poll_one blocks – it blocks the whole supply chain. Combined with non-guaranteed message delivery by zmq in certain scenarios, I currently don't see a completely risk-free solution.

Unfortunately, I only study zmq for I possibly will have tasks for it in the future. It's unlikely I will be able to find a better solution until then. And even then it feels to me that Net::ZMQ would need to be changed too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I have no more comments on this one, maybe @jnthn has.

loop {
last if $closer;
CATCH { default { .rethrow unless $closer } }
my $event = poll_one($isocket, 100, :in);
if $event > 0 {
$messages.emit: Cro::ZeroMQ::Message.new(parts => $isocket.receivemore);
}
}
}
await $poller-ready;
whenever $messages { emit $_ }
CLOSE {
$closer = True;
Expand Down