Skip to content

Commit

Permalink
Merge pull request #3 from calgray/qachecks
Browse files Browse the repository at this point in the history
Restore QA Checks and Update Readme
  • Loading branch information
calgray authored Dec 6, 2023
2 parents 16aa19a + 31e2d11 commit c4e03d8
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 35 deletions.
15 changes: 15 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[flake8]
max-line-length = 99
exclude = tests/*,
extend-ignore =
E203,
D200,
D205,
D415,
D105,
# D1, # Uncomment to disable enforcing docstrings
required-plugins =
flake8-docstring,
docstring-convention = google
# per-file-ignores =
# tests/*:D100,D104 # Uncomment to disable enforcing module-level docstring in tests/
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ repos:
rev: 6.1.0
hooks:
- id: flake8
additional_dependencies: [flake8-docstrings, flake8-pyproject]
additional_dependencies: [flake8-docstrings]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: "v1.5.1"
hooks:
Expand Down
101 changes: 83 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,101 @@

[![Test and build](https://github.com/calgray/athreading/actions/workflows/ci.yml/badge.svg)](https://github.com/calgray/athreading/actions/workflows/ci.yml)

`athreading` is asynchronous threading library for running and synchronizing worker threads with asyncio.
`athreading` is an asynchronous threading library for running and synchronizing worker threads with asyncio.

## Usage

Although the python GIL prevents true parallelism, existing source code using synchronous sleep/wait calls can be offloaded to worker threads to avoid blocking the async I/O loop.

### Callable → Coroutine

Use `athread.call` to wrap a function/`Callable` to an async function/`Couroutine`:

#### Synchronous<!--1-->

```python
import athreading
import asyncio
import time
import datetime
from concurrent.futures import ThreadPoolExecutor
def print_sqrt(x):
time.sleep(0.5)
result = math.sqrt(x)
print(datetime.datetime.now(), result)
return result

res = (print_sqrt(2), print_sqrt(3), print_sqrt(4))
print(res)
```

output:

```log
2023-12-05 14:45:57.716696 1.4142135623730951
2023-12-05 14:45:58.217192 1.7320508075688772
2023-12-05 14:45:58.717934 2.0
(1.4142135623730951 1.7320508075688772 2.0)
```

#### Asynchronous<!--1-->

```python

async def amain():
res = await asyncio.gather(
athreading.call(print_sqrt)(2),
athreading.call(print_sqrt)(3),
athreading.call(print_sqrt)(4)
)
print(res)

asyncio.run(amain())
```

def worker(delay, n):
output:

```log
2023-12-05 14:45:59.219461 1.4142135623730951
2023-12-05 14:45:59.220492 1.7320508075688772
2023-12-05 14:45:59.221174 2.0
(1.4142135623730951 1.7320508075688772 2.0)
```

### Iterator → AsyncIterator

Use `athreading.iterate` to convert an `Iterable` interface to an `AsyncIterator` for iterating on the main thread.

#### Synchronous<!--2-->

```python
def worker(n):
for i in range(n):
time.sleep(delay)
yield datetime.datetime.now()
time.sleep(0.5)
yield datetime.datetime.now()

def print_stream(id, stream):
for value in stream:
print("thread:", id, "time:", value)

print_stream(0, worker(3))
print_stream(1, worker(3))
print_stream(2, worker(3))
print_stream(3, worker(3))
```

#### Asynchronous<!--3-->

```python
async def print_stream(id, stream):
async with stream:
async for value in stream:
print(id, value)
async with stream:
async for value in stream:
print("thread:", id, "time:", value)


async def arun():
executor = ThreadPoolExecutor(max_workers=4)
await asyncio.gather(
*[
print_stream(tid, athreading.iterate(worker(1.0, 3), executor))
for tid in range(4)
]
)
*[
print_stream(tid, athreading.iterate(worker(1.0, 3), executor))
for tid in range(4)
]
)

asyncio.run(arun())
```
Expand All @@ -55,10 +118,12 @@ output:
3 2023-12-05 09:37:17.836755
```

## Developement
## Maintenance

This is a minimal Python 3.11 application that uses [poetry](https://python-poetry.org) for packaging and dependency management. It also provides [pre-commit](https://pre-commit.com/) hooks (for [isort](https://pycqa.github.io/isort/), [Black](https://black.readthedocs.io/en/stable/), [Flake8](https://flake8.pycqa.org/en/latest/) and [mypy](https://mypy.readthedocs.io/en/stable/)) and automated tests using [pytest](https://pytest.org/) and [GitHub Actions](https://github.com/features/actions). Pre-commit hooks are automatically kept updated with a dedicated GitHub Action, this can be removed and replace with [pre-commit.ci](https://pre-commit.ci) if using an public repo. It was developed by the [Imperial College Research Computing Service](https://www.imperial.ac.uk/admin-services/ict/self-service/research-support/rcs/).

### Development

To modify, test and request changes to this repository:

1. [Download and install Poetry](https://python-poetry.org/docs/#installation) following the instructions for your OS.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ disallow_untyped_defs = false
[tool.flake8]
max-line-length = 99
exclude = [
'./tests'
'tests/*'
]
extend-ignore = [
'E203',
Expand Down
4 changes: 2 additions & 2 deletions src/athreading/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

from .callable import call
from .generator import ThreadedAsyncGenerator, generate
from .iterator import ThreadedAsyncIterator, fiterate, iterate
from .iterator import ThreadedAsyncIterator, _fiterate, iterate

__version__ = "0.1.1"


__all__ = (
"call",
"iterate",
"fiterate",
"_fiterate",
"generate",
"ThreadedAsyncIterator",
"ThreadedAsyncGenerator",
Expand Down
4 changes: 2 additions & 2 deletions src/athreading/callable.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


def call(
f: Callable[ParamsT, ReturnT],
fn: Callable[ParamsT, ReturnT],
executor: ThreadPoolExecutor | None = None,
) -> Callable[ParamsT, Coroutine[None, None, ReturnT]]:
"""Wraps a callable to a Coroutine for calling using a ThreadPoolExecutor."""
Expand All @@ -21,7 +21,7 @@ def call(
executor = executor if executor is not None else ThreadPoolExecutor()

def call_handler(*args: ParamsT.args, **kwargs: ParamsT.kwargs) -> None:
result = f(*args, **kwargs)
result = fn(*args, **kwargs)
q.put(result)
loop.call_soon_threadsafe(event.set)

Expand Down
12 changes: 7 additions & 5 deletions src/athreading/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import queue
import threading
from collections.abc import AsyncGenerator, Generator
from concurrent.futures import ThreadPoolExecutor, wait
from concurrent.futures import Future, ThreadPoolExecutor, wait
from contextlib import AbstractAsyncContextManager
from types import TracebackType
from typing import TypeVar
Expand Down Expand Up @@ -61,10 +61,11 @@ def __init__(
self._loop = asyncio.get_running_loop()
self._generator = generator
self._executor = executor if executor is not None else ThreadPoolExecutor()
self._stream_future: Future[None] | None = None

@override
async def __aenter__(self) -> ThreadedAsyncGenerator[YieldT, SendT]:
self.stream_future = self._executor.submit(self.__stream)
self._stream_future = self._executor.submit(self.__stream)
return self

@override
Expand All @@ -74,16 +75,17 @@ async def __aexit__(
__val: BaseException | None,
__tb: TracebackType | None,
) -> None:
assert self._stream_future is not None
self._event.set()
self._semaphore.release()
self._send_queue.put(None)
wait([self.stream_future])
wait([self._stream_future])

@override
async def __anext__(self) -> YieldT:
assert (
self.stream_future is not None
), "Iterator started before entering thread context"
self._stream_future is not None
), "Generator started before entering thread context"
self._send_queue.put(None)
return await self.__get()

Expand Down
8 changes: 5 additions & 3 deletions src/athreading/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import queue
import threading
from collections.abc import AsyncGenerator, AsyncIterator, Iterable
from concurrent.futures import ThreadPoolExecutor, wait
from concurrent.futures import Future, ThreadPoolExecutor, wait
from contextlib import AbstractAsyncContextManager, asynccontextmanager
from types import TracebackType
from typing import TypeVar
Expand All @@ -34,7 +34,7 @@ def iterate(


@asynccontextmanager
async def fiterate(
async def _fiterate(
iterable: Iterable[YieldT], executor: ThreadPoolExecutor | None = None
) -> AsyncGenerator[AsyncIterator[YieldT], None]:
"""Wraps a synchronous generator to an AsyncGenerator for running using a ThreadPoolExecutor.
Expand Down Expand Up @@ -116,6 +116,7 @@ def __init__(
self._queue: queue.Queue[YieldT] = queue.Queue()
self._iterable = iterable
self._executor = executor if executor is not None else ThreadPoolExecutor()
self._stream_future: Future[None] | None = None

@override
async def __aenter__(self) -> ThreadedAsyncIterator[YieldT]:
Expand All @@ -130,14 +131,15 @@ async def __aexit__(
__val: BaseException | None,
__tb: TracebackType | None,
) -> None:
assert self._stream_future is not None
self._event.set()
self._semaphore.release()
wait([self._stream_future])

async def __anext__(self) -> YieldT:
assert (
self._stream_future is not None
), "Iterator started before entering thread context"
), "Iterator started before entering context"
if not self._event.is_set() or not self._queue.empty():
await self._semaphore.acquire()
if not self._queue.empty():
Expand Down
6 changes: 3 additions & 3 deletions tests/test_iterate.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def generate(delay=0.0):
"streamcontext",
[
lambda delay: aiostream.stream.iterate(generate(delay)).stream(),
lambda delay: athreading.fiterate(generate(delay)),
lambda delay: athreading._fiterate(generate(delay)),
lambda delay: athreading.iterate(generate(delay)),
lambda delay: athreading.generate(generate(delay)),
],
Expand All @@ -42,7 +42,7 @@ async def test_threaded_async_iterate_single(streamcontext, worker_delay, main_d
@pytest.mark.parametrize(
"streamcontext",
[
lambda delay, e: athreading.fiterate(generate(delay), e),
lambda delay, e: athreading._fiterate(generate(delay), e),
lambda delay, e: athreading.iterate(generate(delay), e),
lambda delay, e: athreading.generate(generate(delay), e),
],
Expand Down Expand Up @@ -87,7 +87,7 @@ def generate_infinite(delay=0.0):
"streamcontext",
[
lambda delay: astream.iterate(generate_infinite(delay)).stream(),
lambda delay: athreading.fiterate(generate_infinite(delay)),
lambda delay: athreading._fiterate(generate_infinite(delay)),
lambda delay: athreading.iterate(generate_infinite(delay)),
lambda delay: athreading.generate(generate_infinite(delay)),
],
Expand Down

0 comments on commit c4e03d8

Please sign in to comment.