Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace locking fieldmap with concurrent safe haxmap #685

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
*.swp
*.swo
.idea
*.iml
vendor
_test/test
_test/echo_server
Expand Down
193 changes: 38 additions & 155 deletions field_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ package quickfix

import (
"bytes"
"sort"
"sync"
"slices"
"time"

"github.com/alphadose/haxmap"
)

// field stores a slice of TagValues.
Expand All @@ -40,46 +41,33 @@ func writeField(f field, buffer *bytes.Buffer) {
}

// tagOrder true if tag i should occur before tag j.
type tagOrder func(i, j Tag) bool

type tagSort struct {
tags []Tag
compare tagOrder
}

func (t tagSort) Len() int { return len(t.tags) }
func (t tagSort) Swap(i, j int) { t.tags[i], t.tags[j] = t.tags[j], t.tags[i] }
func (t tagSort) Less(i, j int) bool { return t.compare(t.tags[i], t.tags[j]) }
type tagOrder func(i, j Tag) int

// FieldMap is a collection of fix fields that make up a fix message.
type FieldMap struct {
tagLookup map[Tag]field
tagSort
rwLock *sync.RWMutex
tagLookup *haxmap.Map[Tag, field]
compare tagOrder
}

// ascending tags.
func normalFieldOrder(i, j Tag) bool { return i < j }
func normalFieldOrder(i, j Tag) int { return int(i - j) }

func (m *FieldMap) init() {
m.initWithOrdering(normalFieldOrder)
}

func (m *FieldMap) initWithOrdering(ordering tagOrder) {
m.rwLock = &sync.RWMutex{}
m.tagLookup = make(map[Tag]field)
m.tagLookup = haxmap.New[Tag, field]()
m.compare = ordering
}

