Skip to content

Commit

Permalink
Per RustSocket eventloops
Browse files Browse the repository at this point in the history
  • Loading branch information
olijeffers0n committed Jan 11, 2023
1 parent c440c92 commit 97262e1
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 23 deletions.
11 changes: 5 additions & 6 deletions rustplus/api/base_rust_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,9 @@ async def connect(
:return: None
"""
if self.event_loop is None:
EventLoopManager._loop = asyncio.get_event_loop()
else:
EventLoopManager._loop = self.event_loop
EventLoopManager.set_loop(self.event_loop if self.event_loop is not None else asyncio.get_event_loop(),
self.server_id)

try:
if self.remote.ws is None:
await self.remote.connect(
Expand Down Expand Up @@ -215,7 +214,7 @@ async def switch_server(
self.remote.command_handler.command_options = command_options
else:
self.remote.use_commands = True
self.remote.command_handler = CommandHandler(self.command_options)
self.remote.command_handler = CommandHandler(self.command_options, self)

self.raise_ratelimit_exception = raise_ratelimit_exception

Expand Down Expand Up @@ -354,7 +353,7 @@ def entity_event_callback(future_inner: Future):
), self.server_id
)

future = asyncio.run_coroutine_threadsafe(get_entity(self, eid), EventLoopManager.get_loop())
future = asyncio.run_coroutine_threadsafe(get_entity(self, eid), EventLoopManager.get_loop(self.server_id))
future.add_done_callback(entity_event_callback)

return RegisteredListener(eid, coro)
Expand Down
8 changes: 4 additions & 4 deletions rustplus/api/remote/events/event_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,28 @@ def run_entity_event(self, name, app_message, server_id) -> None:
for handler in handlers.copy():
coro, event_type = handler.data

self._schedule_event(EventLoopManager.get_loop(), coro, EntityEvent(app_message, event_type))
self._schedule_event(EventLoopManager.get_loop(server_id), coro, EntityEvent(app_message, event_type))

def run_team_event(self, app_message, server_id) -> None:

handlers: Set[RegisteredListener] = TeamEvent.handlers.get_handlers(server_id)
for handler in handlers.copy():
coro = handler.data

self._schedule_event(EventLoopManager.get_loop(), coro, TeamEvent(app_message))
self._schedule_event(EventLoopManager.get_loop(server_id), coro, TeamEvent(app_message))

def run_chat_event(self, app_message, server_id) -> None:

handlers: Set[RegisteredListener] = ChatEvent.handlers.get_handlers(server_id)
for handler in handlers.copy():
coro = handler.data

self._schedule_event(EventLoopManager.get_loop(), coro, ChatEvent(app_message))
self._schedule_event(EventLoopManager.get_loop(server_id), coro, ChatEvent(app_message))

def run_proto_event(self, byte_data: bytes, server_id) -> None:

handlers: Set[RegisteredListener] = ProtobufEvent.handlers.get_handlers(server_id)
for handler in handlers.copy():
coro = handler.data

self._schedule_event(EventLoopManager.get_loop(), coro, ProtobufEvent(byte_data))
self._schedule_event(EventLoopManager.get_loop(server_id), coro, ProtobufEvent(byte_data))
18 changes: 13 additions & 5 deletions rustplus/api/remote/events/event_loop_manager.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
import asyncio
from ....utils import ServerID


class EventLoopManager:

_loop = None
_loop = {}

@staticmethod
def get_loop():
if EventLoopManager._loop is None:
def get_loop(server_id: ServerID) -> asyncio.AbstractEventLoop:
if EventLoopManager._loop is None or EventLoopManager._loop.get(server_id) is None:
raise RuntimeError("Event loop is not set")

if EventLoopManager._loop.is_closed():
if EventLoopManager._loop.get(server_id).is_closed():
raise RuntimeError("Event loop is not running")

return EventLoopManager._loop
return EventLoopManager._loop.get(server_id)

@staticmethod
def set_loop(loop: asyncio.AbstractEventLoop, server_id: ServerID) -> None:
EventLoopManager._loop[server_id] = loop
4 changes: 2 additions & 2 deletions rustplus/api/remote/events/map_event_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def _run(self) -> None:
try:

future = asyncio.run_coroutine_threadsafe(
self.api.get_markers(), EventLoopManager.get_loop()
self.api.get_markers(), EventLoopManager.get_loop(self.api.server_id)
)
new_highest_id = 0
for marker in future.result():
Expand Down Expand Up @@ -80,7 +80,7 @@ def _run(self) -> None:
def call_event(self, marker, is_new) -> None:
for listener in self.listeners:
asyncio.run_coroutine_threadsafe(
listener.get_coro()(MarkerEvent(marker, is_new)), EventLoopManager.get_loop()
listener.get_coro()(MarkerEvent(marker, is_new)), EventLoopManager.get_loop(self.api.server_id)
).result()


Expand Down
2 changes: 1 addition & 1 deletion rustplus/api/remote/rust_remote_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(
self.use_commands = False
else:
self.use_commands = True
self.command_handler = CommandHandler(self.command_options)
self.command_handler = CommandHandler(self.command_options, api)

self.event_handler = EventHandler()

Expand Down
2 changes: 1 addition & 1 deletion rustplus/api/remote/rustws.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def run(self) -> None:
f"{datetime.now().strftime('%d/%m/%Y %H:%M:%S')} [RustPlus.py] Connection interrupted, Retrying"
)
asyncio.run_coroutine_threadsafe(
self.connect(ignore_open_value=True), EventLoopManager.get_loop()
self.connect(ignore_open_value=True), EventLoopManager.get_loop(self.server_id)
).result()
continue
return
Expand Down
7 changes: 4 additions & 3 deletions rustplus/commands/command_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@


class CommandHandler:
def __init__(self, command_options: CommandOptions) -> None:
def __init__(self, command_options: CommandOptions, api) -> None:
self.command_options = command_options
self.commands = {}
self.api = api

def register_command(self, data: CommandData) -> None:

Expand Down Expand Up @@ -40,7 +41,7 @@ def run_command(self, message: RustChatMessage, prefix) -> None:
data = self.commands[command]

self._schedule_event(
EventLoopManager.get_loop(),
EventLoopManager.get_loop(self.api.server_id),
data.coro,
Command(
message.name,
Expand All @@ -57,7 +58,7 @@ def run_command(self, message: RustChatMessage, prefix) -> None:

if command in data.aliases or data.callable_func(command):
self._schedule_event(
EventLoopManager.get_loop(),
EventLoopManager.get_loop(self.api.server_id),
data.coro,
Command(
message.name,
Expand Down
2 changes: 1 addition & 1 deletion rustplus/conversation/conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async def start(self) -> None:
await self.send_prompt(await self._prompts[0].prompt())

def run_coro(self, coro, args):
return asyncio.run_coroutine_threadsafe(coro(*args), EventLoopManager.get_loop()).result()
return asyncio.run_coroutine_threadsafe(coro(*args), EventLoopManager.get_loop(self._api.server_id)).result()

def get_answers(self) -> List[str]:
return self._answers

0 comments on commit 97262e1

Please sign in to comment.