Skip to content

Commit

Permalink
Release v0.16.0
Browse files Browse the repository at this point in the history
  • Loading branch information
jcass77 committed Sep 11, 2020
2 parents 96b14ef + 61b84da commit 5f14788
Show file tree
Hide file tree
Showing 18 changed files with 328 additions and 906 deletions.
23 changes: 23 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,29 @@
This changelog is used to track all major changes to WTFIX.


## v0.16.0 (2020-09-11)

**Enhancements**

- FIX protocol specification: look up class attributes in parent classes as well. This allows new FIX protocol
specifications, which can include custom tags and message types, to be derived from a standard base protocol
definition.
- Stop processing messages as soon as an unhandled exception occurs. This ensures that all apps have the same state
up until the point at which the exception was raised.
- The pipeline will now not process any messages for apps that have already been shut down.
- `BasePipeline.stop()` now accepts an optional keyword argument that can be used to pass the exception that caused
the pipeline to be stopped. This makes it possible to distinguish between normal and abnormal pipeline shutdowns so
that OS exit codes can be set properly by the calling process.

**Fixes**

- Remove tag numbers >= 956 from the standard FIX 4.4 protocol definition. These all fall within the customer-defined
number range and do not form part of the official standard.
- Remove non-standard message types from the FIX 4.4. protocol definition.
- Don't re-raise exceptions in asyncio tasks that trigger a pipeline shutdown. This prevents the application's `stop()`
method from being interrupted before it has been fully processed.


## v0.15.3 (2020-08-11)

**Fixes**
Expand Down
26 changes: 11 additions & 15 deletions run_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@
_shutting_down = asyncio.Event()


async def graceful_shutdown(pipeline, sig_name=None):
async def graceful_shutdown(pipeline, sig_name=None, error=None):
if pipeline.stopping_event.is_set():
# Nothing to do
return

if _shutting_down.is_set():
# Only try to shut down once
logger.warning(f"Shutdown already in progress! Ignoring signal '{sig_name}'.")
Expand All @@ -59,7 +63,7 @@ async def graceful_shutdown(pipeline, sig_name=None):
else:
logger.info(f"Initiating graceful shutdown...")

await pipeline.stop()
await pipeline.stop(error=error)


async def main():
Expand All @@ -69,7 +73,7 @@ async def main():
)

args = parser.parse_args()
exit_code = os.EX_UNAVAILABLE
exit_code = os.EX_OK

