diff --git a/config/config.yaml b/config/config.yaml index ed0d9ceffa..2469c5a02a 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -18,7 +18,7 @@ p2p: # TcpPort: # UdpPort: # mdns for local network setup -# DiscoveryType: mdns + DiscoveryType: mdns ssv: GenesisEpoch: diff --git a/exporter/api/conn.go b/exporter/api/conn.go index 0e947ac76d..76be3fb8c1 100644 --- a/exporter/api/conn.go +++ b/exporter/api/conn.go @@ -153,11 +153,23 @@ func (c *conn) ReadLoop() { _ = c.ws.Close() }() c.ws.SetReadLimit(maxMessageSize) - _ = c.ws.SetReadDeadline(time.Now().Add(pongWait)) + err := c.ws.SetReadDeadline(time.Now().Add(pongWait)) + if err != nil { + c.logger.Error("read loop stopped by set read deadline", zap.Error(err)) + return + } c.ws.SetPongHandler(func(string) error { // extend read limit on every pong message // this will keep the connection alive from our POV - _ = c.ws.SetReadDeadline(time.Now().Add(pongWait)) + c.logger.Debug("pong received") + err := c.ws.SetReadDeadline(time.Now().Add(pongWait)) + if err != nil { + c.logger.Error("pong handler - readDeadline", zap.Error(err)) + } + return err + }) + c.ws.SetPingHandler(func(string) error { + c.logger.Debug("ping received") return nil }) for { diff --git a/exporter/ibft/decided_reader.go b/exporter/ibft/decided_reader.go index 7fe3f1a127..0a9f7e8fe6 100644 --- a/exporter/ibft/decided_reader.go +++ b/exporter/ibft/decided_reader.go @@ -2,6 +2,7 @@ package ibft import ( "context" + "fmt" "github.com/bloxapp/ssv/beacon" "github.com/bloxapp/ssv/exporter/api" "github.com/bloxapp/ssv/ibft" @@ -104,9 +105,6 @@ func (r *decidedReader) Start() error { if err := r.waitForMinPeers(ctx, r.validatorShare.PublicKey, 1); err != nil { return errors.Wrap(err, "could not wait for min peers") } - cn, done := r.network.ReceivedDecidedChan() - defer done() - r.listenToNetwork(cn) return nil } @@ -122,27 +120,35 @@ func (r *decidedReader) sync() error { return err } -func (r *decidedReader) listenToNetwork(cn <-chan *proto.SignedMessage) { - r.logger.Debug("listening to decided messages") - for msg := range cn { - if err := validateMsg(msg, string(r.identifier)); err != nil { - continue - } - logger := r.logger.With(messageFields(msg)...) - if err := validateDecidedMsg(msg, r.validatorShare); err != nil { - logger.Debug("received invalid decided message") - continue - } - go func(msg *proto.SignedMessage) { - defer logger.Debug("done with decided msg") - if saved, err := r.handleNewDecidedMessage(msg); err != nil { - if !saved { - logger.Error("could not handle decided message", zap.Error(err)) - } - logger.Error("could not check highest decided", zap.Error(err)) - } - }(msg) +// GetMsgResolver returns proper handler for msg based on msg type +func (r *decidedReader) GetMsgResolver(networkMsg network.NetworkMsg) func(msg *proto.SignedMessage) { + switch networkMsg { + case network.NetworkMsg_DecidedType: + return r.onMessage } + return func(msg *proto.SignedMessage) { + r.logger.Warn(fmt.Sprintf("handler type (%s) is not supported", networkMsg)) + } +} + +func (r *decidedReader) onMessage(msg *proto.SignedMessage) { + if err := validateMsg(msg, r.identifier); err != nil { + return + } + logger := r.logger.With(messageFields(msg)...) + if err := validateDecidedMsg(msg, r.validatorShare); err != nil { + logger.Debug("received invalid decided message") + return + } + go func(msg *proto.SignedMessage) { + defer logger.Debug("done with decided msg") + if saved, err := r.handleNewDecidedMessage(msg); err != nil { + if !saved { + logger.Error("could not handle decided message", zap.Error(err)) + } + logger.Error("could not check highest decided", zap.Error(err)) + } + }(msg) } // handleNewDecidedMessage saves an incoming (valid) decided message @@ -236,10 +242,10 @@ func validateDecidedMsg(msg *proto.SignedMessage, share *storage.Share) error { return p.Run(msg) } -func validateMsg(msg *proto.SignedMessage, identifier string) error { +func validateMsg(msg *proto.SignedMessage, identifier []byte) error { p := pipeline.Combine( auth.BasicMsgValidation(), - auth.ValidateLambdas([]byte(identifier)), + auth.ValidateLambdas(identifier), ) return p.Run(msg) } diff --git a/exporter/ibft/incoming_msgs.go b/exporter/ibft/incoming_msgs.go index 70f8a7d186..15341ef866 100644 --- a/exporter/ibft/incoming_msgs.go +++ b/exporter/ibft/incoming_msgs.go @@ -2,6 +2,7 @@ package ibft import ( "context" + "fmt" "github.com/bloxapp/ssv/beacon" "github.com/bloxapp/ssv/ibft/proto" "github.com/bloxapp/ssv/network" @@ -50,40 +51,44 @@ func (i *incomingMsgsReader) Start() error { if err := i.waitForMinPeers(ctx, i.publicKey, 1); err != nil { return errors.Wrap(err, "could not wait for min peers") } - cn, done := i.network.ReceivedMsgChan() - defer done() - i.listenToNetwork(cn) return nil } -func (i *incomingMsgsReader) listenToNetwork(cn <-chan *proto.SignedMessage) { +func (i *incomingMsgsReader) GetMsgResolver(networkMsg network.NetworkMsg) func(msg *proto.SignedMessage) { + switch networkMsg { + case network.NetworkMsg_IBFTType: + return i.onMessage + } + return func(msg *proto.SignedMessage) { + i.logger.Warn(fmt.Sprintf("handler type (%s) is not supported", networkMsg)) + } +} + +func (i *incomingMsgsReader) onMessage(msg *proto.SignedMessage) { identifier := format.IdentifierFormat(i.publicKey.Serialize(), beacon.RoleTypeAttester.String()) - i.logger.Debug("listening to network messages") - for msg := range cn { - if msg == nil || msg.Message == nil { - i.logger.Info("received invalid msg") - continue - } - // filtering irrelevant messages - // TODO: handle other types of roles - if identifier != string(msg.Message.Lambda) { - continue - } + if msg == nil || msg.Message == nil { + i.logger.Info("received invalid msg") + return + } + // filtering irrelevant messages + // TODO: handle other types of roles + if identifier != string(msg.Message.Lambda) { + return + } - fields := messageFields(msg) + fields := messageFields(msg) - switch msg.Message.Type { - case proto.RoundState_PrePrepare: - i.logger.Info("pre-prepare msg", fields...) - case proto.RoundState_Prepare: - i.logger.Info("prepare msg", fields...) - case proto.RoundState_Commit: - i.logger.Info("commit msg", fields...) - case proto.RoundState_ChangeRound: - i.logger.Info("change round msg", fields...) - default: - i.logger.Warn("undefined message type", zap.Any("msg", msg)) - } + switch msg.Message.Type { + case proto.RoundState_PrePrepare: + i.logger.Info("pre-prepare msg", fields...) + case proto.RoundState_Prepare: + i.logger.Info("prepare msg", fields...) + case proto.RoundState_Commit: + i.logger.Info("commit msg", fields...) + case proto.RoundState_ChangeRound: + i.logger.Info("change round msg", fields...) + default: + i.logger.Warn("undefined message type", zap.Any("msg", msg)) } } diff --git a/exporter/node.go b/exporter/node.go index a1ae2613b0..ba9d341011 100644 --- a/exporter/node.go +++ b/exporter/node.go @@ -9,6 +9,7 @@ import ( "github.com/bloxapp/ssv/exporter/api" "github.com/bloxapp/ssv/exporter/ibft" "github.com/bloxapp/ssv/exporter/storage" + ibftController "github.com/bloxapp/ssv/ibft/controller" "github.com/bloxapp/ssv/ibft/proto" "github.com/bloxapp/ssv/monitoring/metrics" "github.com/bloxapp/ssv/network" @@ -86,6 +87,8 @@ type exporter struct { decidedReadersQueue tasks.Queue networkReadersQueue tasks.Queue metaDataReadersQueue tasks.Queue + + networkMsgMediator ibftController.Mediator } // New creates a new Exporter instance @@ -109,10 +112,13 @@ func New(opts Options) Exporter { decidedReadersQueue: tasks.NewExecutionQueue(readerQueuesInterval), networkReadersQueue: tasks.NewExecutionQueue(readerQueuesInterval), metaDataReadersQueue: tasks.NewExecutionQueue(metaDataReaderQueuesInterval), - ws: opts.WS, - readersMut: sync.RWMutex{}, - decidedReaders: map[string]ibft.Reader{}, - netReaders: map[string]ibft.Reader{}, + + networkMsgMediator: ibftController.NewMediator(opts.Logger), + + ws: opts.WS, + readersMut: sync.RWMutex{}, + decidedReaders: map[string]ibft.Reader{}, + netReaders: map[string]ibft.Reader{}, commitReader: ibft.NewCommitReader(ibft.CommitReaderOptions{ Logger: opts.Logger, Network: opts.Network, @@ -174,6 +180,8 @@ func (exp *exporter) Start() error { go exp.startMainTopic() + exp.startNetworkMediators() + go exp.reportOperators() return exp.ws.Start(fmt.Sprintf(":%d", exp.wsAPIPort)) @@ -367,3 +375,25 @@ func (exp *exporter) reportOperators() { reportOperatorIndex(exp.logger, &operators[i]) } } + +func (exp *exporter) startNetworkMediators() { + msgChan, msgDone := exp.network.ReceivedMsgChan() + decidedChan, decidedDone := exp.network.ReceivedDecidedChan() + + exp.networkMsgMediator.AddListener(network.NetworkMsg_IBFTType, msgChan, msgDone, func(publicKey string) (ibftController.MediatorReader, bool) { + exp.readersMut.Lock() + defer exp.readersMut.Unlock() + if reader, ok := exp.netReaders[publicKey]; ok { + return reader.(ibftController.MediatorReader), ok + } + return nil, false + }) + exp.networkMsgMediator.AddListener(network.NetworkMsg_DecidedType, decidedChan, decidedDone, func(publicKey string) (ibftController.MediatorReader, bool) { + exp.readersMut.Lock() + defer exp.readersMut.Unlock() + if reader, ok := exp.decidedReaders[publicKey]; ok { + return reader.(ibftController.MediatorReader), ok + } + return nil, false + }) +} diff --git a/ibft/controller/controller.go b/ibft/controller/controller.go index f95e968a6d..298be47efa 100644 --- a/ibft/controller/controller.go +++ b/ibft/controller/controller.go @@ -97,8 +97,6 @@ func (i *Controller) Init() error { i.processDecidedQueueMessages() i.processSyncQueueMessages() i.listenToSyncMessages() - i.listenToNetworkMessages() - i.listenToNetworkDecidedMessages() i.initHandlers.Set(true) i.logger.Debug("managed to setup iBFT handlers") } diff --git a/ibft/controller/controller_network.go b/ibft/controller/controller_network.go index 69c5c04c41..4d543f3607 100644 --- a/ibft/controller/controller_network.go +++ b/ibft/controller/controller_network.go @@ -50,21 +50,6 @@ func (i *Controller) listenToNetworkMessages() { }() } -func (i *Controller) listenToNetworkDecidedMessages() { - decidedChan, done := i.network.ReceivedDecidedChan() - go func() { - defer done() - for msg := range decidedChan { - if msg.Message != nil && i.equalIdentifier(msg.Message.Lambda) { - i.msgQueue.AddMessage(&network.Message{ - SignedMessage: msg, - Type: network.NetworkMsg_DecidedType, - }) - } - } - }() -} - func (i *Controller) listenToSyncMessages() { syncChan, done := i.network.ReceivedSyncMsgChan() go func() { diff --git a/ibft/controller/mediator.go b/ibft/controller/mediator.go new file mode 100644 index 0000000000..54b5bebe3d --- /dev/null +++ b/ibft/controller/mediator.go @@ -0,0 +1,52 @@ +package controller + +import ( + "github.com/bloxapp/ssv/ibft/pipeline/auth" + "github.com/bloxapp/ssv/ibft/proto" + "github.com/bloxapp/ssv/network" + "github.com/bloxapp/ssv/utils/format" + "go.uber.org/zap" +) + +// MediatorReader is an interface for components that resolving network msg's +type MediatorReader interface { + GetMsgResolver(networkMsg network.NetworkMsg) func(msg *proto.SignedMessage) +} + +// Mediator between network and redirect the proper msg to the proper MediatorReader +type Mediator struct { + logger *zap.Logger +} + +// NewMediator returns new Mediator +func NewMediator(logger *zap.Logger) Mediator { + return Mediator{ + logger: logger, + } +} + +// AddListener listen to channel and use redirect func to push to the right place +func (m *Mediator) AddListener(ibftType network.NetworkMsg, ch <-chan *proto.SignedMessage, done func(), handler func(publicKey string) (MediatorReader, bool)) { + go func() { + defer done() + for msg := range ch { + m.redirect(ibftType, handler, msg) + } + + m.logger.Debug("mediator stopped listening to network", zap.String("type", ibftType.String())) + }() +} + +// redirect network msg to proper MediatorReader. Also validate the msg itself +func (m *Mediator) redirect(ibftType network.NetworkMsg, readerHandler func(publicKey string) (MediatorReader, bool), msg *proto.SignedMessage) { + if err := auth.BasicMsgValidation().Run(msg); err != nil { + return + } + publicKey, role := format.IdentifierUnformat(string(msg.Message.Lambda)) // TODO need to support multi role types + logger := m.logger.With(zap.String("publicKey", publicKey), zap.String("role", role), zap.String("type", ibftType.String())) + if reader, ok := readerHandler(publicKey); ok { + reader.GetMsgResolver(ibftType)(msg) + } else { + logger.Warn("failed to find validator reader") + } +} diff --git a/ibft/controller/mediator_test.go b/ibft/controller/mediator_test.go new file mode 100644 index 0000000000..cc14a600fb --- /dev/null +++ b/ibft/controller/mediator_test.go @@ -0,0 +1,73 @@ +package controller + +import ( + "encoding/binary" + "fmt" + "github.com/bloxapp/ssv/ibft/proto" + "github.com/bloxapp/ssv/network" + "github.com/bloxapp/ssv/storage/basedb" + "github.com/bloxapp/ssv/storage/kv" + "github.com/bloxapp/ssv/utils/logex" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" + "testing" + "time" +) + +type reader struct { + db basedb.IDb +} + +func init() { + logex.Build("", zapcore.DebugLevel, nil) +} + +func (r reader) GetMsgResolver(networkMsg network.NetworkMsg) func(msg *proto.SignedMessage) { + return func(msg *proto.SignedMessage) { + seq := make([]byte, 8) + binary.LittleEndian.PutUint64(seq, msg.Message.SeqNumber) + if err := r.db.Set([]byte("test"), seq, []byte("val")); err != nil { + panic(fmt.Errorf("faild to set value - %s", err)) + } + } +} + +func TestMediator_AddListener(t *testing.T) { + db, err := kv.New(basedb.Options{ + Type: "badger-memory", + Path: "", + Logger: logex.GetLogger(), + }) + require.NoError(t, err) + + mediator := NewMediator(logex.GetLogger()) + reader := &reader{db: db} + + cn := make(chan *proto.SignedMessage) + mediator.AddListener(network.NetworkMsg_DecidedType, cn, func() { + close(cn) + }, func(publicKey string) (MediatorReader, bool) { + return reader, true + }) + + for i := 1; i < 5; i++ { + cn <- &proto.SignedMessage{ + Message: &proto.Message{ + Type: 0, + Round: 0, + Lambda: nil, + SeqNumber: uint64(i), + Value: nil, + }, + Signature: nil, + SignerIds: nil, + } + } + + time.Sleep(time.Millisecond * 100) + objs, err := db.GetAllByCollection([]byte("test")) + require.NoError(t, err) + require.Equal(t, 4, len(objs)) + require.Equal(t, uint64(1), binary.LittleEndian.Uint64(objs[0].Key)) + require.Equal(t, uint64(4), binary.LittleEndian.Uint64(objs[3].Key)) +} diff --git a/operator/node.go b/operator/node.go index 5f15800409..71eb50544e 100644 --- a/operator/node.go +++ b/operator/node.go @@ -101,6 +101,7 @@ func (n *operatorNode) init(opts Options) error { func (n *operatorNode) Start() error { n.logger.Info("All required services are ready. OPERATOR SUCCESSFULLY CONFIGURED AND NOW RUNNING!") n.validatorsCtrl.StartValidators() + n.validatorsCtrl.StartNetworkMediators() if err := tasks.Retry(n.net.SubscribeToMainTopic, 3); err != nil { n.logger.Error("failed to subscribe to main topic", zap.Error(err)) } diff --git a/validator/controller.go b/validator/controller.go index 6b798c8d7e..e822ffcc98 100644 --- a/validator/controller.go +++ b/validator/controller.go @@ -7,6 +7,7 @@ import ( "github.com/bloxapp/ssv/beacon" "github.com/bloxapp/ssv/eth1" "github.com/bloxapp/ssv/eth1/abiparser" + controller2 "github.com/bloxapp/ssv/ibft/controller" "github.com/bloxapp/ssv/network" "github.com/bloxapp/ssv/operator/forks" "github.com/bloxapp/ssv/storage/basedb" @@ -52,6 +53,7 @@ type Controller interface { GetValidatorsIndices() []spec.ValidatorIndex GetValidator(pubKey string) (*Validator, bool) UpdateValidatorMetaDataLoop() + StartNetworkMediators() } // controller implements Controller @@ -68,6 +70,8 @@ type controller struct { metadataUpdateQueue tasks.Queue metadataUpdateInterval time.Duration + + networkMediator controller2.Mediator } // NewController creates a new validator controller instance @@ -100,6 +104,8 @@ func NewController(options ControllerOptions) Controller { metadataUpdateQueue: tasks.NewExecutionQueue(10 * time.Millisecond), metadataUpdateInterval: options.MetadataUpdateInterval, + + networkMediator: controller2.NewMediator(options.Logger), } if err := ctrl.initShares(options); err != nil { @@ -182,6 +188,18 @@ func (c *controller) setupValidators(shares []*validatorstorage.Share) { go c.updateValidatorsMetadata(fetchMetadata) } +func (c *controller) StartNetworkMediators() { + msgChan, msgDone := c.validatorsMap.optsTemplate.Network.ReceivedMsgChan() + decidedChan, decidedDone := c.validatorsMap.optsTemplate.Network.ReceivedDecidedChan() + + c.networkMediator.AddListener(network.NetworkMsg_IBFTType, msgChan, msgDone, c.getReader) + c.networkMediator.AddListener(network.NetworkMsg_DecidedType, decidedChan, decidedDone, c.getReader) +} + +func (c *controller) getReader(publicKey string) (controller2.MediatorReader, bool) { + return c.validatorsMap.GetValidator(publicKey) +} + // updateValidatorsMetadata updates metadata of the given public keys. // as part of the flow in beacon.UpdateValidatorsMetadata, // UpdateValidatorMetadata is called to persist metadata and start a specific validator diff --git a/validator/validator.go b/validator/validator.go index 987e22042b..7d35246c19 100644 --- a/validator/validator.go +++ b/validator/validator.go @@ -3,6 +3,7 @@ package validator import ( "bytes" "context" + "fmt" spec "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/bloxapp/eth2-key-manager/core" "github.com/bloxapp/ssv/beacon" @@ -144,6 +145,33 @@ func (v *Validator) getSlotStartTime(slot uint64) time.Time { return start } +// GetMsgResolver returns proper handler for msg based on msg type +func (v *Validator) GetMsgResolver(networkMsg network.NetworkMsg) func(msg *proto.SignedMessage) { + switch networkMsg { + case network.NetworkMsg_IBFTType: + return v.listenToNetworkMessages + case network.NetworkMsg_DecidedType: + return v.listenToNetworkDecidedMessages + } + return func(msg *proto.SignedMessage) { + v.logger.Warn(fmt.Sprintf("handler type (%s) is not supported", networkMsg)) + } +} + +func (v *Validator) listenToNetworkMessages(msg *proto.SignedMessage) { + v.msgQueue.AddMessage(&network.Message{ + SignedMessage: msg, + Type: network.NetworkMsg_IBFTType, + }) +} + +func (v *Validator) listenToNetworkDecidedMessages(msg *proto.SignedMessage) { + v.msgQueue.AddMessage(&network.Message{ + SignedMessage: msg, + Type: network.NetworkMsg_DecidedType, + }) +} + func setupIbftController( role beacon.RoleType, logger *zap.Logger,