From dba647bb277314854b6a089623141550a0039f17 Mon Sep 17 00:00:00 2001 From: jcass77 Date: Thu, 30 Jul 2020 17:59:12 +0200 Subject: [PATCH 1/8] Prepare release v0.15.1 --- docs/changelog.md | 7 +++++++ wtfix/apps/sessions.py | 5 ----- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/docs/changelog.md b/docs/changelog.md index 9358bca..ec6e11a 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -3,6 +3,13 @@ This changelog is used to track all major changes to WTFIX. +## v0.15.2 (UNRELEASED) + +**Fixes** + +- client_session: Don't wait for `writer` to close when shutting down in order to avoid hangs due to network errors. + + ## v0.15.1 (2020-07-28) **Fixes** diff --git a/wtfix/apps/sessions.py b/wtfix/apps/sessions.py index b67eaa5..5fcf5b5 100644 --- a/wtfix/apps/sessions.py +++ b/wtfix/apps/sessions.py @@ -177,7 +177,6 @@ async def stop(self, *args, **kwargs): ) self.writer.close() - await self.writer.wait_closed() logger.info(f"{self.name}: Session closed!") async def listen(self): @@ -248,10 +247,6 @@ async def listen(self): # Cancellation request received - close writer logger.info(f"{self.name}: {asyncio.current_task().get_name()} cancelled!") - except Exception as e: - logger.error(f"{self.name}: Unexpected error {e}. Initiating shutdown...") - asyncio.create_task(self.pipeline.stop()) - async def on_send(self, message): """ Writes an encoded message to the StreamWriter. From 5e336e5ce716484b389c4d2ffab56689b6df84ca Mon Sep 17 00:00:00 2001 From: jcass77 Date: Sat, 1 Aug 2020 04:44:01 +0200 Subject: [PATCH 2/8] enh(tasks): Tweak app stopping tasks. --- run_client.py | 24 +++++++++++++----------- wtfix/apps/brokers.py | 12 ++++++------ wtfix/apps/sessions.py | 18 +++++++++--------- 3 files changed, 28 insertions(+), 26 deletions(-) diff --git a/run_client.py b/run_client.py index 9538164..d0ee5ae 100644 --- a/run_client.py +++ b/run_client.py @@ -43,19 +43,22 @@ help=f"reset sequence numbers and start a new session", ) +_shutting_down = asyncio.Event() + async def graceful_shutdown(pipeline, sig_name=None): + if _shutting_down.is_set(): + # Only try to shut down once + return + + _shutting_down.set() + if sig_name is not None: logger.info(f"Received signal {sig_name}! Initiating graceful shutdown...") else: logger.info(f"Initiating graceful shutdown...") - try: - await pipeline.stop() - - except asyncio.exceptions.CancelledError as e: - logger.error(f"Cancelled: connection terminated abnormally! ({e})") - sys.exit(os.EX_UNAVAILABLE) + await pipeline.stop() async def main(): @@ -65,6 +68,7 @@ async def main(): ) args = parser.parse_args() + exit_code = os.EX_UNAVAILABLE with connection_manager(args.connection) as conn: fix_pipeline = BasePipeline( @@ -89,26 +93,24 @@ async def main(): logger.error(e) # User needs to fix config issue before restart is attempted. Set os.EX_OK so that system process # monitors like Supervisor do not attempt a restart immediately. - sys.exit(os.EX_OK) + exit_code = os.EX_OK except KeyboardInterrupt: logger.info("Received keyboard interrupt! Initiating shutdown...") - sys.exit(os.EX_OK) + exit_code = os.EX_OK except asyncio.exceptions.TimeoutError as e: logger.error(e) - sys.exit(os.EX_UNAVAILABLE) except asyncio.exceptions.CancelledError as e: logger.error(f"Cancelled: connection terminated abnormally! ({e})") - sys.exit(os.EX_UNAVAILABLE) except Exception as e: logger.exception(e) - sys.exit(os.EX_UNAVAILABLE) finally: await graceful_shutdown(fix_pipeline) + sys.exit(exit_code) if __name__ == "__main__": diff --git a/wtfix/apps/brokers.py b/wtfix/apps/brokers.py index ece3c21..120285a 100644 --- a/wtfix/apps/brokers.py +++ b/wtfix/apps/brokers.py @@ -58,6 +58,12 @@ async def _send_channel_reader(self): # Cancellation request received - close connections.... logger.info(f"{self.name}: {asyncio.current_task().get_name()} cancelled!") + with await self.redis_pool as conn: + await conn.unsubscribe(self.SEND_CHANNEL) + + self.redis_pool.close() + await self.redis_pool.wait_closed() # Closing all open connections + except aioredis.ChannelClosedError: # Shutting down... logger.info(f"{self.name}: Unsubscribed from {send_channel.name}.") @@ -80,9 +86,3 @@ async def stop(self, *args, **kwargs): if self._channel_reader_task is not None: self._channel_reader_task.cancel() await self._channel_reader_task - - with await self.redis_pool as conn: - await conn.unsubscribe(self.SEND_CHANNEL) - - self.redis_pool.close() - await self.redis_pool.wait_closed() # Closing all open connections diff --git a/wtfix/apps/sessions.py b/wtfix/apps/sessions.py index 5fcf5b5..47416da 100644 --- a/wtfix/apps/sessions.py +++ b/wtfix/apps/sessions.py @@ -170,15 +170,6 @@ async def stop(self, *args, **kwargs): self._listener_task.cancel() await self._listener_task - if self.writer is not None: - logger.info( - f"{self.name}: Initiating disconnect from " - f"{self.pipeline.settings.HOST}:{self.pipeline.settings.PORT}..." - ) - - self.writer.close() - logger.info(f"{self.name}: Session closed!") - async def listen(self): """ Listen for new messages that are sent by the server. @@ -247,6 +238,15 @@ async def listen(self): # Cancellation request received - close writer logger.info(f"{self.name}: {asyncio.current_task().get_name()} cancelled!") + if self.writer is not None: + logger.info( + f"{self.name}: Initiating disconnect from " + f"{self.pipeline.settings.HOST}:{self.pipeline.settings.PORT}..." + ) + + self.writer.close() + logger.info(f"{self.name}: Session closed!") + async def on_send(self, message): """ Writes an encoded message to the StreamWriter. From 4ef48802692044300cf530fc403488ff8bbbdd6f Mon Sep 17 00:00:00 2001 From: jcass77 Date: Sat, 1 Aug 2020 05:21:36 +0200 Subject: [PATCH 3/8] enh(tasks): Tweak app stopping tasks. --- wtfix/apps/admin.py | 38 +++++++------- wtfix/apps/brokers.py | 24 ++++----- wtfix/apps/sessions.py | 109 ++++++++++++++++++++--------------------- 3 files changed, 85 insertions(+), 86 deletions(-) diff --git a/wtfix/apps/admin.py b/wtfix/apps/admin.py index 6daf45e..8988607 100644 --- a/wtfix/apps/admin.py +++ b/wtfix/apps/admin.py @@ -148,6 +148,9 @@ async def stop(self, *args, **kwargs): loop = asyncio.get_running_loop() for cancel_task in cancel_tasks: + if cancel_task.cancelled(): + logger.info(f"{self.name}: {cancel_task.get_name()} cancelled!") + continue if cancel_task.exception() is not None: loop.call_exception_handler( { @@ -166,27 +169,22 @@ async def heartbeat_monitor( :timer: The timer to use as reference against the heartbeat interval :interval_exceeded_response: The response to take if the interval is exceeded. Must be an awaitable. """ - try: - while not self._server_not_responding.is_set(): - # Keep sending heartbeats until the server stops responding. - await asyncio.sleep( - self.seconds_to_next_check(timer) - ) # Wait until the next scheduled heartbeat check. - - if self.seconds_to_next_check(timer) == 0: - # Heartbeat exceeded, send response - await interval_exceeded_response() - - # No response received, force logout! - logger.error( - f"{self.name}: No response received for test request '{self._test_request_id}', " - f"initiating shutdown..." - ) - asyncio.create_task(self.pipeline.stop()) + while not self._server_not_responding.is_set(): + # Keep sending heartbeats until the server stops responding. + await asyncio.sleep( + self.seconds_to_next_check(timer) + ) # Wait until the next scheduled heartbeat check. - except asyncio.exceptions.CancelledError: - # Cancellation request received - logger.info(f"{self.name}: {asyncio.current_task().get_name()} cancelled!") + if self.seconds_to_next_check(timer) == 0: + # Heartbeat exceeded, send response + await interval_exceeded_response() + + # No response received, force logout! + logger.error( + f"{self.name}: No response received for test request '{self._test_request_id}', " + f"initiating shutdown..." + ) + asyncio.create_task(self.pipeline.stop()) async def send_test_request(self): """ diff --git a/wtfix/apps/brokers.py b/wtfix/apps/brokers.py index 120285a..f8101cf 100644 --- a/wtfix/apps/brokers.py +++ b/wtfix/apps/brokers.py @@ -54,16 +54,6 @@ async def _send_channel_reader(self): self.send(message) ) # Pass message on to pipeline - except asyncio.exceptions.CancelledError: - # Cancellation request received - close connections.... - logger.info(f"{self.name}: {asyncio.current_task().get_name()} cancelled!") - - with await self.redis_pool as conn: - await conn.unsubscribe(self.SEND_CHANNEL) - - self.redis_pool.close() - await self.redis_pool.wait_closed() # Closing all open connections - except aioredis.ChannelClosedError: # Shutting down... logger.info(f"{self.name}: Unsubscribed from {send_channel.name}.") @@ -85,4 +75,16 @@ async def stop(self, *args, **kwargs): if self._channel_reader_task is not None: self._channel_reader_task.cancel() - await self._channel_reader_task + try: + await self._channel_reader_task + except asyncio.exceptions.CancelledError: + # Cancellation request received - close connections.... + logger.info( + f"{self.name}: {self._channel_reader_task.get_name()} cancelled!" + ) + + with await self.redis_pool as conn: + await conn.unsubscribe(self.SEND_CHANNEL) + + self.redis_pool.close() + await self.redis_pool.wait_closed() # Closing all open connections diff --git a/wtfix/apps/sessions.py b/wtfix/apps/sessions.py index 47416da..c65f1ea 100644 --- a/wtfix/apps/sessions.py +++ b/wtfix/apps/sessions.py @@ -168,7 +168,20 @@ async def stop(self, *args, **kwargs): logger.info(f"{self.name}: Cancelling listener task...") self._listener_task.cancel() - await self._listener_task + try: + await self._listener_task + except asyncio.exceptions.CancelledError: + # Cancellation request received - close writer + logger.info(f"{self.name}: {self._listener_task.get_name()} cancelled!") + + if self.writer is not None: + logger.info( + f"{self.name}: Initiating disconnect from " + f"{self.pipeline.settings.HOST}:{self.pipeline.settings.PORT}..." + ) + + self.writer.close() + logger.info(f"{self.name}: Session closed!") async def listen(self): """ @@ -184,68 +197,54 @@ async def listen(self): data = [] - try: - while not self.writer.is_closing(): # Listen forever for new messages - try: - # Try to read a complete message. - data = await self.reader.readuntil( - begin_string - ) # Detect beginning of message. - # TODO: should there be a timeout for reading an entire message? - data += await self.reader.readuntil( - checksum_start - ) # Detect start of checksum field. - data += await self.reader.readuntil( - settings.SOH - ) # Detect final message delimiter. - - await self.pipeline.receive(data) - data = None - - except IncompleteReadError as e: - # Connection was closed before a complete message could be received. - if ( + while not self.writer.is_closing(): # Listen forever for new messages + try: + # Try to read a complete message. + data = await self.reader.readuntil( + begin_string + ) # Detect beginning of message. + # TODO: should there be a timeout for reading an entire message? + data += await self.reader.readuntil( + checksum_start + ) # Detect start of checksum field. + data += await self.reader.readuntil( + settings.SOH + ) # Detect final message delimiter. + + await self.pipeline.receive(data) + data = None + + except IncompleteReadError as e: + # Connection was closed before a complete message could be received. + if ( + data + and utils.encode( + f"{connection.protocol.Tag.MsgType}={connection.protocol.MsgType.Logout}" + ) + + settings.SOH + in data + ): + await self.pipeline.receive( data - and utils.encode( - f"{connection.protocol.Tag.MsgType}={connection.protocol.MsgType.Logout}" - ) - + settings.SOH - in data - ): - await self.pipeline.receive( - data - ) # Process logout message in the pipeline as per normal - - raise e - - else: - logger.error( - f"{self.name}: Unexpected EOF waiting for next chunk of partial data " - f"'{utils.decode(e.partial)}' ({e})." - ) - - raise e - - except LimitOverrunError as e: - # Buffer limit reached before a complete message could be read - abort! + ) # Process logout message in the pipeline as per normal + + raise e + + else: logger.error( - f"{self.name}: Stream reader buffer limit exceeded! ({e})." + f"{self.name}: Unexpected EOF waiting for next chunk of partial data " + f"'{utils.decode(e.partial)}' ({e})." ) raise e - except asyncio.exceptions.CancelledError: - # Cancellation request received - close writer - logger.info(f"{self.name}: {asyncio.current_task().get_name()} cancelled!") - - if self.writer is not None: - logger.info( - f"{self.name}: Initiating disconnect from " - f"{self.pipeline.settings.HOST}:{self.pipeline.settings.PORT}..." + except LimitOverrunError as e: + # Buffer limit reached before a complete message could be read - abort! + logger.error( + f"{self.name}: Stream reader buffer limit exceeded! ({e})." ) - self.writer.close() - logger.info(f"{self.name}: Session closed!") + raise e async def on_send(self, message): """ From 0e1188519a2b5493ece94903c54cd3dcec32dfb0 Mon Sep 17 00:00:00 2001 From: jcass77 Date: Tue, 4 Aug 2020 05:09:51 +0200 Subject: [PATCH 4/8] refactor: Prefer using `create_task` to old style `ensure_future`. --- setup.py | 2 +- wtfix/apps/api/rest.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index 21f5069..ceb33dd 100644 --- a/setup.py +++ b/setup.py @@ -9,7 +9,7 @@ setup( name="wtfix", - version="0.15.1", + version="0.15.2", author="John Cass", author_email="john.cass77@gmail.com", description="The Pythonic Financial Information eXchange (FIX) client for humans.", diff --git a/wtfix/apps/api/rest.py b/wtfix/apps/api/rest.py index 5fae7fb..d069d60 100644 --- a/wtfix/apps/api/rest.py +++ b/wtfix/apps/api/rest.py @@ -59,8 +59,7 @@ def post(self): args = self.parser.parse_args() message = decoders.from_json(args["message"]) - loop = asyncio.get_event_loop() - asyncio.ensure_future(self.app.send(message), loop=loop) + asyncio.create_task(self.app.send(message)) return JsonResultResponse( True, From 919327ec3e323f026b196cae61f97575d4400e14 Mon Sep 17 00:00:00 2001 From: jcass77 Date: Tue, 4 Aug 2020 05:16:17 +0200 Subject: [PATCH 5/8] refactor: Prefer using `create_task` to old style `ensure_future`. --- run_client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/run_client.py b/run_client.py index d0ee5ae..b007e48 100644 --- a/run_client.py +++ b/run_client.py @@ -49,6 +49,7 @@ async def graceful_shutdown(pipeline, sig_name=None): if _shutting_down.is_set(): # Only try to shut down once + logger.warning(f"Shutdown already in progress! Ignoring signal '{sig_name}'.") return _shutting_down.set() @@ -82,7 +83,7 @@ async def main(): for sig_name in {"SIGINT", "SIGTERM"}: loop.add_signal_handler( getattr(signal, sig_name), - lambda: asyncio.ensure_future( + lambda: asyncio.create_task( graceful_shutdown(fix_pipeline, sig_name=sig_name) ), ) From 3ac27c612c8c19a368096d68d153d30d4278da3e Mon Sep 17 00:00:00 2001 From: jcass77 Date: Wed, 5 Aug 2020 09:33:14 +0200 Subject: [PATCH 6/8] wip: Resolve process hangs during shutdown. --- wtfix/apps/sessions.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/wtfix/apps/sessions.py b/wtfix/apps/sessions.py index c65f1ea..4d0df3a 100644 --- a/wtfix/apps/sessions.py +++ b/wtfix/apps/sessions.py @@ -228,15 +228,13 @@ async def listen(self): data ) # Process logout message in the pipeline as per normal - raise e - else: logger.error( f"{self.name}: Unexpected EOF waiting for next chunk of partial data " f"'{utils.decode(e.partial)}' ({e})." ) - raise e + asyncio.create_task(self.pipeline.stop()) except LimitOverrunError as e: # Buffer limit reached before a complete message could be read - abort! @@ -244,7 +242,7 @@ async def listen(self): f"{self.name}: Stream reader buffer limit exceeded! ({e})." ) - raise e + asyncio.create_task(self.pipeline.stop()) async def on_send(self, message): """ From 53c30062abdba957850a0d6658cabd369ad3cbac Mon Sep 17 00:00:00 2001 From: jcass77 Date: Wed, 5 Aug 2020 09:42:18 +0200 Subject: [PATCH 7/8] wip: Resolve process hangs during shutdown. --- wtfix/apps/sessions.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/wtfix/apps/sessions.py b/wtfix/apps/sessions.py index 4d0df3a..ac5376b 100644 --- a/wtfix/apps/sessions.py +++ b/wtfix/apps/sessions.py @@ -234,7 +234,9 @@ async def listen(self): f"'{utils.decode(e.partial)}' ({e})." ) + # Stop listening for messages asyncio.create_task(self.pipeline.stop()) + break except LimitOverrunError as e: # Buffer limit reached before a complete message could be read - abort! @@ -242,7 +244,9 @@ async def listen(self): f"{self.name}: Stream reader buffer limit exceeded! ({e})." ) + # Stop listening for messages asyncio.create_task(self.pipeline.stop()) + break async def on_send(self, message): """ From df7ce54ac249dc6d9961a534f39bb24e4d5cc72c Mon Sep 17 00:00:00 2001 From: jcass77 Date: Wed, 5 Aug 2020 09:52:38 +0200 Subject: [PATCH 8/8] Prepare release v0.15.2 --- docs/changelog.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/changelog.md b/docs/changelog.md index ec6e11a..2ace2fb 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -3,11 +3,13 @@ This changelog is used to track all major changes to WTFIX. -## v0.15.2 (UNRELEASED) +## v0.15.2 (2020-08-05) **Fixes** -- client_session: Don't wait for `writer` to close when shutting down in order to avoid hangs due to network errors. +- `client_session`: Don't wait for `writer` to close when shutting down in order to avoid hangs due to network errors. +- Use the recommended `asyncio.create_task` to create new Tasks, which is preferred to `asyncio.ensure_future`. +- Fix issue that caused the `client_session` listener task to hang during shutdown. ## v0.15.1 (2020-07-28)