Skip to content

Commit

Permalink
Export plumpy.futures.Future
Browse files Browse the repository at this point in the history
  • Loading branch information
unkcpz committed Jan 17, 2025
1 parent 39d363f commit 4972dd2
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 15 deletions.
12 changes: 1 addition & 11 deletions src/plumpy/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import contextlib
from typing import Any, Awaitable, Callable, Generator, Optional

__all__ = ['CancellableAction', 'capture_exceptions', 'create_task', 'create_task']
__all__ = ['CancellableAction', 'Future', 'capture_exceptions', 'create_task', 'create_task']


class InvalidFutureError(Exception):
Expand Down Expand Up @@ -78,14 +78,4 @@ def create_task(coro: Callable[[], Awaitable[Any]], loop: Optional[asyncio.Abstr
"""
loop = loop or asyncio.get_event_loop()

# future = loop.create_future()
#
# async def run_task() -> None:
# with capture_exceptions(future):
# res = await coro()
# future.set_result(res)
#
# asyncio.run_coroutine_threadsafe(run_task(), loop)
# return future

return asyncio.wrap_future(asyncio.run_coroutine_threadsafe(coro(), loop))
2 changes: 2 additions & 0 deletions src/plumpy/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ def init(self) -> None:

try:
# filter out state change broadcasts
# XXX: remove dep on kiwipy
subscriber = kiwipy.BroadcastFilter(self.broadcast_receive, subject=re.compile(r'^(?!state_changed).*'))
identifier = self._coordinator.add_broadcast_subscriber(subscriber, identifier=str(self.pid))
# identifier = self._coordinator.add_broadcast_subscriber(
Expand Down Expand Up @@ -1332,6 +1333,7 @@ async def step(self) -> None:
self._stepping = True
next_state = None
try:
# XXX: debug log when need to step to next state
next_state = await self._run_task(self._state.execute)
except process_states.Interruption as exception:
# If the interruption was caused by a call to a Process method then there should
Expand Down
21 changes: 17 additions & 4 deletions src/plumpy/rmq/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
# -*- coding: utf-8 -*-
# mypy: disable-error-code=name-defined
from .communications import *
from .futures import *
from .process_control import *
from .communications import Communicator, DeliveryFailed, RemoteException, TaskRejected, wrap_communicator
from .futures import unwrap_kiwi_future, wrap_to_concurrent_future
from .process_control import RemoteProcessController, RemoteProcessThreadController

__all__ = communications.__all__ + futures.__all__ + process_control.__all__
__all__ = [
# communications
'Communicator',
'DeliveryFailed',
'RemoteException',
# process_control
'RemoteProcessController',
'RemoteProcessThreadController',
'TaskRejected',
# futures
'unwrap_kiwi_future',
'wrap_communicator',
'wrap_to_concurrent_future',
]

0 comments on commit 4972dd2

Please sign in to comment.