diff --git a/be1-go/cli/cli.go b/be1-go/cli/cli.go index 1b5f44f3d1..4eef41c365 100644 --- a/be1-go/cli/cli.go +++ b/be1-go/cli/cli.go @@ -100,6 +100,11 @@ func (s *ServerConfig) newHub(l *zerolog.Logger) (hub.Hub, error) { } } + err = db.StoreFirstRumor() + if err != nil { + return nil, err + } + utils.InitUtils(l, schemaValidator) state.InitState(l) diff --git a/be1-go/cli/pop_test.go b/be1-go/cli/pop_test.go index a53eadb5f5..191a43fd83 100644 --- a/be1-go/cli/pop_test.go +++ b/be1-go/cli/pop_test.go @@ -1,5 +1,6 @@ package main +// //import ( // "context" // "os" diff --git a/be1-go/cli/test_config_files/valid_config_watcher.json b/be1-go/cli/test_config_files/valid_config_watcher.json index 61606ffc4d..e5fb3ee5a6 100644 --- a/be1-go/cli/test_config_files/valid_config_watcher.json +++ b/be1-go/cli/test_config_files/valid_config_watcher.json @@ -1 +1 @@ -{"public-key":"","client-address":"ws://localhost:9000/client","server-address":"ws://localhost:9001/server","server-public-address":"localhost","server-listen-address":"localhost","auth-server-address":"localhost","client-port":9000,"server-port":9001,"auth-port":9100,"other-servers":[]} \ No newline at end of file +{"public-key":"","client-address":"ws://localhost:9000/client","server-address":"ws://localhost:9001/server","server-public-address":"localhost","server-listen-address":"localhost","auth-server-address":"localhost","client-port":9000,"server-port":9001,"auth-port":9100,"other-servers":[],"database-path":""} \ No newline at end of file diff --git a/be1-go/configServer3.json b/be1-go/configServer3.json new file mode 100644 index 0000000000..bccea9dcfc --- /dev/null +++ b/be1-go/configServer3.json @@ -0,0 +1,15 @@ +{ + "public-key" : "", + "server-address" : "ws://127.0.0.1:9005/server", + "client-address" : "ws://127.0.0.1:9004/client", + "server-public-address" : "localhost", + "server-listen-address" : "localhost", + "auth-server-address" : "localhost", + "client-port" : 9004, + "server-port" : 9005, + "auth-port" : 9201, + "other-servers": [ + "localhost:9003" + ], + "database-path" : "./database-c/sqlite.db" +} diff --git a/be1-go/internal/popserver/database/database.go b/be1-go/internal/popserver/database/database.go index b59609beca..3fcf848db4 100644 --- a/be1-go/internal/popserver/database/database.go +++ b/be1-go/internal/popserver/database/database.go @@ -30,6 +30,10 @@ func getInstance() (repository.Repository, *answer.Error) { return instance, nil } +func GetRumorSenderRepositoryInstance() (repository.RumorSenderRepository, *answer.Error) { + return getInstance() +} + func GetQueryRepositoryInstance() (repository.QueryRepository, *answer.Error) { return getInstance() } diff --git a/be1-go/internal/popserver/database/repository/mock_repository.go b/be1-go/internal/popserver/database/repository/mock_repository.go index baa1bbc592..d7dcadd727 100644 --- a/be1-go/internal/popserver/database/repository/mock_repository.go +++ b/be1-go/internal/popserver/database/repository/mock_repository.go @@ -8,6 +8,8 @@ import ( kyber "go.dedis.ch/kyber/v3" + method "popstellar/message/query/method" + mock "github.com/stretchr/testify/mock" types "popstellar/internal/popserver/types" @@ -18,6 +20,34 @@ type MockRepository struct { mock.Mock } +// AddMessageToMyRumor provides a mock function with given fields: messageID +func (_m *MockRepository) AddMessageToMyRumor(messageID string) (int, error) { + ret := _m.Called(messageID) + + if len(ret) == 0 { + panic("no return value specified for AddMessageToMyRumor") + } + + var r0 int + var r1 error + if rf, ok := ret.Get(0).(func(string) (int, error)); ok { + return rf(messageID) + } + if rf, ok := ret.Get(0).(func(string) int); ok { + r0 = rf(messageID) + } else { + r0 = ret.Get(0).(int) + } + + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(messageID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // CheckPrevCreateOrCloseID provides a mock function with given fields: channel, nextID func (_m *MockRepository) CheckPrevCreateOrCloseID(channel string, nextID string) (bool, error) { ret := _m.Called(channel, nextID) @@ -74,6 +104,34 @@ func (_m *MockRepository) CheckPrevOpenOrReopenID(channel string, nextID string) return r0, r1 } +// CheckRumor provides a mock function with given fields: senderID, rumorID +func (_m *MockRepository) CheckRumor(senderID string, rumorID int) (bool, error) { + ret := _m.Called(senderID, rumorID) + + if len(ret) == 0 { + panic("no return value specified for CheckRumor") + } + + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(string, int) (bool, error)); ok { + return rf(senderID, rumorID) + } + if rf, ok := ret.Get(0).(func(string, int) bool); ok { + r0 = rf(senderID, rumorID) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(string, int) error); ok { + r1 = rf(senderID, rumorID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetAllMessagesFromChannel provides a mock function with given fields: channelID func (_m *MockRepository) GetAllMessagesFromChannel(channelID string) ([]message.Message, error) { ret := _m.Called(channelID) @@ -104,6 +162,41 @@ func (_m *MockRepository) GetAllMessagesFromChannel(channelID string) ([]message return r0, r1 } +// GetAndIncrementMyRumor provides a mock function with given fields: +func (_m *MockRepository) GetAndIncrementMyRumor() (bool, method.Rumor, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetAndIncrementMyRumor") + } + + var r0 bool + var r1 method.Rumor + var r2 error + if rf, ok := ret.Get(0).(func() (bool, method.Rumor, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func() method.Rumor); ok { + r1 = rf() + } else { + r1 = ret.Get(1).(method.Rumor) + } + + if rf, ok := ret.Get(2).(func() error); ok { + r2 = rf() + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + // GetChannelType provides a mock function with given fields: channel func (_m *MockRepository) GetChannelType(channel string) (string, error) { ret := _m.Called(channel) @@ -697,6 +790,36 @@ func (_m *MockRepository) GetServerKeys() (kyber.Point, kyber.Scalar, error) { return r0, r1, r2 } +// GetUnprocessedMessagesByChannel provides a mock function with given fields: +func (_m *MockRepository) GetUnprocessedMessagesByChannel() (map[string][]message.Message, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetUnprocessedMessagesByChannel") + } + + var r0 map[string][]message.Message + var r1 error + if rf, ok := ret.Get(0).(func() (map[string][]message.Message, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() map[string][]message.Message); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string][]message.Message) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // HasChannel provides a mock function with given fields: channel func (_m *MockRepository) HasChannel(channel string) (bool, error) { ret := _m.Called(channel) @@ -1027,6 +1150,24 @@ func (_m *MockRepository) StoreRollCallClose(channels []string, laoID string, ms return r0 } +// StoreRumor provides a mock function with given fields: rumorID, sender, unprocessed, processed +func (_m *MockRepository) StoreRumor(rumorID int, sender string, unprocessed map[string][]message.Message, processed []string) error { + ret := _m.Called(rumorID, sender, unprocessed, processed) + + if len(ret) == 0 { + panic("no return value specified for StoreRumor") + } + + var r0 error + if rf, ok := ret.Get(0).(func(int, string, map[string][]message.Message, []string) error); ok { + r0 = rf(rumorID, sender, unprocessed, processed) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // StoreServerKeys provides a mock function with given fields: electionPubKey, electionSecretKey func (_m *MockRepository) StoreServerKeys(electionPubKey kyber.Point, electionSecretKey kyber.Scalar) error { ret := _m.Called(electionPubKey, electionSecretKey) diff --git a/be1-go/internal/popserver/database/repository/repository.go b/be1-go/internal/popserver/database/repository/repository.go index 666316d43c..b3dbbb6350 100644 --- a/be1-go/internal/popserver/database/repository/repository.go +++ b/be1-go/internal/popserver/database/repository/repository.go @@ -4,6 +4,7 @@ import ( "go.dedis.ch/kyber/v3" "popstellar/internal/popserver/types" "popstellar/message/messagedata" + "popstellar/message/query/method" "popstellar/message/query/method/message" ) @@ -17,6 +18,7 @@ type Repository interface { ChirpRepository CoinRepository ReactionRepository + RumorSenderRepository FederationRepository // StoreServerKeys stores the keys of the server @@ -35,6 +37,14 @@ type Repository interface { GetMessageByID(ID string) (message.Message, error) } +type RumorSenderRepository interface { + // AddMessageToMyRumor adds the message to the last rumor of the server and returns the current number of message inside the last rumor + AddMessageToMyRumor(messageID string) (int, error) + + // GetAndIncrementMyRumor return false if the last rumor is empty otherwise returns the new rumor to send and create the next rumor + GetAndIncrementMyRumor() (bool, method.Rumor, error) +} + // ======================= Query ========================== type QueryRepository interface { @@ -47,6 +57,15 @@ type QueryRepository interface { GetAllMessagesFromChannel(channelID string) ([]message.Message, error) GetParamsHeartbeat() (map[string][]string, error) + + // CheckRumor returns true if the rumor already exists + CheckRumor(senderID string, rumorID int) (bool, error) + + // StoreRumor stores the new rumor with its processed and unprocessed messages + StoreRumor(rumorID int, sender string, unprocessed map[string][]message.Message, processed []string) error + + // GetUnprocessedMessagesByChannel returns all the unprocessed messages by channel + GetUnprocessedMessagesByChannel() (map[string][]message.Message, error) } // ======================= Answer ========================== diff --git a/be1-go/internal/popserver/database/sqlite/sqlite.go b/be1-go/internal/popserver/database/sqlite/sqlite.go index 5e55b91e03..d18d553745 100644 --- a/be1-go/internal/popserver/database/sqlite/sqlite.go +++ b/be1-go/internal/popserver/database/sqlite/sqlite.go @@ -10,49 +10,36 @@ import ( _ "modernc.org/sqlite" "popstellar/crypto" "popstellar/internal/popserver/types" + jsonrpc "popstellar/message" "popstellar/message/messagedata" + "popstellar/message/query" + "popstellar/message/query/method" "popstellar/message/query/method/message" "strings" "time" ) -func (s *SQLite) StoreServerKeys(electionPubKey kyber.Point, electionSecretKey kyber.Scalar) error { +func (s *SQLite) GetServerKeys() (kyber.Point, kyber.Scalar, error) { dbLock.Lock() defer dbLock.Unlock() - tx, err := s.database.Begin() - if err != nil { - return err - } - defer tx.Rollback() - - electionPubBuf, err := electionPubKey.MarshalBinary() - if err != nil { - return err - } - electionSecBuf, err := electionSecretKey.MarshalBinary() + var serverPubBuf64 string + var serverSecBuf64 string + err := s.database.QueryRow(selectKeys, serverKeysPath).Scan(&serverPubBuf64, &serverSecBuf64) if err != nil { - return err + return nil, nil, err } - _, err = tx.Exec(insertKeys, serverKeysPath, electionPubBuf, electionSecBuf) + serverPubBuf, err := base64.URLEncoding.DecodeString(serverPubBuf64) if err != nil { - return err + return nil, nil, err } - return tx.Commit() -} - -func (s *SQLite) GetServerKeys() (kyber.Point, kyber.Scalar, error) { - dbLock.Lock() - defer dbLock.Unlock() - - var serverPubBuf []byte - var serverSecBuf []byte - err := s.database.QueryRow(selectKeys, serverKeysPath).Scan(&serverPubBuf, &serverSecBuf) + serverSecBuf, err := base64.URLEncoding.DecodeString(serverSecBuf64) if err != nil { return nil, nil, err } + serverPubKey := crypto.Suite.Point() err = serverPubKey.UnmarshalBinary(serverPubBuf) if err != nil { @@ -67,6 +54,24 @@ func (s *SQLite) GetServerKeys() (kyber.Point, kyber.Scalar, error) { return serverPubKey, serverSecKey, nil } +func (s *SQLite) insertMessageHelper(tx *sql.Tx, messageID string, msg, messageData []byte, storedTime int64) error { + _, err := tx.Exec(insertMessage, messageID, msg, messageData, storedTime) + if err != nil { + return err + + } + _, err = tx.Exec(tranferUnprocessedMessageRumor, messageID) + if err != nil { + return err + } + _, err = tx.Exec(deleteUnprocessedMessageRumor, messageID) + if err != nil { + return err + } + _, err = tx.Exec(deleteUnprocessedMessage, messageID) + return err +} + func (s *SQLite) StoreMessageAndData(channelPath string, msg message.Message) error { dbLock.Lock() defer dbLock.Unlock() @@ -90,16 +95,17 @@ func (s *SQLite) StoreMessageAndData(channelPath string, msg message.Message) er if err != nil { return err } - _, err = tx.Exec(insertMessage, msg.MessageID, msgByte, messageData, time.Now().UnixNano()) + err = s.insertMessageHelper(tx, msg.MessageID, msgByte, messageData, time.Now().UnixNano()) if err != nil { return err - } + _, err = tx.Exec(insertChannelMessage, channelPath, msg.MessageID, true) if err != nil { return err } + return tx.Commit() } @@ -108,7 +114,7 @@ func addPendingSignatures(tx *sql.Tx, msg *message.Message) error { if err != nil { return err } - + defer rows.Close() for rows.Next() { var witness string var signature string @@ -150,6 +156,7 @@ func (s *SQLite) GetMessagesByID(IDs []string) (map[string]message.Message, erro } else if errors.Is(err, sql.ErrNoRows) { return make(map[string]message.Message), nil } + defer rows.Close() messagesByID := make(map[string]message.Message, len(IDs)) for rows.Next() { @@ -243,6 +250,7 @@ func (s *SQLite) GetAllChannels() ([]string, error) { if err != nil { return nil, err } + defer rows.Close() var channels []string for rows.Next() { @@ -283,6 +291,7 @@ func (s *SQLite) GetAllMessagesFromChannel(channelPath string) ([]message.Messag if err != nil { return nil, err } + defer rows.Close() messages := make([]message.Message, 0) for rows.Next() { @@ -328,6 +337,7 @@ func (s *SQLite) GetResultForGetMessagesByID(params map[string][]string) (map[st if err != nil { return nil, err } + defer rows.Close() result := make(map[string][]message.Message) for rows.Next() { @@ -358,6 +368,7 @@ func (s *SQLite) GetParamsHeartbeat() (map[string][]string, error) { if err != nil { return nil, err } + defer rows.Close() result := make(map[string][]string) for rows.Next() { @@ -403,6 +414,7 @@ func (s *SQLite) GetParamsForGetMessageByID(params map[string][]string) (map[str if err != nil { return nil, err } + defer rows.Close() result := make(map[string]struct{}) for rows.Next() { @@ -480,7 +492,6 @@ func (s *SQLite) StoreLaoWithLaoGreet( if err != nil { return err } - defer tx.Rollback() msgByte, err := json.Marshal(msg) if err != nil { @@ -509,7 +520,7 @@ func (s *SQLite) StoreLaoWithLaoGreet( } } - _, err = tx.Exec(insertMessage, msg.MessageID, msgByte, messageData, storedTime) + err = s.insertMessageHelper(tx, msg.MessageID, msgByte, messageData, storedTime) if err != nil { return err } @@ -687,7 +698,7 @@ func (s *SQLite) StoreRollCallClose(channels []string, laoPath string, msg messa return err } - _, err = tx.Exec(insertMessage, msg.MessageID, msgBytes, messageData, time.Now().UnixNano()) + err = s.insertMessageHelper(tx, msg.MessageID, msgBytes, messageData, time.Now().UnixNano()) if err != nil { return err } @@ -695,6 +706,11 @@ func (s *SQLite) StoreRollCallClose(channels []string, laoPath string, msg messa if err != nil { return err } + + if len(channels) == 0 { + return tx.Commit() + } + for _, channel := range channels { _, err = tx.Exec(insertChannel, channel, channelTypeToID[ChirpType], laoPath) if err != nil { @@ -735,7 +751,7 @@ func (s *SQLite) storeElectionHelper( return err } - _, err = tx.Exec(insertMessage, msg.MessageID, msgBytes, messageData, storedTime) + err = s.insertMessageHelper(tx, msg.MessageID, msgBytes, messageData, storedTime) if err != nil { return err } @@ -879,6 +895,7 @@ func (s *SQLite) GetElectionSecretKey(electionPath string) (kyber.Scalar, error) } func (s *SQLite) getElectionState(electionPath string) (string, error) { + var state string err := s.database.QueryRow(selectLastElectionMessage, electionPath, messagedata.ElectionObject, messagedata.VoteActionCastVote).Scan(&state) if err != nil && !errors.Is(err, sql.ErrNoRows) { @@ -975,6 +992,7 @@ func (s *SQLite) GetElectionAttendees(electionPath string) (map[string]struct{}, } func (s *SQLite) getElectionSetup(electionPath string, tx *sql.Tx) (messagedata.ElectionSetup, error) { + var electionSetupBytes []byte err := tx.QueryRow(selectElectionSetup, electionPath, messagedata.ElectionObject, messagedata.ElectionActionSetup).Scan(&electionSetupBytes) if err != nil { @@ -1042,6 +1060,7 @@ func (s *SQLite) GetElectionQuestionsWithValidVotes(electionPath string) (map[st if err != nil { return nil, err } + defer rows.Close() for rows.Next() { var voteBytes []byte @@ -1136,9 +1155,10 @@ func (s *SQLite) StoreElectionEndWithResult(channelPath string, msg, electionRes } storedTime := time.Now().UnixNano() - _, err = tx.Exec(insertMessage, msg.MessageID, msgBytes, messageData, storedTime) + err = s.insertMessageHelper(tx, msg.MessageID, msgBytes, messageData, storedTime) if err != nil { return err + } _, err = tx.Exec(insertChannelMessage, channelPath, msg.MessageID, true) if err != nil { @@ -1188,7 +1208,7 @@ func (s *SQLite) StoreChirpMessages(channel, generalChannel string, msg, general } storedTime := time.Now().UnixNano() - _, err = tx.Exec(insertMessage, msg.MessageID, msgBytes, messageData, storedTime) + err = s.insertMessageHelper(tx, msg.MessageID, msgBytes, messageData, storedTime) if err != nil { return err } @@ -1258,6 +1278,197 @@ func (s *SQLite) GetReactionSender(messageID string) (string, error) { return sender, nil } +func (s *SQLite) CheckRumor(senderID string, rumorID int) (bool, error) { + dbLock.Lock() + defer dbLock.Unlock() + + var id int + if rumorID == 0 { + err := s.database.QueryRow(selectAnyRumor, senderID).Scan(&id) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return false, err + } else if errors.Is(err, sql.ErrNoRows) { + return true, nil + } + return false, nil + } + + err := s.database.QueryRow(selectLastRumor, senderID).Scan(&id) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return false, err + } else if errors.Is(err, sql.ErrNoRows) { + return false, nil + } + return id == rumorID-1, nil +} + +func (s *SQLite) StoreRumor(rumorID int, sender string, unprocessed map[string][]message.Message, processed []string) error { + dbLock.Lock() + defer dbLock.Unlock() + + tx, err := s.database.Begin() + if err != nil { + return err + } + + _, err = tx.Exec(insertRumor, rumorID, sender) + if err != nil { + return err + } + + for channelPath, messages := range unprocessed { + for _, msg := range messages { + _, err = tx.Exec(insertUnprocessedMessage, msg.MessageID, channelPath, msg) + if err != nil { + return err + } + _, err = tx.Exec(insertUnprocessedMessageRumor, msg.MessageID, rumorID, sender) + if err != nil { + return err + } + } + } + + for _, msgID := range processed { + _, err = tx.Exec(insertMessageRumor, msgID, rumorID, sender) + if err != nil { + return err + } + } + + return tx.Commit() +} + +func (s *SQLite) GetUnprocessedMessagesByChannel() (map[string][]message.Message, error) { + dbLock.Lock() + defer dbLock.Unlock() + + rows, err := s.database.Query(selectAllUnprocessedMessages) + if err != nil { + return nil, err + } + defer rows.Close() + + result := make(map[string][]message.Message) + + for rows.Next() { + var channelPath string + var messageByte []byte + if err = rows.Scan(&channelPath, &messageByte); err != nil { + return nil, err + } + var msg message.Message + if err = json.Unmarshal(messageByte, &msg); err != nil { + return nil, err + } + result[channelPath] = append(result[channelPath], msg) + } + return result, nil +} + +func (s *SQLite) AddMessageToMyRumor(messageID string) (int, error) { + dbLock.Lock() + defer dbLock.Unlock() + + tx, err := s.database.Begin() + if err != nil { + return -1, err + } + defer tx.Rollback() + + _, err = s.database.Exec(insertMessageToMyRumor, messageID, serverKeysPath) + if err != nil { + return -1, err + } + var count int + err = s.database.QueryRow(selectCountMyRumor, serverKeysPath).Scan(&count) + if err != nil { + return -1, err + } + + err = tx.Commit() + if err != nil { + return -1, err + } + return count, nil +} + +func (s *SQLite) GetAndIncrementMyRumor() (bool, method.Rumor, error) { + dbLock.Lock() + defer dbLock.Unlock() + + tx, err := s.database.Begin() + if err != nil { + return false, method.Rumor{}, err + } + defer tx.Rollback() + + rows, err := s.database.Query(selectMyRumorMessages, true, serverKeysPath, serverKeysPath) + if err != nil { + return false, method.Rumor{}, err + } + defer rows.Close() + + messages := make(map[string][]message.Message) + for rows.Next() { + var msgBytes []byte + var channelPath string + if err = rows.Scan(&msgBytes, &channelPath); err != nil { + return false, method.Rumor{}, err + } + + var msg message.Message + if err = json.Unmarshal(msgBytes, &msg); err != nil { + return false, method.Rumor{}, err + } + + messages[channelPath] = append(messages[channelPath], msg) + } + + if len(messages) == 0 { + return false, method.Rumor{}, nil + } + + var rumorID int + var sender string + err = tx.QueryRow(selectMyRumorInfos, serverKeysPath).Scan(&rumorID, &sender) + if err != nil { + return false, method.Rumor{}, err + } + + rumor := newRumor(rumorID, sender, messages) + + _, err = tx.Exec(insertRumor, rumorID+1, sender) + if err != nil { + return false, method.Rumor{}, err + } + + err = tx.Commit() + if err != nil { + return false, method.Rumor{}, err + } + + return true, rumor, nil +} + +func newRumor(rumorID int, sender string, messages map[string][]message.Message) method.Rumor { + params := method.ParamsRumor{ + RumorID: rumorID, + SenderID: sender, + Messages: messages, + } + + return method.Rumor{ + Base: query.Base{ + JSONRPCBase: jsonrpc.JSONRPCBase{ + JSONRPC: "2.0", + }, + Method: "rumor", + }, + Params: params, + } +} + //====================================================================================================================== // FederationRepository interface implementation //====================================================================================================================== diff --git a/be1-go/internal/popserver/database/sqlite/sqlite_const.go b/be1-go/internal/popserver/database/sqlite/sqlite_const.go index 9c242472b3..ec688ee254 100644 --- a/be1-go/internal/popserver/database/sqlite/sqlite_const.go +++ b/be1-go/internal/popserver/database/sqlite/sqlite_const.go @@ -100,16 +100,68 @@ const ( signature TEXT UNIQUE, PRIMARY KEY (messageID, witness) )` + + createRumor = ` + CREATE TABLE IF NOT EXISTS rumor ( + ID INTEGER, + sender TEXT, + PRIMARY KEY (ID, sender) + )` + + createMessageRumor = ` + CREATE TABLE IF NOT EXISTS messageRumor ( + messageID TEXT, + rumorID INTEGER, + sender TEXT, + FOREIGN KEY (messageID) REFERENCES message(messageID), + FOREIGN KEY (rumorID, sender) REFERENCES rumor(ID, sender), + PRIMARY KEY (messageID, rumorID, sender) + )` + + createUnprocessedMessage = ` + CREATE TABLE IF NOT EXISTS unprocessedMessage ( + messageID TEXT, + channelPath TEXT, + message TEXT, + PRIMARY KEY (messageID) + )` + + createUnprocessedMessageRumor = ` + CREATE TABLE IF NOT EXISTS unprocessedMessageRumor ( + messageID TEXT, + rumorID INTEGER, + sender TEXT, + FOREIGN KEY (messageID) REFERENCES unprocessedMessage(messageID), + FOREIGN KEY (rumorID, sender) REFERENCES rumor(ID, sender), + PRIMARY KEY (messageID, rumorID, sender) + )` ) const ( - insertChannelMessage = `INSERT INTO channelMessage (channelPath, messageID, isBaseChannel) VALUES (?, ?, ?)` - insertMessage = `INSERT INTO message (messageID, message, messageData, storedTime) VALUES (?, ?, ?, ?)` - insertChannel = `INSERT INTO channel (channelPath, typeID, laoPath) VALUES (?, ?, ?)` - insertOrIgnoreChannel = `INSERT OR IGNORE INTO channel (channelPath, typeID, laoPath) VALUES (?, ?, ?)` - insertKeys = `INSERT INTO key (channelPath, publicKey, secretKey) VALUES (?, ?, ?)` - insertPublicKey = `INSERT INTO key (channelPath, publicKey) VALUES (?, ?)` - insertPendingSignatures = `INSERT INTO pendingSignatures (messageID, witness, signature) VALUES (?, ?, ?)` + insertChannelMessage = `INSERT INTO channelMessage (channelPath, messageID, isBaseChannel) VALUES (?, ?, ?)` + insertMessage = `INSERT INTO message (messageID, message, messageData, storedTime) VALUES (?, ?, ?, ?)` + insertChannel = `INSERT INTO channel (channelPath, typeID, laoPath) VALUES (?, ?, ?)` + insertOrIgnoreChannel = `INSERT OR IGNORE INTO channel (channelPath, typeID, laoPath) VALUES (?, ?, ?)` + insertKeys = `INSERT INTO key (channelPath, publicKey, secretKey) VALUES (?, ?, ?)` + insertPublicKey = `INSERT INTO key (channelPath, publicKey) VALUES (?, ?)` + insertPendingSignatures = `INSERT INTO pendingSignatures (messageID, witness, signature) VALUES (?, ?, ?)` + insertRumor = `INSERT INTO rumor (ID, sender) VALUES (?, ?)` + insertUnprocessedMessage = `INSERT INTO unprocessedMessage (messageID, channelPath, message) VALUES (?, ?, ?)` + insertUnprocessedMessageRumor = `INSERT INTO unprocessedMessageRumor (messageID, rumorID, sender) VALUES (?, ?, ?)` + insertMessageRumor = `INSERT INTO messageRumor (messageID, rumorID, sender) VALUES (?, ?, ?)` + tranferUnprocessedMessageRumor = `INSERT INTO messageRumor (messageID, rumorID, sender) SELECT messageID, rumorID, sender FROM unprocessedMessageRumor WHERE messageID = ?` + insertMessageToMyRumor = ` + INSERT INTO messageRumor (messageID, rumorID, sender) + SELECT ?, max(ID), sender + FROM rumor + WHERE sender = ( + SELECT publicKey + FROM key + WHERE channelPath = ? + ) + LIMIT 1` + + insertFirstRumor = `INSERT OR IGNORE INTO rumor (ID, sender) SELECT ?, publicKey FROM key WHERE channelPath = ?` ) const ( @@ -288,6 +340,25 @@ const ( FROM message WHERE messageID = ?` + selectAnyRumor = `SELECT ID FROM rumor WHERE sender = ?` + + selectAllUnprocessedMessages = `SELECT channelPath, message FROM unprocessedMessage` + + selectCountMyRumor = `SELECT count(*) FROM messageRumor WHERE rumorID = (SELECT max(ID) FROM rumor WHERE sender = (SELECT publicKey FROM key WHERE channelPath = ?))` + + selectMyRumorMessages = ` + select message, channelPath + FROM message JOIN channelMessage ON message.messageID = channelMessage.messageID + WHERE isBaseChannel = ? + AND message.messageID IN + (SELECT messageID + FROM messageRumor + WHERE sender = (SELECT publicKey FROM key WHERE channelPath = ?) AND rumorID = (SELECT max(ID) FROM rumor + WHERE sender = (SELECT publicKey FROM key WHERE channelPath = ?)))` + + selectMyRumorInfos = `SELECT max(ID), sender FROM rumor WHERE sender = (SELECT publicKey FROM key WHERE channelPath = ?)` + selectLastRumor = `SELECT max(ID) FROM rumor WHERE sender = ?` + selectValidFederationChallenges = ` SELECT messageData FROM ( @@ -330,7 +401,9 @@ const ( ) const ( - deletePendingSignatures = `DELETE FROM pendingSignatures WHERE messageID = ?` + deletePendingSignatures = `DELETE FROM pendingSignatures WHERE messageID = ?` + deleteUnprocessedMessage = `DELETE FROM unprocessedMessage WHERE messageID = ?` + deleteUnprocessedMessageRumor = `DELETE FROM unprocessedMessageRumor WHERE messageID = ?` ) const ( diff --git a/be1-go/internal/popserver/database/sqlite/sqlite_init.go b/be1-go/internal/popserver/database/sqlite/sqlite_init.go index cbbf0a0ed8..e20e03cf67 100644 --- a/be1-go/internal/popserver/database/sqlite/sqlite_init.go +++ b/be1-go/internal/popserver/database/sqlite/sqlite_init.go @@ -2,6 +2,8 @@ package sqlite import ( "database/sql" + "encoding/base64" + "go.dedis.ch/kyber/v3" database2 "popstellar/internal/popserver/database/repository" "sync" ) @@ -91,6 +93,12 @@ func NewSQLite(path string, foreignKeyOn bool) (SQLite, error) { return SQLite{}, err } + err = initRumorTables(tx) + if err != nil { + db.Close() + return SQLite{}, err + } + err = tx.Commit() if err != nil { db.Close() @@ -100,6 +108,30 @@ func NewSQLite(path string, foreignKeyOn bool) (SQLite, error) { return SQLite{database: db}, nil } +func initRumorTables(tx *sql.Tx) error { + _, err := tx.Exec(createRumor) + if err != nil { + return err + } + + _, err = tx.Exec(createMessageRumor) + if err != nil { + return err + } + + _, err = tx.Exec(createUnprocessedMessage) + if err != nil { + return err + } + + _, err = tx.Exec(createUnprocessedMessageRumor) + if err != nil { + return err + } + + return nil +} + // Close closes the SQLite database. func (s *SQLite) Close() error { dbLock.Lock() @@ -108,6 +140,41 @@ func (s *SQLite) Close() error { return s.database.Close() } +func (s *SQLite) StoreServerKeys(serverPubKey kyber.Point, serverSecretKey kyber.Scalar) error { + dbLock.Lock() + defer dbLock.Unlock() + + tx, err := s.database.Begin() + if err != nil { + return err + } + defer tx.Rollback() + + serverPubBuf, err := serverPubKey.MarshalBinary() + if err != nil { + return err + } + serverSecBuf, err := serverSecretKey.MarshalBinary() + if err != nil { + return err + } + + _, err = tx.Exec(insertKeys, serverKeysPath, base64.URLEncoding.EncodeToString(serverPubBuf), + base64.URLEncoding.EncodeToString(serverSecBuf)) + if err != nil { + return err + } + + return tx.Commit() +} + +func (s *SQLite) StoreFirstRumor() error { + dbLock.Lock() + defer dbLock.Unlock() + _, err := s.database.Exec(insertFirstRumor, 0, serverKeysPath) + return err +} + func fillChannelTypes(tx *sql.Tx) error { for _, channelType := range channelTypes { _, err := tx.Exec("INSERT INTO channelType (type) VALUES (?)", channelType) diff --git a/be1-go/internal/popserver/generatortest/query.go b/be1-go/internal/popserver/generatortest/query.go index 43803a9308..5e735a0969 100644 --- a/be1-go/internal/popserver/generatortest/query.go +++ b/be1-go/internal/popserver/generatortest/query.go @@ -149,3 +149,25 @@ func NewGetMessagesByIDQuery(t *testing.T, queryID int, msgIDsByChannel map[stri return getMessagesByIDBuf } + +func NewRumorQuery(t *testing.T, queryID int, senderID string, rumorID int, messages map[string][]message.Message) []byte { + rumor := method.Rumor{ + Base: query.Base{ + JSONRPCBase: jsonrpc.JSONRPCBase{ + JSONRPC: "2.0", + }, + Method: query.MethodRumor, + }, + ID: queryID, + Params: method.ParamsRumor{ + SenderID: senderID, + RumorID: rumorID, + Messages: messages, + }, + } + + rumorBuf, err := json.Marshal(&rumor) + require.NoError(t, err) + + return rumorBuf +} diff --git a/be1-go/internal/popserver/handler/answer.go b/be1-go/internal/popserver/handler/answer.go index 73e37de183..f0cf1c81c7 100644 --- a/be1-go/internal/popserver/handler/answer.go +++ b/be1-go/internal/popserver/handler/answer.go @@ -2,14 +2,18 @@ package handler import ( "encoding/json" + "math/rand" + "popstellar" "popstellar/internal/popserver/state" - "popstellar/internal/popserver/utils" "popstellar/message/answer" "popstellar/message/query/method/message" "sort" ) -const maxRetry = 10 +const ( + maxRetry = 10 + continueMongering = 0.5 +) func handleAnswer(msg []byte) *answer.Error { var answerMsg answer.Answer @@ -20,18 +24,27 @@ func handleAnswer(msg []byte) *answer.Error { return errAnswer.Wrap("handleAnswer") } + isRumor, errAnswer := state.IsRumorQuery(*answerMsg.ID) + if errAnswer != nil { + return errAnswer + } + if isRumor { + return handleRumorAnswer(answerMsg) + } + if answerMsg.Result == nil { - utils.LogInfo("received an error, nothing to handle") + popstellar.Logger.Info().Msg("received an error, nothing to handle") // don't send any error to avoid infinite error loop as a server will // send an error to another server that will create another error return nil } + if answerMsg.Result.IsEmpty() { - utils.LogInfo("expected isn't an answer to a popquery, nothing to handle") + popstellar.Logger.Info().Msg("expected isn't an answer to a popquery, nothing to handle") return nil } - errAnswer := state.SetQueryReceived(*answerMsg.ID) + errAnswer = state.SetQueryReceived(*answerMsg.ID) if errAnswer != nil { return errAnswer.Wrap("handleAnswer") } @@ -44,6 +57,45 @@ func handleAnswer(msg []byte) *answer.Error { return nil } +func handleRumorAnswer(msg answer.Answer) *answer.Error { + errAnswer := state.SetQueryReceived(*msg.ID) + if errAnswer != nil { + return errAnswer + } + + popstellar.Logger.Debug().Msgf("received an answer to rumor query %d", *msg.ID) + + if msg.Error != nil { + popstellar.Logger.Debug().Msgf("received an answer error to rumor query %d", *msg.ID) + if msg.Error.Code != answer.DuplicateResourceErrorCode { + popstellar.Logger.Debug().Msgf("invalid error code to rumor query %d", *msg.ID) + return nil + } + + stop := rand.Float64() < continueMongering + + if stop { + popstellar.Logger.Debug().Msgf("stop mongering rumor query %d", *msg.ID) + return nil + } + + popstellar.Logger.Debug().Msgf("continue mongering rumor query %d", *msg.ID) + } + + popstellar.Logger.Debug().Msgf("sender rumor need to continue sending query %d", *msg.ID) + rumor, ok, errAnswer := state.GetRumorFromPastQuery(*msg.ID) + if errAnswer != nil { + return errAnswer + } + if !ok { + return answer.NewInternalServerError("rumor query %d doesn't exist", *msg.ID) + } + + SendRumor(nil, rumor) + + return nil +} + func handleGetMessagesByIDAnswer(msg answer.Answer) *answer.Error { result := msg.Result.GetMessagesByChannel() msgsByChan := make(map[string]map[string]message.Message) @@ -60,7 +112,7 @@ func handleGetMessagesByIDAnswer(msg answer.Answer) *answer.Error { } errAnswer := answer.NewInvalidMessageFieldError("failed to unmarshal: %v", err) - utils.LogError(errAnswer.Wrap("handleGetMessagesByIDAnswer")) + popstellar.Logger.Error().Err(errAnswer) } if len(msgsByChan[channelID]) == 0 { @@ -92,7 +144,7 @@ func tryToHandleMessages(msgsByChannel map[string]map[string]message.Message, so for _, channelID := range sortedChannelIDs { msgs := msgsByChannel[channelID] for msgID, msg := range msgs { - errAnswer := handleChannel(channelID, msg) + errAnswer := handleChannel(channelID, msg, false) if errAnswer == nil { delete(msgsByChannel[channelID], msgID) continue @@ -103,7 +155,7 @@ func tryToHandleMessages(msgsByChannel map[string]map[string]message.Message, so } errAnswer = errAnswer.Wrap(msgID).Wrap("tryToHandleMessages") - utils.LogError(errAnswer) + popstellar.Logger.Error().Err(errAnswer) } if len(msgsByChannel[channelID]) == 0 { diff --git a/be1-go/internal/popserver/handler/channel.go b/be1-go/internal/popserver/handler/channel.go index 275e37dd10..cf023ce35c 100644 --- a/be1-go/internal/popserver/handler/channel.go +++ b/be1-go/internal/popserver/handler/channel.go @@ -20,7 +20,7 @@ import ( "popstellar/validation" ) -func handleChannel(channelPath string, msg message.Message) *answer.Error { +func handleChannel(channelPath string, msg message.Message, fromRumor bool) *answer.Error { errAnswer := verifyMessage(msg) if errAnswer != nil { return errAnswer.Wrap("handleChannel") @@ -36,6 +36,9 @@ func handleChannel(channelPath string, msg message.Message) *answer.Error { errAnswer := answer.NewQueryDatabaseError("if message exists: %v", err) return errAnswer.Wrap("handleChannel") } + if msgAlreadyExists && fromRumor { + return nil + } if msgAlreadyExists { errAnswer := answer.NewInvalidActionError("message %s was already received", msg.MessageID) return errAnswer.Wrap("handleChannel") diff --git a/be1-go/internal/popserver/handler/channel_test.go b/be1-go/internal/popserver/handler/channel_test.go index b76b95d064..13b9734d39 100644 --- a/be1-go/internal/popserver/handler/channel_test.go +++ b/be1-go/internal/popserver/handler/channel_test.go @@ -169,7 +169,7 @@ func Test_handleChannel(t *testing.T) { for _, arg := range args { t.Run(arg.name, func(t *testing.T) { - errAnswer := handleChannel(arg.channel, arg.message) + errAnswer := handleChannel(arg.channel, arg.message, false) require.NotNil(t, errAnswer) require.Contains(t, errAnswer.Error(), arg.contains) }) diff --git a/be1-go/internal/popserver/handler/lao.go b/be1-go/internal/popserver/handler/lao.go index 92e02aebe5..d9a925db2e 100644 --- a/be1-go/internal/popserver/handler/lao.go +++ b/be1-go/internal/popserver/handler/lao.go @@ -161,9 +161,6 @@ func handleRollCallClose(msg message.Message, channelPath string) *answer.Error if errAnswer != nil { return errAnswer.Wrap("handleRollCallClose") } - if len(newChannels) == 0 { - return nil - } err = db.StoreRollCallClose(newChannels, channelPath, msg) if err != nil { diff --git a/be1-go/internal/popserver/handler/publish.go b/be1-go/internal/popserver/handler/publish.go new file mode 100644 index 0000000000..a781d1885d --- /dev/null +++ b/be1-go/internal/popserver/handler/publish.go @@ -0,0 +1,60 @@ +package handler + +import ( + "encoding/json" + "popstellar" + "popstellar/internal/popserver/database" + "popstellar/internal/popserver/state" + "popstellar/message/answer" + "popstellar/message/query/method" + "popstellar/network/socket" + "strings" +) + +const thresholdMessagesByRumor = 1 + +func handlePublish(socket socket.Socket, msg []byte) (*int, *answer.Error) { + var publish method.Publish + + err := json.Unmarshal(msg, &publish) + if err != nil { + errAnswer := answer.NewJsonUnmarshalError(err.Error()) + return nil, errAnswer.Wrap("handlePublish") + } + + errAnswer := handleChannel(publish.Params.Channel, publish.Params.Message, false) + if errAnswer != nil { + return &publish.ID, errAnswer.Wrap("handlePublish") + } + + socket.SendResult(publish.ID, nil, nil) + + if strings.Contains(publish.Params.Channel, "federation") { + return nil, nil + } + + db, errAnswer := database.GetRumorSenderRepositoryInstance() + if errAnswer != nil { + popstellar.Logger.Error().Err(errAnswer) + return nil, nil + } + + popstellar.Logger.Debug().Msgf("sender rumor need to add message %s", publish.Params.Message.MessageID) + nbMessagesInsideRumor, err := db.AddMessageToMyRumor(publish.Params.Message.MessageID) + if err != nil { + popstellar.Logger.Error().Err(err) + return nil, nil + } + + if nbMessagesInsideRumor < thresholdMessagesByRumor { + popstellar.Logger.Debug().Msgf("no enough message to send rumor %s", publish.Params.Message.MessageID) + return nil, nil + } + + errAnswer = state.NotifyResetRumorSender() + if errAnswer != nil { + popstellar.Logger.Error().Err(errAnswer) + } + + return nil, nil +} diff --git a/be1-go/internal/popserver/handler/query.go b/be1-go/internal/popserver/handler/query.go index dbe9635ca5..a2bdab6901 100644 --- a/be1-go/internal/popserver/handler/query.go +++ b/be1-go/internal/popserver/handler/query.go @@ -40,6 +40,8 @@ func handleQuery(socket socket.Socket, msg []byte) *answer.Error { id, errAnswer = handleSubscribe(socket, msg) case query.MethodUnsubscribe: id, errAnswer = handleUnsubscribe(socket, msg) + case query.MethodRumor: + id, errAnswer = handleRumor(socket, msg) default: errAnswer = answer.NewInvalidResourceError("unexpected method: '%s'", queryBase.Method) } @@ -160,25 +162,6 @@ func handleUnsubscribe(socket socket.Socket, msg []byte) (*int, *answer.Error) { return &unsubscribe.ID, nil } -func handlePublish(socket socket.Socket, msg []byte) (*int, *answer.Error) { - var publish method.Publish - - err := json.Unmarshal(msg, &publish) - if err != nil { - errAnswer := answer.NewJsonUnmarshalError(err.Error()) - return nil, errAnswer.Wrap("handlePublish") - } - - errAnswer := handleChannel(publish.Params.Channel, publish.Params.Message) - if errAnswer != nil { - return &publish.ID, errAnswer.Wrap("handlePublish") - } - - socket.SendResult(publish.ID, nil, nil) - - return &publish.ID, nil -} - func handleCatchUp(socket socket.Socket, msg []byte) (*int, *answer.Error) { var catchup method.Catchup diff --git a/be1-go/internal/popserver/handler/rumor.go b/be1-go/internal/popserver/handler/rumor.go new file mode 100644 index 0000000000..c93eed48c1 --- /dev/null +++ b/be1-go/internal/popserver/handler/rumor.go @@ -0,0 +1,158 @@ +package handler + +import ( + "encoding/json" + "popstellar" + "popstellar/internal/popserver/database" + "popstellar/internal/popserver/state" + "popstellar/internal/popserver/utils" + "popstellar/message/answer" + "popstellar/message/query/method" + "popstellar/message/query/method/message" + "popstellar/network/socket" + "sort" +) + +func handleRumor(socket socket.Socket, msg []byte) (*int, *answer.Error) { + var rumor method.Rumor + + err := json.Unmarshal(msg, &rumor) + if err != nil { + errAnswer := answer.NewJsonUnmarshalError(err.Error()) + return nil, errAnswer.Wrap("handleRumor") + } + + popstellar.Logger.Debug().Msgf("received rumor %s-%d from query %d", + rumor.Params.SenderID, rumor.Params.RumorID, rumor.ID) + + db, errAnswer := database.GetQueryRepositoryInstance() + if errAnswer != nil { + return &rumor.ID, errAnswer.Wrap("handleRumor") + } + + ok, err := db.CheckRumor(rumor.Params.SenderID, rumor.Params.RumorID) + if err != nil { + errAnswer := answer.NewQueryDatabaseError("if rumor is not valid: %v", err) + return &rumor.ID, errAnswer.Wrap("handleRumor") + } + if !ok { + errAnswer := answer.NewInvalidResourceError("rumor %s: %v is not valid", + rumor.Params.SenderID, rumor.Params.RumorID) + return &rumor.ID, errAnswer + } + + socket.SendResult(rumor.ID, nil, nil) + + SendRumor(socket, rumor) + + processedMsgs := tryHandlingMessagesByChannel(rumor.Params.Messages) + + err = db.StoreRumor(rumor.Params.RumorID, rumor.Params.SenderID, rumor.Params.Messages, processedMsgs) + if err != nil { + utils.LogError(err) + return &rumor.ID, nil + } + + messages, err := db.GetUnprocessedMessagesByChannel() + if err != nil { + errAnswer := answer.NewQueryDatabaseError("unprocessed messages: %v", err) + return &rumor.ID, errAnswer.Wrap("handleRumor") + } + + _ = tryHandlingMessagesByChannel(messages) + + return &rumor.ID, nil +} + +func tryHandlingMessagesByChannel(unprocessedMsgsByChannel map[string][]message.Message) []string { + processedMsgs := make([]string, 0) + + sortedChannels := sortChannels(unprocessedMsgsByChannel) + + for _, channel := range sortedChannels { + unprocessedMsgs, newProcessedMsgs := tryHandlingMessages(channel, unprocessedMsgsByChannel[channel]) + + if len(newProcessedMsgs) > 0 { + processedMsgs = append(processedMsgs, newProcessedMsgs...) + } + + if len(unprocessedMsgs) > 0 { + unprocessedMsgsByChannel[channel] = unprocessedMsgs + } else { + delete(unprocessedMsgsByChannel, channel) + } + } + + return processedMsgs +} + +func tryHandlingMessages(channel string, unprocessedMsgs []message.Message) ([]message.Message, []string) { + processedMsgs := make([]string, 0) + + for i := 0; i < maxRetry; i++ { + nbProcessed := 0 + for index, msg := range unprocessedMsgs { + errAnswer := handleChannel(channel, msg, true) + if errAnswer == nil { + unprocessedMsgs = removeMessage(index-nbProcessed, unprocessedMsgs) + processedMsgs = append(processedMsgs, msg.MessageID) + nbProcessed++ + continue + } + + errAnswer = errAnswer.Wrap(msg.MessageID).Wrap("tryHandlingMessages") + popstellar.Logger.Error().Err(errAnswer) + } + + if len(unprocessedMsgs) == 0 { + break + } + } + + return unprocessedMsgs, processedMsgs +} + +func removeMessage(index int, messages []message.Message) []message.Message { + result := make([]message.Message, 0) + result = append(result, messages[:index]...) + return append(result, messages[index+1:]...) +} + +func sortChannels(msgsByChannel map[string][]message.Message) []string { + sortedChannelIDs := make([]string, 0) + for channelID := range msgsByChannel { + sortedChannelIDs = append(sortedChannelIDs, channelID) + } + sort.Slice(sortedChannelIDs, func(i, j int) bool { + return len(sortedChannelIDs[i]) < len(sortedChannelIDs[j]) + }) + return sortedChannelIDs +} + +func SendRumor(socket socket.Socket, rumor method.Rumor) { + id, errAnswer := state.GetNextID() + if errAnswer != nil { + popstellar.Logger.Error().Err(errAnswer) + return + } + + rumor.ID = id + + errAnswer = state.AddRumorQuery(id, rumor) + if errAnswer != nil { + popstellar.Logger.Error().Err(errAnswer) + return + } + + buf, err := json.Marshal(rumor) + if err != nil { + popstellar.Logger.Error().Err(err) + return + } + + popstellar.Logger.Debug().Msgf("sending rumor %s-%d query %d", rumor.Params.SenderID, rumor.Params.RumorID, rumor.ID) + errAnswer = state.SendRumor(socket, rumor.Params.SenderID, rumor.Params.RumorID, buf) + if errAnswer != nil { + popstellar.Logger.Err(errAnswer) + } +} diff --git a/be1-go/internal/popserver/hub.go b/be1-go/internal/popserver/hub.go index 0cac7aeb52..97ad6f617b 100644 --- a/be1-go/internal/popserver/hub.go +++ b/be1-go/internal/popserver/hub.go @@ -8,7 +8,6 @@ import ( "popstellar/internal/popserver/database" "popstellar/internal/popserver/handler" "popstellar/internal/popserver/state" - "popstellar/internal/popserver/types" "popstellar/internal/popserver/utils" jsonrpc "popstellar/message" "popstellar/message/query" @@ -18,14 +17,16 @@ import ( "time" ) -const heartbeatDelay = 30 * time.Second +const ( + heartbeatDelay = time.Second * 30 + rumorDelay = time.Second * 5 +) type Hub struct { wg *sync.WaitGroup messageChan chan socket.IncomingMessage stop chan struct{} closedSockets chan string - serverSockets types.Sockets } func NewHub() *Hub { @@ -58,20 +59,22 @@ func NewHub() *Hub { messageChan: messageChan, stop: stop, closedSockets: closedSockets, - serverSockets: types.NewSockets(), } } func (h *Hub) NotifyNewServer(socket socket.Socket) { - h.serverSockets.Upsert(socket) + errAnswer := state.Upsert(socket) + if errAnswer != nil { + popstellar.Logger.Err(errAnswer) + } } func (h *Hub) Start() { - h.wg.Add(2) + h.wg.Add(3) go func() { - defer h.wg.Done() ticker := time.NewTicker(heartbeatDelay) defer ticker.Stop() + defer h.wg.Done() for { select { @@ -83,8 +86,37 @@ func (h *Hub) Start() { } } }() + go func() { + ticker := time.NewTicker(rumorDelay) + defer ticker.Stop() + defer h.wg.Done() + defer popstellar.Logger.Info().Msg("stopping rumor sender") + + popstellar.Logger.Info().Msg("starting rumor sender") + + reset, errAnswer := state.GetResetRumorSender() + if errAnswer != nil { + popstellar.Logger.Error().Err(errAnswer) + return + } + + for { + select { + case <-ticker.C: + popstellar.Logger.Debug().Msgf("sender rumor trigerred") + h.tryToSendRumor() + case <-reset: + popstellar.Logger.Debug().Msgf("sender rumor reset") + ticker.Reset(rumorDelay) + h.tryToSendRumor() + case <-h.stop: + return + } + } + }() go func() { defer h.wg.Done() + utils.LogInfo("start the Hub") for { utils.LogInfo("waiting for a new message") @@ -159,17 +191,6 @@ func (h *Hub) SendGreetServer(socket socket.Socket) error { // sendHeartbeatToServers sends a heartbeat message to all servers func (h *Hub) sendHeartbeatToServers() { - - db, errAnswer := database.GetQueryRepositoryInstance() - if errAnswer != nil { - return - } - - params, err := db.GetParamsHeartbeat() - if err != nil { - return - } - heartbeatMessage := method.Heartbeat{ Base: query.Base{ JSONRPCBase: jsonrpc.JSONRPCBase{ @@ -177,12 +198,36 @@ func (h *Hub) sendHeartbeatToServers() { }, Method: "heartbeat", }, - Params: params, + Params: make(map[string][]string), } buf, err := json.Marshal(heartbeatMessage) if err != nil { - utils.LogError(err) + popstellar.Logger.Err(err) + } + + errAnswer := state.SendToAllServer(buf) + if errAnswer != nil { + popstellar.Logger.Err(errAnswer) } - h.serverSockets.SendToAll(buf) +} + +func (h *Hub) tryToSendRumor() { + db, errAnswer := database.GetRumorSenderRepositoryInstance() + if errAnswer != nil { + popstellar.Logger.Error().Err(errAnswer) + return + } + + ok, rumor, err := db.GetAndIncrementMyRumor() + if err != nil { + popstellar.Logger.Error().Err(err) + return + } + if !ok { + popstellar.Logger.Debug().Msg("no new rumor to send") + return + } + + handler.SendRumor(nil, rumor) } diff --git a/be1-go/internal/popserver/state/hub_parameter.go b/be1-go/internal/popserver/state/hub_parameter.go new file mode 100644 index 0000000000..179196643c --- /dev/null +++ b/be1-go/internal/popserver/state/hub_parameter.go @@ -0,0 +1,58 @@ +package state + +import ( + "popstellar/message/answer" + "popstellar/network/socket" + "sync" +) + +type HubParameter interface { + GetWaitGroup() *sync.WaitGroup + GetMessageChan() chan socket.IncomingMessage + GetStopChan() chan struct{} + GetClosedSockets() chan string +} + +func getHubParams() (HubParameter, *answer.Error) { + if instance == nil || instance.hubParams == nil { + return nil, answer.NewInternalServerError("hubparams was not instantiated") + } + + return instance.hubParams, nil +} + +func GetWaitGroup() (*sync.WaitGroup, *answer.Error) { + hubParams, errAnswer := getHubParams() + if errAnswer != nil { + return nil, errAnswer + } + + return hubParams.GetWaitGroup(), nil +} + +func GetMessageChan() (chan socket.IncomingMessage, *answer.Error) { + hubParams, errAnswer := getHubParams() + if errAnswer != nil { + return nil, errAnswer + } + + return hubParams.GetMessageChan(), nil +} + +func GetStopChan() (chan struct{}, *answer.Error) { + hubParams, errAnswer := getHubParams() + if errAnswer != nil { + return nil, errAnswer + } + + return hubParams.GetStopChan(), nil +} + +func GetClosedSockets() (chan string, *answer.Error) { + hubParams, errAnswer := getHubParams() + if errAnswer != nil { + return nil, errAnswer + } + + return hubParams.GetClosedSockets(), nil +} diff --git a/be1-go/internal/popserver/state/peerer.go b/be1-go/internal/popserver/state/peerer.go new file mode 100644 index 0000000000..8d3e379e86 --- /dev/null +++ b/be1-go/internal/popserver/state/peerer.go @@ -0,0 +1,65 @@ +package state + +import ( + "popstellar/message/answer" + "popstellar/message/query/method" +) + +type Peerer interface { + AddPeerInfo(socketID string, info method.GreetServerParams) error + AddPeerGreeted(socketID string) + GetAllPeersInfo() []method.GreetServerParams + IsPeerGreeted(socketID string) bool +} + +func getPeers() (Peerer, *answer.Error) { + if instance == nil || instance.peers == nil { + return nil, answer.NewInternalServerError("peerer was not instantiated") + } + + return instance.peers, nil +} + +func AddPeerInfo(socketID string, info method.GreetServerParams) *answer.Error { + peers, errAnswer := getPeers() + if errAnswer != nil { + return errAnswer + } + + err := peers.AddPeerInfo(socketID, info) + if err != nil { + errAnswer := answer.NewInvalidActionError("failed to add peer: %v", err) + return errAnswer + } + + return nil +} + +func AddPeerGreeted(socketID string) *answer.Error { + peers, errAnswer := getPeers() + if errAnswer != nil { + return errAnswer + } + + peers.AddPeerGreeted(socketID) + + return nil +} + +func GetAllPeersInfo() ([]method.GreetServerParams, *answer.Error) { + peers, errAnswer := getPeers() + if errAnswer != nil { + return nil, errAnswer + } + + return peers.GetAllPeersInfo(), nil +} + +func IsPeerGreeted(socketID string) (bool, *answer.Error) { + peers, errAnswer := getPeers() + if errAnswer != nil { + return false, errAnswer + } + + return peers.IsPeerGreeted(socketID), nil +} diff --git a/be1-go/internal/popserver/state/querier.go b/be1-go/internal/popserver/state/querier.go new file mode 100644 index 0000000000..ec16cd357a --- /dev/null +++ b/be1-go/internal/popserver/state/querier.go @@ -0,0 +1,90 @@ +package state + +import ( + "popstellar/message/answer" + "popstellar/message/query/method" +) + +type Querier interface { + GetQueryState(ID int) (bool, error) + GetNextID() int + SetQueryReceived(ID int) error + AddQuery(ID int, query method.GetMessagesById) + AddRumorQuery(id int, query method.Rumor) + IsRumorQuery(queryID int) bool + GetRumorFromPastQuery(queryID int) (method.Rumor, bool) +} + +func getQueries() (Querier, *answer.Error) { + if instance == nil || instance.queries == nil { + return nil, answer.NewInternalServerError("querier was not instantiated") + } + + return instance.queries, nil +} + +func GetNextID() (int, *answer.Error) { + queries, errAnswer := getQueries() + if errAnswer != nil { + return -1, errAnswer + } + + return queries.GetNextID(), nil +} + +func SetQueryReceived(ID int) *answer.Error { + queries, errAnswer := getQueries() + if errAnswer != nil { + return errAnswer + } + + err := queries.SetQueryReceived(ID) + if err != nil { + errAnswer := answer.NewInvalidActionError("%v", err) + return errAnswer + } + + return nil +} + +func AddQuery(ID int, query method.GetMessagesById) *answer.Error { + queries, errAnswer := getQueries() + if errAnswer != nil { + return errAnswer + } + + queries.AddQuery(ID, query) + + return nil +} + +func AddRumorQuery(ID int, query method.Rumor) *answer.Error { + queries, errAnswer := getQueries() + if errAnswer != nil { + return errAnswer + } + + queries.AddRumorQuery(ID, query) + + return nil +} + +func IsRumorQuery(ID int) (bool, *answer.Error) { + queries, errAnswer := getQueries() + if errAnswer != nil { + return false, errAnswer + } + + return queries.IsRumorQuery(ID), nil +} + +func GetRumorFromPastQuery(ID int) (method.Rumor, bool, *answer.Error) { + queries, errAnswer := getQueries() + if errAnswer != nil { + return method.Rumor{}, false, errAnswer + } + + rumor, ok := queries.GetRumorFromPastQuery(ID) + + return rumor, ok, nil +} diff --git a/be1-go/internal/popserver/state/socketer.go b/be1-go/internal/popserver/state/socketer.go new file mode 100644 index 0000000000..dbdb68321f --- /dev/null +++ b/be1-go/internal/popserver/state/socketer.go @@ -0,0 +1,67 @@ +package state + +import ( + "popstellar/message/answer" + "popstellar/network/socket" +) + +type Socketer interface { + SendToAll(buf []byte) + SendRumor(socket socket.Socket, senderID string, rumorID int, buf []byte) + Upsert(socket socket.Socket) + Delete(ID string) bool +} + +func getSockets() (Socketer, *answer.Error) { + if instance == nil || instance.sockets == nil { + return nil, answer.NewInternalServerError("sockets was not instantiated") + } + + return instance.sockets, nil +} + +func SendToAllServer(buf []byte) *answer.Error { + sockets, errAnswer := getSockets() + if errAnswer != nil { + return errAnswer + } + + sockets.SendToAll(buf) + + return nil +} + +func SendRumor(socket socket.Socket, senderID string, rumorID int, buf []byte) *answer.Error { + sockets, errAnswer := getSockets() + if errAnswer != nil { + return errAnswer + } + + sockets.SendRumor(socket, senderID, rumorID, buf) + + return nil +} + +// Upsert upserts a socket into the Sockets store. +func Upsert(socket socket.Socket) *answer.Error { + sockets, errAnswer := getSockets() + if errAnswer != nil { + return errAnswer + } + + sockets.Upsert(socket) + + return nil +} + +// Delete deletes a socket from the store. Returns false +// if the socket is not present in the store and true +// on success. +func Delete(ID string) (bool, *answer.Error) { + sockets, errAnswer := getSockets() + if errAnswer != nil { + return false, errAnswer + } + + return sockets.Delete(ID), nil +} diff --git a/be1-go/internal/popserver/state/state.go b/be1-go/internal/popserver/state/state.go index 8083e53945..b441a78d0c 100644 --- a/be1-go/internal/popserver/state/state.go +++ b/be1-go/internal/popserver/state/state.go @@ -4,8 +4,6 @@ import ( "github.com/rs/zerolog" "popstellar/internal/popserver/types" "popstellar/message/answer" - "popstellar/message/query/method" - "popstellar/network/socket" "sync" ) @@ -13,49 +11,23 @@ var once sync.Once var instance *state type state struct { - subs Subscriber - peers Peerer - queries Querier - hubParams HubParameter -} - -type HubParameter interface { - GetWaitGroup() *sync.WaitGroup - GetMessageChan() chan socket.IncomingMessage - GetStopChan() chan struct{} - GetClosedSockets() chan string -} - -type Subscriber interface { - AddChannel(channel string) *answer.Error - HasChannel(channel string) bool - Subscribe(channel string, socket socket.Socket) *answer.Error - Unsubscribe(channel string, socket socket.Socket) *answer.Error - UnsubscribeFromAll(socketID string) - SendToAll(buf []byte, channel string) *answer.Error -} - -type Peerer interface { - AddPeerInfo(socketID string, info method.GreetServerParams) error - AddPeerGreeted(socketID string) - GetAllPeersInfo() []method.GreetServerParams - IsPeerGreeted(socketID string) bool -} - -type Querier interface { - GetQueryState(ID int) (bool, error) - GetNextID() int - SetQueryReceived(ID int) error - AddQuery(ID int, query method.GetMessagesById) + subs Subscriber + peers Peerer + queries Querier + hubParams HubParameter + sockets Socketer + resetRumorSender chan struct{} } func InitState(log *zerolog.Logger) { once.Do(func() { instance = &state{ - subs: types.NewSubscribers(), - peers: types.NewPeers(), - queries: types.NewQueries(log), - hubParams: types.NewHubParams(), + subs: types.NewSubscribers(), + peers: types.NewPeers(), + queries: types.NewQueries(log), + hubParams: types.NewHubParams(), + sockets: types.NewSockets(), + resetRumorSender: make(chan struct{}), } }) } @@ -71,205 +43,23 @@ func SetState(subs Subscriber, peers Peerer, queries Querier, hubParams HubParam } } -func getSubs() (Subscriber, *answer.Error) { - if instance == nil || instance.subs == nil { - return nil, answer.NewInternalServerError("subscriber was not instantiated") - } - - return instance.subs, nil -} - -func AddChannel(channel string) *answer.Error { - subs, errAnswer := getSubs() - if errAnswer != nil { - return errAnswer - } - - return subs.AddChannel(channel) -} - -func HasChannel(channel string) (bool, *answer.Error) { - subs, errAnswer := getSubs() - if errAnswer != nil { - return false, errAnswer - } - - return subs.HasChannel(channel), nil -} - -func Subscribe(socket socket.Socket, channel string) *answer.Error { - subs, errAnswer := getSubs() - if errAnswer != nil { - return errAnswer +func GetResetRumorSender() (chan struct{}, *answer.Error) { + if instance == nil || instance.resetRumorSender == nil { + return nil, answer.NewInternalServerError("resetRumorSender was not instantiated") } - return subs.Subscribe(channel, socket) + return instance.resetRumorSender, nil } -func Unsubscribe(socket socket.Socket, channel string) *answer.Error { - subs, errAnswer := getSubs() - if errAnswer != nil { - return errAnswer +func NotifyResetRumorSender() *answer.Error { + if instance == nil || instance.resetRumorSender == nil { + return answer.NewInternalServerError("resetRumorSender was not instantiated") } - return subs.Unsubscribe(channel, socket) -} - -func UnsubscribeFromAll(socketID string) *answer.Error { - subs, errAnswer := getSubs() - if errAnswer != nil { - return errAnswer + select { + case instance.resetRumorSender <- struct{}{}: + case <-instance.hubParams.GetStopChan(): } - subs.UnsubscribeFromAll(socketID) - return nil } - -func SendToAll(buf []byte, channel string) *answer.Error { - subs, errAnswer := getSubs() - if errAnswer != nil { - return errAnswer - } - - return subs.SendToAll(buf, channel) -} - -func getPeers() (Peerer, *answer.Error) { - if instance == nil || instance.peers == nil { - return nil, answer.NewInternalServerError("peerer was not instantiated") - } - - return instance.peers, nil -} - -func AddPeerInfo(socketID string, info method.GreetServerParams) *answer.Error { - peers, errAnswer := getPeers() - if errAnswer != nil { - return errAnswer - } - - err := peers.AddPeerInfo(socketID, info) - if err != nil { - errAnswer := answer.NewInvalidActionError("failed to add peer: %v", err) - return errAnswer - } - - return nil -} - -func AddPeerGreeted(socketID string) *answer.Error { - peers, errAnswer := getPeers() - if errAnswer != nil { - return errAnswer - } - - peers.AddPeerGreeted(socketID) - - return nil -} - -func GetAllPeersInfo() ([]method.GreetServerParams, *answer.Error) { - peers, errAnswer := getPeers() - if errAnswer != nil { - return nil, errAnswer - } - - return peers.GetAllPeersInfo(), nil -} - -func IsPeerGreeted(socketID string) (bool, *answer.Error) { - peers, errAnswer := getPeers() - if errAnswer != nil { - return false, errAnswer - } - - return peers.IsPeerGreeted(socketID), nil -} - -func getQueries() (Querier, *answer.Error) { - if instance == nil || instance.queries == nil { - return nil, answer.NewInternalServerError("querier was not instantiated") - } - - return instance.queries, nil -} - -func GetNextID() (int, *answer.Error) { - queries, errAnswer := getQueries() - if errAnswer != nil { - return -1, errAnswer - } - - return queries.GetNextID(), nil -} - -func SetQueryReceived(ID int) *answer.Error { - queries, errAnswer := getQueries() - if errAnswer != nil { - return errAnswer - } - - err := queries.SetQueryReceived(ID) - if err != nil { - errAnswer := answer.NewInvalidActionError("%v", err) - return errAnswer - } - - return nil -} - -func AddQuery(ID int, query method.GetMessagesById) *answer.Error { - queries, errAnswer := getQueries() - if errAnswer != nil { - return errAnswer - } - - queries.AddQuery(ID, query) - - return nil -} - -func getHubParams() (HubParameter, *answer.Error) { - if instance == nil || instance.hubParams == nil { - return nil, answer.NewInternalServerError("hubparams was not instantiated") - } - - return instance.hubParams, nil -} - -func GetWaitGroup() (*sync.WaitGroup, *answer.Error) { - hubParams, errAnswer := getHubParams() - if errAnswer != nil { - return nil, errAnswer - } - - return hubParams.GetWaitGroup(), nil -} - -func GetMessageChan() (chan socket.IncomingMessage, *answer.Error) { - hubParams, errAnswer := getHubParams() - if errAnswer != nil { - return nil, errAnswer - } - - return hubParams.GetMessageChan(), nil -} - -func GetStopChan() (chan struct{}, *answer.Error) { - hubParams, errAnswer := getHubParams() - if errAnswer != nil { - return nil, errAnswer - } - - return hubParams.GetStopChan(), nil -} - -func GetClosedSockets() (chan string, *answer.Error) { - hubParams, errAnswer := getHubParams() - if errAnswer != nil { - return nil, errAnswer - } - - return hubParams.GetClosedSockets(), nil -} diff --git a/be1-go/internal/popserver/state/subscriber.go b/be1-go/internal/popserver/state/subscriber.go new file mode 100644 index 0000000000..c94514d488 --- /dev/null +++ b/be1-go/internal/popserver/state/subscriber.go @@ -0,0 +1,79 @@ +package state + +import ( + "popstellar/message/answer" + "popstellar/network/socket" +) + +type Subscriber interface { + AddChannel(channel string) *answer.Error + HasChannel(channel string) bool + Subscribe(channel string, socket socket.Socket) *answer.Error + Unsubscribe(channel string, socket socket.Socket) *answer.Error + UnsubscribeFromAll(socketID string) + SendToAll(buf []byte, channel string) *answer.Error +} + +func getSubs() (Subscriber, *answer.Error) { + if instance == nil || instance.subs == nil { + return nil, answer.NewInternalServerError("subscriber was not instantiated") + } + + return instance.subs, nil +} + +func AddChannel(channel string) *answer.Error { + subs, errAnswer := getSubs() + if errAnswer != nil { + return errAnswer + } + + return subs.AddChannel(channel) +} + +func HasChannel(channel string) (bool, *answer.Error) { + subs, errAnswer := getSubs() + if errAnswer != nil { + return false, errAnswer + } + + return subs.HasChannel(channel), nil +} + +func Subscribe(socket socket.Socket, channel string) *answer.Error { + subs, errAnswer := getSubs() + if errAnswer != nil { + return errAnswer + } + + return subs.Subscribe(channel, socket) +} + +func Unsubscribe(socket socket.Socket, channel string) *answer.Error { + subs, errAnswer := getSubs() + if errAnswer != nil { + return errAnswer + } + + return subs.Unsubscribe(channel, socket) +} + +func UnsubscribeFromAll(socketID string) *answer.Error { + subs, errAnswer := getSubs() + if errAnswer != nil { + return errAnswer + } + + subs.UnsubscribeFromAll(socketID) + + return nil +} + +func SendToAll(buf []byte, channel string) *answer.Error { + subs, errAnswer := getSubs() + if errAnswer != nil { + return errAnswer + } + + return subs.SendToAll(buf, channel) +} diff --git a/be1-go/internal/popserver/types/queries.go b/be1-go/internal/popserver/types/queries.go index 6d154f857b..a56a6b842a 100644 --- a/be1-go/internal/popserver/types/queries.go +++ b/be1-go/internal/popserver/types/queries.go @@ -16,6 +16,8 @@ type Queries struct { state map[int]bool // getMessagesByIdQueries stores the server's getMessagesByIds queries by their ID. getMessagesByIdQueries map[int]method.GetMessagesById + getRumorQueries map[int]method.Rumor + // nextID store the ID of the next query nextID int // zerolog @@ -27,6 +29,7 @@ func NewQueries(log *zerolog.Logger) *Queries { return &Queries{ state: make(map[int]bool), getMessagesByIdQueries: make(map[int]method.GetMessagesById), + getRumorQueries: make(map[int]method.Rumor), log: log, } } @@ -81,3 +84,32 @@ func (q *Queries) AddQuery(id int, query method.GetMessagesById) { q.getMessagesByIdQueries[id] = query q.state[id] = false } + +func (q *Queries) AddRumorQuery(id int, query method.Rumor) { + q.Lock() + defer q.Unlock() + + q.getRumorQueries[id] = query + q.state[id] = false +} + +func (q *Queries) IsRumorQuery(queryID int) bool { + q.Lock() + defer q.Unlock() + + _, ok := q.getRumorQueries[queryID] + + return ok +} + +func (q *Queries) GetRumorFromPastQuery(queryID int) (method.Rumor, bool) { + q.Lock() + defer q.Unlock() + + rumor, ok := q.getRumorQueries[queryID] + if !ok { + return method.Rumor{}, false + } + + return rumor, true +} diff --git a/be1-go/internal/popserver/types/sockets.go b/be1-go/internal/popserver/types/sockets.go index 189a09d1e7..5f3d4c5b1a 100644 --- a/be1-go/internal/popserver/types/sockets.go +++ b/be1-go/internal/popserver/types/sockets.go @@ -1,26 +1,48 @@ package types import ( + "fmt" + "math/rand" + "popstellar" "popstellar/network/socket" "sync" ) // NewSockets returns a new initialized Sockets -func NewSockets() Sockets { - return Sockets{ - store: make(map[string]socket.Socket), +func NewSockets() *Sockets { + return &Sockets{ + rState: make(map[string]rumorState), + socketIDs: make([]string, 0), + store: make(map[string]socket.Socket), } } +type rumorState struct { + counter int + index int + bannedSocket string +} + // Sockets provides thread-functionalities around a socket store. type Sockets struct { sync.RWMutex - store map[string]socket.Socket + rState map[string]rumorState + socketIDs []string + store map[string]socket.Socket } -// Len returns the number of Sockets. -func (s *Sockets) Len() int { - return len(s.store) +func (s *Sockets) newRumorState(socket socket.Socket) rumorState { + bannedSocket := "" + + if socket != nil { + bannedSocket = socket.ID() + } + + return rumorState{ + counter: 0, + index: rand.Intn(len(s.store)), + bannedSocket: bannedSocket, + } } // SendToAll sends a message to all Sockets. @@ -28,16 +50,57 @@ func (s *Sockets) SendToAll(buf []byte) { s.RLock() defer s.RUnlock() - for _, s := range s.store { - s.Send(buf) + for _, v := range s.store { + v.Send(buf) } } +func (s *Sockets) SendRumor(socket socket.Socket, senderID string, rumorID int, buf []byte) { + s.Lock() + defer s.Unlock() + + if len(s.store) == 0 { + return + } + + senderRumorID := fmt.Sprintf("%s%d", senderID, rumorID) + + rState, ok := s.rState[senderRumorID] + if !ok { + rState = s.newRumorState(socket) + s.rState[senderRumorID] = rState + } else { + // to be sure to not overflow + rState.index %= len(s.store) + s.rState[senderRumorID] = rState + } + + if s.socketIDs[rState.index] == rState.bannedSocket { + rState.index += 1 + rState.index %= len(s.store) + rState.counter += 1 + s.rState[senderRumorID] = rState + } + + if rState.counter >= len(s.store) { + popstellar.Logger.Debug().Msgf("stop sending rumor because completed cycle") + return + } + + s.store[s.socketIDs[rState.index]].Send(buf) + + rState.index += 1 + rState.index %= len(s.store) + rState.counter += 1 + s.rState[senderRumorID] = rState +} + // Upsert upserts a socket into the Sockets store. func (s *Sockets) Upsert(socket socket.Socket) { s.Lock() defer s.Unlock() + s.socketIDs = append(s.socketIDs, socket.ID()) s.store[socket.ID()] = socket } @@ -55,5 +118,21 @@ func (s *Sockets) Delete(ID string) bool { delete(s.store, ID) + index := -1 + + for i, socketID := range s.socketIDs { + if socketID == ID { + index = i + } + } + + if index == -1 { + return false + } + + socketIDs := make([]string, 0) + socketIDs = append(socketIDs, s.socketIDs[:index]...) + s.socketIDs = append(socketIDs, s.socketIDs[index+1:]...) + return true } diff --git a/be1-go/logger.go b/be1-go/logger.go index ec3ebd6f50..725784ea3c 100644 --- a/be1-go/logger.go +++ b/be1-go/logger.go @@ -36,7 +36,7 @@ var ShortSHA = "unknown" // level. const EnvLogLevel = "LLVL" -const defaultLevel = zerolog.InfoLevel +const defaultLevel = zerolog.DebugLevel func init() { lvl := os.Getenv(EnvLogLevel) diff --git a/be1-go/message/query/method/rumor.go b/be1-go/message/query/method/rumor.go new file mode 100644 index 0000000000..fa39c852aa --- /dev/null +++ b/be1-go/message/query/method/rumor.go @@ -0,0 +1,19 @@ +package method + +import ( + "popstellar/message/query" + "popstellar/message/query/method/message" +) + +type ParamsRumor struct { + SenderID string `json:"sender_id"` + RumorID int `json:"rumor_id"` + Messages map[string][]message.Message `json:"messages"` +} + +// Rumor defines a JSON RPC rumor message +type Rumor struct { + query.Base + ID int `json:"id"` + Params ParamsRumor `json:"params"` +} diff --git a/be1-go/message/query/query.go b/be1-go/message/query/query.go index aa38afff6b..d499859c88 100644 --- a/be1-go/message/query/query.go +++ b/be1-go/message/query/query.go @@ -11,6 +11,7 @@ const ( MethodHeartbeat = "heartbeat" MethodGetMessagesById = "get_messages_by_id" MethodGreetServer = "greet_server" + MethodRumor = "rumor" ) // Base defines all the common attributes for a Query RPC message