// Tags returns all of the Field Tags in this FieldMap.
func (m FieldMap) Tags() []Tag {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

tags := make([]Tag, 0, len(m.tagLookup))
for t := range m.tagLookup {
tags = append(tags, t)
}
var tags []Tag
m.tagLookup.ForEach(func(tag Tag, _ field) bool {
tags = append(tags, tag)
return true
})

return tags
}
Expand All @@ -91,33 +79,13 @@ func (m FieldMap) Get(parser Field) MessageRejectError {

// Has returns true if the Tag is present in this FieldMap.
func (m FieldMap) Has(tag Tag) bool {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

_, ok := m.tagLookup[tag]
_, ok := m.tagLookup.Get(tag)
return ok
}

// GetField parses of a field with Tag tag. Returned reject may indicate the field is not present, or the field value is invalid.
func (m FieldMap) GetField(tag Tag, parser FieldValueReader) MessageRejectError {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

f, ok := m.tagLookup[tag]
if !ok {
return ConditionallyRequiredFieldMissing(tag)
}

if err := parser.Read(f[0].value); err != nil {
return IncorrectDataFormatForValue(tag)
}

return nil
}

// GetField parses of a field with Tag tag. Returned reject may indicate the field is not present, or the field value is invalid.
func (m FieldMap) getFieldNoLock(tag Tag, parser FieldValueReader) MessageRejectError {
f, ok := m.tagLookup[tag]
f, ok := m.tagLookup.Get(tag)
if !ok {
return ConditionallyRequiredFieldMissing(tag)
}
Expand All @@ -131,20 +99,7 @@ func (m FieldMap) getFieldNoLock(tag Tag, parser FieldValueReader) MessageReject

// GetBytes is a zero-copy GetField wrapper for []bytes fields.
func (m FieldMap) GetBytes(tag Tag) ([]byte, MessageRejectError) {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

f, ok := m.tagLookup[tag]
if !ok {
return nil, ConditionallyRequiredFieldMissing(tag)
}

return f[0].value, nil
}

// getBytesNoLock is a lock free zero-copy GetField wrapper for []bytes fields.
func (m FieldMap) getBytesNoLock(tag Tag) ([]byte, MessageRejectError) {
f, ok := m.tagLookup[tag]
f, ok := m.tagLookup.Get(tag)
if !ok {
return nil, ConditionallyRequiredFieldMissing(tag)
}
Expand Down Expand Up @@ -176,26 +131,8 @@ func (m FieldMap) GetInt(tag Tag) (int, MessageRejectError) {
return int(val), err
}

// GetInt is a lock free GetField wrapper for int fields.
func (m FieldMap) getIntNoLock(tag Tag) (int, MessageRejectError) {
bytes, err := m.getBytesNoLock(tag)
if err != nil {
return 0, err
}

var val FIXInt
if val.Read(bytes) != nil {
err = IncorrectDataFormatForValue(tag)
}

return int(val), err
}

// GetTime is a GetField wrapper for utc timestamp fields.
func (m FieldMap) GetTime(tag Tag) (t time.Time, err MessageRejectError) {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

bytes, err := m.GetBytes(tag)
if err != nil {
return
Expand All @@ -218,21 +155,9 @@ func (m FieldMap) GetString(tag Tag) (string, MessageRejectError) {
return string(val), nil
}

// GetString is a GetField wrapper for string fields.
func (m FieldMap) getStringNoLock(tag Tag) (string, MessageRejectError) {
var val FIXString
if err := m.getFieldNoLock(tag, &val); err != nil {
return "", err
}
return string(val), nil
}

// GetGroup is a Get function specific to Group Fields.
func (m FieldMap) GetGroup(parser FieldGroupReader) MessageRejectError {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

f, ok := m.tagLookup[parser.Tag()]
f, ok := m.tagLookup.Get(parser.Tag())
if !ok {
return ConditionallyRequiredFieldMissing(parser.Tag())
}
Expand Down Expand Up @@ -277,67 +202,38 @@ func (m *FieldMap) SetString(tag Tag, value string) *FieldMap {

// Remove removes a tag from field map.
func (m *FieldMap) Remove(tag Tag) {
m.rwLock.Lock()
defer m.rwLock.Unlock()

delete(m.tagLookup, tag)
m.tagLookup.Del(tag)
}

// Clear purges all fields from field map.
func (m *FieldMap) Clear() {
m.rwLock.Lock()
defer m.rwLock.Unlock()

m.tags = m.tags[0:0]
for k := range m.tagLookup {
delete(m.tagLookup, k)
}
}

func (m *FieldMap) clearNoLock() {
m.tags = m.tags[0:0]
for k := range m.tagLookup {
delete(m.tagLookup, k)
}
m.tagLookup.Clear()
}

// CopyInto overwrites the given FieldMap with this one.
func (m *FieldMap) CopyInto(to *FieldMap) {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

to.tagLookup = make(map[Tag]field)
for tag, f := range m.tagLookup {
to.tagLookup = haxmap.New[Tag, field]()
m.tagLookup.ForEach(func(tag Tag, f field) bool {
clone := make(field, 1)
clone[0] = f[0]
to.tagLookup[tag] = clone
}
to.tags = make([]Tag, len(m.tags))
copy(to.tags, m.tags)
to.tagLookup.Set(tag, clone)
return true
})
to.compare = m.compare
}

func (m *FieldMap) add(f field) {
t := fieldTag(f)
if _, ok := m.tagLookup[t]; !ok {
m.tags = append(m.tags, t)
}

m.tagLookup[t] = f
m.tagLookup.Set(fieldTag(f), f)
}

func (m *FieldMap) getOrCreate(tag Tag) field {
m.rwLock.Lock()
defer m.rwLock.Unlock()

if f, ok := m.tagLookup[tag]; ok {
if f, ok := m.tagLookup.Get(tag); ok {
f = f[:1]
return f
}

f := make(field, 1)
m.tagLookup[tag] = f
m.tags = append(m.tags, tag)
m.tagLookup.Set(tag, f)
return f
}

Expand All @@ -350,65 +246,52 @@ func (m *FieldMap) Set(field FieldWriter) *FieldMap {

// SetGroup is a setter specific to group fields.
func (m *FieldMap) SetGroup(field FieldGroupWriter) *FieldMap {
m.rwLock.Lock()
defer m.rwLock.Unlock()

_, ok := m.tagLookup[field.Tag()]
if !ok {
m.tags = append(m.tags, field.Tag())
}
m.tagLookup[field.Tag()] = field.Write()
m.tagLookup.Set(field.Tag(), field.Write())
return m
}

func (m *FieldMap) sortedTags() []Tag {
sort.Sort(m)
return m.tags
tags := m.Tags()
slices.SortFunc(tags, m.compare)
return tags
}

func (m FieldMap) write(buffer *bytes.Buffer) {
m.rwLock.Lock()
defer m.rwLock.Unlock()

for _, tag := range m.sortedTags() {
if f, ok := m.tagLookup[tag]; ok {
if f, ok := m.tagLookup.Get(tag); ok {
writeField(f, buffer)
}
}
}

func (m FieldMap) total() int {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

total := 0
for _, fields := range m.tagLookup {
m.tagLookup.ForEach(func(_ Tag, fields field) bool {
for _, tv := range fields {
switch tv.tag {
case tagCheckSum: // Tag does not contribute to total.
default:
total += tv.total()
}
}
}
return true
})

return total
}

func (m FieldMap) length() int {
m.rwLock.RLock()
defer m.rwLock.RUnlock()

length := 0
for _, fields := range m.tagLookup {
m.tagLookup.ForEach(func(_ Tag, fields field) bool {
for _, tv := range fields {
switch tv.tag {
case tagBeginString, tagBodyLength, tagCheckSum: // Tags do not contribute to length.
default:
length += tv.length()
}
}
}
return true
})

return length
}
20 changes: 11 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,32 @@ module github.com/quickfixgo/quickfix
go 1.21

require (
github.com/alphadose/haxmap v1.4.1
github.com/mattn/go-sqlite3 v1.14.22
github.com/pires/go-proxyproto v0.7.0
github.com/pkg/errors v0.9.1
github.com/shopspring/decimal v1.4.0
github.com/stretchr/testify v1.8.4
go.mongodb.org/mongo-driver v1.15.0
golang.org/x/net v0.24.0
github.com/stretchr/testify v1.10.0
go.mongodb.org/mongo-driver v1.17.1
golang.org/x/net v0.33.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/klauspost/compress v1.15.12 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/montanaflynn/stats v0.6.6 // indirect
github.com/montanaflynn/stats v0.7.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/text v0.14.0 // indirect
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/exp v0.0.0-20221031165847-c99f073a8326 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/text v0.21.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading
Loading