From 9a26a535b164bcee75e4ffa14b22df1a8f45462e Mon Sep 17 00:00:00 2001 From: Letian Yi Date: Mon, 12 Apr 2021 19:02:01 +0800 Subject: [PATCH] Optimize performance --- .gitignore | 1 + acceptor.go | 4 ++-- config/configuration.go | 3 +++ in_session.go | 2 ++ initiator.go | 4 ++-- internal/session_settings.go | 3 +++ mongostore.go | 1 + session.go | 14 +++++++++++++- session_factory.go | 27 +++++++++++++++++++++++++++ 9 files changed, 54 insertions(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index ff2f9b241..4393f2a4b 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ _test/echo_server _test/tmp _vendor* gen +.idea diff --git a/acceptor.go b/acceptor.go index 879cdb17f..89e8daeda 100644 --- a/acceptor.go +++ b/acceptor.go @@ -339,8 +339,8 @@ func (a *Acceptor) handleConnection(netConn net.Conn) { } a.sessionAddr.Store(sessID, netConn.RemoteAddr()) - msgIn := make(chan fixIn) - msgOut := make(chan []byte) + msgIn := make(chan fixIn, session.ReceiveQueueLength) + msgOut := make(chan []byte, session.SendQueueLength) if err := session.connect(msgIn, msgOut); err != nil { a.globalLog.OnEventf("Unable to accept session %v connection: %v", sessID, err.Error()) diff --git a/config/configuration.go b/config/configuration.go index 1d70f75c7..762d6eec1 100644 --- a/config/configuration.go +++ b/config/configuration.go @@ -67,4 +67,7 @@ const ( RejectInvalidMessage string = "RejectInvalidMessage" DynamicSessions string = "DynamicSessions" DynamicQualifier string = "DynamicQualifier" + SendBufferSize string = "SendBufferSize" + SendQueueLength string = "SendQueueLength" + ReceiveQueueLength string = "ReceiveQueueLength" ) diff --git a/in_session.go b/in_session.go index 50666a703..2b83ae3a4 100644 --- a/in_session.go +++ b/in_session.go @@ -255,6 +255,7 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int session.log.OnEventf("Resending Message: %v", sentMessageSeqNum) msgBytes = msg.build() session.EnqueueBytesAndSend(msgBytes) + session.log.OnOutgoing(msgBytes) seqNum = sentMessageSeqNum + 1 nextSeqNum = seqNum @@ -396,6 +397,7 @@ func (state *inSession) generateSequenceReset(session *session, beginSeqNo int, msgBytes := sequenceReset.build() session.EnqueueBytesAndSend(msgBytes) + session.log.OnOutgoing(msgBytes) session.log.OnEventf("Sent SequenceReset TO: %v", endSeqNo) return diff --git a/initiator.go b/initiator.go index a3d64afc3..4098cd5e1 100644 --- a/initiator.go +++ b/initiator.go @@ -184,8 +184,8 @@ func (i *Initiator) handleConnection(session *session, tlsConfig *tls.Config, di netConn = tlsConn } - msgIn = make(chan fixIn) - msgOut = make(chan []byte) + msgIn = make(chan fixIn, session.ReceiveQueueLength) + msgOut = make(chan []byte, session.SendQueueLength) if err := session.connect(msgIn, msgOut); err != nil { session.log.OnEventf("Failed to initiate: %v", err) goto reconnect diff --git a/internal/session_settings.go b/internal/session_settings.go index e003f54e4..f12adb87f 100644 --- a/internal/session_settings.go +++ b/internal/session_settings.go @@ -17,6 +17,9 @@ type SessionSettings struct { SkipCheckLatency bool MaxLatency time.Duration DisableMessagePersist bool + SendBufferSize int + SendQueueLength int + ReceiveQueueLength int // Required on logon for FIX.T.1 messages. DefaultApplVerID string diff --git a/mongostore.go b/mongostore.go index 7c61ef29f..ac6a61a1e 100644 --- a/mongostore.go +++ b/mongostore.go @@ -352,6 +352,7 @@ func (store *mongoStore) GetMessages(beginSeqNum, endSeqNum int) (msgs [][]byte, for cursor.Next(context.Background()) { if err = cursor.Decode(&msgFilter); err != nil { + _ = cursor.Close(context.Background()) return } msgs = append(msgs, msgFilter.Message) diff --git a/session.go b/session.go index b359245e2..ae978a2e9 100644 --- a/session.go +++ b/session.go @@ -347,8 +347,20 @@ func (s *session) persist(seqNum int, msgBytes []byte) error { } func (s *session) sendQueued() { + var queueBuffer = make([]byte, 0, s.SendBufferSize) for _, msgBytes := range s.toSend { - s.sendBytes(msgBytes) + s.log.OnOutgoing(msgBytes) + if len(queueBuffer)+len(msgBytes) < s.SendBufferSize { + queueBuffer = append(queueBuffer, msgBytes...) + } else { + s.sendBytes(queueBuffer) + queueBuffer = make([]byte, 0, s.SendBufferSize) + queueBuffer = append(queueBuffer, msgBytes...) + } + } + + if len(queueBuffer) > 0 { + s.sendBytes(queueBuffer) } s.dropQueued() diff --git a/session_factory.go b/session_factory.go index 3a6be68a5..2205e2fb7 100644 --- a/session_factory.go +++ b/session_factory.go @@ -322,6 +322,33 @@ func (f sessionFactory) newSession( s.DisableMessagePersist = !persistMessages } + s.SendBufferSize = 1000 + if settings.HasSetting(config.SendBufferSize) { + var sendBufferSize int + if sendBufferSize, err = settings.IntSetting(config.SendBufferSize); err != nil { + return + } + s.SendBufferSize = sendBufferSize + } + + s.SendQueueLength = 1 + if settings.HasSetting(config.SendQueueLength) { + var sendQueueLength int + if sendQueueLength, err = settings.IntSetting(config.SendQueueLength); err != nil { + return + } + s.SendQueueLength = sendQueueLength + } + + s.SendQueueLength = 1 + if settings.HasSetting(config.ReceiveQueueLength) { + var receiveQueueLength int + if receiveQueueLength, err = settings.IntSetting(config.ReceiveQueueLength); err != nil { + return + } + s.ReceiveQueueLength = receiveQueueLength + } + if f.BuildInitiators { if err = f.buildInitiatorSettings(s, settings); err != nil { return