with connection_manager(args.connection) as conn:
fix_pipeline = BasePipeline(
Expand All @@ -91,27 +95,19 @@ async def main():
await fix_pipeline.start()

except ImproperlyConfigured as e:
logger.error(e)
# User needs to fix config issue before restart is attempted. Set os.EX_OK so that system process
# monitors like Supervisor do not attempt a restart immediately.
exit_code = os.EX_OK
await graceful_shutdown(fix_pipeline, error=e)

except KeyboardInterrupt:
logger.info("Received keyboard interrupt! Initiating shutdown...")
exit_code = os.EX_OK

except asyncio.exceptions.TimeoutError as e:
logger.error(e)

except asyncio.exceptions.CancelledError as e:
logger.error(f"Cancelled: connection terminated abnormally! ({e})")
await graceful_shutdown(fix_pipeline)

except Exception as e:
logger.exception(e)
await graceful_shutdown(fix_pipeline, error=e)
exit_code = os.EX_UNAVAILABLE # Abnormal termination

finally:
await graceful_shutdown(fix_pipeline)

# Report tasks that are still running after shutdown.
tasks = [
task
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

setup(
name="wtfix",
version="0.15.3",
version="0.16.0",
author="John Cass",
author_email="[email protected]",
description="The Pythonic Financial Information eXchange (FIX) client for humans.",
Expand Down
54 changes: 25 additions & 29 deletions wtfix/apps/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import uuid
from datetime import datetime, timedelta
from enum import Enum, auto
from typing import Callable
from typing import Callable, List

from wtfix.apps.base import MessageTypeHandlerApp, on
from wtfix.apps.sessions import ClientSessionApp
Expand All @@ -30,6 +30,7 @@
from wtfix.message import admin
from wtfix.core import utils
from wtfix.message.message import FIXMessage
from wtfix.pipeline import BasePipeline
from wtfix.protocol.contextlib import connection

logger = settings.logger
Expand All @@ -52,7 +53,7 @@ class HeartbeatApp(MessageTypeHandlerApp):

name = "heartbeat"

def __init__(self, pipeline, *args, **kwargs):
def __init__(self, pipeline: BasePipeline, *args, **kwargs):
super().__init__(pipeline, *args, **kwargs)

self._test_request_id = None # A waiting TestRequest message for which no response has been received.
Expand All @@ -62,7 +63,7 @@ def __init__(self, pipeline, *args, **kwargs):
self._server_not_responding = asyncio.Event()

@property
def heartbeat_interval(self):
def heartbeat_interval(self) -> int:
"""
The heartbeat interval is supposed to be agreed between the sender and target as part of the logon
process (which is why we do not accept it as a parameter or set it in the constructor).
Expand All @@ -88,7 +89,7 @@ def test_request_response_delay(self) -> int:
"""
return 2 * self.heartbeat_interval + 4

def seconds_to_next_check(self, timer: HeartbeatTimers) -> int:
def seconds_to_next_check(self, timer: HeartbeatTimers) -> float:
"""
:timer: The timer being checked (sent / received)
:return: The number of seconds before the next check is due to occur.
Expand Down Expand Up @@ -181,22 +182,17 @@ async def heartbeat_monitor(
await interval_exceeded_response()

# No response received, force logout!
logger.error(
f"{self.name}: No response received for test request '{self._test_request_id}', "
f"initiating shutdown..."
asyncio.create_task(
self.pipeline.stop(
SessionError(
f"{self.name}: No response received for test request '{self._test_request_id}'."
)
)
)
asyncio.create_task(self.pipeline.stop())

except asyncio.exceptions.CancelledError:
logger.info(f"{self.name}: {asyncio.current_task().get_name()} cancelled!")

except Exception:
# Stop monitoring heartbeat
logger.exception(
f"{self.name}: Unhandled exception while monitoring heartbeat! Shutting down pipeline..."
)
asyncio.create_task(self.pipeline.stop())
raise
except Exception as e:
# Unhandled exception - abort!
asyncio.create_task(self.pipeline.stop(e))

async def send_test_request(self):
"""
Expand Down Expand Up @@ -347,7 +343,7 @@ async def stop(self, *args, **kwargs):
await super().stop(*args, **kwargs)

@on(connection.protocol.MsgType.Logon)
async def on_logon(self, message):
async def on_logon(self, message: FIXMessage) -> FIXMessage:
"""
Confirms all of the session parameters that we sent when logging on.
Expand Down Expand Up @@ -387,7 +383,7 @@ async def on_logon(self, message):
return message

@on(connection.protocol.MsgType.Logout)
async def on_logout(self, message):
async def on_logout(self, message: FIXMessage) -> FIXMessage:
self.logged_out_event.set() # FIX server has logged us out.

asyncio.create_task(
Expand Down Expand Up @@ -486,7 +482,7 @@ class SeqNumManagerApp(MessageTypeHandlerApp):
# How long to wait (in seconds) for resend requests from target before sending our own resend requests.
RESEND_WAIT_TIME = 5

def __init__(self, pipeline, *args, **kwargs):
def __init__(self, pipeline: BasePipeline, *args, **kwargs):

self.startup_time = (
None # Needed to check if we should wait for resend requests from target
Expand All @@ -506,23 +502,23 @@ def __init__(self, pipeline, *args, **kwargs):
super().__init__(pipeline, *args, **kwargs)

@property
def send_seq_num(self):
def send_seq_num(self) -> int:
return self._send_seq_num

@send_seq_num.setter
def send_seq_num(self, value: int):
self._send_seq_num = int(value) # Make sure we received an int

@property
def receive_seq_num(self):
def receive_seq_num(self) -> int:
return self._receive_seq_num

@receive_seq_num.setter
def receive_seq_num(self, value: int):
self._receive_seq_num = int(value) # Make sure we received an int

@property
def expected_seq_num(self):
def expected_seq_num(self) -> int:
return self.receive_seq_num + 1

async def start(self, *args, **kwargs):
Expand Down Expand Up @@ -558,7 +554,7 @@ async def start(self, *args, **kwargs):

self.startup_time = datetime.utcnow()

async def _check_sequence_number(self, message):
async def _check_sequence_number(self, message: FIXMessage) -> FIXMessage:

if int(message.seq_num) < self.expected_seq_num:
self._handle_sequence_number_too_low(message)
Expand Down Expand Up @@ -616,7 +612,7 @@ async def _replay_buffered_messages(self):
# Buffer is empty - continue
pass

def _handle_sequence_number_too_low(self, message):
def _handle_sequence_number_too_low(self, message: FIXMessage):
"""
According to the FIX specification, receiving a lower than expected sequence number, that is
not a duplicate, is a fatal error that requires manual intervention. Throw an exception to
Expand All @@ -639,7 +635,7 @@ def _handle_sequence_number_too_low(self, message):

raise SessionError(error_msg)

async def _handle_sequence_number_too_high(self, message):
async def _handle_sequence_number_too_high(self, message: FIXMessage) -> FIXMessage:

# We've missed some incoming messages
if len(self.receive_buffer) == 0:
Expand Down Expand Up @@ -687,7 +683,7 @@ async def _handle_sequence_number_too_high(self, message):
f"(waiting for #{self.expected_seq_num})..."
)

async def _send_resend_request(self, missing_seq_nums):
async def _send_resend_request(self, missing_seq_nums: List[int]):
# Wait for opportunity to send resend request. Must:
#
# 1.) Have waited for resend requests from the target; and
Expand Down Expand Up @@ -716,7 +712,7 @@ async def _send_resend_request(self, missing_seq_nums):
)
)

