Skip to content

Commit

Permalink
Fixes #68: fix deadlock in base Channel class
Browse files Browse the repository at this point in the history
  • Loading branch information
dolamroth committed Feb 6, 2024
1 parent b653ab7 commit 10bc798
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 19 deletions.
1 change: 0 additions & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ jobs:
run: |
cp .env.template .env
env >> .env
ls -lah
- name: Build image
run: docker image build -t starlette-web .
Expand Down
2 changes: 1 addition & 1 deletion starlette_web/common/caches/cache_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class CacheHandler:
def __init__(self):
self._caches: Dict[str, BaseCache] = dict()

def __getitem__(self, alias):
def __getitem__(self, alias) -> BaseCache:
try:
return self._caches[alias]
except KeyError:
Expand Down
18 changes: 9 additions & 9 deletions starlette_web/common/channels/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,18 @@ async def disconnect(self) -> None:
await self._channel_layer.disconnect()

async def _listener(self) -> None:
while True:
try:
event = await self._channel_layer.next_published()
except ListenerClosed:
break
async with anyio.create_task_group() as task_group:
while True:
try:
event = await self._channel_layer.next_published()
except ListenerClosed:
break

async with self._manager_lock:
subscribers_list = list(self._subscribers.get(event.group, []))
async with self._manager_lock:
subscribers_list = list(self._subscribers.get(event.group, []))

async with anyio.create_task_group() as nursery:
for send_stream in subscribers_list:
nursery.start_soon(send_stream.send, event)
task_group.start_soon(send_stream.send, event)

async with self._manager_lock:
for group in self._subscribers.keys():
Expand Down
5 changes: 4 additions & 1 deletion starlette_web/common/channels/layers/local_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,15 @@ async def unsubscribe(self, group: str, **kwargs) -> None:
self._subscribed.remove(group)

async def publish(self, group: str, message: Any, **kwargs) -> None:
if not self._send_stream:
raise RuntimeError(".publish() requires not-null self._send_stream")

event = Event(group=group, message=message)
await self._send_stream.send(event)

async def next_published(self) -> Event:
if not self._receive_stream:
raise RuntimeError("Getting next published requires not-null self._receive_stream")
raise RuntimeError(".next_published() requires not-null self._receive_stream")

while True:
event = await self._receive_stream.receive()
Expand Down
15 changes: 8 additions & 7 deletions starlette_web/tests/contrib/test_channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ def test_channels_crossprocess(self):

async def run_command_in_process(command_name, command_args):
cmd = (
f"cd {settings.PROJECT_ROOT_DIR} && " f"{sys.executable} command.py {command_name}"
f"cd {settings.PROJECT_ROOT_DIR} && "
f"{sys.executable} command.py {command_name}"
)

if command_args:
Expand All @@ -105,24 +106,24 @@ async def run_command_in_process(command_name, command_args):
await process.wait()

async def task_coroutine():
async with anyio.create_task_group() as nursery:
nursery.cancel_scope.deadline = anyio.current_time() + 10
nursery.start_soon(
async with anyio.create_task_group() as task_group:
task_group.cancel_scope.deadline = anyio.current_time() + 10
task_group.start_soon(
run_command_in_process,
"test_channels_publisher",
[f"--group={test_group_name}"],
)
nursery.start_soon(
task_group.start_soon(
run_command_in_process,
"test_channels_subscriber",
[f"--group={test_group_name}", f"--subscriber={subscriber_1}"],
)
nursery.start_soon(
task_group.start_soon(
run_command_in_process,
"test_channels_subscriber",
[f"--group={test_group_name}", f"--subscriber={subscriber_2}"],
)
nursery.start_soon(
task_group.start_soon(
run_command_in_process,
"test_channels_subscriber",
[f"--group={test_group_name}", f"--subscriber={subscriber_3}"],
Expand Down

0 comments on commit 10bc798

Please sign in to comment.