From 1cde56f05711cce352e09c175faca50f74530996 Mon Sep 17 00:00:00 2001 From: Andrey Zelenchuk Date: Thu, 19 Mar 2020 21:41:03 +0700 Subject: [PATCH 1/5] Do not ignore exceptions from `Serializer.deserialize`. --- channels_graphql_ws/graphql_ws_consumer.py | 46 +++++++++++++--------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/channels_graphql_ws/graphql_ws_consumer.py b/channels_graphql_ws/graphql_ws_consumer.py index 575ad58..db5ddc7 100644 --- a/channels_graphql_ws/graphql_ws_consumer.py +++ b/channels_graphql_ws/graphql_ws_consumer.py @@ -717,15 +717,23 @@ async def notifier(): # Assert we run in a proper thread. self._assert_thread() while True: - payload = await notification_queue.get() + serialized_payload = await notification_queue.get() + # Run a subscription's `publish` method (invoked by the # `trigger.on_next` function) within the threadpool used # for processing other GraphQL resolver functions. - # NOTE: `lambda` is important to run the deserialization + # NOTE: it is important to run the deserialization # in the worker thread as well. - await self._run_in_worker( - lambda: trigger.on_next(Serializer.deserialize(payload)) - ) + def workload(): + try: + payload = Serializer.deserialize(serialized_payload) + except Exception as ex: # pylint: disable=broad-except + trigger.on_error(f"Cannot deserialize payload. {ex}") + else: + trigger.on_next(payload) + + await self._run_in_worker(workload) + # Message processed. This allows `Queue.join` to work. notification_queue.task_done() @@ -735,23 +743,23 @@ async def notifier(): lambda publish_returned: publish_returned is not self.SKIP ) - # Start listening for broadcasts (subscribe to the Channels - # groups), spawn the notification processing task and put - # subscription information into the registry. - # NOTE: Update of `_sids_by_group` & `_subscriptions` must be - # atomic i.e. without `awaits` in between. + # Start listening for broadcasts (subscribe to the Channels + # groups), spawn the notification processing task and put + # subscription information into the registry. + # NOTE: Update of `_sids_by_group` & `_subscriptions` must be + # atomic i.e. without `awaits` in between. waitlist = [] - for group in groups: - self._sids_by_group.setdefault(group, []).append(operation_id) + for group in groups: + self._sids_by_group.setdefault(group, []).append(operation_id) waitlist.append(self._channel_layer.group_add(group, self.channel_name)) notifier_task = self._spawn_background_task(notifier()) - self._subscriptions[operation_id] = self._SubInf( - groups=groups, - sid=operation_id, - unsubscribed_callback=unsubscribed_callback, - notification_queue=notification_queue, - notifier_task=notifier_task, - ) + self._subscriptions[operation_id] = self._SubInf( + groups=groups, + sid=operation_id, + unsubscribed_callback=unsubscribed_callback, + notification_queue=notification_queue, + notifier_task=notifier_task, + ) await asyncio.wait(waitlist) From 2ece5a955c9c606dde7e1fc6b8cd557b680e6850 Mon Sep 17 00:00:00 2001 From: Andrey Zelenchuk Date: Thu, 19 Mar 2020 21:42:18 +0700 Subject: [PATCH 2/5] Introduce per-operation contexts. --- channels_graphql_ws/graphql_ws_consumer.py | 4 +-- channels_graphql_ws/operation_context.py | 33 ++++++++++++++++++++++ channels_graphql_ws/scope_as_context.py | 2 +- 3 files changed, 36 insertions(+), 3 deletions(-) create mode 100644 channels_graphql_ws/operation_context.py diff --git a/channels_graphql_ws/graphql_ws_consumer.py b/channels_graphql_ws/graphql_ws_consumer.py index db5ddc7..e6f843e 100644 --- a/channels_graphql_ws/graphql_ws_consumer.py +++ b/channels_graphql_ws/graphql_ws_consumer.py @@ -58,7 +58,7 @@ import promise import rx -from .scope_as_context import ScopeAsContext +from .operation_context import OperationContext from .serializer import Serializer # Module logger. @@ -547,7 +547,7 @@ async def _on_gql_start(self, operation_id, payload): # Create object-like context (like in `Query` or `Mutation`) # from the dict-like one provided by the Channels. - context = ScopeAsContext(self.scope) + context = OperationContext(self.scope) # Adding channel name to the context because it seems to be # useful for some use cases, take a loot at the issue from diff --git a/channels_graphql_ws/operation_context.py b/channels_graphql_ws/operation_context.py new file mode 100644 index 0000000..f4e253a --- /dev/null +++ b/channels_graphql_ws/operation_context.py @@ -0,0 +1,33 @@ +"""Just `OperationContext` class.""" + +from channels_graphql_ws.scope_as_context import ScopeAsContext + + +class OperationContext(ScopeAsContext): + """ + The context intended to use in methods of Graphene classes as `info.context`. + + This class provides two public properties: + 1. `scope` - per-connection context. This is the `scope` of Django Channels. + 2. `operation_context` - per-operation context. Empty. Fill free to store your's + data here. + + For backward compatibility: + - Method `_asdict` returns the `scope`. + - Other attributes are routed to the `scope`. + """ + + def __init__(self, scope: dict): + """Nothing interesting here.""" + super().__init__(scope) + self._operation_context: dict = {} + + @property + def scope(self) -> dict: + """Return the scope.""" + return self._scope + + @property + def operation_context(self) -> dict: + """Return the per-operation context.""" + return self._operation_context diff --git a/channels_graphql_ws/scope_as_context.py b/channels_graphql_ws/scope_as_context.py index 202a1dc..036bc60 100644 --- a/channels_graphql_ws/scope_as_context.py +++ b/channels_graphql_ws/scope_as_context.py @@ -25,7 +25,7 @@ class ScopeAsContext: """Wrapper to make Channels `scope` appear as an `info.context`.""" - def __init__(self, scope): + def __init__(self, scope: dict): """Remember given `scope`.""" self._scope = scope From b6a9d53c9c859d809601f6edb818324db27ad2bd Mon Sep 17 00:00:00 2001 From: Andrey Zelenchuk Date: Thu, 19 Mar 2020 21:57:03 +0700 Subject: [PATCH 3/5] Introduce initial payload. --- channels_graphql_ws/graphql_ws_consumer.py | 19 ++++++++++++++++++- channels_graphql_ws/subscription.py | 5 ++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/channels_graphql_ws/graphql_ws_consumer.py b/channels_graphql_ws/graphql_ws_consumer.py index e6f843e..c34645b 100644 --- a/channels_graphql_ws/graphql_ws_consumer.py +++ b/channels_graphql_ws/graphql_ws_consumer.py @@ -671,7 +671,12 @@ def register_middleware(next_middleware, root, info, *args, **kwds): await self._send_gql_complete(operation_id) async def _register_subscription( - self, operation_id, groups, publish_callback, unsubscribed_callback + self, + operation_id, + groups, + publish_callback, + unsubscribed_callback, + initial_payload, ): """Register a new subscription when client subscribes. @@ -709,6 +714,10 @@ async def _register_subscription( maxsize=self.subscription_notification_queue_limit ) + # Enqueue the initial payload. + if initial_payload is not self.SKIP: + notification_queue.put_nowait(Serializer.serialize(initial_payload)) + # Start an endless task which listens the `notification_queue` # and invokes subscription "resolver" on new notifications. async def notifier(): @@ -716,6 +725,14 @@ async def notifier(): # Assert we run in a proper thread. self._assert_thread() + + # Dirty hack to partially workaround the race between: + # 1) call to `result.subscribe` in `_on_gql_start`; and + # 2) call to `trigger.on_next` below in this function. + # The first call must be earlier. Otherwise, first one or more notifications + # may be lost. + await asyncio.sleep(1) + while True: serialized_payload = await notification_queue.get() diff --git a/channels_graphql_ws/subscription.py b/channels_graphql_ws/subscription.py index cc95756..71941d4 100644 --- a/channels_graphql_ws/subscription.py +++ b/channels_graphql_ws/subscription.py @@ -356,6 +356,7 @@ def __init_subclass_with_meta__( _meta.subscribe = get_function(subscribe) _meta.publish = get_function(publish) _meta.unsubscribed = get_function(unsubscribed) + _meta.initial_payload = options.get("initial_payload", cls.SKIP) super().__init_subclass_with_meta__(_meta=_meta, **options) @@ -422,7 +423,9 @@ def unsubscribed_callback(): # `subscribe`. return result - return register_subscription(groups, publish_callback, unsubscribed_callback) + return register_subscription( + groups, publish_callback, unsubscribed_callback, cls._meta.initial_payload + ) @classmethod def _group_name(cls, group=None): From 3846828400057ebca4eb89718f9d088e623afd1d Mon Sep 17 00:00:00 2001 From: Andrey Zelenchuk Date: Fri, 20 Mar 2020 03:13:07 +0700 Subject: [PATCH 4/5] Fix losing initial payload because of the race. --- channels_graphql_ws/graphql_ws_consumer.py | 45 +++++++++------------- 1 file changed, 19 insertions(+), 26 deletions(-) diff --git a/channels_graphql_ws/graphql_ws_consumer.py b/channels_graphql_ws/graphql_ws_consumer.py index c34645b..37ac5b5 100644 --- a/channels_graphql_ws/graphql_ws_consumer.py +++ b/channels_graphql_ws/graphql_ws_consumer.py @@ -706,9 +706,6 @@ async def _register_subscription( # `_sids_by_group` without any locks. self._assert_thread() - # The subject we will trigger on the `broadcast` message. - trigger = rx.subjects.Subject() - # The subscription notification queue. notification_queue = asyncio.Queue( maxsize=self.subscription_notification_queue_limit @@ -720,24 +717,16 @@ async def _register_subscription( # Start an endless task which listens the `notification_queue` # and invokes subscription "resolver" on new notifications. - async def notifier(): + async def notifier(observer: rx.Observer): """Watch the notification queue and notify clients.""" # Assert we run in a proper thread. self._assert_thread() - - # Dirty hack to partially workaround the race between: - # 1) call to `result.subscribe` in `_on_gql_start`; and - # 2) call to `trigger.on_next` below in this function. - # The first call must be earlier. Otherwise, first one or more notifications - # may be lost. - await asyncio.sleep(1) - while True: serialized_payload = await notification_queue.get() # Run a subscription's `publish` method (invoked by the - # `trigger.on_next` function) within the threadpool used + # `observer.on_next` function) within the threadpool used # for processing other GraphQL resolver functions. # NOTE: it is important to run the deserialization # in the worker thread as well. @@ -745,31 +734,24 @@ def workload(): try: payload = Serializer.deserialize(serialized_payload) except Exception as ex: # pylint: disable=broad-except - trigger.on_error(f"Cannot deserialize payload. {ex}") + observer.on_error(f"Cannot deserialize payload. {ex}") else: - trigger.on_next(payload) + observer.on_next(payload) await self._run_in_worker(workload) # Message processed. This allows `Queue.join` to work. notification_queue.task_done() - # Enqueue the `publish` method execution. But do not notify - # clients when `publish` returns `SKIP`. - stream = trigger.map(publish_callback).filter( # pylint: disable=no-member - lambda publish_returned: publish_returned is not self.SKIP - ) - + def push_payloads(observer: rx.Observer): # Start listening for broadcasts (subscribe to the Channels # groups), spawn the notification processing task and put # subscription information into the registry. # NOTE: Update of `_sids_by_group` & `_subscriptions` must be # atomic i.e. without `awaits` in between. - waitlist = [] for group in groups: self._sids_by_group.setdefault(group, []).append(operation_id) - waitlist.append(self._channel_layer.group_add(group, self.channel_name)) - notifier_task = self._spawn_background_task(notifier()) + notifier_task = self._spawn_background_task(notifier(observer)) self._subscriptions[operation_id] = self._SubInf( groups=groups, sid=operation_id, @@ -778,9 +760,20 @@ def workload(): notifier_task=notifier_task, ) - await asyncio.wait(waitlist) + await asyncio.wait( + [ + self._channel_layer.group_add(group, self.channel_name) + for group in groups + ] + ) - return stream + # Enqueue the `publish` method execution. But do not notify + # clients when `publish` returns `SKIP`. + return ( + rx.Observable.create(push_payloads) # pylint: disable=no-member + .map(publish_callback) + .filter(lambda publish_returned: publish_returned is not self.SKIP) + ) async def _on_gql_stop(self, operation_id): """Process the STOP message. From f2cc027f6d284a0f5ba99dfa22c53cede8e0bf37 Mon Sep 17 00:00:00 2001 From: Andrey Zelenchuk Date: Fri, 20 Mar 2020 03:21:36 +0700 Subject: [PATCH 5/5] Fix (partially) test "tests/test_concurrent.py::test_message_order_in_subscribe_unsubscribe_all_loop[False-async]". There is still a race: all calls to `unsubscribe` may finish before the subscription is registered. --- tests/test_concurrent.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_concurrent.py b/tests/test_concurrent.py index e6a2c15..ee845f4 100644 --- a/tests/test_concurrent.py +++ b/tests/test_concurrent.py @@ -732,9 +732,9 @@ async def test_message_order_in_subscribe_unsubscribe_all_loop( 'complete' message. """ - NUMBER_OF_UNSUBSCRIBE_CALLS = 50 # pylint: disable=invalid-name + NUMBER_OF_UNSUBSCRIBE_CALLS = 100 # pylint: disable=invalid-name # Delay in seconds. - DELAY_BETWEEN_UNSUBSCRIBE_CALLS = 0.01 # pylint: disable=invalid-name + DELAY_BETWEEN_UNSUBSCRIBE_CALLS = 0.02 # pylint: disable=invalid-name # Gradually stop the test if time is up. TIME_BORDER = 20 # pylint: disable=invalid-name