Skip to content

Commit

Permalink
Merge pull request #591 from bloxapp/stage
Browse files Browse the repository at this point in the history
Stage to Main (v0.2.0 preparation) Part 3
  • Loading branch information
amirylm authored Jun 15, 2022
2 parents 2ad57de + fbecb96 commit 18ab5ac
Show file tree
Hide file tree
Showing 29 changed files with 65 additions and 87 deletions.
11 changes: 1 addition & 10 deletions cli/operator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
forksv0 "github.com/bloxapp/ssv/network/forks/v0"
p2pv1 "github.com/bloxapp/ssv/network/p2p"
"github.com/bloxapp/ssv/operator"
"github.com/bloxapp/ssv/operator/duties"
"github.com/bloxapp/ssv/operator/validator"
forksprotocol "github.com/bloxapp/ssv/protocol/forks"
beaconprotocol "github.com/bloxapp/ssv/protocol/v1/blockchain/beacon"
Expand All @@ -51,9 +50,7 @@ type config struct {
NetworkPrivateKey string `yaml:"NetworkPrivateKey" env:"NETWORK_PRIVATE_KEY" env-description:"private key for network identity"`
ClearNetworkKey bool `yaml:"ClearNetworkKey" env:"CLEAR_NETWORK_KEY" env-description:"flag that turns on/off network key revocation"`

ReadOnlyMode bool `yaml:"ReadOnlyMode" env:"READ_ONLY_MODE" env-description:"a flag to turn on read only operator"`

ForkV1Epoch uint64 `yaml:"ForkV1Epoch" env:"FORKV1_EPOCH" env-description:"Target epoch for fork v1"`
ForkV1Epoch uint64 `yaml:"ForkV1Epoch" env:"FORKV1_EPOCH" env-default:"102594" env-description:"Target epoch for fork v1"`
}

