diff --git a/docs/changelog.md b/docs/changelog.md index 578c909..bd4b288 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -3,6 +3,29 @@ This changelog is used to track all major changes to WTFIX. +## v0.16.0 (2020-09-11) + +**Enhancements** + +- FIX protocol specification: look up class attributes in parent classes as well. This allows new FIX protocol + specifications, which can include custom tags and message types, to be derived from a standard base protocol + definition. +- Stop processing messages as soon as an unhandled exception occurs. This ensures that all apps have the same state + up until the point at which the exception was raised. +- The pipeline will now not process any messages for apps that have already been shut down. +- `BasePipeline.stop()` now accepts an optional keyword argument that can be used to pass the exception that caused + the pipeline to be stopped. This makes it possible to distinguish between normal and abnormal pipeline shutdowns so + that OS exit codes can be set properly by the calling process. + +**Fixes** + +- Remove tag numbers >= 956 from the standard FIX 4.4 protocol definition. These all fall within the customer-defined + number range and do not form part of the official standard. +- Remove non-standard message types from the FIX 4.4. protocol definition. +- Don't re-raise exceptions in asyncio tasks that trigger a pipeline shutdown. This prevents the application's `stop()` + method from being interrupted before it has been fully processed. + + ## v0.15.3 (2020-08-11) **Fixes** diff --git a/run_client.py b/run_client.py index 03e1466..ad73c4d 100644 --- a/run_client.py +++ b/run_client.py @@ -46,7 +46,11 @@ _shutting_down = asyncio.Event() -async def graceful_shutdown(pipeline, sig_name=None): +async def graceful_shutdown(pipeline, sig_name=None, error=None): + if pipeline.stopping_event.is_set(): + # Nothing to do + return + if _shutting_down.is_set(): # Only try to shut down once logger.warning(f"Shutdown already in progress! Ignoring signal '{sig_name}'.") @@ -59,7 +63,7 @@ async def graceful_shutdown(pipeline, sig_name=None): else: logger.info(f"Initiating graceful shutdown...") - await pipeline.stop() + await pipeline.stop(error=error) async def main(): @@ -69,7 +73,7 @@ async def main(): ) args = parser.parse_args() - exit_code = os.EX_UNAVAILABLE + exit_code = os.EX_OK with connection_manager(args.connection) as conn: fix_pipeline = BasePipeline( @@ -91,27 +95,19 @@ async def main(): await fix_pipeline.start() except ImproperlyConfigured as e: - logger.error(e) # User needs to fix config issue before restart is attempted. Set os.EX_OK so that system process # monitors like Supervisor do not attempt a restart immediately. - exit_code = os.EX_OK + await graceful_shutdown(fix_pipeline, error=e) except KeyboardInterrupt: logger.info("Received keyboard interrupt! Initiating shutdown...") - exit_code = os.EX_OK - - except asyncio.exceptions.TimeoutError as e: - logger.error(e) - - except asyncio.exceptions.CancelledError as e: - logger.error(f"Cancelled: connection terminated abnormally! ({e})") + await graceful_shutdown(fix_pipeline) except Exception as e: - logger.exception(e) + await graceful_shutdown(fix_pipeline, error=e) + exit_code = os.EX_UNAVAILABLE # Abnormal termination finally: - await graceful_shutdown(fix_pipeline) - # Report tasks that are still running after shutdown. tasks = [ task diff --git a/setup.py b/setup.py index 790b9f9..cf4a1d3 100644 --- a/setup.py +++ b/setup.py @@ -9,7 +9,7 @@ setup( name="wtfix", - version="0.15.3", + version="0.16.0", author="John Cass", author_email="john.cass77@gmail.com", description="The Pythonic Financial Information eXchange (FIX) client for humans.", diff --git a/wtfix/apps/admin.py b/wtfix/apps/admin.py index 471780c..ba558e4 100644 --- a/wtfix/apps/admin.py +++ b/wtfix/apps/admin.py @@ -20,7 +20,7 @@ import uuid from datetime import datetime, timedelta from enum import Enum, auto -from typing import Callable +from typing import Callable, List from wtfix.apps.base import MessageTypeHandlerApp, on from wtfix.apps.sessions import ClientSessionApp @@ -30,6 +30,7 @@ from wtfix.message import admin from wtfix.core import utils from wtfix.message.message import FIXMessage +from wtfix.pipeline import BasePipeline from wtfix.protocol.contextlib import connection logger = settings.logger @@ -52,7 +53,7 @@ class HeartbeatApp(MessageTypeHandlerApp): name = "heartbeat" - def __init__(self, pipeline, *args, **kwargs): + def __init__(self, pipeline: BasePipeline, *args, **kwargs): super().__init__(pipeline, *args, **kwargs) self._test_request_id = None # A waiting TestRequest message for which no response has been received. @@ -62,7 +63,7 @@ def __init__(self, pipeline, *args, **kwargs): self._server_not_responding = asyncio.Event() @property - def heartbeat_interval(self): + def heartbeat_interval(self) -> int: """ The heartbeat interval is supposed to be agreed between the sender and target as part of the logon process (which is why we do not accept it as a parameter or set it in the constructor). @@ -88,7 +89,7 @@ def test_request_response_delay(self) -> int: """ return 2 * self.heartbeat_interval + 4 - def seconds_to_next_check(self, timer: HeartbeatTimers) -> int: + def seconds_to_next_check(self, timer: HeartbeatTimers) -> float: """ :timer: The timer being checked (sent / received) :return: The number of seconds before the next check is due to occur. @@ -181,22 +182,17 @@ async def heartbeat_monitor( await interval_exceeded_response() # No response received, force logout! - logger.error( - f"{self.name}: No response received for test request '{self._test_request_id}', " - f"initiating shutdown..." + asyncio.create_task( + self.pipeline.stop( + SessionError( + f"{self.name}: No response received for test request '{self._test_request_id}'." + ) + ) ) - asyncio.create_task(self.pipeline.stop()) - except asyncio.exceptions.CancelledError: - logger.info(f"{self.name}: {asyncio.current_task().get_name()} cancelled!") - - except Exception: - # Stop monitoring heartbeat - logger.exception( - f"{self.name}: Unhandled exception while monitoring heartbeat! Shutting down pipeline..." - ) - asyncio.create_task(self.pipeline.stop()) - raise + except Exception as e: + # Unhandled exception - abort! + asyncio.create_task(self.pipeline.stop(e)) async def send_test_request(self): """ @@ -347,7 +343,7 @@ async def stop(self, *args, **kwargs): await super().stop(*args, **kwargs) @on(connection.protocol.MsgType.Logon) - async def on_logon(self, message): + async def on_logon(self, message: FIXMessage) -> FIXMessage: """ Confirms all of the session parameters that we sent when logging on. @@ -387,7 +383,7 @@ async def on_logon(self, message): return message @on(connection.protocol.MsgType.Logout) - async def on_logout(self, message): + async def on_logout(self, message: FIXMessage) -> FIXMessage: self.logged_out_event.set() # FIX server has logged us out. asyncio.create_task( @@ -486,7 +482,7 @@ class SeqNumManagerApp(MessageTypeHandlerApp): # How long to wait (in seconds) for resend requests from target before sending our own resend requests. RESEND_WAIT_TIME = 5 - def __init__(self, pipeline, *args, **kwargs): + def __init__(self, pipeline: BasePipeline, *args, **kwargs): self.startup_time = ( None # Needed to check if we should wait for resend requests from target @@ -506,7 +502,7 @@ def __init__(self, pipeline, *args, **kwargs): super().__init__(pipeline, *args, **kwargs) @property - def send_seq_num(self): + def send_seq_num(self) -> int: return self._send_seq_num @send_seq_num.setter @@ -514,7 +510,7 @@ def send_seq_num(self, value: int): self._send_seq_num = int(value) # Make sure we received an int @property - def receive_seq_num(self): + def receive_seq_num(self) -> int: return self._receive_seq_num @receive_seq_num.setter @@ -522,7 +518,7 @@ def receive_seq_num(self, value: int): self._receive_seq_num = int(value) # Make sure we received an int @property - def expected_seq_num(self): + def expected_seq_num(self) -> int: return self.receive_seq_num + 1 async def start(self, *args, **kwargs): @@ -558,7 +554,7 @@ async def start(self, *args, **kwargs): self.startup_time = datetime.utcnow() - async def _check_sequence_number(self, message): + async def _check_sequence_number(self, message: FIXMessage) -> FIXMessage: if int(message.seq_num) < self.expected_seq_num: self._handle_sequence_number_too_low(message) @@ -616,7 +612,7 @@ async def _replay_buffered_messages(self): # Buffer is empty - continue pass - def _handle_sequence_number_too_low(self, message): + def _handle_sequence_number_too_low(self, message: FIXMessage): """ According to the FIX specification, receiving a lower than expected sequence number, that is not a duplicate, is a fatal error that requires manual intervention. Throw an exception to @@ -639,7 +635,7 @@ def _handle_sequence_number_too_low(self, message): raise SessionError(error_msg) - async def _handle_sequence_number_too_high(self, message): + async def _handle_sequence_number_too_high(self, message: FIXMessage) -> FIXMessage: # We've missed some incoming messages if len(self.receive_buffer) == 0: @@ -687,7 +683,7 @@ async def _handle_sequence_number_too_high(self, message): f"(waiting for #{self.expected_seq_num})..." ) - async def _send_resend_request(self, missing_seq_nums): + async def _send_resend_request(self, missing_seq_nums: List[int]): # Wait for opportunity to send resend request. Must: # # 1.) Have waited for resend requests from the target; and @@ -716,7 +712,7 @@ async def _send_resend_request(self, missing_seq_nums): ) ) - async def _handle_resend_request(self, message): + async def _handle_resend_request(self, message: FIXMessage) -> FIXMessage: # Set event marker to block our own gap fill requests until we've responded to this request. self.resend_request_handled_event.clear() diff --git a/wtfix/apps/base.py b/wtfix/apps/base.py index dd7360a..5190da3 100644 --- a/wtfix/apps/base.py +++ b/wtfix/apps/base.py @@ -18,6 +18,7 @@ from wtfix.core.exceptions import ValidationError, MessageProcessingError from wtfix.message.message import FIXMessage +from wtfix.pipeline import BasePipeline class BaseApp: @@ -27,7 +28,7 @@ class BaseApp: name = None - def __init__(self, pipeline, *args, **kwargs): + def __init__(self, pipeline: BasePipeline, *args, **kwargs): """ :param pipeline: The pipeline that this app will be added to. :raises: ValidationError if no name has been specified for this apps. @@ -49,6 +50,8 @@ async def initialize(self, *args, **kwargs): All apps are initialized concurrently and need to complete their initialization routines within the timeout specified by INIT_TIMEOUT. + + Being 'initialized' implies that an app is ready to start receiving messages. """ pass @@ -72,6 +75,8 @@ async def stop(self, *args, **kwargs): Apps are stopped sequentially in the order that they were added to the pipeline and each app is subject to the STOP_TIMEOUT timeout. + + Once an app is 'stopped' it will not receive any more messages. """ pass @@ -147,7 +152,7 @@ class MessageTypeHandlerApp(BaseApp): name = "type_filter" - def __init__(self, pipeline, *args, **kwargs): + def __init__(self, pipeline: BasePipeline, *args, **kwargs): super().__init__(pipeline, *args, **kwargs) self.type_handlers = {} diff --git a/wtfix/apps/brokers.py b/wtfix/apps/brokers.py index 07cbc91..39ef9d4 100644 --- a/wtfix/apps/brokers.py +++ b/wtfix/apps/brokers.py @@ -21,6 +21,7 @@ from wtfix.apps.base import BaseApp from wtfix.conf import settings from wtfix.core import decoders, utils +from wtfix.pipeline import BasePipeline logger = settings.logger @@ -34,7 +35,7 @@ class RedisPubSubApp(BaseApp): SEND_CHANNEL = "channel:send" - def __init__(self, pipeline, *args, **kwargs): + def __init__(self, pipeline: BasePipeline, *args, **kwargs): super().__init__(pipeline, *args, **kwargs) self.redis_pool = None @@ -54,6 +55,9 @@ async def _send_channel_reader(self): self.send(message) ) # Pass message on to pipeline + except asyncio.exceptions.CancelledError: + logger.info(f"{self.name}: {asyncio.current_task().get_name()} cancelled!") + except aioredis.ChannelClosedError: # Shutting down... logger.info(f"{self.name}: Unsubscribed from {send_channel.name}.") @@ -73,13 +77,7 @@ async def start(self, *args, **kwargs): async def stop(self, *args, **kwargs): if self._channel_reader_task is not None: self._channel_reader_task.cancel() - try: - await self._channel_reader_task - except asyncio.exceptions.CancelledError: - # Cancellation request received - close connections.... - logger.info( - f"{self.name}: {self._channel_reader_task.get_name()} cancelled!" - ) + await self._channel_reader_task with await self.redis_pool as conn: await conn.unsubscribe(self.SEND_CHANNEL) diff --git a/wtfix/apps/parsers.py b/wtfix/apps/parsers.py index cbfffab..2b8e4d9 100644 --- a/wtfix/apps/parsers.py +++ b/wtfix/apps/parsers.py @@ -19,6 +19,7 @@ from wtfix.core.utils import GroupTemplateMixin from wtfix.message.field import Field from wtfix.message.message import RawMessage, generic_message_factory, FIXMessage +from wtfix.pipeline import BasePipeline class RawMessageParserApp(BaseApp, GroupTemplateMixin): @@ -28,7 +29,7 @@ class RawMessageParserApp(BaseApp, GroupTemplateMixin): name = "raw_message_parser" - def __init__(self, pipeline, *args, **kwargs): + def __init__(self, pipeline: BasePipeline, *args, **kwargs): super().__init__(pipeline, *args, **kwargs) self.group_templates = self.pipeline.settings.GROUP_TEMPLATES diff --git a/wtfix/apps/sessions.py b/wtfix/apps/sessions.py index fa50cd7..c0438f7 100644 --- a/wtfix/apps/sessions.py +++ b/wtfix/apps/sessions.py @@ -24,6 +24,7 @@ from wtfix.apps.base import BaseApp from wtfix.conf import settings from wtfix.core import utils +from wtfix.pipeline import BasePipeline from wtfix.protocol.contextlib import connection @@ -38,7 +39,13 @@ class SessionApp(BaseApp): name = "session" def __init__( - self, pipeline, new_session=False, sid_path=None, sender=None, *args, **kwargs + self, + pipeline: BasePipeline, + new_session: bool = False, + sid_path: Path = None, + sender: str = None, + *args, + **kwargs, ): super().__init__(pipeline, *args, **kwargs) @@ -59,7 +66,7 @@ def __init__( self.writer = None @property - def session_id(self): + def session_id(self) -> str: return self._session_id @property @@ -110,7 +117,13 @@ class ClientSessionApp(SessionApp): name = "client_session" def __init__( - self, pipeline, new_session=False, sender=None, target=None, *args, **kwargs + self, + pipeline: BasePipeline, + new_session: bool = False, + sender: str = None, + target: str = None, + *args, + **kwargs, ): super().__init__( pipeline, new_session=new_session, sender=sender, *args, **kwargs @@ -175,7 +188,10 @@ async def stop(self, *args, **kwargs): logger.info(f"{self.name}: Cancelling listener task...") self._listener_task.cancel() - await self._listener_task + try: + await self._listener_task + except asyncio.exceptions.CancelledError: + logger.info(f"{self.name}: {self._listener_task.get_name()} cancelled!") await super().stop(*args, **kwargs) @@ -230,18 +246,11 @@ async def listen(self): # Something else went wrong, re-raise raise - except asyncio.exceptions.CancelledError: - logger.info(f"{self.name}: {asyncio.current_task().get_name()} cancelled!") - - except Exception: - # Stop monitoring heartbeat - logger.exception( - f"{self.name}: Unhandled exception while listening for messages! Shutting down pipeline..." - ) - asyncio.create_task(self.pipeline.stop()) - raise + except Exception as e: + # Unhandled exception - abort! + asyncio.create_task(self.pipeline.stop(e)) - async def on_send(self, message): + async def on_send(self, message: bytes): """ Writes an encoded message to the StreamWriter. diff --git a/wtfix/apps/tests/conftest.py b/wtfix/apps/tests/conftest.py index 3ae1d78..3ba9a54 100644 --- a/wtfix/apps/tests/conftest.py +++ b/wtfix/apps/tests/conftest.py @@ -69,11 +69,11 @@ async def simulate_heartbeat_response(message): @pytest.fixture -def user_notification_message(): +def email_message(): faker = Faker() return generic_message_factory( - (connection.protocol.Tag.MsgType, connection.protocol.MsgType.UserNotification), + (connection.protocol.Tag.MsgType, connection.protocol.MsgType.Email), (connection.protocol.Tag.MsgSeqNum, 1), ( connection.protocol.Tag.SenderCompID, @@ -93,11 +93,11 @@ def user_notification_message(): @pytest.fixture -def messages(user_notification_message): +def messages(email_message): messages = [] for idx in range(1, 6): - next_message = user_notification_message.copy() + next_message = email_message.copy() next_message.seq_num = idx messages.append(next_message) diff --git a/wtfix/apps/tests/test_admin.py b/wtfix/apps/tests/test_admin.py index c474c1e..83c127f 100644 --- a/wtfix/apps/tests/test_admin.py +++ b/wtfix/apps/tests/test_admin.py @@ -253,33 +253,33 @@ async def test_start_resets_sequence_numbers_for_new_session( assert seq_num_app.receive_seq_num == 0 def test_handle_sequence_number_too_low_raises_exception_if_number_too_low( - self, user_notification_message + self, email_message ): with pytest.raises(SessionError): pipeline_mock = MagicMock(BasePipeline) seq_num_app = SeqNumManagerApp(pipeline_mock) seq_num_app.receive_seq_num = 10 - user_notification_message.MsgSeqNum = 1 + email_message.MsgSeqNum = 1 - seq_num_app._handle_sequence_number_too_low(user_notification_message) + seq_num_app._handle_sequence_number_too_low(email_message) def test_handle_sequence_number_too_low_skips_duplicates_with_low_sequence_numbers( - self, user_notification_message + self, email_message ): with pytest.raises(StopMessageProcessing): pipeline_mock = MagicMock(BasePipeline) seq_num_app = SeqNumManagerApp(pipeline_mock) seq_num_app.receive_seq_num = 10 - user_notification_message.MsgSeqNum = 1 - user_notification_message.PossDupFlag = True + email_message.MsgSeqNum = 1 + email_message.PossDupFlag = True - seq_num_app._handle_sequence_number_too_low(user_notification_message) + seq_num_app._handle_sequence_number_too_low(email_message) @pytest.mark.asyncio async def test_handle_seq_num_too_high_starts_buffer_and_sends_resend_request( - self, pipeline_with_messages, user_notification_message + self, pipeline_with_messages, email_message ): with pytest.raises(StopMessageProcessing): seq_num_app = SeqNumManagerApp(pipeline_with_messages) @@ -287,22 +287,20 @@ async def test_handle_seq_num_too_high_starts_buffer_and_sends_resend_request( seconds=5 ) # Don't wait - user_notification_message.MsgSeqNum = 99 - await seq_num_app._handle_sequence_number_too_high( - user_notification_message - ) + email_message.MsgSeqNum = 99 + await seq_num_app._handle_sequence_number_too_high(email_message) # Wait for separate 'send' tasks to complete tasks = asyncio.all_tasks() await asyncio.wait(tasks, timeout=0.1) assert len(seq_num_app.receive_buffer) == 1 - assert seq_num_app.receive_buffer[0] == user_notification_message + assert seq_num_app.receive_buffer[0] == email_message assert pipeline_with_messages.send.call_count == 1 @pytest.mark.asyncio async def test_handle_seq_num_too_high_buffers_messages_received_out_of_order( - self, pipeline_with_messages, user_notification_message + self, pipeline_with_messages, email_message ): seq_num_app = SeqNumManagerApp(pipeline_with_messages) seq_num_app.startup_time = datetime.utcnow() - timedelta( @@ -310,7 +308,7 @@ async def test_handle_seq_num_too_high_buffers_messages_received_out_of_order( ) # Don't wait for idx in range(5): - out_of_sequence_msg = user_notification_message.copy() + out_of_sequence_msg = email_message.copy() out_of_sequence_msg.MsgSeqNum = 5 + idx try: await seq_num_app._handle_sequence_number_too_high(out_of_sequence_msg) @@ -437,7 +435,7 @@ async def test_handle_resend_request_converts_admin_messages_to_sequence_reset_m @pytest.mark.asyncio async def test_on_receive_handles_gapfill( - self, pipeline_with_messages, user_notification_message + self, pipeline_with_messages, email_message ): seq_num_app = SeqNumManagerApp(pipeline_with_messages) seq_num_app.startup_time = datetime.utcnow() - timedelta( @@ -445,10 +443,10 @@ async def test_on_receive_handles_gapfill( ) # Don't wait for resend requests seq_num_app.receive_seq_num = 5 # 5 Messages received so far - user_notification_message.seq_num = 8 # Simulate missing messages 6 and 7 + email_message.seq_num = 8 # Simulate missing messages 6 and 7 try: - await seq_num_app.on_receive(user_notification_message) + await seq_num_app.on_receive(email_message) assert pipeline_with_messages.send.call_count == 1 # Resend request sent except StopMessageProcessing: # Expected @@ -456,7 +454,7 @@ async def test_on_receive_handles_gapfill( # Simulate resend of 6 and 7 for seq_num in [6, 7]: - message = user_notification_message.copy() + message = email_message.copy() message.seq_num = seq_num message.PossDupFlag = True await seq_num_app.on_receive(message) diff --git a/wtfix/apps/tests/test_store.py b/wtfix/apps/tests/test_store.py index 91dfc15..38dffa6 100644 --- a/wtfix/apps/tests/test_store.py +++ b/wtfix/apps/tests/test_store.py @@ -14,7 +14,7 @@ def test_get_key(self): @pytest.mark.parametrize("store_class", [MemoryStore, RedisStore]) @pytest.mark.asyncio - async def test_get_returns_message(self, store_class, user_notification_message): + async def test_get_returns_message(self, store_class, email_message): store = store_class() await store.initialize() @@ -23,11 +23,11 @@ async def test_get_returns_message(self, store_class, user_notification_message) await conn.execute("flushall") session_id = uuid.uuid4().hex - await store.set(session_id, "TRADER", user_notification_message) + await store.set(session_id, "TRADER", email_message) assert ( - await store.get(session_id, "TRADER", user_notification_message.seq_num) - == user_notification_message + await store.get(session_id, "TRADER", email_message.seq_num) + == email_message ) await store.finalize() @@ -52,7 +52,7 @@ async def test_get_not_found_returns_none(self, store_class): @pytest.mark.parametrize("store_class", [MemoryStore, RedisStore]) @pytest.mark.asyncio - async def test_delete(self, store_class, user_notification_message): + async def test_delete(self, store_class, email_message): store = store_class() await store.initialize() @@ -64,8 +64,8 @@ async def test_delete(self, store_class, user_notification_message): # Add some messages for idx in range(5): - await store.set(session_id, "TRADER", user_notification_message) - user_notification_message.seq_num += 1 + await store.set(session_id, "TRADER", email_message) + email_message.seq_num += 1 assert await store.delete(session_id, "TRADER", 3) == 1 assert await store.delete(session_id, "TRADER", 99) == 0 # Does not exist @@ -74,7 +74,7 @@ async def test_delete(self, store_class, user_notification_message): @pytest.mark.parametrize("store_class", [MemoryStore, RedisStore]) @pytest.mark.asyncio - async def test_filter_all(self, store_class, user_notification_message): + async def test_filter_all(self, store_class, email_message): store = store_class() await store.initialize() @@ -86,8 +86,8 @@ async def test_filter_all(self, store_class, user_notification_message): # Add some messages for idx in range(5): - await store.set(session_id, "TRADER", user_notification_message) - user_notification_message.seq_num += 1 + await store.set(session_id, "TRADER", email_message) + email_message.seq_num += 1 seq_nums = await store.filter() @@ -98,7 +98,7 @@ async def test_filter_all(self, store_class, user_notification_message): @pytest.mark.parametrize("store_class", [MemoryStore, RedisStore]) @pytest.mark.asyncio - async def test_filter_by_session_id(self, store_class, user_notification_message): + async def test_filter_by_session_id(self, store_class, email_message): store = store_class() await store.initialize() @@ -111,9 +111,9 @@ async def test_filter_by_session_id(self, store_class, user_notification_message # Add some messages for idx in range(5): - await store.set(session_id, "TRADER", user_notification_message) - await store.set(other_session_id, "TRADER", user_notification_message) - user_notification_message.seq_num += 1 + await store.set(session_id, "TRADER", email_message) + await store.set(other_session_id, "TRADER", email_message) + email_message.seq_num += 1 seq_nums = await store.filter(session_id=session_id) @@ -124,9 +124,7 @@ async def test_filter_by_session_id(self, store_class, user_notification_message @pytest.mark.parametrize("store_class", [MemoryStore, RedisStore]) @pytest.mark.asyncio - async def test_filter_by_originator_id( - self, store_class, user_notification_message - ): + async def test_filter_by_originator_id(self, store_class, email_message): store = store_class() await store.initialize() @@ -138,9 +136,9 @@ async def test_filter_by_originator_id( # Add some messages for idx in range(5): - await store.set(session_id, "TRADER", user_notification_message) - await store.set(session_id, "OTHER_TRADER", user_notification_message) - user_notification_message.seq_num += 1 + await store.set(session_id, "TRADER", email_message) + await store.set(session_id, "OTHER_TRADER", email_message) + email_message.seq_num += 1 seq_nums = await store.filter(originator="TRADER") @@ -152,7 +150,7 @@ async def test_filter_by_originator_id( @pytest.mark.parametrize("store_class", [MemoryStore, RedisStore]) @pytest.mark.asyncio async def test_filter_by_session_and_originator_id( - self, store_class, user_notification_message + self, store_class, email_message ): store = store_class() await store.initialize() @@ -166,9 +164,9 @@ async def test_filter_by_session_and_originator_id( # Add some messages for idx in range(5): - await store.set(session_id, "TRADER", user_notification_message) - await store.set(other_session_id, "OTHER_TRADER", user_notification_message) - user_notification_message.seq_num += 1 + await store.set(session_id, "TRADER", email_message) + await store.set(other_session_id, "OTHER_TRADER", email_message) + email_message.seq_num += 1 seq_nums = await store.filter(session_id=session_id, originator="TRADER") @@ -180,17 +178,17 @@ async def test_filter_by_session_and_originator_id( class TestMemoryStore: @pytest.mark.asyncio - async def test_set(self, user_notification_message): + async def test_set(self, email_message): store = MemoryStore() await store.initialize() session_id = uuid.uuid4().hex - await store.set(session_id, "TRADER", user_notification_message) + await store.set(session_id, "TRADER", email_message) assert len(store._store) == 1 assert ( - store._store[f"{session_id}:TRADER:{user_notification_message.seq_num}"] - == user_notification_message + store._store[f"{session_id}:TRADER:{email_message.seq_num}"] + == email_message ) await store.finalize() @@ -198,7 +196,7 @@ async def test_set(self, user_notification_message): class TestRedisStore: @pytest.mark.asyncio - async def test_initialize_creates_pool(self, user_notification_message): + async def test_initialize_creates_pool(self, email_message): store = RedisStore() await store.initialize() @@ -207,15 +205,15 @@ async def test_initialize_creates_pool(self, user_notification_message): await store.finalize() @pytest.mark.asyncio - async def test_set(self, user_notification_message): + async def test_set(self, email_message): store = RedisStore() await store.initialize() session_id = uuid.uuid4().hex - await store.set(session_id, "TRADER", user_notification_message) + await store.set(session_id, "TRADER", email_message) assert await store.redis_pool.exists( - f"{session_id}:TRADER:{user_notification_message.seq_num}" + f"{session_id}:TRADER:{email_message.seq_num}" ) await store.finalize() diff --git a/wtfix/apps/wire.py b/wtfix/apps/wire.py index 3f39c19..9ada022 100644 --- a/wtfix/apps/wire.py +++ b/wtfix/apps/wire.py @@ -14,6 +14,7 @@ # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . +from typing import Tuple from wtfix.apps.base import BaseApp from wtfix.apps.sessions import ClientSessionApp @@ -124,7 +125,7 @@ async def on_receive(self, data: bytes) -> FIXMessage: raise MessageProcessingError() from e @classmethod - def check_begin_string(cls, data, start=0): + def check_begin_string(cls, data: bytes, start: int = 0) -> Tuple[bytes, int]: """ Checks the BeginString tag (8) for the encoded message data provided. @@ -153,7 +154,9 @@ def check_begin_string(cls, data, start=0): return begin_string, tag_end @classmethod - def check_body_length(cls, data, start=0, body_end=None): + def check_body_length( + cls, data: bytes, start: int = 0, body_end: int = None + ) -> Tuple[int, int]: """ Checks the BodyLength tag (9) for the encoded message data provided. @@ -163,7 +166,7 @@ def check_body_length(cls, data, start=0, body_end=None): is not provided then the data byte string will be parsed to look for the Checksum (10) tag, which should denote the end of the message body. - :return: A tuple consisting of the value of the BodyLength tag in encoded byte format, and the + :return: A tuple consisting of the value of the BodyLength tag as an int, and the index at which the tag ends. :raises: ParsingError if the BodyLength tag can either not be found, or if the actual body @@ -189,7 +192,9 @@ def check_body_length(cls, data, start=0, body_end=None): return body_length, tag_end @classmethod - def check_checksum(cls, data, body_start=0, body_end=None): + def check_checksum( + cls, data: bytes, body_start: int = 0, body_end: int = None + ) -> Tuple[int, int]: """ Checks the Checksum tag (10) for the encoded message data provided. @@ -198,8 +203,7 @@ def check_checksum(cls, data, body_start=0, body_end=None): :param body_end: The index in the encoded message at which the message body ends. If this value is not provided, then it will default to the index at which the Checksum tag starts. - :return: A tuple consisting of the value of the BeginString tag in encoded byte format, and the - index at which the tag ends. + :return: A tuple consisting of the value of the checksum and the index at which the tag ends. :raises: ParsingError if the BeginString tag can either not be found, or if it is not the first tag in the message. diff --git a/wtfix/message/collections.py b/wtfix/message/collections.py index 676d85b..09c614f 100644 --- a/wtfix/message/collections.py +++ b/wtfix/message/collections.py @@ -180,8 +180,8 @@ def __setattr__(self, key, value): :param key: The Field's tag name :param value: The value to set the Field to """ - if key in connection.protocol.Tag.__dict__.keys(): - self[connection.protocol.Tag.__dict__[key]] = value + if key in connection.protocol.Tag.get_attributes().keys(): + self[connection.protocol.Tag.get_attributes()[key]] = value else: super().__setattr__(key, value) @@ -191,8 +191,8 @@ def __delattr__(self, item): :param item: The Field's tag name """ - if item in connection.protocol.Tag.__dict__.keys(): - del self[connection.protocol.Tag.__dict__[item]] + if item in connection.protocol.Tag.get_attributes().keys(): + del self[connection.protocol.Tag.get_attributes()[item]] else: super().__delattr__(item) diff --git a/wtfix/pipeline.py b/wtfix/pipeline.py index dc93863..f124746 100644 --- a/wtfix/pipeline.py +++ b/wtfix/pipeline.py @@ -17,6 +17,7 @@ import asyncio from collections import OrderedDict +from typing import Union, List, Tuple from wtfix.core.klass import get_class_from_module_string from wtfix.conf import ConnectionSettings @@ -26,7 +27,9 @@ StopMessageProcessing, ValidationError, ImproperlyConfigured, + SessionError, ) +from wtfix.message.message import FIXMessage from wtfix.protocol.contextlib import connection logger = settings.logger @@ -40,27 +43,32 @@ class BasePipeline: INBOUND_PROCESSING = 0 OUTBOUND_PROCESSING = 1 - def __init__(self, connection_name, installed_apps=None, **kwargs): + def __init__(self, connection_name: str, installed_apps: List = None, **kwargs): self.settings = ConnectionSettings(connection_name) self._installed_apps = self._load_apps(installed_apps=installed_apps, **kwargs) logger.info( - f"Created new WTFIX application pipeline: {list(self.apps.keys())}." + f"Created new WTFIX application pipeline: {list(self._installed_apps.keys())}." ) + # An app is 'active' if it has (a) been initialized and (b) not been stopped. + self._active_apps = OrderedDict() self.stop_lock = asyncio.Lock() self.stopping_event = asyncio.Event() self.stopped_event = asyncio.Event() + self.errors = [] + @property - def apps(self): + def apps(self) -> OrderedDict: return self._installed_apps - def _load_apps(self, installed_apps=None, **kwargs): + def _load_apps(self, installed_apps: List = None, **kwargs) -> OrderedDict: """ Loads the list of apps to be used for processing messages. :param installed_apps: The list of class paths for the installed apps. + :returns: An ordered dictionary containing all of the apps that have been loaded. """ loaded_apps = OrderedDict() @@ -88,9 +96,12 @@ async def initialize(self): """ logger.info(f"Initializing applications...") - init_calls = (app.initialize for app in reversed(self.apps.values())) + init_calls = (app.initialize for app in self.apps.values()) await asyncio.gather(*(call() for call in init_calls)) + for name, app in self.apps.items(): + self._active_apps[name] = app + logger.info("All apps initialized!") async def start(self): @@ -128,13 +139,28 @@ async def start(self): # Block until the pipeline has been stopped again. await self.stopped_event.wait() - async def stop(self): + if self.errors: + # Re-raise exceptions so that calling process can set proper exit codes. + raise SessionError(f"Pipeline terminated abnormally due to: {self.errors}") + + async def stop(self, error: Exception = None): """ Tries to shut down the pipeline in an orderly fashion. + :param: error: The exception that triggered the pipeline stop or None if the shutdown occurred normally. :raises: TimeoutError if STOP_TIMEOUT is exceeded. """ + if self.stop_lock.locked(): + # Stop already in progress - skip + return + async with self.stop_lock: # Ensure that more than one app does not attempt to initiate a shutdown at once + if error: + logger.exception( + f"Pipeline shutting down due to exception: {error}.", exc_info=error + ) + self.errors.append(error) + if self.stopping_event.is_set() or self.stopped_event.is_set(): # Pipeline has already been stopped or is in the process of shutting down - nothing more to do. return @@ -142,14 +168,17 @@ async def stop(self): self.stopping_event.set() # Mark start of shutdown event. logger.info("Shutting down pipeline...") - for app in self.apps.values(): + for name, app in self.apps.items(): # Stop apps in the reverse order that they were initialized. logger.info(f"Stopping app '{app}'...") try: await asyncio.wait_for(app.stop(), settings.STOP_TIMEOUT) + del self._active_apps[name] + except asyncio.exceptions.TimeoutError: logger.error(f"Timeout waiting for app '{app}' to stop!") continue # Continue trying to stop next app. + except Exception: # Don't allow misbehaving apps to interrupt pipeline shutdown. logger.exception(f"Error trying to stop app '{app}'.") @@ -158,18 +187,20 @@ async def stop(self): self.stopped_event.set() logger.info("Pipeline stopped.") - def _setup_message_handling(self, direction): + def _setup_message_handling(self, direction: int) -> Tuple[str, iter]: if direction is self.INBOUND_PROCESSING: - return "on_receive", reversed(self.apps.values()) + return "on_receive", reversed(self._active_apps.values()) if direction is self.OUTBOUND_PROCESSING: - return "on_send", iter(self.apps.values()) + return "on_send", iter(self._active_apps.values()) raise ValidationError( f"Unknown application chain processing direction '{direction}'." ) - async def _process_message(self, message, direction): + async def _process_message( + self, message: Union[FIXMessage, bytes], direction: int + ) -> Union[FIXMessage, bytes]: """ Process a message by passing it on to the various apps in the pipeline. @@ -214,20 +245,27 @@ async def _process_message(self, message, direction): f"Ignoring connection error during shutdown / logout: {e}" ) else: - - # Log exception in case it is not handled properly in the Future object. - logger.exception( - f"Unhandled exception while doing {method_name}: {e} ({message})." - ) - await self.stop() # Block while we try to stop the pipeline - raise e + # Unhandled exception - abort! + asyncio.create_task(self.stop(e)) return message - async def receive(self, message): + async def receive(self, message: bytes) -> Union[FIXMessage, bytes]: """Receives a new message to be processed""" + if self.errors: + logger.warning( + f"Pipeline errors have occurred, ignoring received message: {message}" + ) + return message + return await self._process_message(message, BasePipeline.INBOUND_PROCESSING) - async def send(self, message): + async def send(self, message: FIXMessage) -> Union[FIXMessage, bytes]: """Processes a new message to be sent""" + if self.errors: + logger.warning( + f"Pipeline errors have occurred, ignoring send message: {message}" + ) + return message + return await self._process_message(message, BasePipeline.OUTBOUND_PROCESSING) diff --git a/wtfix/protocol/fix/_44/message_types.py b/wtfix/protocol/fix/_44/message_types.py index a3e9668..6f38e6a 100644 --- a/wtfix/protocol/fix/_44/message_types.py +++ b/wtfix/protocol/fix/_44/message_types.py @@ -19,16 +19,11 @@ class MsgType(_BaseMsgType): - AdjustedPositionReport = "BL" Advertisement = "7" AllocationInstruction = "J" AllocationInstructionAck = "P" - AllocationInstructionAlert = "BM" AllocationReport = "AS" AllocationReportAck = "AT" - ApplicationMessageReport = "BY" - ApplicationMessageRequest = "BW" - ApplicationMessageRequestAck = "BX" AssignmentReport = "AW" BidRequest = "k" BidResponse = "l" @@ -40,17 +35,14 @@ class MsgType(_BaseMsgType): CollateralRequest = "AX" CollateralResponse = "AZ" Confirmation = "AK" + ConfirmationAck = "AU" ConfirmationRequest = "BH" - Confirmation_Ack = "AU" - ContraryIntentionReport = "BO" - CrossOrderCancelReplaceRequest = "t" CrossOrderCancelRequest = "u" + CrossOrderCancelReplaceRequest = "t" DerivativeSecurityList = "AA" DerivativeSecurityListRequest = "z" - DerivativeSecurityListUpdateReport = "BR" - DontKnowTradeDK = "Q" + DontKnowTrade = "Q" Email = "C" - ExecutionAcknowledgement = "BN" ExecutionReport = "8" Heartbeat = "0" IOI = "6" @@ -62,14 +54,11 @@ class MsgType(_BaseMsgType): Logon = "A" Logout = "5" MarketDataIncrementalRefresh = "X" + MarketDataSnapshotFullRefresh = "W" MarketDataRequest = "V" MarketDataRequestReject = "Y" - MarketDataSnapshotFullRefresh = "W" - MarketDefinition = "BU" - MarketDefinitionRequest = "BT" - MarketDefinitionUpdateReport = "BV" MassQuote = "i" - MassQuoteAcknowledgement = "" + MassQuoteAcknowledgement = "b" MultilegOrderCancelReplace = "AC" NetworkCounterpartySystemStatusRequest = "BC" NetworkCounterpartySystemStatusResponse = "BD" @@ -79,16 +68,12 @@ class MsgType(_BaseMsgType): NewOrderSingle = "D" News = "B" OrderCancelReject = "9" - OrderCancelReplaceRequest = "G" OrderCancelRequest = "F" - OrderMassActionReport = "BZ" - OrderMassActionRequest = "CA" + OrderCancelReplaceRequest = "G" OrderMassCancelReport = "r" OrderMassCancelRequest = "q" OrderMassStatusRequest = "AF" OrderStatusRequest = "H" - PartyDetailsListReport = "CG" - PartyDetailsListRequest = "CF" PositionMaintenanceReport = "AM" PositionMaintenanceRequest = "AL" PositionReport = "AP" @@ -99,19 +84,17 @@ class MsgType(_BaseMsgType): QuoteResponse = "AJ" QuoteStatusReport = "AI" QuoteStatusRequest = "a" - RFQRequest = "AH" RegistrationInstructions = "o" RegistrationInstructionsResponse = "p" Reject = "3" RequestForPositions = "AN" RequestForPositionsAck = "AO" ResendRequest = "2" + RFQRequest = "AH" SecurityDefinition = "d" SecurityDefinitionRequest = "c" - SecurityDefinitionUpdateReport = "BP" SecurityList = "y" SecurityListRequest = "x" - SecurityListUpdateReport = "BK" SecurityStatus = "f" SecurityStatusRequest = "e" SecurityTypeRequest = "v" @@ -119,21 +102,13 @@ class MsgType(_BaseMsgType): SequenceReset = "4" SettlementInstructionRequest = "AV" SettlementInstructions = "T" - SettlementObligationReport = "BQ" - StreamAssignmentReport = "CD" - StreamAssignmentReportACK = "CE" - StreamAssignmentRequest = "CC" TestRequest = "1" TradeCaptureReport = "AE" TradeCaptureReportAck = "AR" TradeCaptureReportRequest = "AD" TradeCaptureReportRequestAck = "AQ" - TradingSessionList = "BJ" - TradingSessionListRequest = "BI" - TradingSessionListUpdateReport = "BS" TradingSessionStatus = "h" TradingSessionStatusRequest = "g" - UserNotification = "C" UserRequest = "BE" UserResponse = "BF" - XML_non_FIX = "n" + XMLMessage = "n" diff --git a/wtfix/protocol/fix/_44/tags.py b/wtfix/protocol/fix/_44/tags.py index 0853205..e02871d 100644 --- a/wtfix/protocol/fix/_44/tags.py +++ b/wtfix/protocol/fix/_44/tags.py @@ -38,11 +38,11 @@ class Tag(_BaseTag): ExecID = 17 ExecInst = 18 ExecRefID = 19 - ExecTransType = 20 + # ExecTransType = 20 (replaced) HandlInst = 21 SecurityIDSource = 22 IOIID = 23 - # IOIOthSvc(nolongerused) = 24 # Original name: IOIOthSvc (no longer used) + # IOIOthSvc = 24 (no longer used) IOIQltyInd = 25 IOIRefID = 26 IOIQty = 27 @@ -64,12 +64,12 @@ class Tag(_BaseTag): PossDupFlag = 43 Price = 44 RefSeqNum = 45 - # RelatdSym(nolongerused) = 46 # Original name: RelatdSym (no longer used) - # Rule80A(NoLongerUsed) = 47 # Original name: Rule80A(No Longer Used) + # RelatdSym = 46 (no longer used) + # Rule80A = 47 (no longer used) SecurityID = 48 SenderCompID = 49 SenderSubID = 50 - # SendingDate(nolongerused) = 51 # Original name: SendingDate (no longer used) + # SendingDate = 51 (no longer used) SendingTime = 52 Quantity = 53 Side = 54 @@ -94,7 +94,7 @@ class Tag(_BaseTag): NoOrders = 73 AvgPxPrecision = 74 TradeDate = 75 - ExecBroker = 76 + # ExecBroker = 76 (replaced) PositionEffect = 77 NoAllocs = 78 AllocAccount = 79 @@ -104,13 +104,13 @@ class Tag(_BaseTag): RptSeq = 83 CxlQty = 84 NoDlvyInst = 85 - DlvyInst = 86 + # DlvyInst = 86 (no longer used) AllocStatus = 87 AllocRejCode = 88 Signature = 89 SecureDataLen = 90 SecureData = 91 - BrokerOfCredit = 92 + # BrokerOfCredit = 92 (replaced) SignatureLength = 93 EmailType = 94 RawDataLength = 95 @@ -122,11 +122,11 @@ class Tag(_BaseTag): CxlRejReason = 102 OrdRejReason = 103 IOIQualifier = 104 - WaveNo = 105 + # WaveNo = 105 (no longer used) Issuer = 106 SecurityDesc = 107 HeartBtInt = 108 - ClientID = 109 + # ClientID = 109 (replaced) MinQty = 110 MaxFloor = 111 TestReqID = 112 @@ -142,7 +142,7 @@ class Tag(_BaseTag): OrigSendingTime = 122 GapFillFlag = 123 NoExecs = 124 - CxlType = 125 + # CxlType = 125 (no longer used) ExpireTime = 126 DKReason = 127 DeliverToCompID = 128 @@ -183,28 +183,28 @@ class Tag(_BaseTag): SettlInstTransType = 163 EmailThreadID = 164 SettlInstSource = 165 - SettlLocation = 166 + # SettlLocation = 166 (replaced) SecurityType = 167 EffectiveTime = 168 StandInstDbType = 169 StandInstDbName = 170 StandInstDbID = 171 SettlDeliveryType = 172 - SettlDepositoryCode = 173 # Original name: SettlDepositoryCode - SettlBrkrCode = 174 # Original name: SettlBrkrCode - SettlInstCode = 175 # Original name: SettlInstCode - SecuritySettlAgentName = 176 - SecuritySettlAgentCode = 177 - SecuritySettlAgentAcctNum = 178 - SecuritySettlAgentAcctName = 179 - SecuritySettlAgentContactName = 180 - SecuritySettlAgentContactPhone = 181 - CashSettlAgentName = 182 - CashSettlAgentCode = 183 - CashSettlAgentAcctNum = 184 - CashSettlAgentAcctName = 185 - CashSettlAgentContactName = 186 - CashSettlAgentContactPhone = 187 + # SettlDepositoryCode = 173 (replaced) + # SettlBrkrCode = 174 (replaced) + # SettlInstCode = 175 (replaced) + # SecuritySettlAgentName = 176 (replaced) + # SecuritySettlAgentCode = 177 (replaced) + # SecuritySettlAgentAcctNum = 178 (replaced) + # SecuritySettlAgentAcctName = 179 (replaced) + # SecuritySettlAgentContactName = 180 (replaced) + # SecuritySettlAgentContactPhone = 181 (replaced) + # CashSettlAgentName = 182 (replaced) + # CashSettlAgentCode = 183 (replaced) + # CashSettlAgentAcctNum = 184 (replaced) + # CashSettlAgentAcctName = 185 (replaced) + # CashSettlAgentContactName = 186 (replaced) + # CashSettlAgentContactPhone = 187 (replaced) BidSpotRate = 188 BidForwardPoints = 189 OfferSpotRate = 190 @@ -218,7 +218,7 @@ class Tag(_BaseTag): SecondaryOrderID = 198 NoIOIQualifiers = 199 MaturityMonthYear = 200 - PutOrCall = 201 + # PutOrCall = 201 (replaced) StrikePrice = 202 CoveredOrUncovered = 203 CustomerOrFirm = 204 @@ -236,15 +236,15 @@ class Tag(_BaseTag): RoutingType = 216 RoutingID = 217 Spread = 218 - Benchmark = 219 + # Benchmark = 219 (no longer used) BenchmarkCurveCurrency = 220 BenchmarkCurveName = 221 BenchmarkCurvePoint = 222 CouponRate = 223 CouponPaymentDate = 224 IssueDate = 225 - RepurchaseTerm = 226 - RepurchaseRate = 227 + # RepurchaseTerm = 226 (deprecated) + # RepurchaseRate = 227 (deprecated) Factor = 228 TradeOriginationDate = 229 ExDate = 230 @@ -256,29 +256,28 @@ class Tag(_BaseTag): Yield = 236 TotalTakedown = 237 Concession = 238 - RepoCollateralSecurityType = 239 - RedemptionDate = 240 + # RepoCollateralSecurityType = 239 (deprecated) + # RedemptionDate = 240 (deprecated) UnderlyingCouponPaymentDate = 241 UnderlyingIssueDate = 242 - UnderlyingRepoCollateralSecurityType = 243 - UnderlyingRepurchaseTerm = 244 - UnderlyingRepurchaseRate = 245 + # UnderlyingRepoCollateralSecurityType = 243 (deprecated) + # UnderlyingRepurchaseTerm = 244 (deprecated) + # UnderlyingRepurchaseRate = 245 (deprecated) UnderlyingFactor = 246 - UnderlyingRedemptionDate = 247 + # UnderlyingRedemptionDate = 247 (deprecated) LegCouponPaymentDate = 248 LegIssueDate = 249 - LegRepoCollateralSecurityType = 250 - LegRepurchaseTerm = 251 - LegRepurchaseRate = 252 + # LegRepoCollateralSecurityType = 250 (deprecated) + # LegRepurchaseTerm = 251 (deprecated) + # LegRepurchaseRate = 252 (deprecated) LegFactor = 253 - LegRedemptionDate = 254 + # LegRedemptionDate = 254 (deprecated) CreditRating = 255 UnderlyingCreditRating = 256 LegCreditRating = 257 TradedFlatSwitch = 258 BasisFeatureDate = 259 BasisFeaturePrice = 260 - # Reserved/AllocatedtotheFixedIncomeproposal = 261 # Original name: Reserved/Allocated to the Fixed Income proposal MDReqID = 262 SubscriptionRequestType = 263 MarketDepth = 264 @@ -331,12 +330,11 @@ class Tag(_BaseTag): UnderlyingSymbol = 311 UnderlyingSymbolSfx = 312 UnderlyingMaturityMonthYear = 313 - UnderlyingMaturityDay = 314 - UnderlyingPutOrCall = 315 - UnderlyingStrikePrice = 316 + # UnderlyingMaturityDay = 314 (replaced) + # UnderlyingStrikePrice = 316 (replaced) UnderlyingOptAttribute = 317 UnderlyingCurrency = 318 - RatioQty = 319 + # RatioQty = 319 (replaced) SecurityReqID = 320 SecurityRequestType = 321 SecurityResponseID = 322 @@ -387,7 +385,7 @@ class Tag(_BaseTag): QuoteSetValidUntilTime = 367 QuoteEntryRejectReason = 368 LastMsgSeqNumProcessed = 369 - OnBehalfOfSendingTime = 370 + # OnBehalfOfSendingTime = 370 (no longer used) RefTagID = 371 RefMsgType = 372 SessionRejectReason = 373 @@ -456,8 +454,8 @@ class Tag(_BaseTag): UnderlyingContractMultiplier = 436 ContraTradeQty = 437 ContraTradeTime = 438 - ClearingFirm = 439 - ClearingAccount = 440 + # ClearingFirm = 439 (replaced) + # ClearingAccount = 440 (replaced) LiquidityNumSecurities = 441 MultiLegReportingType = 442 StrikeTime = 443 @@ -466,8 +464,8 @@ class Tag(_BaseTag): EncodedListStatusText = 446 PartyIDSource = 447 PartyID = 448 - TotalVolumeTradedDate = 449 - TotalVolumeTradedTime = 450 # Original name: TotalVolumeTraded Time + # TotalVolumeTradedDate = 449 (replaced) + # TotalVolumeTradedTime = 450 (replaced) NetChgPrevDay = 451 PartyRole = 452 NoPartyIDs = 453 @@ -482,7 +480,7 @@ class Tag(_BaseTag): UnderlyingProduct = 462 UnderlyingCFICode = 463 TestMessageIndicator = 464 - QuantityType = 465 + # QuantityType = 465 (deprecated) BookingRefID = 466 IndividualAllocID = 467 RoundingDirection = 468 @@ -557,7 +555,7 @@ class Tag(_BaseTag): QuoteType = 537 NestedPartyRole = 538 NoNestedPartyIDs = 539 - TotalAccruedInterestAmt = 540 + # TotalAccruedInterestAmt = 540 (deprecated) MaturityDate = 541 UnderlyingMaturityDate = 542 InstrRegistry = 543 @@ -670,7 +668,7 @@ class Tag(_BaseTag): LegalConfirm = 650 UnderlyingLastPx = 651 UnderlyingLastQty = 652 - SecDefStatus = 653 + # SecDefStatus = 653 (replaced) LegRefID = 654 ContraLegRefID = 655 SettlCurrBidFxRate = 656 @@ -826,7 +824,6 @@ class Tag(_BaseTag): NoNested2PartySubIDs = 806 Nested2PartySubIDType = 807 AllocIntermedReqType = 808 - NoUsernames = 809 UnderlyingPx = 810 PriceDelta = 811 ApplQueueMax = 812 @@ -974,660 +971,3 @@ class Tag(_BaseTag): Nested3PartySubIDType = 954 LegContractSettlMonth = 955 LegInterestAccrualDate = 956 - NoStrategyParameters = 957 - StrategyParameterName = 958 - StrategyParameterType = 959 - StrategyParameterValue = 960 - HostCrossID = 961 - SideTimeInForce = 962 - MDReportID = 963 - SecurityReportID = 964 - SecurityStatus = 965 - SettleOnOpenFlag = 966 - StrikeMultiplier = 967 - StrikeValue = 968 - MinPriceIncrement = 969 - PositionLimit = 970 - NTPositionLimit = 971 - UnderlyingAllocationPercent = 972 - UnderlyingCashAmount = 973 - UnderlyingCashType = 974 - UnderlyingSettlementType = 975 - QuantityDate = 976 - ContIntRptID = 977 - LateIndicator = 978 - InputSource = 979 - SecurityUpdateAction = 980 - NoExpiration = 981 - ExpirationQtyType = 982 - ExpQty = 983 - NoUnderlyingAmounts = 984 - UnderlyingPayAmount = 985 - UnderlyingCollectAmount = 986 - UnderlyingSettlementDate = 987 - UnderlyingSettlementStatus = 988 - SecondaryIndividualAllocID = 989 - LegReportID = 990 - RndPx = 991 - IndividualAllocType = 992 - AllocCustomerCapacity = 993 - TierCode = 994 - UnitOfMeasure = 996 - TimeUnit = 997 - UnderlyingUnitOfMeasure = 998 - LegUnitOfMeasure = 999 - UnderlyingTimeUnit = 1000 - LegTimeUnit = 1001 - AllocMethod = 1002 - TradeID = 1003 - SideTradeReportID = 1005 - SideFillStationCd = 1006 - SideReasonCd = 1007 - SideTrdSubTyp = 1008 - SideLastQty = 1009 - MessageEventSource = 1011 - SideTrdRegTimestamp = 1012 - SideTrdRegTimestampType = 1013 - SideTrdRegTimestampSrc = 1014 - AsOfIndicator = 1015 - NoSideTrdRegTS = 1016 - LegOptionRatio = 1017 - NoInstrumentParties = 1018 - InstrumentPartyID = 1019 - TradeVolume = 1020 - MDBookType = 1021 - MDFeedType = 1022 - MDPriceLevel = 1023 - MDOriginType = 1024 - FirstPx = 1025 - MDEntrySpotRate = 1026 - MDEntryForwardPoints = 1027 - ManualOrderIndicator = 1028 - CustDirectedOrder = 1029 - ReceivedDeptID = 1030 - CustOrderHandlingInst = 1031 - OrderHandlingInstSource = 1032 - DeskType = 1033 - DeskTypeSource = 1034 - DeskOrderHandlingInst = 1035 - ExecAckStatus = 1036 - UnderlyingDeliveryAmount = 1037 - UnderlyingCapValue = 1038 - UnderlyingSettlMethod = 1039 - SecondaryTradeID = 1040 - FirmTradeID = 1041 - SecondaryFirmTradeID = 1042 - CollApplType = 1043 - UnderlyingAdjustedQuantity = 1044 - UnderlyingFXRate = 1045 - UnderlyingFXRateCalc = 1046 - AllocPositionEffect = 1047 - DealingCapacity = 1048 - InstrmtAssignmentMethod = 1049 - InstrumentPartyIDSource = 1050 - InstrumentPartyRole = 1051 - NoInstrumentPartySubIDs = 1052 - InstrumentPartySubID = 1053 - InstrumentPartySubIDType = 1054 - PositionCurrency = 1055 - CalculatedCcyLastQty = 1056 - AggressorIndicator = 1057 - NoUndlyInstrumentParties = 1058 - UnderlyingInstrumentPartyID = 1059 - UnderlyingInstrumentPartyIDSource = 1060 - UnderlyingInstrumentPartyRole = 1061 - NoUndlyInstrumentPartySubIDs = 1062 - UnderlyingInstrumentPartySubID = 1063 - UnderlyingInstrumentPartySubIDType = 1064 - BidSwapPoints = 1065 - OfferSwapPoints = 1066 - LegBidForwardPoints = 1067 - LegOfferForwardPoints = 1068 - SwapPoints = 1069 - MDQuoteType = 1070 - LastSwapPoints = 1071 - SideGrossTradeAmt = 1072 - LegLastForwardPoints = 1073 - LegCalculatedCcyLastQty = 1074 - LegGrossTradeAmt = 1075 - MaturityTime = 1079 - RefOrderID = 1080 - RefOrderIDSource = 1081 - SecondaryDisplayQty = 1082 - DisplayWhen = 1083 - DisplayMethod = 1084 - DisplayLowQty = 1085 - DisplayHighQty = 1086 - DisplayMinIncr = 1087 - RefreshQty = 1088 - MatchIncrement = 1089 - MaxPriceLevels = 1090 - PreTradeAnonymity = 1091 - PriceProtectionScope = 1092 - LotType = 1093 - PegPriceType = 1094 - PeggedRefPrice = 1095 - PegSecurityIDSource = 1096 - PegSecurityID = 1097 - PegSymbol = 1098 - PegSecurityDesc = 1099 - TriggerType = 1100 - TriggerAction = 1101 - TriggerPrice = 1102 - TriggerSymbol = 1103 - TriggerSecurityID = 1104 - TriggerSecurityIDSource = 1105 - TriggerSecurityDesc = 1106 - TriggerPriceType = 1107 - TriggerPriceTypeScope = 1108 - TriggerPriceDirection = 1109 - TriggerNewPrice = 1110 - TriggerOrderType = 1111 - TriggerNewQty = 1112 - TriggerTradingSessionID = 1113 - TriggerTradingSessionSubID = 1114 - OrderCategory = 1115 - NoRootPartyIDs = 1116 - RootPartyID = 1117 - RootPartyIDSource = 1118 - RootPartyRole = 1119 - NoRootPartySubIDs = 1120 - RootPartySubID = 1121 - RootPartySubIDType = 1122 - TradeHandlingInstr = 1123 - OrigTradeHandlingInstr = 1124 - OrigTradeDate = 1125 - OrigTradeID = 1126 - OrigSecondaryTradeID = 1127 - ApplVerID = 1128 - CstmApplVerID = 1129 - RefApplVerID = 1130 - RefCstmApplVerID = 1131 - TZTransactTime = 1132 - ExDestinationIDSource = 1133 - ReportedPxDiff = 1134 - RptSys = 1135 - AllocClearingFeeIndicator = 1136 - DefaultApplVerID = 1137 - DisplayQty = 1138 - ExchangeSpecialInstructions = 1139 - MaxTradeVol = 1140 - NoMDFeedTypes = 1141 - MatchAlgorithm = 1142 - MaxPriceVariation = 1143 - ImpliedMarketIndicator = 1144 - EventTime = 1145 - MinPriceIncrementAmount = 1146 - UnitOfMeasureQty = 1147 - LowLimitPrice = 1148 - HighLimitPrice = 1149 - TradingReferencePrice = 1150 - SecurityGroup = 1151 - LegNumber = 1152 - SettlementCycleNo = 1153 - SideCurrency = 1154 - SideSettlCurrency = 1155 - ApplExtID = 1156 - CcyAmt = 1157 - NoSettlDetails = 1158 - SettlObligMode = 1159 - SettlObligMsgID = 1160 - SettlObligID = 1161 - SettlObligTransType = 1162 - SettlObligRefID = 1163 - SettlObligSource = 1164 - NoSettlOblig = 1165 - QuoteMsgID = 1166 - QuoteEntryStatus = 1167 - TotNoCxldQuotes = 1168 - TotNoAccQuotes = 1169 - TotNoRejQuotes = 1170 - PrivateQuote = 1171 - RespondentType = 1172 - MDSubBookType = 1173 - SecurityTradingEvent = 1174 - NoStatsIndicators = 1175 - StatsType = 1176 - NoOfSecSizes = 1177 - MDSecSizeType = 1178 - MDSecSize = 1179 - ApplID = 1180 - ApplSeqNum = 1181 - ApplBegSeqNum = 1182 - ApplEndSeqNum = 1183 - SecurityXMLLen = 1184 - SecurityXML = 1185 - SecurityXMLSchema = 1186 - RefreshIndicator = 1187 - Volatility = 1188 - TimeToExpiration = 1189 - RiskFreeRate = 1190 - PriceUnitOfMeasure = 1191 - PriceUnitOfMeasureQty = 1192 - SettlMethod = 1193 - ExerciseStyle = 1194 - OptPayoutAmount = 1195 - PriceQuoteMethod = 1196 - ValuationMethod = 1197 - ListMethod = 1198 - CapPrice = 1199 - FloorPrice = 1200 - NoStrikeRules = 1201 - StartStrikePxRange = 1202 - EndStrikePxRange = 1203 - StrikeIncrement = 1204 - NoTickRules = 1205 - StartTickPriceRange = 1206 - EndTickPriceRange = 1207 - TickIncrement = 1208 - TickRuleType = 1209 - NestedInstrAttribType = 1210 - NestedInstrAttribValue = 1211 - LegMaturityTime = 1212 - UnderlyingMaturityTime = 1213 - DerivativeSymbol = 1214 - DerivativeSymbolSfx = 1215 - DerivativeSecurityID = 1216 - DerivativeSecurityIDSource = 1217 - NoDerivativeSecurityAltID = 1218 - DerivativeSecurityAltID = 1219 - DerivativeSecurityAltIDSource = 1220 - SecondaryLowLimitPrice = 1221 - MaturityRuleID = 1222 - StrikeRuleID = 1223 - LegUnitOfMeasureQty = 1224 - DerivativeOptPayAmount = 1225 - EndMaturityMonthYear = 1226 - ProductComplex = 1227 - DerivativeProductComplex = 1228 - MaturityMonthYearIncrement = 1229 - SecondaryHighLimitPrice = 1230 - MinLotSize = 1231 - NoExecInstRules = 1232 - NoLotTypeRules = 1234 - NoMatchRules = 1235 - NoMaturityRules = 1236 - NoOrdTypeRules = 1237 - NoTimeInForceRules = 1239 - SecondaryTradingReferencePrice = 1240 - StartMaturityMonthYear = 1241 - FlexProductEligibilityIndicator = 1242 - DerivFlexProductEligibilityIndicator = 1243 - FlexibleIndicator = 1244 - TradingCurrency = 1245 - DerivativeProduct = 1246 - DerivativeSecurityGroup = 1247 - DerivativeCFICode = 1248 - DerivativeSecurityType = 1249 - DerivativeSecuritySubType = 1250 - DerivativeMaturityMonthYear = 1251 - DerivativeMaturityDate = 1252 - DerivativeMaturityTime = 1253 - DerivativeSettleOnOpenFlag = 1254 - DerivativeInstrmtAssignmentMethod = 1255 - DerivativeSecurityStatus = 1256 - DerivativeInstrRegistry = 1257 - DerivativeCountryOfIssue = 1258 - DerivativeStateOrProvinceOfIssue = 1259 - DerivativeLocaleOfIssue = 1260 - DerivativeStrikePrice = 1261 - DerivativeStrikeCurrency = 1262 - DerivativeStrikeMultiplier = 1263 - DerivativeStrikeValue = 1264 - DerivativeOptAttribute = 1265 - DerivativeContractMultiplier = 1266 - DerivativeMinPriceIncrement = 1267 - DerivativeMinPriceIncrementAmount = 1268 - DerivativeUnitOfMeasure = 1269 - DerivativeUnitOfMeasureQty = 1270 - DerivativeTimeUnit = 1271 - DerivativeSecurityExchange = 1272 - DerivativePositionLimit = 1273 - DerivativeNTPositionLimit = 1274 - DerivativeIssuer = 1275 - DerivativeIssueDate = 1276 - DerivativeEncodedIssuerLen = 1277 - DerivativeEncodedIssuer = 1278 - DerivativeSecurityDesc = 1279 - DerivativeEncodedSecurityDescLen = 1280 - DerivativeEncodedSecurityDesc = 1281 - DerivativeSecurityXMLLen = 1282 - DerivativeSecurityXML = 1283 - DerivativeSecurityXMLSchema = 1284 - DerivativeContractSettlMonth = 1285 - NoDerivativeEvents = 1286 - DerivativeEventType = 1287 - DerivativeEventDate = 1288 - DerivativeEventTime = 1289 - DerivativeEventPx = 1290 - DerivativeEventText = 1291 - NoDerivativeInstrumentParties = 1292 - DerivativeInstrumentPartyID = 1293 - DerivativeInstrumentPartyIDSource = 1294 - DerivativeInstrumentPartyRole = 1295 - NoDerivativeInstrumentPartySubIDs = 1296 - DerivativeInstrumentPartySubID = 1297 - DerivativeInstrumentPartySubIDType = 1298 - DerivativeExerciseStyle = 1299 - MarketSegmentID = 1300 - MarketID = 1301 - MaturityMonthYearIncrementUnits = 1302 - MaturityMonthYearFormat = 1303 - StrikeExerciseStyle = 1304 - SecondaryPriceLimitType = 1305 - PriceLimitType = 1306 - DerivativeSecurityListRequestType = 1307 - ExecInstValue = 1308 - NoTradingSessionRules = 1309 - NoMarketSegments = 1310 - NoDerivativeInstrAttrib = 1311 - NoNestedInstrAttrib = 1312 - DerivativeInstrAttribType = 1313 - DerivativeInstrAttribValue = 1314 - DerivativePriceUnitOfMeasure = 1315 - DerivativePriceUnitOfMeasureQty = 1316 - DerivativeSettlMethod = 1317 - DerivativePriceQuoteMethod = 1318 - DerivativeValuationMethod = 1319 - DerivativeListMethod = 1320 - DerivativeCapPrice = 1321 - DerivativeFloorPrice = 1322 - DerivativePutOrCall = 1323 - ListUpdateAction = 1324 - ParentMktSegmID = 1325 - TradingSessionDesc = 1326 - TradSesUpdateAction = 1327 - RejectText = 1328 - FeeMultiplier = 1329 - UnderlyingLegSymbol = 1330 - UnderlyingLegSymbolSfx = 1331 - UnderlyingLegSecurityID = 1332 - UnderlyingLegSecurityIDSource = 1333 - NoUnderlyingLegSecurityAltID = 1334 - UnderlyingLegSecurityAltID = 1335 - UnderlyingLegSecurityAltIDSource = 1336 - UnderlyingLegSecurityType = 1337 - UnderlyingLegSecuritySubType = 1338 - UnderlyingLegMaturityMonthYear = 1339 - UnderlyingLegStrikePrice = 1340 - UnderlyingLegSecurityExchange = 1341 - NoOfLegUnderlyings = 1342 - UnderlyingLegPutOrCall = 1343 - UnderlyingLegCFICode = 1344 - UnderlyingLegMaturityDate = 1345 - ApplReqID = 1346 - ApplReqType = 1347 - ApplResponseType = 1348 - ApplTotalMessageCount = 1349 - ApplLastSeqNum = 1350 - NoApplIDs = 1351 - ApplResendFlag = 1352 - ApplResponseID = 1353 - ApplResponseError = 1354 - RefApplID = 1355 - ApplReportID = 1356 - RefApplLastSeqNum = 1357 - LegPutOrCall = 1358 - EncodedSymbolLen = 1359 - EncodedSymbol = 1360 - TotNoFills = 1361 - NoFills = 1362 - FillExecID = 1363 - FillPx = 1364 - FillQty = 1365 - LegAllocID = 1366 - LegAllocSettlCurrency = 1367 - TradSesEvent = 1368 - MassActionReportID = 1369 - NoNotAffectedOrders = 1370 - NotAffectedOrderID = 1371 - NotAffOrigClOrdID = 1372 - MassActionType = 1373 - MassActionScope = 1374 - MassActionResponse = 1375 - MassActionRejectReason = 1376 - MultilegModel = 1377 - MultilegPriceMethod = 1378 - LegVolatility = 1379 - DividendYield = 1380 - LegDividendYield = 1381 - CurrencyRatio = 1382 - LegCurrencyRatio = 1383 - LegExecInst = 1384 - ContingencyType = 1385 - ListRejectReason = 1386 - NoTrdRepIndicators = 1387 - TrdRepPartyRole = 1388 - TrdRepIndicator = 1389 - TradePublishIndicator = 1390 - UnderlyingLegOptAttribute = 1391 - UnderlyingLegSecurityDesc = 1392 - MarketReqID = 1393 - MarketReportID = 1394 - MarketUpdateAction = 1395 - MarketSegmentDesc = 1396 - EncodedMktSegmDescLen = 1397 - EncodedMktSegmDesc = 1398 - ApplNewSeqNum = 1399 - EncryptedPasswordMethod = 1400 - EncryptedPasswordLen = 1401 - EncryptedPassword = 1402 - EncryptedNewPasswordLen = 1403 - EncryptedNewPassword = 1404 - UnderlyingLegMaturityTime = 1405 - RefApplExtID = 1406 - DefaultApplExtID = 1407 - DefaultCstmApplVerID = 1408 - SessionStatus = 1409 - DefaultVerIndicator = 1410 - Nested4PartySubIDType = 1411 - Nested4PartySubID = 1412 - NoNested4PartySubIDs = 1413 - NoNested4PartyIDs = 1414 - Nested4PartyID = 1415 - Nested4PartyIDSource = 1416 - Nested4PartyRole = 1417 - LegLastQty = 1418 - UnderlyingExerciseStyle = 1419 - LegExerciseStyle = 1420 - LegPriceUnitOfMeasure = 1421 - LegPriceUnitOfMeasureQty = 1422 - UnderlyingUnitOfMeasureQty = 1423 - UnderlyingPriceUnitOfMeasure = 1424 - UnderlyingPriceUnitOfMeasureQty = 1425 - ApplReportType = 1426 - SideExecID = 1427 - OrderDelay = 1428 - OrderDelayUnit = 1429 - VenueType = 1430 - RefOrdIDReason = 1431 - OrigCustOrderCapacity = 1432 - RefApplReqID = 1433 - ModelType = 1434 - ContractMultiplierUnit = 1435 - LegContractMultiplierUnit = 1436 - UnderlyingContractMultiplierUnit = 1437 - DerivativeContractMultiplierUnit = 1438 - FlowScheduleType = 1439 - LegFlowScheduleType = 1440 - UnderlyingFlowScheduleType = 1441 - DerivativeFlowScheduleType = 1442 - FillLiquidityInd = 1443 - SideLiquidityInd = 1444 - NoRateSources = 1445 - RateSource = 1446 - RateSourceType = 1447 - ReferencePage = 1448 - RestructuringType = 1449 - Seniority = 1450 - NotionalPercentageOutstanding = 1451 - OriginalNotionalPercentageOutstanding = 1452 - UnderlyingRestructuringType = 1453 - UnderlyingSeniority = 1454 - UnderlyingNotionalPercentageOutstanding = 1455 - UnderlyingOriginalNotionalPercentageOutstanding = 1456 - AttachmentPoint = 1457 - DetachmentPoint = 1458 - UnderlyingAttachmentPoint = 1459 - UnderlyingDetachmentPoint = 1460 - NoTargetPartyIDs = 1461 - TargetPartyID = 1462 - TargetPartyIDSource = 1463 - TargetPartyRole = 1464 - SecurityListID = 1465 - SecurityListRefID = 1466 - SecurityListDesc = 1467 - EncodedSecurityListDescLen = 1468 - EncodedSecurityListDesc = 1469 - SecurityListType = 1470 - SecurityListTypeSource = 1471 - NewsID = 1472 - NewsCategory = 1473 - LanguageCode = 1474 - NoNewsRefIDs = 1475 - NewsRefID = 1476 - NewsRefType = 1477 - StrikePriceDeterminationMethod = 1478 - StrikePriceBoundaryMethod = 1479 - StrikePriceBoundaryPrecision = 1480 - UnderlyingPriceDeterminationMethod = 1481 - OptPayoutType = 1482 - NoComplexEvents = 1483 - ComplexEventType = 1484 - ComplexOptPayoutAmount = 1485 - ComplexEventPrice = 1486 - ComplexEventPriceBoundaryMethod = 1487 - ComplexEventPriceBoundaryPrecision = 1488 - ComplexEventPriceTimeType = 1489 - ComplexEventCondition = 1490 - NoComplexEventDates = 1491 - ComplexEventStartDate = 1492 - ComplexEventEndDate = 1493 - NoComplexEventTimes = 1494 - ComplexEventStartTime = 1495 - ComplexEventEndTime = 1496 - StreamAsgnReqID = 1497 - StreamAsgnReqType = 1498 - NoAsgnReqs = 1499 - MDStreamID = 1500 - StreamAsgnRptID = 1501 - StreamAsgnRejReason = 1502 - StreamAsgnAckType = 1503 - RelSymTransactTime = 1504 - PartyDetailsListRequestID = 1505 - NoPartyListResponseTypes = 1506 - PartyListResponseType = 1507 - NoRequestedPartyRoles = 1508 - RequestedPartyRole = 1509 - PartyDetailsListReportID = 1510 - PartyDetailsRequestResult = 1511 - TotNoPartyList = 1512 - NoPartyList = 1513 - NoPartyRelationships = 1514 - PartyRelationship = 1515 - NoPartyAltIDs = 1516 - PartyAltID = 1517 - PartyAltIDSource = 1518 - NoPartyAltSubIDs = 1519 - PartyAltSubID = 1520 - PartyAltSubIDType = 1521 - NoContextPartyIDs = 1522 - ContextPartyID = 1523 - ContextPartyIDSource = 1524 - ContextPartyRole = 1525 - NoContextPartySubIDs = 1526 - ContextPartySubID = 1527 - ContextPartySubIDType = 1528 - NoRiskLimits = 1529 - RiskLimitType = 1530 - RiskLimitAmount = 1531 - RiskLimitCurrency = 1532 - RiskLimitPlatform = 1533 - NoRiskInstruments = 1534 - RiskInstrumentOperator = 1535 - RiskSymbol = 1536 - RiskSymbolSfx = 1537 - RiskSecurityID = 1538 - RiskSecurityIDSource = 1539 - NoRiskSecurityAltID = 1540 - RiskSecurityAltID = 1541 - RiskSecurityAltIDSource = 1542 - RiskProduct = 1543 - RiskProductComplex = 1544 - RiskSecurityGroup = 1545 - RiskCFICode = 1546 - RiskSecurityType = 1547 - RiskSecuritySubType = 1548 - RiskMaturityMonthYear = 1549 - RiskMaturityTime = 1550 - RiskRestructuringType = 1551 - RiskSeniority = 1552 - RiskPutOrCall = 1553 - RiskFlexibleIndicator = 1554 - RiskCouponRate = 1555 - RiskSecurityDesc = 1556 - RiskInstrumentSettlType = 1557 - RiskInstrumentMultiplier = 1558 - NoRiskWarningLevels = 1559 - RiskWarningLevelPercent = 1560 - RiskWarningLevelName = 1561 - NoRelatedPartyIDs = 1562 - RelatedPartyID = 1563 - RelatedPartyIDSource = 1564 - RelatedPartyRole = 1565 - NoRelatedPartySubIDs = 1566 - RelatedPartySubID = 1567 - RelatedPartySubIDType = 1568 - NoRelatedPartyAltIDs = 1569 - RelatedPartyAltID = 1570 - RelatedPartyAltIDSource = 1571 - NoRelatedPartyAltSubIDs = 1572 - RelatedPartyAltSubID = 1573 - RelatedPartyAltSubIDType = 1574 - NoRelatedContextPartyIDs = 1575 - RelatedContextPartyID = 1576 - RelatedContextPartyIDSource = 1577 - RelatedContextPartyRole = 1578 - NoRelatedContextPartySubIDs = 1579 - RelatedContextPartySubID = 1580 - RelatedContextPartySubIDType = 1581 - NoRelationshipRiskLimits = 1582 - RelationshipRiskLimitType = 1583 - RelationshipRiskLimitAmount = 1584 - RelationshipRiskLimitCurrency = 1585 - RelationshipRiskLimitPlatform = 1586 - NoRelationshipRiskInstruments = 1587 - RelationshipRiskInstrumentOperator = 1588 - RelationshipRiskSymbol = 1589 - RelationshipRiskSymbolSfx = 1590 - RelationshipRiskSecurityID = 1591 - RelationshipRiskSecurityIDSource = 1592 - NoRelationshipRiskSecurityAltID = 1593 - RelationshipRiskSecurityAltID = 1594 - RelationshipRiskSecurityAltIDSource = 1595 - RelationshipRiskProduct = 1596 - RelationshipRiskProductComplex = 1597 - RelationshipRiskSecurityGroup = 1598 - RelationshipRiskCFICode = 1599 - RelationshipRiskSecurityType = 1600 - RelationshipRiskSecuritySubType = 1601 - RelationshipRiskMaturityMonthYear = 1602 - RelationshipRiskMaturityTime = 1603 - RelationshipRiskRestructuringType = 1604 - RelationshipRiskSeniority = 1605 - RelationshipRiskPutOrCall = 1606 - RelationshipRiskFlexibleIndicator = 1607 - RelationshipRiskCouponRate = 1608 - RelationshipRiskSecurityExchange = 1609 - RelationshipRiskSecurityDesc = 1610 - RelationshipRiskInstrumentSettlType = 1611 - RelationshipRiskInstrumentMultiplier = 1612 - NoRelationshipRiskWarningLevels = 1613 - RelationshipRiskWarningLevelPercent = 1614 - RelationshipRiskWarningLevelName = 1615 - RiskSecurityExchange = 1616 - StreamAsgnType = 1617 - RelationshipRiskEncodedSecurityDescLen = 1618 - RelationshipRiskEncodedSecurityDesc = 1619 - RiskEncodedSecurityDescLen = 1620 - RiskEncodedSecurityDesc = 1621 diff --git a/wtfix/protocol/spec.py b/wtfix/protocol/spec.py index c42b7b5..b755d4b 100644 --- a/wtfix/protocol/spec.py +++ b/wtfix/protocol/spec.py @@ -79,18 +79,31 @@ class AttributeValueMappingsMixin: message types used in the FIX specification more easily. """ + @classmethod + @lru_cache(maxsize=2) + def get_attributes(cls): + """ + Get all attributes of this class, including attributes of parent classes. + :return: A dictionary of attribute names and their values. + """ + return { + name: value + for name, value in inspect.getmembers( + cls, lambda a: not (inspect.isroutine(a)) + ) + } + @classmethod @lru_cache(maxsize=2) def get_attribute_value_mappings(cls): """ Create a reverse mapping of all of the attributes that have been defined in this class. """ - attributes = inspect.getmembers(cls, lambda a: not (inspect.isroutine(a))) # Skip attributes that start with an underscore mappings = { - attribute[1]: attribute[0] - for attribute in attributes - if attribute[0][0] != "_" + value: name + for name, value in cls.get_attributes().items() + if name[0] != "_" } # Won't be able to look up names reliably if duplicate attribute values exist @@ -117,4 +130,4 @@ def get_value(cls, name): :param name: a type name :return: the value associated with the type name. """ - return cls.__dict__[name] + return cls.get_attributes()[name] diff --git a/wtfix/tests/test_pipeline.py b/wtfix/tests/test_pipeline.py index ce6893e..2fac694 100644 --- a/wtfix/tests/test_pipeline.py +++ b/wtfix/tests/test_pipeline.py @@ -1,4 +1,5 @@ import asyncio +from collections import OrderedDict from unittest.mock import MagicMock import pytest @@ -45,6 +46,9 @@ def test_prep_processing_pipeline_inbound_order(self, three_level_app_chain): connection_name=conn.name, installed_apps=three_level_app_chain ) + # Simulate all apps active + pipeline._active_apps = OrderedDict(pipeline.apps.items()) + func, app_chain = pipeline._setup_message_handling( pipeline.INBOUND_PROCESSING ) @@ -59,6 +63,9 @@ def test_pre_processing_pipeline_outbound_order(self, three_level_app_chain): connection_name=conn.name, installed_apps=three_level_app_chain ) + # Simulate all apps active + pipeline._active_apps = OrderedDict(pipeline.apps.items()) + func, app_chain = pipeline._setup_message_handling( pipeline.OUTBOUND_PROCESSING ) @@ -201,6 +208,9 @@ async def test_stop_stops_apps_in_top_down_order(self, three_level_app_chain): pipeline._installed_apps[key] = app_mock + # Simulate all apps active + pipeline._active_apps = OrderedDict(pipeline.apps.items()) + await pipeline.stop() for app in pipeline.apps.values(): @@ -228,6 +238,9 @@ async def test_stop_allows_only_one_stop_process_to_run_concurrently( pipeline._installed_apps[key] = app_mock + # Simulate all apps active + pipeline._active_apps = OrderedDict(pipeline.apps.items()) + asyncio.create_task(pipeline.stop()) asyncio.create_task(pipeline.stop()) asyncio.create_task(pipeline.stop()) @@ -253,6 +266,9 @@ async def test_stop_no_op_if_already_stopped(self, three_level_app_chain): pipeline._installed_apps[key] = app_mock + # Simulate all apps active + pipeline._active_apps = OrderedDict(pipeline.apps.items()) + await pipeline.stop() await pipeline.stop() @@ -267,6 +283,9 @@ async def test_receive(self, three_level_app_chain): connection_name=conn.name, installed_apps=three_level_app_chain ) + # Simulate all apps active + pipeline._active_apps = OrderedDict(pipeline.apps.items()) + message = await pipeline.receive(admin.TestRequestMessage("Test")) assert message.TestReqID == "Test r1 r2 r3" @@ -277,6 +296,9 @@ async def test_receive_stop(self, three_level_stop_app_chain): connection_name=conn.name, installed_apps=three_level_stop_app_chain ) + # Simulate all apps active + pipeline._active_apps = OrderedDict(pipeline.apps.items()) + message = await pipeline.receive(admin.TestRequestMessage("Test")) assert message.TestReqID == "Test r1" @@ -287,6 +309,9 @@ async def test_send(self, three_level_app_chain): connection_name=conn.name, installed_apps=three_level_app_chain ) + # Simulate all apps active + pipeline._active_apps = OrderedDict(pipeline.apps.items()) + message = await pipeline.send(admin.TestRequestMessage("Test")) assert message.TestReqID == "Test s3 s2 s1" @@ -297,5 +322,8 @@ async def test_send_stop(self, three_level_stop_app_chain): connection_name=conn.name, installed_apps=three_level_stop_app_chain ) + # Simulate all apps active + pipeline._active_apps = OrderedDict(pipeline.apps.items()) + message = await pipeline.send(admin.TestRequestMessage("Test")) assert message.TestReqID == "Test s3"