diff --git a/src/plumpy/coordinator.py b/src/plumpy/coordinator.py index dc501c4e..702ea5f5 100644 --- a/src/plumpy/coordinator.py +++ b/src/plumpy/coordinator.py @@ -16,8 +16,10 @@ class Coordinator(Protocol): + # XXX: naming - 'add_message_handler' def add_rpc_subscriber(self, subscriber: 'RpcSubscriber', identifier: 'ID_TYPE | None' = None) -> Any: ... + # XXX: naming - 'add_broadcast_handler' def add_broadcast_subscriber( self, subscriber: 'BroadcastSubscriber', @@ -26,6 +28,7 @@ def add_broadcast_subscriber( identifier: 'ID_TYPE | None' = None, ) -> Any: ... + # XXX: naming - absorbed into 'add_message_handler' def add_task_subscriber(self, subscriber: 'TaskSubscriber', identifier: 'ID_TYPE | None' = None) -> 'ID_TYPE': ... def remove_rpc_subscriber(self, identifier: 'ID_TYPE | None') -> None: ... diff --git a/src/plumpy/rmq/communications.py b/src/plumpy/rmq/communications.py index 61f89fc1..b27b65a1 100644 --- a/src/plumpy/rmq/communications.py +++ b/src/plumpy/rmq/communications.py @@ -9,7 +9,8 @@ import kiwipy -from plumpy import futures +from plumpy.futures import create_task +from plumpy.rmq.futures import wrap_to_concurrent_future from plumpy.utils import ensure_coroutine __all__ = [ @@ -72,7 +73,7 @@ def converted(communicator: kiwipy.Communicator, *args: Any, **kwargs: Any) -> k return kiwi_future msg_fn = functools.partial(coro, communicator, *args, **kwargs) - task_future = futures.create_task(msg_fn, loop) + task_future = create_task(msg_fn, loop) return wrap_to_concurrent_future(task_future) return converted diff --git a/tests/rmq/__init__.py b/tests/rmq/__init__.py index 72078829..2845b1bb 100644 --- a/tests/rmq/__init__.py +++ b/tests/rmq/__init__.py @@ -17,10 +17,10 @@ def add_rpc_subscriber(self, subscriber, identifier=None): def add_broadcast_subscriber( self, subscriber, - subject_filter=None, + subject_filters=None, identifier=None, ): - subscriber = kiwipy.BroadcastFilter(subscriber, subject=subject_filter) + subscriber = kiwipy.BroadcastFilter(subscriber, subject=subject_filters) return self._comm.add_broadcast_subscriber(subscriber, identifier) # XXX: naming - `add_reciver_task` (can be combined with two above maybe??)