var cfg config
Expand Down Expand Up @@ -160,9 +157,6 @@ var StartNodeCmd = &cobra.Command{
cfg.SSVOptions.Beacon = beaconClient
cfg.SSVOptions.ETHNetwork = eth2Network
cfg.SSVOptions.Network = p2pNet

//cfg.SSVOptions.UseMainTopic = false // which topics needs to be subscribed is determined by ssv protocol

cfg.SSVOptions.ValidatorOptions.ForkVersion = ssvForkVersion
cfg.SSVOptions.ValidatorOptions.ETHNetwork = eth2Network
cfg.SSVOptions.ValidatorOptions.Logger = Logger
Expand Down Expand Up @@ -201,9 +195,6 @@ var StartNodeCmd = &cobra.Command{

validatorCtrl := validator.NewController(cfg.SSVOptions.ValidatorOptions)
cfg.SSVOptions.ValidatorController = validatorCtrl
if cfg.ReadOnlyMode {
cfg.SSVOptions.DutyExec = duties.NewReadOnlyExecutor(Logger)
}
operatorNode = operator.New(cfg.SSVOptions)

if cfg.MetricsAPIPort > 0 {
Expand Down
2 changes: 1 addition & 1 deletion docs/resources/cov-badge.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion eth1/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func SyncEth1Events(logger *zap.Logger, client Client, storage SyncOffsetStorage
syncWg.Wait()

if len(errs) > 0 {
logger.Error("failed to handle all events from sync", zap.Any("errs", errs))
logger.Warn("failed to handle all events from sync", zap.Any("errs", errs))
return errors.New("failed to handle all events from sync")
}

Expand Down
7 changes: 0 additions & 7 deletions ibft/conversion/converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,6 @@ func ToSignedMessageV1(sm *proto.SignedMessage) (*message.SignedMessage, error)
}

func toV1ChangeRound(changeRoundData []byte) ([]byte, error) {
// TODO need to remove log once done with testing
r, err := json.Marshal(changeRoundData)
if err == nil {
logex.GetLogger().Debug("------ convert change round v0 -> v1", zap.String("data marshaled", string(r)), zap.ByteString("data byte", changeRoundData))
} else {
logex.GetLogger().Debug("------ FAILED convert change round v0 -> v1", zap.Error(err))
}
ret := &proto.ChangeRoundData{}
if err := json.Unmarshal(changeRoundData, ret); err != nil {
logex.GetLogger().Warn("failed to unmarshal v0 change round struct", zap.Error(err))
Expand Down
3 changes: 1 addition & 2 deletions network/discovery/local_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ const (
// localDiscoveryInterval is how often we re-publish our mDNS records.
localDiscoveryInterval = time.Second / 2
// LocalDiscoveryServiceTag is used in our mDNS advertisements to discover other peers
LocalDiscoveryServiceTag = "bloxstaking.ssv"
//LocalDiscoveryServiceTag = "ssv.discovery" // TODO: change
LocalDiscoveryServiceTag = "ssv.discovery"
)

// localDiscovery implements ssv_discovery.Service using mDNS and KAD-DHT
Expand Down
8 changes: 2 additions & 6 deletions network/p2p/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ import (

// Config holds the configuration options for p2p network
type Config struct {
// TODO: ENR for compatibility
// Bootnodes string `yaml:"Bootnodes" env:"BOOTNODES" env-description:"Bootnodes to use to start discovery, seperated with ';'" env-default:"enr:-LK4QMmL9hLJ1csDN4rQoSjlJGE2SvsXOETfcLH8uAVrxlHaELF0u3NeKCTY2eO_X1zy5eEKcHruyaAsGNiyyG4QWUQBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpD1pf1CAAAAAP__________gmlkgnY0gmlwhCLdu_SJc2VjcDI1NmsxoQO8KQz5L1UEXzEr-CXFFq1th0eG6gopbdul2OQVMuxfMoN0Y3CCE4iDdWRwgg-g"`
// stage enr
// Bootnodes string `yaml:"Bootnodes" env:"BOOTNODES" env-description:"Bootnodes to use to start discovery, seperated with ';'" env-default:"enr:-LK4QDAmZK-69qRU5q-cxW6BqLwIlWoYH-BoRlX2N7D9rXBlM7OJ9tWRRtryqvCW04geHC_ab8QmWT9QULnT0Tc5S1cBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpD1pf1CAAAAAP__________gmlkgnY0gmlwhArqAsGJc2VjcDI1NmsxoQO8KQz5L1UEXzEr-CXFFq1th0eG6gopbdul2OQVMuxfMoN0Y3CCE4iDdWRwgg-g"`
Bootnodes string `yaml:"Bootnodes" env:"BOOTNODES" env-description:"Bootnodes to use to start discovery, seperated with ';'" env-default:"enr:-LK4QMmL9hLJ1csDN4rQoSjlJGE2SvsXOETfcLH8uAVrxlHaELF0u3NeKCTY2eO_X1zy5eEKcHruyaAsGNiyyG4QWUQBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpD1pf1CAAAAAP__________gmlkgnY0gmlwhCLdu_SJc2VjcDI1NmsxoQO8KQz5L1UEXzEr-CXFFq1th0eG6gopbdul2OQVMuxfMoN0Y3CCE4iDdWRwgg-g"`

TCPPort int `yaml:"TcpPort" env:"TCP_PORT" env-default:"13001" env-description:"TCP port for p2p transport"`
Expand All @@ -34,8 +30,8 @@ type Config struct {
HostDNS string `yaml:"HostDNS" env:"HOST_DNS" env-description:"External DNS node is exposed for discovery"`

RequestTimeout time.Duration `yaml:"RequestTimeout" env:"P2P_REQUEST_TIMEOUT" env-default:"5s"`
MaxBatchResponse uint64 `yaml:"MaxBatchResponse" env:"P2P_MAX_BATCH_RESPONSE" env-default:"50" env-description:"Maximum number of returned objects in a batch"`
MaxPeers int `yaml:"MaxPeers" env:"P2P_MAX_PEERS" env-default:"150" env-description:"Connected peers limit for outbound connections, inbound connections can grow up to 2 times of this value"`
MaxBatchResponse uint64 `yaml:"MaxBatchResponse" env:"P2P_MAX_BATCH_RESPONSE" env-default:"25" env-description:"Maximum number of returned objects in a batch"`
MaxPeers int `yaml:"MaxPeers" env:"P2P_MAX_PEERS" env-default:"250" env-description:"Connected peers limit for outbound connections, inbound connections can grow up to 2 times of this value"`
// PubSubScoring is a flag to turn on/off pubsub scoring
PubSubScoring bool `yaml:"PubSubScoring" env:"PUBSUB_SCORING" env-description:"Flag to turn on/off pubsub scoring"`
// PubSubTrace is a flag to turn on/off pubsub tracing in logs
Expand Down
6 changes: 3 additions & 3 deletions network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ func (n *p2pNetwork) Close() error {
defer atomic.StoreInt32(&n.state, stateClosed)
n.cancel()
if err := n.disc.Close(); err != nil {
n.logger.Error("could not close discovery", zap.Error(err))
n.logger.Warn("could not close discovery", zap.Error(err))
}
if err := n.idx.Close(); err != nil {
n.logger.Error("could not close index", zap.Error(err))
n.logger.Warn("could not close index", zap.Error(err))
}
if err := n.topicsCtrl.Close(); err != nil {
n.logger.Error("could not close topics controller", zap.Error(err))
n.logger.Warn("could not close topics controller", zap.Error(err))
}
return n.host.Close()
}
Expand Down
5 changes: 3 additions & 2 deletions network/p2p/p2p_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,14 @@ func (n *p2pNetwork) Setup() error {
if err != nil {
return err
}
n.logger.Debug("p2p host was configured", zap.String("peer", n.host.ID().String()))
n.logger = n.logger.With(zap.String("selfPeer", n.host.ID().String()))
n.logger.Debug("p2p host was configured")

err = n.SetupServices()
if err != nil {
return err
}
n.logger.Debug("p2p services were configured", zap.String("peer", n.host.ID().String()))
n.logger.Debug("p2p services were configured")

return nil
}
Expand Down
4 changes: 2 additions & 2 deletions network/peers/conn_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func HandleConnections(ctx context.Context, logger *zap.Logger, handshaker Hands
// the handshake might have been triggered by the other node,
// therefore the handshake might be pending
if err == ErrIndexingInProcess || err == errHandshakeInProcess {
_logger.Debug("peer handshake already in process")
return err
//_logger.Debug("peer handshake already in process")
return nil
}
_logger.Warn("could not handshake with peer", zap.Error(err))
err := net.ClosePeer(id)
Expand Down
3 changes: 1 addition & 2 deletions network/peers/handshaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (h *handshaker) Handler() libp2pnetwork.StreamHandler {
h.logger.Warn("could not consume node info request", zap.Error(err))
return
}
h.logger.Debug("handling handshake request from peer", zap.Any("info", ni))
//h.logger.Debug("handling handshake request from peer", zap.Any("info", ni))
if !h.applyFilters(&ni) {
h.logger.Debug("filtering peer", zap.Any("info", ni))
return
Expand Down Expand Up @@ -251,7 +251,6 @@ func (h *handshaker) applyFilters(nodeInfo *records.NodeInfo) bool {
return false
}
if !ok {
h.logger.Debug("filtering peer", zap.Any("identity", nodeInfo))
return false
}
}
Expand Down
5 changes: 2 additions & 3 deletions network/streams/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (n *streamCtrl) Request(peerID peer.ID, protocol protocol.ID, data []byte)
stream := NewStream(s)
defer func() {
if err := stream.Close(); err != nil {
n.logger.Error("could not close stream", zap.Error(err))
n.logger.Warn("could not close stream", zap.Error(err))
}
}()
metricsStreamOutgoingRequests.WithLabelValues(string(protocol)).Inc()
Expand Down Expand Up @@ -86,10 +86,9 @@ func (n *streamCtrl) HandleStream(stream core.Stream) ([]byte, StreamResponder,
protocolID := stream.Protocol()
metricsStreamRequests.WithLabelValues(string(protocolID)).Inc()
logger := n.logger.With(zap.String("protocol", string(protocolID)), zap.String("streamID", streamID))
logger.Debug("handle stream", zap.Duration("timeout", n.requestTimeout))
done := func() {
if err := s.Close(); err != nil {
logger.Error("could not close stream", zap.Error(err))
logger.Warn("could not close stream", zap.Error(err))
}
}
data, err := s.ReadWithTimeout(n.requestTimeout)
Expand Down
2 changes: 0 additions & 2 deletions network/topics/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ func (ctrl *topicsCtrl) listen(sub *pubsub.Subscription) error {
continue
}
metricsPubsubInbound.WithLabelValues(ctrl.fork.GetTopicBaseName(topicName)).Inc()
//logger.Debug("got message from topic", zap.String("topic", topicName))
if err := ctrl.msgHandler(topicName, msg); err != nil {
logger.Debug("could not handle msg", zap.Error(err))
}
Expand All @@ -293,7 +292,6 @@ func (ctrl *topicsCtrl) setupTopicValidator(name string) error {
pubsub.WithValidatorConcurrency(256)) // TODO: find the best value for concurrency
// TODO: check pubsub.WithValidatorInline() and pubsub.WithValidatorTimeout()
if err != nil {
//ctrl.logger.Warn("could not register topic validator", zap.Error(err))
return errors.Wrap(err, "could not register topic validator")
}
}
Expand Down
8 changes: 0 additions & 8 deletions network/topics/gossipsub_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,3 @@ func gossipSubParam() pubsub.GossipSubParams {

return params
}

// TODO: check if needed
// We have to unfortunately set this globally in order
// to configure our message id time-cache rather than instantiating
// it with a router instance.
//func setGlobalPubSubParams() {
// pubsub.TimeCacheDuration = 550 * gsHeartbeatInterval
//}
10 changes: 10 additions & 0 deletions network/topics/msg_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"github.com/bloxapp/ssv/network/forks"
"github.com/bloxapp/ssv/protocol/v1/message"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/zap"
Expand Down Expand Up @@ -35,6 +36,15 @@ func NewSSVMsgValidator(plogger *zap.Logger, fork forks.Fork, self peer.ID) func
reportValidationResult(validationResultEncoding)
return pubsub.ValidationReject
}
// check decided topic
if msg.MsgType == message.SSVDecidedMsgType {
if decidedTopic := fork.DecidedTopic(); len(decidedTopic) > 0 {
if fork.GetTopicFullName(decidedTopic) == pmsg.GetTopic() {
reportValidationResult(validationResultValid)
return pubsub.ValidationAccept
}
}
}
topics := fork.ValidatorTopicID(msg.GetIdentifier().GetValidatorPK())
// wrong topic
if fork.GetTopicFullName(topics[0]) != pmsg.GetTopic() {
Expand Down
6 changes: 3 additions & 3 deletions operator/duties/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (dc *dutyController) ExecuteDuty(duty *beaconprotocol.Duty) error {
// force the validator to be started (subscribed to validator's topic and synced)
// TODO: handle error (return error
if err := v.Start(); err != nil {
logger.Error("could not start validator", zap.Error(err))
logger.Warn("could not start validator", zap.Error(err))
return
}
logger.Info("starting duty processing")
Expand All @@ -137,7 +137,7 @@ func (dc *dutyController) listenToTicker(slots <-chan types.Slot) {
dc.logger.Debug("slot ticker", zap.Uint64("slot", uint64(currentSlot)))
duties, err := dc.fetcher.GetDuties(uint64(currentSlot))
if err != nil {
dc.logger.Error("failed to get duties", zap.Error(err))
dc.logger.Warn("failed to get duties", zap.Error(err))
}
for i := range duties {
go dc.onDuty(&duties[i])
Expand All @@ -157,7 +157,7 @@ func (dc *dutyController) onDuty(duty *beaconprotocol.Duty) {
if dc.shouldExecute(duty) {
logger.Debug("duty was sent to execution")
if err := dc.ExecuteDuty(duty); err != nil {
logger.Error("could not dispatch duty", zap.Error(err))
logger.Warn("could not dispatch duty", zap.Error(err))
return
}
return
Expand Down
2 changes: 1 addition & 1 deletion operator/duties/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (df *dutyFetcher) GetDuties(slot uint64) ([]beacon.Duty, error) {
} else {
// epoch's duties does not exist in cache -> fetch
if err := df.updateDutiesFromBeacon(slot); err != nil {
logger.Error("failed to get duties", zap.Error(err))
logger.Warn("failed to get duties", zap.Error(err))
return nil, err
}
if raw, exist := df.cache.Get(cacheKey); exist {
Expand Down
16 changes: 8 additions & 8 deletions operator/validator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type ControllerOptions struct {
Shares []ShareOptions `yaml:"Shares"`
ShareEncryptionKeyProvider ShareEncryptionKeyProvider
CleanRegistryData bool
FullNode bool `yaml:"FullNode" env:"FORCE_HISTORY_KEY" env-default:"false" env-description:"Flag that indicates whether the node saves decided history or just the latest messages"`
FullNode bool `yaml:"FullNode" env:"FULLNODE" env-default:"false" env-description:"Flag that indicates whether the node saves decided history or just the latest messages"`
KeyManager beaconprotocol.KeyManager
OperatorPubKey string
RegistryStorage registrystorage.OperatorsCollection
Expand Down Expand Up @@ -269,10 +269,10 @@ func (c *controller) ListenToEth1Events(feed *event.Feed) {
select {
case e := <-cn:
if err := handler(*e); err != nil {
c.logger.Error("could not process ongoing eth1 event", zap.Error(err))
c.logger.Warn("could not process ongoing eth1 event", zap.Error(err))
}
case err := <-sub.Err():
c.logger.Error("event feed subscription error", zap.Error(err))
c.logger.Warn("event feed subscription error", zap.Error(err))
}
}
}
Expand Down Expand Up @@ -348,7 +348,7 @@ func (c *controller) updateValidatorsMetadata(pubKeys [][]byte) {
if len(pubKeys) > 0 {
c.logger.Debug("updating validators", zap.Int("count", len(pubKeys)))
if err := beaconprotocol.UpdateValidatorsMetadata(pubKeys, c, c.beacon, c.onMetadataUpdated); err != nil {
c.logger.Error("could not update all validators", zap.Error(err))
c.logger.Warn("could not update all validators", zap.Error(err))
}
}
}
Expand All @@ -365,7 +365,7 @@ func (c *controller) UpdateValidatorMetadata(pk string, metadata *beaconprotocol
}
_, err := c.startValidator(v)
if err != nil {
c.logger.Error("could not start validator", zap.Error(err))
c.logger.Warn("could not start validator", zap.Error(err))
}
}
return nil
Expand All @@ -391,7 +391,7 @@ func (c *controller) GetValidatorsIndices() []spec.ValidatorIndex {
return nil
})
if err != nil {
c.logger.Error("failed to get all validators public keys", zap.Error(err))
c.logger.Warn("failed to get all validators public keys", zap.Error(err))
}

go c.updateValidatorsMetadata(toFetch)
Expand All @@ -417,7 +417,7 @@ func (c *controller) onMetadataUpdated(pk string, meta *beaconprotocol.Validator
}
_, err := c.startValidator(v)
if err != nil {
c.logger.Error("could not start validator after metadata update",
c.logger.Warn("could not start validator after metadata update",
zap.String("pk", pk), zap.Error(err), zap.Any("metadata", meta))
}
}
Expand Down Expand Up @@ -522,7 +522,7 @@ func (c *controller) UpdateValidatorMetaDataLoop() {

shares, err := c.collection.GetEnabledOperatorValidatorShares(c.operatorPubKey)
if err != nil {
c.logger.Error("could not get validators shares for metadata update", zap.Error(err))
c.logger.Warn("could not get validators shares for metadata update", zap.Error(err))
continue
}
var pks [][]byte
Expand Down
5 changes: 0 additions & 5 deletions protocol/v1/message/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ import (
"bytes"
"crypto/sha256"
"encoding/json"
"github.com/bloxapp/ssv/utils/logex"
"go.uber.org/zap"

"github.com/herumi/bls-eth-go-binary/bls"
"github.com/pkg/errors"

Expand Down Expand Up @@ -427,8 +424,6 @@ func (msg *ConsensusMessage) convertToV0Root() ([]byte, error) {
return nil, errors.Wrap(err, "could not encode message")
}

logex.GetLogger().Debug("---- root ----", zap.String("r", string(marshaledRoot)))

hasher := sha256.New()
_, err = hasher.Write(marshaledRoot)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion protocol/v1/qbft/controller/controller_decided.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ func (c *Controller) processDecidedMessage(msg *message.SignedMessage) error {
if currentInstance := c.getCurrentInstance(); currentInstance != nil {
currentInstance.Stop()
}
if err := c.syncDecided(knownMsg); err != nil {
lastKnown, err := c.decidedStrategy.GetLastDecided(c.Identifier) // knownMsg can be nil in fullSync mode so need to fetch last known.
if err != nil {
logger.Error("failed to get last known decided", zap.Error(err))
}
if err := c.syncDecided(lastKnown); err != nil {
logger.Error("failed sync after decided received", zap.Error(err))
}

Expand Down
Loading

0 comments on commit 18ab5ac

Please sign in to comment.