diff --git a/_test/test-server/main.go b/_test/test-server/main.go index 470cd7c22..d3e680d9b 100644 --- a/_test/test-server/main.go +++ b/_test/test-server/main.go @@ -17,6 +17,7 @@ import ( "github.com/quickfixgo/quickfix/config" field "github.com/quickfixgo/quickfix/gen/field" tag "github.com/quickfixgo/quickfix/gen/tag" + filelog "github.com/quickfixgo/quickfix/log/file" "github.com/quickfixgo/quickfix/store/file" "github.com/quickfixgo/quickfix/store/mongo" ) @@ -132,7 +133,7 @@ func main() { return } - fileLogFactory, err := quickfix.NewFileLogFactory(appSettings) + fileLogFactory, err := filelog.NewLogFactory(appSettings) if err != nil { fmt.Println("Error creating file log factory:", err) return diff --git a/accepter_test.go b/acceptor_test.go similarity index 98% rename from accepter_test.go rename to acceptor_test.go index 5cf888a8c..059c42715 100644 --- a/accepter_test.go +++ b/acceptor_test.go @@ -98,7 +98,7 @@ func TestAcceptor_SetTLSConfig(t *testing.T) { _, err := genericSettings.AddSession(sessionSettings) require.NoError(t, err) - logger, err := NewScreenLogFactory().Create() + logger, err := NewNullLogFactory().Create() require.NoError(t, err) acceptor := &Acceptor{settings: genericSettings, globalLog: logger} defer acceptor.Stop() diff --git a/config/configuration.go b/config/configuration.go index f4f6fed97..a0223f9fc 100644 --- a/config/configuration.go +++ b/config/configuration.go @@ -849,6 +849,100 @@ const ( // Valid Values: // - A valid path FileLogPath string = "FileLogPath" + + // SQLLogDriver sets the name of the database driver to use for application logs (see https://go.dev/wiki/SQLDrivers for the list of available drivers). + // SQLLogDriver is only relevant if also using sql.NewLogFactory(..) in code + // when creating your LogFactory for your initiator or acceptor. + // + // Required: Only if using a sql db as your Log + // + // Default: N/A + // + // Valid Values: + // - See https://go.dev/wiki/SQLDrivers + SQLLogDriver string = "SQLLogDriver" + + // SQLLogDataSourceName sets the driver-specific data source name of the database to use for application logs. + // This usually consists of at least a database name and connection information. + // SQLLogDataSourceName is only relevant if also using sql.NewLogFactory(..) in code + // when creating your LogFactory for your initiator or acceptor. + // + // See https://pkg.go.dev/database/sql#Open for more information. + // + // Required: Only if using a sql db as your Log. + // + // Default: N/A + // + // Valid Values: + // - A string correspondinng to a datasource + SQLLogDataSourceName string = "SQLLogDataSourceName" + + // SQLLogConnMaxLifetime sets the maximum duration of time that a database connection may be reused. + // See https://pkg.go.dev/database/sql#DB.SetConnMaxLifetime for more information. + // + // If your database server has a config option to close inactive connections after some duration (e.g. MySQL "wait_timeout"), + // set SQLLogConnMaxLifetime to a value less than that duration. + // + // Example Values: + // - SQLLogConnMaxLifetime=14400s # 14400 seconds + // - SQLLogConnMaxLifetime=2h45m # 2 hours and 45 minutes + // + // SQLLogConnMaxLifetime is only relevant if also using sql.NewLogFactory(..) in code + // when creating your LogFactory for your initiator or acceptor. + // + // Required: No + // + // Default: 0 (forever) + // + // Valid Values: + // - A valid go time.Duration + SQLLogConnMaxLifetime string = "SQLLogConnMaxLifetime" + + // MongoLogConnection sets the MongoDB connection URL to use for application logs. + // + // See https://pkg.go.dev/go.mongodb.org/mongo-driver/mongo#Connect for more information. + // + // MongoLogConnection is only relevant if also using mongo.NewLogFactory(..) in code + // when creating your LogFactory for your initiator or acceptor. + // + // Required: Only if using MongoDB as your Log. + // + // Default: N/A + // + // Valid Values: + // - A string representing a MongoDB connection + MongoLogConnection string = "MongoLogConnection" + + // MongoLogDatabase sets the MongoDB-specific name of the database to use for application logs. + // + // See https://pkg.go.dev/go.mongodb.org/mongo-driver/mongo#Connect for more information. + // + // MongoLogDatabase is only relevant if also using mongo.NewLogFactory(..) in code + // when creating your LogFactory for your initiator or acceptor. + // + // Required: Only if using MongoDB as your Log. + // + // Default: N/A + // + // Valid Values: + // - A string corresponding to a MongoDB database + MongoLogDatabase string = "MongoLogDatabase" + + // MongoLogReplicaSet sets the MongoDB replica set to use for application logs. + // This is optional. + // + // See https://pkg.go.dev/go.mongodb.org/mongo-driver/mongo#Connect for more information. + // + // MongoLogReplicaSet is only relevant if also using mongo.NewLogFactory(..) in code + // when creating your LogFactory for your initiator or acceptor. + // + // Required: No + // + // Default: N/A + // + // Valid Values: + // - A string corresponding to a MongoDB replica set + MongoLogReplicaSet string = "MongoLogReplicaSet" ) const ( @@ -895,7 +989,7 @@ const ( // - N FileStoreSync string = "FileStoreSync" - // SQLStoreDriver sets the name of the database driver to use (see https://go.dev/wiki/SQLDrivers for the list of available drivers). + // SQLStoreDriver sets the name of the database driver to use for message storage (see https://go.dev/wiki/SQLDrivers for the list of available drivers). // SQLStoreDriver is only relevant if also using sql.NewStoreFactory(..) in code // when creating your MessageStoreFactory for your initiator or acceptor. // @@ -907,7 +1001,7 @@ const ( // - See https://go.dev/wiki/SQLDrivers SQLStoreDriver string = "SQLStoreDriver" - // SQLStoreDataSourceName sets the driver-specific data source name of the database to use. + // SQLStoreDataSourceName sets the driver-specific data source name of the database to use for messagge storage. // This usually consists of at least a database name and connection information. // SQLStoreDataSourceName is only relevant if also using sql.NewStoreFactory(..) in code // when creating your MessageStoreFactory for your initiator or acceptor. @@ -943,7 +1037,7 @@ const ( // - A valid go time.Duration SQLStoreConnMaxLifetime string = "SQLStoreConnMaxLifetime" - // MongoStoreConnection sets the MongoDB connection URL to use. + // MongoStoreConnection sets the MongoDB connection URL to use for message storage. // // See https://pkg.go.dev/go.mongodb.org/mongo-driver/mongo#Connect for more information. // @@ -958,7 +1052,7 @@ const ( // - A string representing a MongoDB connection MongoStoreConnection string = "MongoStoreConnection" - // MongoStoreDatabase sets the MongoDB-specific name of the database to use. + // MongoStoreDatabase sets the MongoDB-specific name of the database to use for message storage. // // See https://pkg.go.dev/go.mongodb.org/mongo-driver/mongo#Connect for more information. // @@ -973,7 +1067,7 @@ const ( // - A string corresponding to a MongoDB database MongoStoreDatabase string = "MongoStoreDatabase" - // MongoStoreReplicaSet sets the MongoDB replica set to use. + // MongoStoreReplicaSet sets the MongoDB replica set to use for message storage. // This is optional. // // See https://pkg.go.dev/go.mongodb.org/mongo-driver/mongo#Connect for more information. diff --git a/file_log.go b/log/file/file_log.go similarity index 84% rename from file_log.go rename to log/file/file_log.go index 85eefab1d..9fd36a851 100644 --- a/file_log.go +++ b/log/file/file_log.go @@ -13,7 +13,7 @@ // Contact ask@quickfixengine.org if any conditions of this licensing // are not clear to you. -package quickfix +package file import ( "fmt" @@ -21,6 +21,7 @@ import ( "os" "path" + "github.com/quickfixgo/quickfix" "github.com/quickfixgo/quickfix/config" ) @@ -47,12 +48,12 @@ func (l fileLog) OnEventf(format string, v ...interface{}) { type fileLogFactory struct { globalLogPath string - sessionLogPaths map[SessionID]string + sessionLogPaths map[quickfix.SessionID]string } -// NewFileLogFactory creates an instance of LogFactory that writes messages and events to file. +// NewLogFactory creates an instance of LogFactory that writes messages and events to file. // The location of global and session log files is configured via FileLogPath. -func NewFileLogFactory(settings *Settings) (LogFactory, error) { +func NewLogFactory(settings *quickfix.Settings) (quickfix.LogFactory, error) { logFactory := fileLogFactory{} var err error @@ -60,7 +61,7 @@ func NewFileLogFactory(settings *Settings) (LogFactory, error) { return logFactory, err } - logFactory.sessionLogPaths = make(map[SessionID]string) + logFactory.sessionLogPaths = make(map[quickfix.SessionID]string) for sid, sessionSettings := range settings.SessionSettings() { logPath, err := sessionSettings.Setting(config.FileLogPath) @@ -101,11 +102,11 @@ func newFileLog(prefix string, logPath string) (fileLog, error) { return l, nil } -func (f fileLogFactory) Create() (Log, error) { +func (f fileLogFactory) Create() (quickfix.Log, error) { return newFileLog("GLOBAL", f.globalLogPath) } -func (f fileLogFactory) CreateSessionLog(sessionID SessionID) (Log, error) { +func (f fileLogFactory) CreateSessionLog(sessionID quickfix.SessionID) (quickfix.Log, error) { logPath, ok := f.sessionLogPaths[sessionID] if !ok { diff --git a/file_log_test.go b/log/file/file_log_test.go similarity index 93% rename from file_log_test.go rename to log/file/file_log_test.go index b303ec290..cf25e8474 100644 --- a/file_log_test.go +++ b/log/file/file_log_test.go @@ -13,7 +13,7 @@ // Contact ask@quickfixengine.org if any conditions of this licensing // are not clear to you. -package quickfix +package file import ( "bufio" @@ -22,11 +22,13 @@ import ( "path" "strings" "testing" + + "github.com/quickfixgo/quickfix" ) func TestFileLog_NewFileLogFactory(t *testing.T) { - _, err := NewFileLogFactory(NewSettings()) + _, err := NewLogFactory(quickfix.NewSettings()) if err == nil { t.Error("Should expect error when settings have no file log path") @@ -52,9 +54,9 @@ TargetCompID=ARCA SessionQualifier=BS ` stringReader := strings.NewReader(cfg) - settings, _ := ParseSettings(stringReader) + settings, _ := quickfix.ParseSettings(stringReader) - factory, err := NewFileLogFactory(settings) + factory, err := NewLogFactory(settings) if err != nil { t.Error("Did not expect error", err) @@ -68,7 +70,7 @@ SessionQualifier=BS type fileLogHelper struct { LogPath string Prefix string - Log Log + Log quickfix.Log } func newFileLogHelper(t *testing.T) *fileLogHelper { diff --git a/fileutil.go b/log/file/file_util.go similarity index 93% rename from fileutil.go rename to log/file/file_util.go index 73bc4a6b2..34e999a43 100644 --- a/fileutil.go +++ b/log/file/file_util.go @@ -13,15 +13,17 @@ // Contact ask@quickfixengine.org if any conditions of this licensing // are not clear to you. -package quickfix +package file import ( "fmt" "os" "strings" + + "github.com/quickfixgo/quickfix" ) -func sessionIDFilenamePrefix(s SessionID) string { +func sessionIDFilenamePrefix(s quickfix.SessionID) string { sender := []string{s.SenderCompID} if s.SenderSubID != "" { sender = append(sender, s.SenderSubID) diff --git a/fileutil_test.go b/log/file/file_util_test.go similarity index 91% rename from fileutil_test.go rename to log/file/file_util_test.go index 0f9095364..77a95b6ab 100644 --- a/fileutil_test.go +++ b/log/file/file_util_test.go @@ -13,7 +13,7 @@ // Contact ask@quickfixengine.org if any conditions of this licensing // are not clear to you. -package quickfix +package file import ( "fmt" @@ -21,6 +21,7 @@ import ( "path" "testing" + "github.com/quickfixgo/quickfix" "github.com/stretchr/testify/require" ) @@ -37,7 +38,7 @@ func requireFileExists(t *testing.T, fname string) { func TestSessionIDFilename_MinimallyQualifiedSessionID(t *testing.T) { // When the session ID is - sessionID := SessionID{BeginString: "FIX.4.4", SenderCompID: "SENDER", TargetCompID: "TARGET"} + sessionID := quickfix.SessionID{BeginString: "FIX.4.4", SenderCompID: "SENDER", TargetCompID: "TARGET"} // Then the filename should be require.Equal(t, "FIX.4.4-SENDER-TARGET", sessionIDFilenamePrefix(sessionID)) @@ -45,7 +46,7 @@ func TestSessionIDFilename_MinimallyQualifiedSessionID(t *testing.T) { func TestSessionIDFilename_FullyQualifiedSessionID(t *testing.T) { // When the session ID is - sessionID := SessionID{ + sessionID := quickfix.SessionID{ BeginString: "FIX.4.4", SenderCompID: "A", SenderSubID: "B", diff --git a/log/mongo/mongo_log.go b/log/mongo/mongo_log.go new file mode 100644 index 000000000..253e6bdb9 --- /dev/null +++ b/log/mongo/mongo_log.go @@ -0,0 +1,222 @@ +// Copyright (c) quickfixengine.org All rights reserved. +// +// This file may be distributed under the terms of the quickfixengine.org +// license as defined by quickfixengine.org and appearing in the file +// LICENSE included in the packaging of this file. +// +// This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING +// THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A +// PARTICULAR PURPOSE. +// +// See http://www.quickfixengine.org/LICENSE for licensing information. +// +// Contact ask@quickfixengine.org if any conditions of this licensing +// are not clear to you. + +package mongo + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + + "github.com/quickfixgo/quickfix" + "github.com/quickfixgo/quickfix/config" +) + +type mongoLogFactory struct { + settings *quickfix.Settings + messagesLogCollection string + eventLogCollection string +} + +type mongoLog struct { + sessionID quickfix.SessionID + mongoURL string + mongoDatabase string + db *mongo.Client + messagesLogCollection string + eventLogCollection string + allowTransactions bool +} + +// NewLogFactory returns a mongo-based implementation of LogFactory. +func NewLogFactory(settings *quickfix.Settings) quickfix.LogFactory { + return NewLogFactoryPrefixed(settings, "") +} + +// NewLogFactoryPrefixed returns a mongo-based implementation of LogFactory, with prefix on collections. +func NewLogFactoryPrefixed(settings *quickfix.Settings, collectionsPrefix string) quickfix.LogFactory { + return mongoLogFactory{ + settings: settings, + messagesLogCollection: collectionsPrefix + "messages_log", + eventLogCollection: collectionsPrefix + "event_log", + } +} + +// Create creates a new mongo implementation of the Log interface. +func (f mongoLogFactory) Create() (l quickfix.Log, err error) { + globalSettings := f.settings.GlobalSettings() + + mongoConnectionURL, err := globalSettings.Setting(config.MongoLogConnection) + if err != nil { + return nil, err + } + mongoDatabase, err := globalSettings.Setting(config.MongoLogDatabase) + if err != nil { + return nil, err + } + + // Optional. + mongoReplicaSet, _ := globalSettings.Setting(config.MongoLogReplicaSet) + + return newmongoLog(quickfix.SessionID{}, mongoConnectionURL, mongoDatabase, mongoReplicaSet, f.messagesLogCollection, f.eventLogCollection) +} + +// CreateSessionLog creates a new mongo implementation of the Log interface. +func (f mongoLogFactory) CreateSessionLog(sessionID quickfix.SessionID) (l quickfix.Log, err error) { + globalSettings := f.settings.GlobalSettings() + dynamicSessions, _ := globalSettings.BoolSetting(config.DynamicSessions) + + sessionSettings, ok := f.settings.SessionSettings()[sessionID] + if !ok { + if dynamicSessions { + sessionSettings = globalSettings + } else { + return nil, fmt.Errorf("unknown session: %v", sessionID) + } + } + mongoConnectionURL, err := sessionSettings.Setting(config.MongoLogConnection) + if err != nil { + return nil, err + } + mongoDatabase, err := sessionSettings.Setting(config.MongoLogDatabase) + if err != nil { + return nil, err + } + + // Optional. + mongoReplicaSet, _ := sessionSettings.Setting(config.MongoLogReplicaSet) + + return newmongoLog(sessionID, mongoConnectionURL, mongoDatabase, mongoReplicaSet, f.messagesLogCollection, f.eventLogCollection) +} + +func newmongoLog(sessionID quickfix.SessionID, mongoURL, mongoDatabase, mongoReplicaSet, messagesLogCollection, eventLogCollection string) (l *mongoLog, err error) { + + allowTransactions := len(mongoReplicaSet) > 0 + l = &mongoLog{ + sessionID: sessionID, + mongoURL: mongoURL, + mongoDatabase: mongoDatabase, + messagesLogCollection: messagesLogCollection, + eventLogCollection: eventLogCollection, + allowTransactions: allowTransactions, + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + l.db, err = mongo.Connect(ctx, options.Client().ApplyURI(mongoURL).SetDirect(len(mongoReplicaSet) == 0).SetReplicaSet(mongoReplicaSet)) + if err != nil { + return + } + + return +} + +func (l mongoLog) OnIncoming(msg []byte) { + l.insert(l.messagesLogCollection, msg) +} + +func (l mongoLog) OnOutgoing(msg []byte) { + l.insert(l.messagesLogCollection, msg) +} + +func (l mongoLog) OnEvent(msg string) { + l.insert(l.eventLogCollection, []byte(msg)) +} + +func (l mongoLog) OnEventf(format string, v ...interface{}) { + l.insert(l.eventLogCollection, []byte(fmt.Sprintf(format, v...))) +} + +func generateEntry(s *quickfix.SessionID) (entry *entryData) { + entry = &entryData{ + BeginString: s.BeginString, + SessionQualifier: s.Qualifier, + SenderCompID: s.SenderCompID, + SenderSubID: s.SenderSubID, + SenderLocID: s.SenderLocationID, + TargetCompID: s.TargetCompID, + TargetSubID: s.TargetSubID, + TargetLocID: s.TargetLocationID, + } + return +} + +type entryData struct { + Time time.Time `bson:"time,omitempty"` + BeginString string `bson:"begin_string"` + SenderCompID string `bson:"sender_comp_id"` + SenderSubID string `bson:"sender_sub_id"` + SenderLocID string `bson:"sender_loc_id"` + TargetCompID string `bson:"target_comp_id"` + TargetSubID string `bson:"target_sub_id"` + TargetLocID string `bson:"target_loc_id"` + SessionQualifier string `bson:"session_qualifier"` + Text []byte `bson:"text,omitempty"` +} + +func (l *mongoLog) insert(collection string, text []byte) { + entry := generateEntry(&l.sessionID) + entry.Text = text + entry.Time = time.Now() + _, err := l.db.Database(l.mongoDatabase).Collection(collection).InsertOne(context.Background(), entry) + if err != nil { + log.Println(err) + } +} + +func (l *mongoLog) iterate(coll string, cb func(string) error) error { + entry := generateEntry(&l.sessionID) + + cursor, err := l.db.Database(l.mongoDatabase).Collection(coll).Find(context.Background(), bson.D{}, nil) + if err != nil { + return err + } + defer func() { _ = cursor.Close(context.Background()) }() + for cursor.Next(context.Background()) { + if err = cursor.Decode(&entry); err != nil { + return err + } else if err = cb(string(entry.Text)); err != nil { + return err + } + } + return nil +} + +func (l *mongoLog) getEntries(coll string) ([]string, error) { + var txts []string + err := l.iterate(coll, func(text string) error { + txts = append(txts, text) + return nil + }) + return txts, err +} + +// close closes the l's database connection. +func (l *mongoLog) close() error { + if l.db != nil { + err := l.db.Disconnect(context.Background()) + if err != nil { + return errors.Wrap(err, "error disconnecting from database") + } + l.db = nil + } + return nil +} diff --git a/log/mongo/mongo_log_test.go b/log/mongo/mongo_log_test.go new file mode 100644 index 000000000..771db953c --- /dev/null +++ b/log/mongo/mongo_log_test.go @@ -0,0 +1,99 @@ +// Copyright (c) quickfixengine.org All rights reserved. +// +// This file may be distributed under the terms of the quickfixengine.org +// license as defined by quickfixengine.org and appearing in the file +// LICENSE included in the packaging of this file. +// +// This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING +// THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A +// PARTICULAR PURPOSE. +// +// See http://www.quickfixengine.org/LICENSE for licensing information. +// +// Contact ask@quickfixengine.org if any conditions of this licensing +// are not clear to you. + +package mongo + +import ( + "context" + "fmt" + "log" + "os" + "strings" + "testing" + + "github.com/quickfixgo/quickfix" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +// MongoLogTestSuite runs tests for the MongoLog impl of Log. +type MongoLogTestSuite struct { + suite.Suite + log *mongoLog +} + +func (suite *MongoLogTestSuite) SetupTest() { + mongoDbCxn := os.Getenv("MONGODB_TEST_CXN") + if len(mongoDbCxn) <= 0 { + log.Println("MONGODB_TEST_CXN environment arg is not provided, skipping...") + suite.T().SkipNow() + } + mongoDatabase := "automated_testing_database" + mongoReplicaSet := "replicaset" + + // create settings + sessionID := quickfix.SessionID{BeginString: "FIX.4.4", SenderCompID: "SENDER", TargetCompID: "TARGET"} + settings, err := quickfix.ParseSettings(strings.NewReader(fmt.Sprintf(` +[DEFAULT] +MongoLogConnection=%s +MongoLogDatabase=%s +MongoLogReplicaSet=%s + +[SESSION] +BeginString=%s +SenderCompID=%s +TargetCompID=%s`, mongoDbCxn, mongoDatabase, mongoReplicaSet, sessionID.BeginString, sessionID.SenderCompID, sessionID.TargetCompID))) + require.Nil(suite.T(), err) + + // create log + log, err := NewLogFactory(settings).CreateSessionLog(sessionID) + require.Nil(suite.T(), err) + suite.log = log.(*mongoLog) +} + +func (suite *MongoLogTestSuite) TestMongoLog() { + suite.log.OnIncoming([]byte("Cool1")) + suite.log.OnOutgoing([]byte("Cool2")) + entries, err := suite.log.getEntries("messages_log") + require.Nil(suite.T(), err) + require.Len(suite.T(), entries, 2) + require.Equal(suite.T(), "Cool1", entries[0]) + require.Equal(suite.T(), "Cool2", entries[1]) + + suite.log.OnEvent("Cool3") + suite.log.OnEvent("Cool4") + entries, err = suite.log.getEntries("event_log") + require.Nil(suite.T(), err) + require.Len(suite.T(), entries, 2) + require.Equal(suite.T(), "Cool3", entries[0]) + require.Equal(suite.T(), "Cool4", entries[1]) +} + +func (suite *MongoLogTestSuite) TearDownTest() { + entry := generateEntry(&suite.log.sessionID) + _, err := suite.log.db.Database(suite.log.mongoDatabase).Collection(suite.log.messagesLogCollection).DeleteMany(context.Background(), entry) + require.Nil(suite.T(), err) + + entry2 := generateEntry(&suite.log.sessionID) + _, err = suite.log.db.Database(suite.log.mongoDatabase).Collection(suite.log.eventLogCollection).DeleteMany(context.Background(), entry2) + require.Nil(suite.T(), err) + + err = suite.log.close() + require.Nil(suite.T(), err) +} + +func TestMongoLogTestSuite(t *testing.T) { + suite.Run(t, new(MongoLogTestSuite)) +} diff --git a/screen_log.go b/log/screen/screen_log.go similarity index 79% rename from screen_log.go rename to log/screen/screen_log.go index c17d67ecb..91e3f95aa 100644 --- a/screen_log.go +++ b/log/screen/screen_log.go @@ -13,11 +13,13 @@ // Contact ask@quickfixengine.org if any conditions of this licensing // are not clear to you. -package quickfix +package screen import ( "fmt" "time" + + "github.com/quickfixgo/quickfix" ) type screenLog struct { @@ -45,17 +47,17 @@ func (l screenLog) OnEventf(format string, a ...interface{}) { type screenLogFactory struct{} -func (screenLogFactory) Create() (Log, error) { +func (screenLogFactory) Create() (quickfix.Log, error) { log := screenLog{"GLOBAL"} return log, nil } -func (screenLogFactory) CreateSessionLog(sessionID SessionID) (Log, error) { +func (screenLogFactory) CreateSessionLog(sessionID quickfix.SessionID) (quickfix.Log, error) { log := screenLog{sessionID.String()} return log, nil } -// NewScreenLogFactory creates an instance of LogFactory that writes messages and events to stdout. -func NewScreenLogFactory() LogFactory { +// NewLogFactory creates an instance of LogFactory that writes messages and events to stdout. +func NewLogFactory() quickfix.LogFactory { return screenLogFactory{} } diff --git a/log/sql/sql_log.go b/log/sql/sql_log.go new file mode 100644 index 000000000..10858bdf7 --- /dev/null +++ b/log/sql/sql_log.go @@ -0,0 +1,226 @@ +// Copyright (c) quickfixengine.org All rights reserved. +// +// This file may be distributed under the terms of the quickfixengine.org +// license as defined by quickfixengine.org and appearing in the file +// LICENSE included in the packaging of this file. +// +// This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING +// THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A +// PARTICULAR PURPOSE. +// +// See http://www.quickfixengine.org/LICENSE for licensing information. +// +// Contact ask@quickfixengine.org if any conditions of this licensing +// are not clear to you. + +package sql + +import ( + "database/sql" + "fmt" + "log" + "regexp" + "time" + + "github.com/quickfixgo/quickfix" + "github.com/quickfixgo/quickfix/config" +) + +type sqlLogFactory struct { + settings *quickfix.Settings +} + +type sqlLog struct { + sessionID quickfix.SessionID + sqlDriver string + sqlDataSourceName string + sqlConnMaxLifetime time.Duration + db *sql.DB + placeholder placeholderFunc +} + +type placeholderFunc func(int) string + +var rePlaceholder = regexp.MustCompile(`\?`) + +func sqlString(raw string, placeholder placeholderFunc) string { + if placeholder == nil { + return raw + } + idx := 0 + return rePlaceholder.ReplaceAllStringFunc(raw, func(_ string) string { + p := placeholder(idx) + idx++ + return p + }) +} + +func postgresPlaceholder(i int) string { + return fmt.Sprintf("$%d", i+1) +} + +// NewLogFactory returns a sql-based implementation of LogFactory. +func NewLogFactory(settings *quickfix.Settings) quickfix.LogFactory { + return sqlLogFactory{settings: settings} +} + +// Create creates a new SQLLog implementation of the Log interface. +func (f sqlLogFactory) Create() (log quickfix.Log, err error) { + globalSettings := f.settings.GlobalSettings() + + sqlDriver, err := globalSettings.Setting(config.SQLLogDriver) + if err != nil { + return nil, err + } + sqlDataSourceName, err := globalSettings.Setting(config.SQLLogDataSourceName) + if err != nil { + return nil, err + } + sqlConnMaxLifetime := 0 * time.Second + if globalSettings.HasSetting(config.SQLLogConnMaxLifetime) { + sqlConnMaxLifetime, err = globalSettings.DurationSetting(config.SQLLogConnMaxLifetime) + if err != nil { + return nil, err + } + } + + return newSQLLog(quickfix.SessionID{}, sqlDriver, sqlDataSourceName, sqlConnMaxLifetime) +} + +// CreateSessionLog creates a new SQLLog implementation of the Log interface. +func (f sqlLogFactory) CreateSessionLog(sessionID quickfix.SessionID) (log quickfix.Log, err error) { + globalSettings := f.settings.GlobalSettings() + dynamicSessions, _ := globalSettings.BoolSetting(config.DynamicSessions) + + sessionSettings, ok := f.settings.SessionSettings()[sessionID] + if !ok { + if dynamicSessions { + sessionSettings = globalSettings + } else { + return nil, fmt.Errorf("unknown session: %v", sessionID) + } + } + + sqlDriver, err := sessionSettings.Setting(config.SQLLogDriver) + if err != nil { + return nil, err + } + sqlDataSourceName, err := sessionSettings.Setting(config.SQLLogDataSourceName) + if err != nil { + return nil, err + } + sqlConnMaxLifetime := 0 * time.Second + if sessionSettings.HasSetting(config.SQLLogConnMaxLifetime) { + sqlConnMaxLifetime, err = sessionSettings.DurationSetting(config.SQLLogConnMaxLifetime) + if err != nil { + return nil, err + } + } + return newSQLLog(sessionID, sqlDriver, sqlDataSourceName, sqlConnMaxLifetime) +} + +func newSQLLog(sessionID quickfix.SessionID, driver string, dataSourceName string, connMaxLifetime time.Duration) (l *sqlLog, err error) { + l = &sqlLog{ + sessionID: sessionID, + sqlDriver: driver, + sqlDataSourceName: dataSourceName, + sqlConnMaxLifetime: connMaxLifetime, + } + + if l.sqlDriver == "postgres" || l.sqlDriver == "pgx" { + l.placeholder = postgresPlaceholder + } + + if l.db, err = sql.Open(l.sqlDriver, l.sqlDataSourceName); err != nil { + return nil, err + } + l.db.SetConnMaxLifetime(l.sqlConnMaxLifetime) + + if err = l.db.Ping(); err != nil { // ensure immediate connection + return nil, err + } + + return l, nil +} + +func (l sqlLog) OnIncoming(msg []byte) { + l.insert("messages_log", string(msg)) +} + +func (l sqlLog) OnOutgoing(msg []byte) { + l.insert("messages_log", string(msg)) +} + +func (l sqlLog) OnEvent(msg string) { + l.insert("event_log", msg) +} + +func (l sqlLog) OnEventf(format string, v ...interface{}) { + l.insert("event_log", fmt.Sprintf(format, v...)) +} + +func (l sqlLog) insert(table string, value string) { + s := l.sessionID + + _, err := l.db.Exec(sqlString(`INSERT INTO `+table+` ( + time, + beginstring, session_qualifier, + sendercompid, sendersubid, senderlocid, + targetcompid, targetsubid, targetlocid, + text) + VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, l.placeholder), + time.Now(), + s.BeginString, s.Qualifier, + s.SenderCompID, s.SenderSubID, s.SenderLocationID, + s.TargetCompID, s.TargetSubID, s.TargetLocationID, + value, + ) + if err != nil { + log.Println(err) + } +} + +func (l *sqlLog) iterate(table string, cb func(string) error) error { + s := l.sessionID + rows, err := l.db.Query(sqlString(`SELECT text FROM `+table+` + WHERE beginstring=? AND session_qualifier=? + AND sendercompid=? AND sendersubid=? AND senderlocid=? + AND targetcompid=? AND targetsubid=? AND targetlocid=?`, + l.placeholder), + s.BeginString, s.Qualifier, + s.SenderCompID, s.SenderSubID, s.SenderLocationID, + s.TargetCompID, s.TargetSubID, s.TargetLocationID) + if err != nil { + return err + } + defer func() { _ = rows.Close() }() + + for rows.Next() { + var txt string + if err = rows.Scan(&txt); err != nil { + return err + } else if err = cb(txt); err != nil { + return err + } + } + + return rows.Err() +} + +func (l *sqlLog) getEntries(table string) ([]string, error) { + var msgs []string + err := l.iterate(table, func(msg string) error { + msgs = append(msgs, msg) + return nil + }) + return msgs, err +} + +// Close closes the log's database connection. +func (l *sqlLog) close() error { + if l.db != nil { + l.db.Close() + l.db = nil + } + return nil +} diff --git a/log/sql/sql_log_test.go b/log/sql/sql_log_test.go new file mode 100644 index 000000000..513371da4 --- /dev/null +++ b/log/sql/sql_log_test.go @@ -0,0 +1,110 @@ +// Copyright (c) quickfixengine.org All rights reserved. +// +// This file may be distributed under the terms of the quickfixengine.org +// license as defined by quickfixengine.org and appearing in the file +// LICENSE included in the packaging of this file. +// +// This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING +// THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A +// PARTICULAR PURPOSE. +// +// See http://www.quickfixengine.org/LICENSE for licensing information. +// +// Contact ask@quickfixengine.org if any conditions of this licensing +// are not clear to you. + +package sql + +import ( + "database/sql" + "fmt" + "os" + "path" + "path/filepath" + "strings" + "testing" + "time" + + _ "github.com/mattn/go-sqlite3" + "github.com/quickfixgo/quickfix" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +// SQLLogTestSuite runs tests for the SQLLog impl of Log. +type SQLLogTestSuite struct { + suite.Suite + sqlLogRootPath string + log *sqlLog +} + +func (suite *SQLLogTestSuite) SetupTest() { + suite.sqlLogRootPath = path.Join(os.TempDir(), fmt.Sprintf("SQLLogTestSuite-%d", os.Getpid())) + err := os.MkdirAll(suite.sqlLogRootPath, os.ModePerm) + require.Nil(suite.T(), err) + sqlDriver := "sqlite3" + sqlDsn := path.Join(suite.sqlLogRootPath, fmt.Sprintf("%d.db", time.Now().UnixNano())) + + // create tables + db, err := sql.Open(sqlDriver, sqlDsn) + require.Nil(suite.T(), err) + ddlFnames, err := filepath.Glob(fmt.Sprintf("../../_sql/%s/*.sql", sqlDriver)) + require.Nil(suite.T(), err) + for _, fname := range ddlFnames { + sqlBytes, err := os.ReadFile(fname) + require.Nil(suite.T(), err) + _, err = db.Exec(string(sqlBytes)) + require.Nil(suite.T(), err) + } + + // create settings + sessionID := quickfix.SessionID{BeginString: "FIX.4.4", SenderCompID: "SENDER", TargetCompID: "TARGET"} + settings, err := quickfix.ParseSettings(strings.NewReader(fmt.Sprintf(` +[DEFAULT] +SQLLogDriver=%s +SQLLogDataSourceName=%s +SQLLogConnMaxLifetime=14400s + +[SESSION] +BeginString=%s +SenderCompID=%s +TargetCompID=%s`, sqlDriver, sqlDsn, sessionID.BeginString, sessionID.SenderCompID, sessionID.TargetCompID))) + require.Nil(suite.T(), err) + + // create log + log, err := NewLogFactory(settings).CreateSessionLog(sessionID) + require.Nil(suite.T(), err) + suite.log = log.(*sqlLog) +} + +func (suite *SQLLogTestSuite) TestSQLLog() { + suite.log.OnIncoming([]byte("Cool1")) + suite.log.OnOutgoing([]byte("Cool2")) + entries, err := suite.log.getEntries("messages_log") + require.Nil(suite.T(), err) + require.Len(suite.T(), entries, 2) + require.Equal(suite.T(), "Cool1", entries[0]) + require.Equal(suite.T(), "Cool2", entries[1]) + + suite.log.OnEvent("Cool3") + suite.log.OnEvent("Cool4") + entries, err = suite.log.getEntries("event_log") + require.Nil(suite.T(), err) + require.Len(suite.T(), entries, 2) + require.Equal(suite.T(), "Cool3", entries[0]) + require.Equal(suite.T(), "Cool4", entries[1]) +} + +func (suite *SQLLogTestSuite) TestSqlPlaceholderReplacement() { + got := sqlString("A ? B ? C ?", postgresPlaceholder) + suite.Equal("A $1 B $2 C $3", got) +} + +func (suite *SQLLogTestSuite) TearDownTest() { + suite.log.close() + os.RemoveAll(suite.sqlLogRootPath) +} + +func TestSQLLogTestSuite(t *testing.T) { + suite.Run(t, new(SQLLogTestSuite)) +} diff --git a/memorystore.go b/memory_store.go similarity index 100% rename from memorystore.go rename to memory_store.go diff --git a/store/file/filestore.go b/store/file/file_store.go similarity index 100% rename from store/file/filestore.go rename to store/file/file_store.go diff --git a/store/file/filestore_test.go b/store/file/file_store_test.go similarity index 100% rename from store/file/filestore_test.go rename to store/file/file_store_test.go diff --git a/store/memory/memorystore_test.go b/store/memory/memory_store_test.go similarity index 100% rename from store/memory/memorystore_test.go rename to store/memory/memory_store_test.go diff --git a/store/mongo/mongostore.go b/store/mongo/mongo_store.go similarity index 100% rename from store/mongo/mongostore.go rename to store/mongo/mongo_store.go diff --git a/store/mongo/mongostore_test.go b/store/mongo/mongo_store_test.go similarity index 100% rename from store/mongo/mongostore_test.go rename to store/mongo/mongo_store_test.go diff --git a/store/sql/sqlstore.go b/store/sql/sql_store.go similarity index 100% rename from store/sql/sqlstore.go rename to store/sql/sql_store.go diff --git a/store/sql/sqlstore_test.go b/store/sql/sql_store_test.go similarity index 100% rename from store/sql/sqlstore_test.go rename to store/sql/sql_store_test.go