Skip to content

Commit

Permalink
Optimize performance
Browse files Browse the repository at this point in the history
  • Loading branch information
letian0805 authored and Letian Yi committed May 16, 2023
1 parent 186598c commit 9a26a53
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 5 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ _test/echo_server
_test/tmp
_vendor*
gen
.idea
4 changes: 2 additions & 2 deletions acceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,8 @@ func (a *Acceptor) handleConnection(netConn net.Conn) {
}

a.sessionAddr.Store(sessID, netConn.RemoteAddr())
msgIn := make(chan fixIn)
msgOut := make(chan []byte)
msgIn := make(chan fixIn, session.ReceiveQueueLength)
msgOut := make(chan []byte, session.SendQueueLength)

if err := session.connect(msgIn, msgOut); err != nil {
a.globalLog.OnEventf("Unable to accept session %v connection: %v", sessID, err.Error())
Expand Down
3 changes: 3 additions & 0 deletions config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,7 @@ const (
RejectInvalidMessage string = "RejectInvalidMessage"
DynamicSessions string = "DynamicSessions"
DynamicQualifier string = "DynamicQualifier"
SendBufferSize string = "SendBufferSize"
SendQueueLength string = "SendQueueLength"
ReceiveQueueLength string = "ReceiveQueueLength"
)
2 changes: 2 additions & 0 deletions in_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int
session.log.OnEventf("Resending Message: %v", sentMessageSeqNum)
msgBytes = msg.build()
session.EnqueueBytesAndSend(msgBytes)
session.log.OnOutgoing(msgBytes)

seqNum = sentMessageSeqNum + 1
nextSeqNum = seqNum
Expand Down Expand Up @@ -396,6 +397,7 @@ func (state *inSession) generateSequenceReset(session *session, beginSeqNo int,
msgBytes := sequenceReset.build()

session.EnqueueBytesAndSend(msgBytes)
session.log.OnOutgoing(msgBytes)
session.log.OnEventf("Sent SequenceReset TO: %v", endSeqNo)

return
Expand Down
4 changes: 2 additions & 2 deletions initiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ func (i *Initiator) handleConnection(session *session, tlsConfig *tls.Config, di
netConn = tlsConn
}

msgIn = make(chan fixIn)
msgOut = make(chan []byte)
msgIn = make(chan fixIn, session.ReceiveQueueLength)
msgOut = make(chan []byte, session.SendQueueLength)
if err := session.connect(msgIn, msgOut); err != nil {
session.log.OnEventf("Failed to initiate: %v", err)
goto reconnect
Expand Down
3 changes: 3 additions & 0 deletions internal/session_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ type SessionSettings struct {
SkipCheckLatency bool
MaxLatency time.Duration
DisableMessagePersist bool
SendBufferSize int
SendQueueLength int
ReceiveQueueLength int

// Required on logon for FIX.T.1 messages.
DefaultApplVerID string
Expand Down
1 change: 1 addition & 0 deletions mongostore.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ func (store *mongoStore) GetMessages(beginSeqNum, endSeqNum int) (msgs [][]byte,

for cursor.Next(context.Background()) {
if err = cursor.Decode(&msgFilter); err != nil {
_ = cursor.Close(context.Background())
return
}
msgs = append(msgs, msgFilter.Message)
Expand Down
14 changes: 13 additions & 1 deletion session.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,20 @@ func (s *session) persist(seqNum int, msgBytes []byte) error {
}

func (s *session) sendQueued() {
var queueBuffer = make([]byte, 0, s.SendBufferSize)
for _, msgBytes := range s.toSend {
s.sendBytes(msgBytes)
s.log.OnOutgoing(msgBytes)
if len(queueBuffer)+len(msgBytes) < s.SendBufferSize {
queueBuffer = append(queueBuffer, msgBytes...)
} else {
s.sendBytes(queueBuffer)
queueBuffer = make([]byte, 0, s.SendBufferSize)
queueBuffer = append(queueBuffer, msgBytes...)
}
}

if len(queueBuffer) > 0 {
s.sendBytes(queueBuffer)
}

s.dropQueued()
Expand Down
27 changes: 27 additions & 0 deletions session_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,33 @@ func (f sessionFactory) newSession(
s.DisableMessagePersist = !persistMessages
}

s.SendBufferSize = 1000
if settings.HasSetting(config.SendBufferSize) {
var sendBufferSize int
if sendBufferSize, err = settings.IntSetting(config.SendBufferSize); err != nil {
return
}
s.SendBufferSize = sendBufferSize
}

s.SendQueueLength = 1
if settings.HasSetting(config.SendQueueLength) {
var sendQueueLength int
if sendQueueLength, err = settings.IntSetting(config.SendQueueLength); err != nil {
return
}
s.SendQueueLength = sendQueueLength
}

s.SendQueueLength = 1
if settings.HasSetting(config.ReceiveQueueLength) {
var receiveQueueLength int
if receiveQueueLength, err = settings.IntSetting(config.ReceiveQueueLength); err != nil {
return
}
s.ReceiveQueueLength = receiveQueueLength
}

if f.BuildInitiators {
if err = f.buildInitiatorSettings(s, settings); err != nil {
return
Expand Down

0 comments on commit 9a26a53

Please sign in to comment.