Skip to content

Commit

Permalink
Network msg's refactor (#489)
Browse files Browse the repository at this point in the history
* ws ping pong logs

* push msg to reader

* deploy exporter stage

* deploy exporter prod

* network mediator

* stage deploy

* ibft controller disable listeners

* deploy stage

* deploy stage

* deploy prod

* mediator logs

* deploy exporter stage

* mediator default in select

* deploy exporter stage

* fix mediator resolving

* deploy stage

* fix exporter mediator

* deploy stage

* deploy prod

* exporter fix mediator handler

* deploy exporter prod

* lint & logs remove

* lint fix

* test rename
  • Loading branch information
nivBlox authored Jan 2, 2022
1 parent 921a1d4 commit 2aad2d4
Show file tree
Hide file tree
Showing 12 changed files with 285 additions and 77 deletions.
2 changes: 1 addition & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ p2p:
# TcpPort:
# UdpPort:
# mdns for local network setup
# DiscoveryType: mdns
DiscoveryType: mdns

ssv:
GenesisEpoch:
Expand Down
16 changes: 14 additions & 2 deletions exporter/api/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,23 @@ func (c *conn) ReadLoop() {
_ = c.ws.Close()
}()
c.ws.SetReadLimit(maxMessageSize)
_ = c.ws.SetReadDeadline(time.Now().Add(pongWait))
err := c.ws.SetReadDeadline(time.Now().Add(pongWait))
if err != nil {
c.logger.Error("read loop stopped by set read deadline", zap.Error(err))
return
}
c.ws.SetPongHandler(func(string) error {
// extend read limit on every pong message
// this will keep the connection alive from our POV
_ = c.ws.SetReadDeadline(time.Now().Add(pongWait))
c.logger.Debug("pong received")
err := c.ws.SetReadDeadline(time.Now().Add(pongWait))
if err != nil {
c.logger.Error("pong handler - readDeadline", zap.Error(err))
}
return err
})
c.ws.SetPingHandler(func(string) error {
c.logger.Debug("ping received")
return nil
})
for {
Expand Down
56 changes: 31 additions & 25 deletions exporter/ibft/decided_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ibft

import (
"context"
"fmt"
"github.com/bloxapp/ssv/beacon"
"github.com/bloxapp/ssv/exporter/api"
"github.com/bloxapp/ssv/ibft"
Expand Down Expand Up @@ -104,9 +105,6 @@ func (r *decidedReader) Start() error {
if err := r.waitForMinPeers(ctx, r.validatorShare.PublicKey, 1); err != nil {
return errors.Wrap(err, "could not wait for min peers")
}
cn, done := r.network.ReceivedDecidedChan()
defer done()
r.listenToNetwork(cn)
return nil
}

Expand All @@ -122,27 +120,35 @@ func (r *decidedReader) sync() error {
return err
}

func (r *decidedReader) listenToNetwork(cn <-chan *proto.SignedMessage) {
r.logger.Debug("listening to decided messages")
for msg := range cn {
if err := validateMsg(msg, string(r.identifier)); err != nil {
continue
}
logger := r.logger.With(messageFields(msg)...)
if err := validateDecidedMsg(msg, r.validatorShare); err != nil {
logger.Debug("received invalid decided message")
continue
}
go func(msg *proto.SignedMessage) {
defer logger.Debug("done with decided msg")
if saved, err := r.handleNewDecidedMessage(msg); err != nil {
if !saved {
logger.Error("could not handle decided message", zap.Error(err))
}
logger.Error("could not check highest decided", zap.Error(err))
}
}(msg)
// GetMsgResolver returns proper handler for msg based on msg type
func (r *decidedReader) GetMsgResolver(networkMsg network.NetworkMsg) func(msg *proto.SignedMessage) {
switch networkMsg {
case network.NetworkMsg_DecidedType:
return r.onMessage
}
return func(msg *proto.SignedMessage) {
r.logger.Warn(fmt.Sprintf("handler type (%s) is not supported", networkMsg))
}
}

func (r *decidedReader) onMessage(msg *proto.SignedMessage) {
if err := validateMsg(msg, r.identifier); err != nil {
return
}
logger := r.logger.With(messageFields(msg)...)
if err := validateDecidedMsg(msg, r.validatorShare); err != nil {
logger.Debug("received invalid decided message")
return
}
go func(msg *proto.SignedMessage) {
defer logger.Debug("done with decided msg")
if saved, err := r.handleNewDecidedMessage(msg); err != nil {
if !saved {
logger.Error("could not handle decided message", zap.Error(err))
}
logger.Error("could not check highest decided", zap.Error(err))
}
}(msg)
}

// handleNewDecidedMessage saves an incoming (valid) decided message
Expand Down Expand Up @@ -236,10 +242,10 @@ func validateDecidedMsg(msg *proto.SignedMessage, share *storage.Share) error {
return p.Run(msg)
}

func validateMsg(msg *proto.SignedMessage, identifier string) error {
func validateMsg(msg *proto.SignedMessage, identifier []byte) error {
p := pipeline.Combine(
auth.BasicMsgValidation(),
auth.ValidateLambdas([]byte(identifier)),
auth.ValidateLambdas(identifier),
)
return p.Run(msg)
}
Expand Down
61 changes: 33 additions & 28 deletions exporter/ibft/incoming_msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ibft

import (
"context"
"fmt"
"github.com/bloxapp/ssv/beacon"
"github.com/bloxapp/ssv/ibft/proto"
"github.com/bloxapp/ssv/network"
Expand Down Expand Up @@ -50,40 +51,44 @@ func (i *incomingMsgsReader) Start() error {
if err := i.waitForMinPeers(ctx, i.publicKey, 1); err != nil {
return errors.Wrap(err, "could not wait for min peers")
}
cn, done := i.network.ReceivedMsgChan()
defer done()
i.listenToNetwork(cn)
return nil
}

func (i *incomingMsgsReader) listenToNetwork(cn <-chan *proto.SignedMessage) {
func (i *incomingMsgsReader) GetMsgResolver(networkMsg network.NetworkMsg) func(msg *proto.SignedMessage) {
switch networkMsg {
case network.NetworkMsg_IBFTType:
return i.onMessage
}
return func(msg *proto.SignedMessage) {
i.logger.Warn(fmt.Sprintf("handler type (%s) is not supported", networkMsg))
}
}

func (i *incomingMsgsReader) onMessage(msg *proto.SignedMessage) {
identifier := format.IdentifierFormat(i.publicKey.Serialize(), beacon.RoleTypeAttester.String())
i.logger.Debug("listening to network messages")
for msg := range cn {
if msg == nil || msg.Message == nil {
i.logger.Info("received invalid msg")
continue
}
// filtering irrelevant messages
// TODO: handle other types of roles
if identifier != string(msg.Message.Lambda) {
continue
}
if msg == nil || msg.Message == nil {
i.logger.Info("received invalid msg")
return
}
// filtering irrelevant messages
// TODO: handle other types of roles
if identifier != string(msg.Message.Lambda) {
return
}

fields := messageFields(msg)
fields := messageFields(msg)

switch msg.Message.Type {
case proto.RoundState_PrePrepare:
i.logger.Info("pre-prepare msg", fields...)
case proto.RoundState_Prepare:
i.logger.Info("prepare msg", fields...)
case proto.RoundState_Commit:
i.logger.Info("commit msg", fields...)
case proto.RoundState_ChangeRound:
i.logger.Info("change round msg", fields...)
default:
i.logger.Warn("undefined message type", zap.Any("msg", msg))
}
switch msg.Message.Type {
case proto.RoundState_PrePrepare:
i.logger.Info("pre-prepare msg", fields...)
case proto.RoundState_Prepare:
i.logger.Info("prepare msg", fields...)
case proto.RoundState_Commit:
i.logger.Info("commit msg", fields...)
case proto.RoundState_ChangeRound:
i.logger.Info("change round msg", fields...)
default:
i.logger.Warn("undefined message type", zap.Any("msg", msg))
}
}

Expand Down
38 changes: 34 additions & 4 deletions exporter/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/bloxapp/ssv/exporter/api"
"github.com/bloxapp/ssv/exporter/ibft"
"github.com/bloxapp/ssv/exporter/storage"
ibftController "github.com/bloxapp/ssv/ibft/controller"
"github.com/bloxapp/ssv/ibft/proto"
"github.com/bloxapp/ssv/monitoring/metrics"
"github.com/bloxapp/ssv/network"
Expand Down Expand Up @@ -86,6 +87,8 @@ type exporter struct {
decidedReadersQueue tasks.Queue
networkReadersQueue tasks.Queue
metaDataReadersQueue tasks.Queue

networkMsgMediator ibftController.Mediator
}

// New creates a new Exporter instance
Expand All @@ -109,10 +112,13 @@ func New(opts Options) Exporter {
decidedReadersQueue: tasks.NewExecutionQueue(readerQueuesInterval),
networkReadersQueue: tasks.NewExecutionQueue(readerQueuesInterval),
metaDataReadersQueue: tasks.NewExecutionQueue(metaDataReaderQueuesInterval),
ws: opts.WS,
readersMut: sync.RWMutex{},
decidedReaders: map[string]ibft.Reader{},
netReaders: map[string]ibft.Reader{},

networkMsgMediator: ibftController.NewMediator(opts.Logger),

ws: opts.WS,
readersMut: sync.RWMutex{},
decidedReaders: map[string]ibft.Reader{},
netReaders: map[string]ibft.Reader{},
commitReader: ibft.NewCommitReader(ibft.CommitReaderOptions{
Logger: opts.Logger,
Network: opts.Network,
Expand Down Expand Up @@ -174,6 +180,8 @@ func (exp *exporter) Start() error {

go exp.startMainTopic()

exp.startNetworkMediators()

go exp.reportOperators()

return exp.ws.Start(fmt.Sprintf(":%d", exp.wsAPIPort))
Expand Down Expand Up @@ -367,3 +375,25 @@ func (exp *exporter) reportOperators() {
reportOperatorIndex(exp.logger, &operators[i])
}
}

func (exp *exporter) startNetworkMediators() {
msgChan, msgDone := exp.network.ReceivedMsgChan()
decidedChan, decidedDone := exp.network.ReceivedDecidedChan()

exp.networkMsgMediator.AddListener(network.NetworkMsg_IBFTType, msgChan, msgDone, func(publicKey string) (ibftController.MediatorReader, bool) {
exp.readersMut.Lock()
defer exp.readersMut.Unlock()
if reader, ok := exp.netReaders[publicKey]; ok {
return reader.(ibftController.MediatorReader), ok
}
return nil, false
})
exp.networkMsgMediator.AddListener(network.NetworkMsg_DecidedType, decidedChan, decidedDone, func(publicKey string) (ibftController.MediatorReader, bool) {
exp.readersMut.Lock()
defer exp.readersMut.Unlock()
if reader, ok := exp.decidedReaders[publicKey]; ok {
return reader.(ibftController.MediatorReader), ok
}
return nil, false
})
}
2 changes: 0 additions & 2 deletions ibft/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ func (i *Controller) Init() error {
i.processDecidedQueueMessages()
i.processSyncQueueMessages()
i.listenToSyncMessages()
i.listenToNetworkMessages()
i.listenToNetworkDecidedMessages()
i.initHandlers.Set(true)
i.logger.Debug("managed to setup iBFT handlers")
}
Expand Down
15 changes: 0 additions & 15 deletions ibft/controller/controller_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,6 @@ func (i *Controller) listenToNetworkMessages() {
}()
}

func (i *Controller) listenToNetworkDecidedMessages() {
decidedChan, done := i.network.ReceivedDecidedChan()
go func() {
defer done()
for msg := range decidedChan {
if msg.Message != nil && i.equalIdentifier(msg.Message.Lambda) {
i.msgQueue.AddMessage(&network.Message{
SignedMessage: msg,
Type: network.NetworkMsg_DecidedType,
})
}
}
}()
}

func (i *Controller) listenToSyncMessages() {
syncChan, done := i.network.ReceivedSyncMsgChan()
go func() {
Expand Down
52 changes: 52 additions & 0 deletions ibft/controller/mediator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package controller

import (
"github.com/bloxapp/ssv/ibft/pipeline/auth"
"github.com/bloxapp/ssv/ibft/proto"
"github.com/bloxapp/ssv/network"
"github.com/bloxapp/ssv/utils/format"
"go.uber.org/zap"
)

// MediatorReader is an interface for components that resolving network msg's
type MediatorReader interface {
GetMsgResolver(networkMsg network.NetworkMsg) func(msg *proto.SignedMessage)
}

// Mediator between network and redirect the proper msg to the proper MediatorReader
type Mediator struct {
logger *zap.Logger
}

// NewMediator returns new Mediator
func NewMediator(logger *zap.Logger) Mediator {
return Mediator{
logger: logger,
}
}

// AddListener listen to channel and use redirect func to push to the right place
func (m *Mediator) AddListener(ibftType network.NetworkMsg, ch <-chan *proto.SignedMessage, done func(), handler func(publicKey string) (MediatorReader, bool)) {
go func() {
defer done()
for msg := range ch {
m.redirect(ibftType, handler, msg)
}

m.logger.Debug("mediator stopped listening to network", zap.String("type", ibftType.String()))
}()
}

// redirect network msg to proper MediatorReader. Also validate the msg itself
func (m *Mediator) redirect(ibftType network.NetworkMsg, readerHandler func(publicKey string) (MediatorReader, bool), msg *proto.SignedMessage) {
if err := auth.BasicMsgValidation().Run(msg); err != nil {
return
}
publicKey, role := format.IdentifierUnformat(string(msg.Message.Lambda)) // TODO need to support multi role types
logger := m.logger.With(zap.String("publicKey", publicKey), zap.String("role", role), zap.String("type", ibftType.String()))
if reader, ok := readerHandler(publicKey); ok {
reader.GetMsgResolver(ibftType)(msg)
} else {
logger.Warn("failed to find validator reader")
}
}
Loading

0 comments on commit 2aad2d4

Please sign in to comment.