Skip to content

Commit

Permalink
Merge pull request #680 from michaelwilner/file-store-uses-files-excl…
Browse files Browse the repository at this point in the history
…usively
  • Loading branch information
ackleymi authored Nov 25, 2024
2 parents c7392e8 + 00263ae commit 5185ff8
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 52 deletions.
19 changes: 15 additions & 4 deletions internal/testsuite/store_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package testsuite

import (
"sort"
"time"

"github.com/quickfixgo/quickfix"
Expand Down Expand Up @@ -106,8 +107,13 @@ func (s *StoreTestSuite) TestMessageStoreSaveMessageGetMessage() {
2: "they were forced to eat Robin's minstrels",
3: "and there was much rejoicing",
}
for seqNum, msg := range expectedMsgsBySeqNum {
s.Require().Nil(s.MsgStore.SaveMessage(seqNum, []byte(msg)))
var seqNums []int
for seqNum := range expectedMsgsBySeqNum {
seqNums = append(seqNums, seqNum)
}
sort.Ints(seqNums)
for _, seqNum := range seqNums {
s.Require().Nil(s.MsgStore.SaveMessage(seqNum, []byte(expectedMsgsBySeqNum[seqNum])))
}

// When the messages are retrieved from the MessageStore
Expand Down Expand Up @@ -141,8 +147,13 @@ func (s *StoreTestSuite) TestMessageStoreSaveMessageAndIncrementGetMessage() {
2: "they were forced to eat Robin's minstrels",
3: "and there was much rejoicing",
}
for seqNum, msg := range expectedMsgsBySeqNum {
s.Require().Nil(s.MsgStore.SaveMessageAndIncrNextSenderMsgSeqNum(seqNum, []byte(msg)))
var seqNums []int
for seqNum := range expectedMsgsBySeqNum {
seqNums = append(seqNums, seqNum)
}
sort.Ints(seqNums)
for _, seqNum := range seqNums {
s.Require().Nil(s.MsgStore.SaveMessageAndIncrNextSenderMsgSeqNum(seqNum, []byte(expectedMsgsBySeqNum[seqNum])))
}
s.Equal(423, s.MsgStore.NextSenderMsgSeqNum())

Expand Down
78 changes: 30 additions & 48 deletions store/file/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,20 @@ import (
"path"
"strconv"
"strings"
"sync"
"time"

"github.com/pkg/errors"

"github.com/quickfixgo/quickfix"
"github.com/quickfixgo/quickfix/config"
)

type msgDef struct {
offset int64
size int
}

type fileStoreFactory struct {
settings *quickfix.Settings
}

type fileStore struct {
sessionID quickfix.SessionID
cache quickfix.MessageStore
offsets sync.Map
bodyFname string
headerFname string
sessionFname string
Expand Down Expand Up @@ -107,7 +99,6 @@ func newFileStore(sessionID quickfix.SessionID, dirname string, fileSync bool) (
store := &fileStore{
sessionID: sessionID,
cache: memStore,
offsets: sync.Map{},
bodyFname: path.Join(dirname, fmt.Sprintf("%s.%s", sessionPrefix, "body")),
headerFname: path.Join(dirname, fmt.Sprintf("%s.%s", sessionPrefix, "header")),
sessionFname: path.Join(dirname, fmt.Sprintf("%s.%s", sessionPrefix, "session")),
Expand Down Expand Up @@ -199,18 +190,6 @@ func (store *fileStore) Refresh() (err error) {
}

func (store *fileStore) populateCache() (creationTimePopulated bool, err error) {
if tmpHeaderFile, err := os.Open(store.headerFname); err == nil {
defer tmpHeaderFile.Close()
for {
var seqNum, size int
var offset int64
if cnt, err := fmt.Fscanf(tmpHeaderFile, "%d,%d,%d\n", &seqNum, &offset, &size); err != nil || cnt != 3 {
break
}
store.offsets.Store(seqNum, msgDef{offset: offset, size: size})
}
}

if timeBytes, err := os.ReadFile(store.sessionFname); err == nil {
var ctime time.Time
if err := ctime.UnmarshalText(timeBytes); err == nil {
Expand Down Expand Up @@ -348,7 +327,6 @@ func (store *fileStore) SaveMessage(seqNum int, msg []byte) error {
}
}

store.offsets.Store(seqNum, msgDef{offset: offset, size: len(msg)})
return nil
}

Expand All @@ -360,34 +338,38 @@ func (store *fileStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []
return store.IncrNextSenderMsgSeqNum()
}

func (store *fileStore) getMessage(seqNum int) (msg []byte, found bool, err error) {
msgInfoTemp, found := store.offsets.Load(seqNum)
if !found {
return
}
msgInfo, ok := msgInfoTemp.(msgDef)
if !ok {
return nil, true, fmt.Errorf("incorrect msgInfo type while reading file: %s", store.bodyFname)
}

msg = make([]byte, msgInfo.size)
if _, err = store.bodyFile.ReadAt(msg, msgInfo.offset); err != nil {
return nil, true, fmt.Errorf("unable to read from file: %s: %s", store.bodyFname, err.Error())
}

return msg, true, nil
}

func (store *fileStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error {
for seqNum := beginSeqNum; seqNum <= endSeqNum; seqNum++ {
m, found, err := store.getMessage(seqNum)
if err != nil {
return err
}
if found {
if err = cb(m); err != nil {
return err
// Sync files and seek to start of header file
if err := store.bodyFile.Sync(); err != nil {
return fmt.Errorf("unable to flush file: %s: %s", store.bodyFname, err.Error())
} else if err = store.headerFile.Sync(); err != nil {
return fmt.Errorf("unable to flush file: %s: %s", store.headerFname, err.Error())
} else if _, err = store.headerFile.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("unable to seek to start of file: %s: %s", store.headerFname, err.Error())
}

// Iterate over the header file
for {
var seqNum, size int
var offset int64
if cnt, err := fmt.Fscanf(store.headerFile, "%d,%d,%d\n", &seqNum, &offset, &size); err != nil {
if errors.Is(err, io.EOF) {
break
}
return fmt.Errorf("unable to read from file: %s: %s", store.headerFname, err.Error())
} else if cnt < 3 || seqNum > endSeqNum {
// If we have reached the end of possible iteration then break
break
} else if seqNum < beginSeqNum {
// If we have not yet reached the starting sequence number then continue
continue
}
// Otherwise process the file
msg := make([]byte, size)
if _, err := store.bodyFile.ReadAt(msg, offset); err != nil {
return fmt.Errorf("unable to read from file: %s: %s", store.bodyFname, err.Error())
} else if err = cb(msg); err != nil {
return err
}
}
return nil
Expand Down

0 comments on commit 5185ff8

Please sign in to comment.