-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add EndpointAPI.wait_until_connected_to
#86
Add EndpointAPI.wait_until_connected_to
#86
Conversation
I've got some PR stacking going on here. This contains the diff from 3 PRs, only latest commit is relevant. |
Don't know whether you intentionally didn't use this, but if you push the first two commits to this repo under a branch you can then set the base branch of this PR so that the diff only shows the last commit. Just have to remember to set the base back to master once the preceding PRs have been merged! |
@lithp since this stretches across forks, that results in the PR being only open in my fork which lacks visibility. Maybe I should just open these as drafts so that there's less confusion and they can be reviewed sequentially as they get merged and rebased. |
45159e5
to
840ff41
Compare
@pytest.mark.asyncio | ||
async def test_connect_to_endpoint(ipc_base_path): | ||
config = ConnectionConfig.from_name("server", base_path=ipc_base_path) | ||
async with AsyncioEndpoint.serve(config) as server: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love this API 😍
lahja/asyncio/endpoint.py
Outdated
finally: | ||
self._half_connections.remove(remote) | ||
if remote in self._half_connections: | ||
self._half_connections.remove(remote) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be a little cleaner to replace this finally block with another task.add_done_callback(self._half_connections.remote(remote))
inside _accept_conn
.
lahja/asyncio/endpoint.py
Outdated
try: | ||
await remote.wait_stopped() | ||
except asyncio.CancelledError: | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this suppress CancelledError
? I think that in this situation it's okay but letting it propagate up is safer.
From the docs:
cancelled() can be used to check if the Task was cancelled. The method returns True if the wrapped coroutine did not suppress the CancelledError exception and was actually cancelled.
Suppressing the error prevents callers of _handle_client
which didn't use ensure_future
from knowing they've been cancelled, and it also prevents other callers of _handle_client
from determining whether the coro stopped because of cancellation. Neither kind of caller exists right now but this still seems like unexpected behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments, esp one about suppressing CancellationError
, but no major complaints here (only looking at the last commit). One potential source of errors here is that the code which adds to _half_connections
is fairly removed from the code which removes from it (and same with _full_connections
). It'd be nice if there was some way of bringing them closer together.
4133f0f
to
4469d20
Compare
@lithp I had to make some structural changes to fix a race condition where subscription updates could end up not transmitting if they occured at a specific timing during the connection. Would like one more pass if you will. |
return f"RemoteEndpoint[{self.name if self.name is not None else id(self)}]" | ||
|
||
def __repr__(self) -> str: | ||
return f"<{self}>" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can/should extract to standalone.
lahja/asyncio/endpoint.py
Outdated
self._received_subscription.notify_all() | ||
if message.response_expected: | ||
await self.send_message(SubscriptionsAck()) | ||
if message.response_expected: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lithp can you confirm this should be how this is structured or do you think it's ok to send the Ack
after we've released the lock...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine to send the Ack
from outside the lock's block, and maybe even preferable! The Condition is there to coordinate the local process, since the remote process doesn't even know about this Condition I don't think that releasing the lock first could create any race conditions.
lahja/asyncio/endpoint.py
Outdated
while event not in self.subscribed_messages: | ||
await self.wait_until_subscription_received() | ||
async with self._received_subscription: | ||
while True: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lithp can you confirm this is ok according to your understanding of conditions. rather than release and then re-acquire at the entrance to each loop, we eliminate this and only switch on the release/acquire that happens as part of the wait
call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me!
lahja/asyncio/endpoint.py
Outdated
) | ||
) | ||
|
||
async def _connect_receiving_queue(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't intend to relocate this but I think I like it here better. Need to add some comments to deliniate the body of this class more clearly but I'll do that as a cleanup PR since re-organizing makes code hard to audit.
TODO: revert this move.
lahja/asyncio/endpoint.py
Outdated
def _notify_subscriptions_changed_nowait(self) -> None: | ||
self._subscription_updates_queue.put_nowait(None) | ||
|
||
async def _do_notify_subscriptions_changed(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: remove this method and inline it to _process_subscription_updates
since we never want anyone to call this directly.
|
||
async def _process_subscription_updates(self) -> None: | ||
self._subscription_updates_running.set() | ||
async with self._subscription_updates_condition: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lithp Needed this to ensure we don't end up with concurrent subscription updates being sent out since it's possible that they arrive out of order in that case and then the other side will have potentially in-accurate subscription records.
TOOD: add a code comment explaining this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would they arrive out of order? Domain sockets always deliver messages in order and _notify_lock
ensures there's only one update per socket happening at any time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started undoing this but then stopped as I'm inclined to leave it this way. Here's my reasoning.
- we end up needing to do subscription updates from synchronous methods with the addition of
subscribe_nowait
which seems to be a necessary API. - I'm much more comfortable with that method doing a
Queue.put_nowait
than I am adding an additional call toasyncio.ensure_future(self._notify_subscription_updates())
. - I think that the
asyncio.Condition
based approach is more appropriate and under that approach we still need a background process to listen for changes. The various branches that I have in progress have made good use of thatCondition
API.
I'm fine unrolling this if you're still not convinced at the point where the trio
based endpoint is being added to the library.
if self._process.pid is not None: | ||
os.kill(self._process.pid, signal.SIGINT) | ||
else: | ||
self._process.terminate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: extract to standalone.
4469d20
to
3f30e62
Compare
@@ -216,9 +222,13 @@ def __init__( | |||
endpoint has acknowledged the new subscription set. If ``block`` is ``False`` then this | |||
function will return immediately after the send finishes. | |||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment just above this line isn't quite right anymore: "Alert the Endpoint
which has connected to us that our subscription set has changed." Since RemoteEndpoint
now handles both inbound and outbound this should be something like "Alert the remote that this endpoint's subscription set has changed"
self._receiving_loop_running = asyncio.Event() | ||
self._receiving_queue = asyncio.Queue() | ||
|
||
self._subscription_updates_running = asyncio.Event() | ||
self._subscription_updates_condition = asyncio.Condition() | ||
self._subscription_updates_queue = asyncio.Queue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These could be initialized in __init__
which would save you from needing to declare them at the beginning of this class and would also make it easier to reason about the potential for None
dereferences.
The await *.running()
pattern sounds like it could be pulled out into a helper which removes the potentially None
instance attribute entirely. Something like await self.start_coro(self._process_subscription_updates)
which doesn't return until the coro has started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to leave this for now and open an issue to track as I've seen this issue arising as well but I don't want to conflate this PR with an extra fix that I'm not sure I know the right solution for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lahja/asyncio/endpoint.py
Outdated
while True: | ||
if event in self.subscribed_messages: | ||
return | ||
await self._received_subscription.wait() | ||
|
||
|
||
@asynccontextmanager # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As of this change _handle_client
and _handle_server
are the same method and together they constitute the only callers of run_remote_endpoint
. Maybe they could all be merged into one method (untested):
async def run_remote_endpoint(remote: RemoteEndpoint) -> None
await remote.start()
try:
await remote.wait_stopped()
finally:
await remote.stop()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't figure out from these comments what the problem with missed updates was so it's possible that this is the best solution and I'm willing to be talked into approving, but this solution is very heavyweight!
This method with _subscription_updates_condition
which uses a single coro to do the notifications has the effect of serializing all updates across all remote endpoints. It's not only a lot of code but it's also much slower than it needs to be! I think that in most cases the endpoint doesn't need to block at all, it's okay if we take a while to inform the remotes of the messages we're expecting, because endpoints already drop all messages which occurred before the endpoint started listening for them.
Blocking before returning from connect_to_endpoint
and stream
was added to make it easier to write tests. Once those methods return the caller is able to call broadcast()
on other endpoints and be sure the message will be sent.
I'm not sure what was causing the dropped updates but I'm hoping there's a different way to fix it, and adding sleep()
calls to the tests sounds preferable to the complication of putting this condition everywhere the endpoint might update a subscription, and adding a queue and a coro to allow asynchronously updating subscriptions. I think this change tips this class over into the territory where it's too scary to touch, which will slow down the API improvements I think we plan on adding in the future.
# potentially being redundant. | ||
async with self._subscription_updates_condition: | ||
await remote.notify_subscriptions_updated(self.subscribed_events) | ||
await self._add_half_connection(remote) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, okay, I see the problem now, _add_half_connection
blocks when it tries to acquire connections_changed
so there's a period in time where the remote isn't receiving subscription updates. Double-sending the same set of subscriptions has a very small cost though! And I think that trying not to send it introduced greater inefficiencies elsewhere, serializing all updates isn't cheap!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(I thought I responded to this somewhere but I don't see it)
I'm inclined to leave it this way because it correctly manages the appropriate locks on the resource, where-as the other mechanism solves the problem through indirection. First, I think the overall cost here is low because our usage patterns for the library don't involve adding new connections at a high frequency. Second, doing it the alternate way of inverting these statements seems like a solution that could easily be undone incidentally in a refactor since it's an implicit solution.
I've been futzing with this code for days so it's a bit muddled in my head but one thing that makes me prefer this approach is that it I'd "correct" in the sense that it properly maintains the necessary resource locks. The alternative one of double sending is likely also adequate in fixing the bug but it does so through some insurrection which I think means more mental overhead since it's no longer trivial to do things like grep the code for all of the places the resource is modified by looking for the places it gets locked.
Sent from ProtonMail mobile
…-------- Original Message --------
On May 28, 2019, 6:49 PM, Brian Cloutier wrote:
@lithp commented on this pull request.
---------------------------------------------------------------
In [lahja/asyncio/endpoint.py](#86 (comment)):
> self._server_tasks.add(task)
- # the Endpoint on the other end blocks until it receives this message
- await remote.notify_subscriptions_updated(self.subscribed_events)
+ await remote.wait_started()
+
+ # we **must** ensure that the subscription updates are locked between
+ # the time that we manually update this individual connection and that
+ # we place it within the set of tracked connections, otherwise, a
+ # subscription update from elsewhere can occur between the time these
+ # two statements execute resulting in the remote missing a new
+ # subscription update. Note that inverting these statements should
+ # also mitigate this, but it has the downside of the manual update
+ # potentially being redundant.
+ async with self._subscription_updates_condition:
+ await remote.notify_subscriptions_updated(self.subscribed_events)
+ await self._add_half_connection(remote)
Ahh, okay, I see the problem now, _add_half_connection blocks when it tries to acquire connections_changed so there's a period in time where the remote isn't receiving subscription updates. Double-sending the same set of subscriptions has a very small cost though! And I think that trying not to send it introduced greater inefficiencies elsewhere, serializing all updates isn't cheap!
—
You are receiving this because you authored the thread.
Reply to this email directly, [view it on GitHub](#86?email_source=notifications&email_token=AAGJHAWLNRGDNTX7BZ3TAQDPXXHJNA5CNFSM4HPO2UA2YY3PNVWWK3TUL52HS4DFWFIHK3DMKJSXC5LFON2FEZLWNFSXPKTDN5WW2ZLOORPWSZGOBZ542WQ#pullrequestreview-242994522), or [mute the thread](https://github.com/notifications/unsubscribe-auth/AAGJHAWPH3UM3ZG5XLWJNC3PXXHJNANCNFSM4HPO2UAQ).
|
@lithp lets do a call later today when we're both online and see if we can find something that works for both of us.
|
3988c00
to
4a631bc
Compare
What was wrong?
Needed a non-polling mechanism to wait for a connection.
How was it fixed?
Additive changes to our connections are now guarded with an
asyncio.Condition
. This allows implementation ofwait_until_connected_to
in a non-polling manner. After the initial connection check, the API waits for an update to the connection before checking again.Cute Animal Picture