Skip to content

Commit

Permalink
Fixes #77: documented possible backpressure problem in Channels
Browse files Browse the repository at this point in the history
  • Loading branch information
dolamroth committed Feb 9, 2024
1 parent 33c91cc commit 41ddbe5
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 23 deletions.
39 changes: 18 additions & 21 deletions docs/common/channels.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,37 +92,34 @@ async with Channel(InMemoryChannelLayer()) as channel:

If you want to publish messages and guarantee, that recipient has got them, you need to use
broker that allows acknowledgement for messages. Again, this is not provided by default, and
you'll have to define a custom channel layer for this purpose. For an example, see kafka
backend and proposed mqtt backend for `encode/broadcaster`:
you'll have to define a custom channel layer for this purpose.
For an example, see kafka backend for `encode/broadcaster`:

- https://github.com/encode/broadcaster/blob/956571d030d33d6cb820758ec5ed8fe79c3288c6/broadcaster/_backends/kafka.py
- https://github.com/encode/broadcaster/blob/0e48df3a129998036b2e454b5b54e5dddc00d8dc/broadcaster/_backends/mqtt.py

For Redis, use redis Streams which support acknowledgment:
- https://redis.io/commands/xack/
- https://github.com/encode/broadcaster/blob/3cfcc8b41339862b1f5d50f42ab027bcae92d78c/broadcaster/_backends/redis_stream.py

## Limitations & Caveats

Channels cannot be instantiated project-wise, in the same way as caches.
In `channels`, the channel layer is instantiated for the whole duration of `async with` block,
and holds a set of memory streams, which it uses to fire messages to subscribers.
**Subscribers may be instantiated in a different thread**, then the main application,
which goes against the fact that all `anyio` operations are **by-design not threadsafe**.
#### Channel initialization

The exact reason boils down to the fact, that async Queues, which are used for synchronization,
depend on async Events, which do not function between different event-loops/threads.
A cross-thread/cross-event-loop pub-sub would require using synchronous `threading.Queue` instead,
which is out of scope of responsibility of `starlette_web.common.channels`.
It is preferable to **only use channels as async context manager**,
since it registers its own `anyio.TaskGroup`.
In some cases, like websockets, when you need to control channel creation and deletion,
there are available synchronisation mechanisms with `anyio.Event`.

In comparison, this is not an issue for caches, since all operations in caches are atomic
and do not require pair-wise synchronization.
#### Backpressure

In practice, this means, that you have to instantiate channels with `async with` block,
every time you need to use them. See `starlette_web.tests.contrib.test_channels.TestChannelLayers`
for examples of usage.
Built-in `InMemoryChannelLayer` may be prone to
[backpressure](https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/#bug-1-backpressure)
problem, if publish happens much more often, than listening to messages. In case of other channel layers,
backpressure is theoretically possible on the broker side. This is not something you should expect on a
daily basis (it concerns cases with 10k+ messages per minute),
but it is preferable to design exchange with relatively short messages.

It is preferable to **only use channels as async context manager**, since it registers its own `anyio.TaskGroup`.
In some cases, when you need to split channel creation and deletion, like with websockets, there are available
synchronisation mechanisms with `anyio.Event`. See websocket chat test
`starlette_web.tests.views.websocket.ChatWebsocketTestEndpoint` for example of usage.
## Examples

- `starlette_web.tests.contrib.test_channels.TestChannelLayers`
- `starlette_web.tests.views.websocket.ChatWebsocketTestEndpoint`
9 changes: 7 additions & 2 deletions starlette_web/common/channels/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,13 @@ async def publish(self, group: str, message: Any, **kwargs) -> None:
await self._channel_layer.publish(group, message, **kwargs)

@asynccontextmanager
async def subscribe(self, group: str, **kwargs) -> AsyncGenerator["Subscriber", None]:
send_stream, receive_stream = anyio.create_memory_object_stream()
async def subscribe(
self,
group: str,
max_buffer_size: float = 0,
**kwargs,
) -> AsyncGenerator["Subscriber", None]:
send_stream, receive_stream = anyio.create_memory_object_stream(max_buffer_size)

try:
async with self._manager_lock:
Expand Down

0 comments on commit 41ddbe5

Please sign in to comment.