async def _handle_resend_request(self, message):
async def _handle_resend_request(self, message: FIXMessage) -> FIXMessage:

# Set event marker to block our own gap fill requests until we've responded to this request.
self.resend_request_handled_event.clear()
Expand Down
9 changes: 7 additions & 2 deletions wtfix/apps/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from wtfix.core.exceptions import ValidationError, MessageProcessingError
from wtfix.message.message import FIXMessage
from wtfix.pipeline import BasePipeline


class BaseApp:
Expand All @@ -27,7 +28,7 @@ class BaseApp:

name = None

def __init__(self, pipeline, *args, **kwargs):
def __init__(self, pipeline: BasePipeline, *args, **kwargs):
"""
:param pipeline: The pipeline that this app will be added to.
:raises: ValidationError if no name has been specified for this apps.
Expand All @@ -49,6 +50,8 @@ async def initialize(self, *args, **kwargs):
All apps are initialized concurrently and need to complete their initialization routines within the
timeout specified by INIT_TIMEOUT.
Being 'initialized' implies that an app is ready to start receiving messages.
"""
pass

Expand All @@ -72,6 +75,8 @@ async def stop(self, *args, **kwargs):
Apps are stopped sequentially in the order that they were added to the pipeline and each app is subject
to the STOP_TIMEOUT timeout.
Once an app is 'stopped' it will not receive any more messages.
"""
pass

Expand Down Expand Up @@ -147,7 +152,7 @@ class MessageTypeHandlerApp(BaseApp):

name = "type_filter"

def __init__(self, pipeline, *args, **kwargs):
def __init__(self, pipeline: BasePipeline, *args, **kwargs):
super().__init__(pipeline, *args, **kwargs)

self.type_handlers = {}
Expand Down
14 changes: 6 additions & 8 deletions wtfix/apps/brokers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from wtfix.apps.base import BaseApp
from wtfix.conf import settings
from wtfix.core import decoders, utils
from wtfix.pipeline import BasePipeline

logger = settings.logger

Expand All @@ -34,7 +35,7 @@ class RedisPubSubApp(BaseApp):

SEND_CHANNEL = "channel:send"

def __init__(self, pipeline, *args, **kwargs):
def __init__(self, pipeline: BasePipeline, *args, **kwargs):
super().__init__(pipeline, *args, **kwargs)

self.redis_pool = None
Expand All @@ -54,6 +55,9 @@ async def _send_channel_reader(self):
self.send(message)
) # Pass message on to pipeline

except asyncio.exceptions.CancelledError:
logger.info(f"{self.name}: {asyncio.current_task().get_name()} cancelled!")

except aioredis.ChannelClosedError:
# Shutting down...
logger.info(f"{self.name}: Unsubscribed from {send_channel.name}.")
Expand All @@ -73,13 +77,7 @@ async def start(self, *args, **kwargs):
async def stop(self, *args, **kwargs):
if self._channel_reader_task is not None:
self._channel_reader_task.cancel()
try:
await self._channel_reader_task
except asyncio.exceptions.CancelledError:
# Cancellation request received - close connections....
logger.info(
f"{self.name}: {self._channel_reader_task.get_name()} cancelled!"
)
await self._channel_reader_task

with await self.redis_pool as conn:
await conn.unsubscribe(self.SEND_CHANNEL)
Expand Down
3 changes: 2 additions & 1 deletion wtfix/apps/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from wtfix.core.utils import GroupTemplateMixin
from wtfix.message.field import Field
from wtfix.message.message import RawMessage, generic_message_factory, FIXMessage
from wtfix.pipeline import BasePipeline


class RawMessageParserApp(BaseApp, GroupTemplateMixin):
Expand All @@ -28,7 +29,7 @@ class RawMessageParserApp(BaseApp, GroupTemplateMixin):

name = "raw_message_parser"

def __init__(self, pipeline, *args, **kwargs):
def __init__(self, pipeline: BasePipeline, *args, **kwargs):
super().__init__(pipeline, *args, **kwargs)

self.group_templates = self.pipeline.settings.GROUP_TEMPLATES
Expand Down
Loading

0 comments on commit 5f14788

Please sign in to comment.