Skip to content

Commit

Permalink
rebase amend
Browse files Browse the repository at this point in the history
  • Loading branch information
unkcpz committed Dec 30, 2024
1 parent 667f299 commit d1f585a
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 4 deletions.
3 changes: 3 additions & 0 deletions src/plumpy/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@


class Coordinator(Protocol):
# XXX: naming - 'add_message_handler'
def add_rpc_subscriber(self, subscriber: 'RpcSubscriber', identifier: 'ID_TYPE | None' = None) -> Any: ...

# XXX: naming - 'add_broadcast_handler'
def add_broadcast_subscriber(
self,
subscriber: 'BroadcastSubscriber',
Expand All @@ -26,6 +28,7 @@ def add_broadcast_subscriber(
identifier: 'ID_TYPE | None' = None,
) -> Any: ...

# XXX: naming - absorbed into 'add_message_handler'
def add_task_subscriber(self, subscriber: 'TaskSubscriber', identifier: 'ID_TYPE | None' = None) -> 'ID_TYPE': ...

def remove_rpc_subscriber(self, identifier: 'ID_TYPE | None') -> None: ...
Expand Down
5 changes: 3 additions & 2 deletions src/plumpy/rmq/communications.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

import kiwipy

from plumpy import futures
from plumpy.futures import create_task
from plumpy.rmq.futures import wrap_to_concurrent_future
from plumpy.utils import ensure_coroutine

__all__ = [
Expand Down Expand Up @@ -72,7 +73,7 @@ def converted(communicator: kiwipy.Communicator, *args: Any, **kwargs: Any) -> k
return kiwi_future

msg_fn = functools.partial(coro, communicator, *args, **kwargs)
task_future = futures.create_task(msg_fn, loop)
task_future = create_task(msg_fn, loop)
return wrap_to_concurrent_future(task_future)

return converted
Expand Down
4 changes: 2 additions & 2 deletions tests/rmq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ def add_rpc_subscriber(self, subscriber, identifier=None):
def add_broadcast_subscriber(
self,
subscriber,
subject_filter=None,
subject_filters=None,
identifier=None,
):
subscriber = kiwipy.BroadcastFilter(subscriber, subject=subject_filter)
subscriber = kiwipy.BroadcastFilter(subscriber, subject=subject_filters)
return self._comm.add_broadcast_subscriber(subscriber, identifier)

# XXX: naming - `add_reciver_task` (can be combined with two above maybe??)
Expand Down

0 comments on commit d1f585a

Please sign in to comment.