diff --git a/cli/operator/node.go b/cli/operator/node.go index bd91a70548..f32db49ace 100644 --- a/cli/operator/node.go +++ b/cli/operator/node.go @@ -24,7 +24,6 @@ import ( forksv0 "github.com/bloxapp/ssv/network/forks/v0" p2pv1 "github.com/bloxapp/ssv/network/p2p" "github.com/bloxapp/ssv/operator" - "github.com/bloxapp/ssv/operator/duties" "github.com/bloxapp/ssv/operator/validator" forksprotocol "github.com/bloxapp/ssv/protocol/forks" beaconprotocol "github.com/bloxapp/ssv/protocol/v1/blockchain/beacon" @@ -51,9 +50,7 @@ type config struct { NetworkPrivateKey string `yaml:"NetworkPrivateKey" env:"NETWORK_PRIVATE_KEY" env-description:"private key for network identity"` ClearNetworkKey bool `yaml:"ClearNetworkKey" env:"CLEAR_NETWORK_KEY" env-description:"flag that turns on/off network key revocation"` - ReadOnlyMode bool `yaml:"ReadOnlyMode" env:"READ_ONLY_MODE" env-description:"a flag to turn on read only operator"` - - ForkV1Epoch uint64 `yaml:"ForkV1Epoch" env:"FORKV1_EPOCH" env-description:"Target epoch for fork v1"` + ForkV1Epoch uint64 `yaml:"ForkV1Epoch" env:"FORKV1_EPOCH" env-default:"102594" env-description:"Target epoch for fork v1"` } var cfg config @@ -160,9 +157,6 @@ var StartNodeCmd = &cobra.Command{ cfg.SSVOptions.Beacon = beaconClient cfg.SSVOptions.ETHNetwork = eth2Network cfg.SSVOptions.Network = p2pNet - - //cfg.SSVOptions.UseMainTopic = false // which topics needs to be subscribed is determined by ssv protocol - cfg.SSVOptions.ValidatorOptions.ForkVersion = ssvForkVersion cfg.SSVOptions.ValidatorOptions.ETHNetwork = eth2Network cfg.SSVOptions.ValidatorOptions.Logger = Logger @@ -201,9 +195,6 @@ var StartNodeCmd = &cobra.Command{ validatorCtrl := validator.NewController(cfg.SSVOptions.ValidatorOptions) cfg.SSVOptions.ValidatorController = validatorCtrl - if cfg.ReadOnlyMode { - cfg.SSVOptions.DutyExec = duties.NewReadOnlyExecutor(Logger) - } operatorNode = operator.New(cfg.SSVOptions) if cfg.MetricsAPIPort > 0 { diff --git a/docs/resources/cov-badge.svg b/docs/resources/cov-badge.svg index 95e485324d..76ca939b50 100644 --- a/docs/resources/cov-badge.svg +++ b/docs/resources/cov-badge.svg @@ -1 +1 @@ -coverage: 51.2%coverage51.2% \ No newline at end of file +coverage: 52.0%coverage52.0% \ No newline at end of file diff --git a/eth1/sync.go b/eth1/sync.go index d942615c61..2692db4c8b 100644 --- a/eth1/sync.go +++ b/eth1/sync.go @@ -84,7 +84,7 @@ func SyncEth1Events(logger *zap.Logger, client Client, storage SyncOffsetStorage syncWg.Wait() if len(errs) > 0 { - logger.Error("failed to handle all events from sync", zap.Any("errs", errs)) + logger.Warn("failed to handle all events from sync", zap.Any("errs", errs)) return errors.New("failed to handle all events from sync") } diff --git a/ibft/conversion/converters.go b/ibft/conversion/converters.go index 77ea262896..3dc1eec78d 100644 --- a/ibft/conversion/converters.go +++ b/ibft/conversion/converters.go @@ -182,13 +182,6 @@ func ToSignedMessageV1(sm *proto.SignedMessage) (*message.SignedMessage, error) } func toV1ChangeRound(changeRoundData []byte) ([]byte, error) { - // TODO need to remove log once done with testing - r, err := json.Marshal(changeRoundData) - if err == nil { - logex.GetLogger().Debug("------ convert change round v0 -> v1", zap.String("data marshaled", string(r)), zap.ByteString("data byte", changeRoundData)) - } else { - logex.GetLogger().Debug("------ FAILED convert change round v0 -> v1", zap.Error(err)) - } ret := &proto.ChangeRoundData{} if err := json.Unmarshal(changeRoundData, ret); err != nil { logex.GetLogger().Warn("failed to unmarshal v0 change round struct", zap.Error(err)) diff --git a/network/discovery/local_service.go b/network/discovery/local_service.go index c347a54f8a..9951559eca 100644 --- a/network/discovery/local_service.go +++ b/network/discovery/local_service.go @@ -19,8 +19,7 @@ const ( // localDiscoveryInterval is how often we re-publish our mDNS records. localDiscoveryInterval = time.Second / 2 // LocalDiscoveryServiceTag is used in our mDNS advertisements to discover other peers - LocalDiscoveryServiceTag = "bloxstaking.ssv" - //LocalDiscoveryServiceTag = "ssv.discovery" // TODO: change + LocalDiscoveryServiceTag = "ssv.discovery" ) // localDiscovery implements ssv_discovery.Service using mDNS and KAD-DHT diff --git a/network/p2p/config.go b/network/p2p/config.go index 1af8bb29b9..9864cee220 100644 --- a/network/p2p/config.go +++ b/network/p2p/config.go @@ -22,10 +22,6 @@ import ( // Config holds the configuration options for p2p network type Config struct { - // TODO: ENR for compatibility - // Bootnodes string `yaml:"Bootnodes" env:"BOOTNODES" env-description:"Bootnodes to use to start discovery, seperated with ';'" env-default:"enr:-LK4QMmL9hLJ1csDN4rQoSjlJGE2SvsXOETfcLH8uAVrxlHaELF0u3NeKCTY2eO_X1zy5eEKcHruyaAsGNiyyG4QWUQBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpD1pf1CAAAAAP__________gmlkgnY0gmlwhCLdu_SJc2VjcDI1NmsxoQO8KQz5L1UEXzEr-CXFFq1th0eG6gopbdul2OQVMuxfMoN0Y3CCE4iDdWRwgg-g"` - // stage enr - // Bootnodes string `yaml:"Bootnodes" env:"BOOTNODES" env-description:"Bootnodes to use to start discovery, seperated with ';'" env-default:"enr:-LK4QDAmZK-69qRU5q-cxW6BqLwIlWoYH-BoRlX2N7D9rXBlM7OJ9tWRRtryqvCW04geHC_ab8QmWT9QULnT0Tc5S1cBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpD1pf1CAAAAAP__________gmlkgnY0gmlwhArqAsGJc2VjcDI1NmsxoQO8KQz5L1UEXzEr-CXFFq1th0eG6gopbdul2OQVMuxfMoN0Y3CCE4iDdWRwgg-g"` Bootnodes string `yaml:"Bootnodes" env:"BOOTNODES" env-description:"Bootnodes to use to start discovery, seperated with ';'" env-default:"enr:-LK4QMmL9hLJ1csDN4rQoSjlJGE2SvsXOETfcLH8uAVrxlHaELF0u3NeKCTY2eO_X1zy5eEKcHruyaAsGNiyyG4QWUQBh2F0dG5ldHOIAAAAAAAAAACEZXRoMpD1pf1CAAAAAP__________gmlkgnY0gmlwhCLdu_SJc2VjcDI1NmsxoQO8KQz5L1UEXzEr-CXFFq1th0eG6gopbdul2OQVMuxfMoN0Y3CCE4iDdWRwgg-g"` TCPPort int `yaml:"TcpPort" env:"TCP_PORT" env-default:"13001" env-description:"TCP port for p2p transport"` @@ -34,8 +30,8 @@ type Config struct { HostDNS string `yaml:"HostDNS" env:"HOST_DNS" env-description:"External DNS node is exposed for discovery"` RequestTimeout time.Duration `yaml:"RequestTimeout" env:"P2P_REQUEST_TIMEOUT" env-default:"5s"` - MaxBatchResponse uint64 `yaml:"MaxBatchResponse" env:"P2P_MAX_BATCH_RESPONSE" env-default:"50" env-description:"Maximum number of returned objects in a batch"` - MaxPeers int `yaml:"MaxPeers" env:"P2P_MAX_PEERS" env-default:"150" env-description:"Connected peers limit for outbound connections, inbound connections can grow up to 2 times of this value"` + MaxBatchResponse uint64 `yaml:"MaxBatchResponse" env:"P2P_MAX_BATCH_RESPONSE" env-default:"25" env-description:"Maximum number of returned objects in a batch"` + MaxPeers int `yaml:"MaxPeers" env:"P2P_MAX_PEERS" env-default:"250" env-description:"Connected peers limit for outbound connections, inbound connections can grow up to 2 times of this value"` // PubSubScoring is a flag to turn on/off pubsub scoring PubSubScoring bool `yaml:"PubSubScoring" env:"PUBSUB_SCORING" env-description:"Flag to turn on/off pubsub scoring"` // PubSubTrace is a flag to turn on/off pubsub tracing in logs diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index 7d48f960ec..26f740fe89 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -90,13 +90,13 @@ func (n *p2pNetwork) Close() error { defer atomic.StoreInt32(&n.state, stateClosed) n.cancel() if err := n.disc.Close(); err != nil { - n.logger.Error("could not close discovery", zap.Error(err)) + n.logger.Warn("could not close discovery", zap.Error(err)) } if err := n.idx.Close(); err != nil { - n.logger.Error("could not close index", zap.Error(err)) + n.logger.Warn("could not close index", zap.Error(err)) } if err := n.topicsCtrl.Close(); err != nil { - n.logger.Error("could not close topics controller", zap.Error(err)) + n.logger.Warn("could not close topics controller", zap.Error(err)) } return n.host.Close() } diff --git a/network/p2p/p2p_setup.go b/network/p2p/p2p_setup.go index b998b7ed9e..5b01793d02 100644 --- a/network/p2p/p2p_setup.go +++ b/network/p2p/p2p_setup.go @@ -53,13 +53,14 @@ func (n *p2pNetwork) Setup() error { if err != nil { return err } - n.logger.Debug("p2p host was configured", zap.String("peer", n.host.ID().String())) + n.logger = n.logger.With(zap.String("selfPeer", n.host.ID().String())) + n.logger.Debug("p2p host was configured") err = n.SetupServices() if err != nil { return err } - n.logger.Debug("p2p services were configured", zap.String("peer", n.host.ID().String())) + n.logger.Debug("p2p services were configured") return nil } diff --git a/network/peers/conn_handler.go b/network/peers/conn_handler.go index f87edd4b5c..e0bea6b9d2 100644 --- a/network/peers/conn_handler.go +++ b/network/peers/conn_handler.go @@ -31,8 +31,8 @@ func HandleConnections(ctx context.Context, logger *zap.Logger, handshaker Hands // the handshake might have been triggered by the other node, // therefore the handshake might be pending if err == ErrIndexingInProcess || err == errHandshakeInProcess { - _logger.Debug("peer handshake already in process") - return err + //_logger.Debug("peer handshake already in process") + return nil } _logger.Warn("could not handshake with peer", zap.Error(err)) err := net.ClosePeer(id) diff --git a/network/peers/handshaker.go b/network/peers/handshaker.go index 676023c52d..9f4c9278a3 100644 --- a/network/peers/handshaker.go +++ b/network/peers/handshaker.go @@ -94,7 +94,7 @@ func (h *handshaker) Handler() libp2pnetwork.StreamHandler { h.logger.Warn("could not consume node info request", zap.Error(err)) return } - h.logger.Debug("handling handshake request from peer", zap.Any("info", ni)) + //h.logger.Debug("handling handshake request from peer", zap.Any("info", ni)) if !h.applyFilters(&ni) { h.logger.Debug("filtering peer", zap.Any("info", ni)) return @@ -251,7 +251,6 @@ func (h *handshaker) applyFilters(nodeInfo *records.NodeInfo) bool { return false } if !ok { - h.logger.Debug("filtering peer", zap.Any("identity", nodeInfo)) return false } } diff --git a/network/streams/controller.go b/network/streams/controller.go index c83d930619..6343e2256b 100644 --- a/network/streams/controller.go +++ b/network/streams/controller.go @@ -56,7 +56,7 @@ func (n *streamCtrl) Request(peerID peer.ID, protocol protocol.ID, data []byte) stream := NewStream(s) defer func() { if err := stream.Close(); err != nil { - n.logger.Error("could not close stream", zap.Error(err)) + n.logger.Warn("could not close stream", zap.Error(err)) } }() metricsStreamOutgoingRequests.WithLabelValues(string(protocol)).Inc() @@ -86,10 +86,9 @@ func (n *streamCtrl) HandleStream(stream core.Stream) ([]byte, StreamResponder, protocolID := stream.Protocol() metricsStreamRequests.WithLabelValues(string(protocolID)).Inc() logger := n.logger.With(zap.String("protocol", string(protocolID)), zap.String("streamID", streamID)) - logger.Debug("handle stream", zap.Duration("timeout", n.requestTimeout)) done := func() { if err := s.Close(); err != nil { - logger.Error("could not close stream", zap.Error(err)) + logger.Warn("could not close stream", zap.Error(err)) } } data, err := s.ReadWithTimeout(n.requestTimeout) diff --git a/network/topics/controller.go b/network/topics/controller.go index 07b3242526..c423cb3362 100644 --- a/network/topics/controller.go +++ b/network/topics/controller.go @@ -275,7 +275,6 @@ func (ctrl *topicsCtrl) listen(sub *pubsub.Subscription) error { continue } metricsPubsubInbound.WithLabelValues(ctrl.fork.GetTopicBaseName(topicName)).Inc() - //logger.Debug("got message from topic", zap.String("topic", topicName)) if err := ctrl.msgHandler(topicName, msg); err != nil { logger.Debug("could not handle msg", zap.Error(err)) } @@ -293,7 +292,6 @@ func (ctrl *topicsCtrl) setupTopicValidator(name string) error { pubsub.WithValidatorConcurrency(256)) // TODO: find the best value for concurrency // TODO: check pubsub.WithValidatorInline() and pubsub.WithValidatorTimeout() if err != nil { - //ctrl.logger.Warn("could not register topic validator", zap.Error(err)) return errors.Wrap(err, "could not register topic validator") } } diff --git a/network/topics/gossipsub_params.go b/network/topics/gossipsub_params.go index 143b50d905..4994f3e930 100644 --- a/network/topics/gossipsub_params.go +++ b/network/topics/gossipsub_params.go @@ -39,11 +39,3 @@ func gossipSubParam() pubsub.GossipSubParams { return params } - -// TODO: check if needed -// We have to unfortunately set this globally in order -// to configure our message id time-cache rather than instantiating -// it with a router instance. -//func setGlobalPubSubParams() { -// pubsub.TimeCacheDuration = 550 * gsHeartbeatInterval -//} diff --git a/network/topics/msg_validator.go b/network/topics/msg_validator.go index ba1fee848a..f357f6fdc6 100644 --- a/network/topics/msg_validator.go +++ b/network/topics/msg_validator.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "github.com/bloxapp/ssv/network/forks" + "github.com/bloxapp/ssv/protocol/v1/message" "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" "go.uber.org/zap" @@ -35,6 +36,15 @@ func NewSSVMsgValidator(plogger *zap.Logger, fork forks.Fork, self peer.ID) func reportValidationResult(validationResultEncoding) return pubsub.ValidationReject } + // check decided topic + if msg.MsgType == message.SSVDecidedMsgType { + if decidedTopic := fork.DecidedTopic(); len(decidedTopic) > 0 { + if fork.GetTopicFullName(decidedTopic) == pmsg.GetTopic() { + reportValidationResult(validationResultValid) + return pubsub.ValidationAccept + } + } + } topics := fork.ValidatorTopicID(msg.GetIdentifier().GetValidatorPK()) // wrong topic if fork.GetTopicFullName(topics[0]) != pmsg.GetTopic() { diff --git a/operator/duties/controller.go b/operator/duties/controller.go index 4af57759c8..1a1103a396 100644 --- a/operator/duties/controller.go +++ b/operator/duties/controller.go @@ -115,7 +115,7 @@ func (dc *dutyController) ExecuteDuty(duty *beaconprotocol.Duty) error { // force the validator to be started (subscribed to validator's topic and synced) // TODO: handle error (return error if err := v.Start(); err != nil { - logger.Error("could not start validator", zap.Error(err)) + logger.Warn("could not start validator", zap.Error(err)) return } logger.Info("starting duty processing") @@ -137,7 +137,7 @@ func (dc *dutyController) listenToTicker(slots <-chan types.Slot) { dc.logger.Debug("slot ticker", zap.Uint64("slot", uint64(currentSlot))) duties, err := dc.fetcher.GetDuties(uint64(currentSlot)) if err != nil { - dc.logger.Error("failed to get duties", zap.Error(err)) + dc.logger.Warn("failed to get duties", zap.Error(err)) } for i := range duties { go dc.onDuty(&duties[i]) @@ -157,7 +157,7 @@ func (dc *dutyController) onDuty(duty *beaconprotocol.Duty) { if dc.shouldExecute(duty) { logger.Debug("duty was sent to execution") if err := dc.ExecuteDuty(duty); err != nil { - logger.Error("could not dispatch duty", zap.Error(err)) + logger.Warn("could not dispatch duty", zap.Error(err)) return } return diff --git a/operator/duties/fetcher.go b/operator/duties/fetcher.go index d862038944..4cb957745f 100644 --- a/operator/duties/fetcher.go +++ b/operator/duties/fetcher.go @@ -78,7 +78,7 @@ func (df *dutyFetcher) GetDuties(slot uint64) ([]beacon.Duty, error) { } else { // epoch's duties does not exist in cache -> fetch if err := df.updateDutiesFromBeacon(slot); err != nil { - logger.Error("failed to get duties", zap.Error(err)) + logger.Warn("failed to get duties", zap.Error(err)) return nil, err } if raw, exist := df.cache.Get(cacheKey); exist { diff --git a/operator/validator/controller.go b/operator/validator/controller.go index e575121081..1b7f3f1532 100644 --- a/operator/validator/controller.go +++ b/operator/validator/controller.go @@ -55,7 +55,7 @@ type ControllerOptions struct { Shares []ShareOptions `yaml:"Shares"` ShareEncryptionKeyProvider ShareEncryptionKeyProvider CleanRegistryData bool - FullNode bool `yaml:"FullNode" env:"FORCE_HISTORY_KEY" env-default:"false" env-description:"Flag that indicates whether the node saves decided history or just the latest messages"` + FullNode bool `yaml:"FullNode" env:"FULLNODE" env-default:"false" env-description:"Flag that indicates whether the node saves decided history or just the latest messages"` KeyManager beaconprotocol.KeyManager OperatorPubKey string RegistryStorage registrystorage.OperatorsCollection @@ -269,10 +269,10 @@ func (c *controller) ListenToEth1Events(feed *event.Feed) { select { case e := <-cn: if err := handler(*e); err != nil { - c.logger.Error("could not process ongoing eth1 event", zap.Error(err)) + c.logger.Warn("could not process ongoing eth1 event", zap.Error(err)) } case err := <-sub.Err(): - c.logger.Error("event feed subscription error", zap.Error(err)) + c.logger.Warn("event feed subscription error", zap.Error(err)) } } } @@ -348,7 +348,7 @@ func (c *controller) updateValidatorsMetadata(pubKeys [][]byte) { if len(pubKeys) > 0 { c.logger.Debug("updating validators", zap.Int("count", len(pubKeys))) if err := beaconprotocol.UpdateValidatorsMetadata(pubKeys, c, c.beacon, c.onMetadataUpdated); err != nil { - c.logger.Error("could not update all validators", zap.Error(err)) + c.logger.Warn("could not update all validators", zap.Error(err)) } } } @@ -365,7 +365,7 @@ func (c *controller) UpdateValidatorMetadata(pk string, metadata *beaconprotocol } _, err := c.startValidator(v) if err != nil { - c.logger.Error("could not start validator", zap.Error(err)) + c.logger.Warn("could not start validator", zap.Error(err)) } } return nil @@ -391,7 +391,7 @@ func (c *controller) GetValidatorsIndices() []spec.ValidatorIndex { return nil }) if err != nil { - c.logger.Error("failed to get all validators public keys", zap.Error(err)) + c.logger.Warn("failed to get all validators public keys", zap.Error(err)) } go c.updateValidatorsMetadata(toFetch) @@ -417,7 +417,7 @@ func (c *controller) onMetadataUpdated(pk string, meta *beaconprotocol.Validator } _, err := c.startValidator(v) if err != nil { - c.logger.Error("could not start validator after metadata update", + c.logger.Warn("could not start validator after metadata update", zap.String("pk", pk), zap.Error(err), zap.Any("metadata", meta)) } } @@ -522,7 +522,7 @@ func (c *controller) UpdateValidatorMetaDataLoop() { shares, err := c.collection.GetEnabledOperatorValidatorShares(c.operatorPubKey) if err != nil { - c.logger.Error("could not get validators shares for metadata update", zap.Error(err)) + c.logger.Warn("could not get validators shares for metadata update", zap.Error(err)) continue } var pks [][]byte diff --git a/protocol/v1/message/consensus.go b/protocol/v1/message/consensus.go index 7f6df07084..c8330d019e 100644 --- a/protocol/v1/message/consensus.go +++ b/protocol/v1/message/consensus.go @@ -4,9 +4,6 @@ import ( "bytes" "crypto/sha256" "encoding/json" - "github.com/bloxapp/ssv/utils/logex" - "go.uber.org/zap" - "github.com/herumi/bls-eth-go-binary/bls" "github.com/pkg/errors" @@ -427,8 +424,6 @@ func (msg *ConsensusMessage) convertToV0Root() ([]byte, error) { return nil, errors.Wrap(err, "could not encode message") } - logex.GetLogger().Debug("---- root ----", zap.String("r", string(marshaledRoot))) - hasher := sha256.New() _, err = hasher.Write(marshaledRoot) if err != nil { diff --git a/protocol/v1/qbft/controller/controller_decided.go b/protocol/v1/qbft/controller/controller_decided.go index e81738963f..6b3254fbd2 100644 --- a/protocol/v1/qbft/controller/controller_decided.go +++ b/protocol/v1/qbft/controller/controller_decided.go @@ -93,7 +93,11 @@ func (c *Controller) processDecidedMessage(msg *message.SignedMessage) error { if currentInstance := c.getCurrentInstance(); currentInstance != nil { currentInstance.Stop() } - if err := c.syncDecided(knownMsg); err != nil { + lastKnown, err := c.decidedStrategy.GetLastDecided(c.Identifier) // knownMsg can be nil in fullSync mode so need to fetch last known. + if err != nil { + logger.Error("failed to get last known decided", zap.Error(err)) + } + if err := c.syncDecided(lastKnown); err != nil { logger.Error("failed sync after decided received", zap.Error(err)) } diff --git a/protocol/v1/qbft/controller/duty_execution.go b/protocol/v1/qbft/controller/duty_execution.go index 63a5d42fbe..c28157b266 100644 --- a/protocol/v1/qbft/controller/duty_execution.go +++ b/protocol/v1/qbft/controller/duty_execution.go @@ -19,13 +19,13 @@ func (c *Controller) ProcessSignatureMessage(msg *message.SignedPostConsensusMes } // validate message - if len(msg.GetSigners()) == 0 { // no KeyManager, empty sig - c.logger.Error("missing KeyManager id", zap.Any("msg", msg)) + if len(msg.GetSigners()) == 0 { + c.logger.Warn("missing signers", zap.Any("msg", msg)) return nil } //if len(msg.GetSignature()) == 0 { // no KeyManager, empty sig if len(msg.Message.DutySignature) == 0 { // TODO need to add sig to msg and not use this sig - c.logger.Error("missing sig", zap.Any("msg", msg)) + c.logger.Warn("missing duty signature", zap.Any("msg", msg)) return nil } logger := c.logger.With(zap.Uint64("signer_id", uint64(msg.GetSigners()[0]))) @@ -39,7 +39,7 @@ func (c *Controller) ProcessSignatureMessage(msg *message.SignedPostConsensusMes logger.Info("collected valid signature", zap.String("sig", hex.EncodeToString(msg.Message.DutySignature)), zap.Any("msg", msg)) if err := c.verifyPartialSignature(msg.Message.DutySignature, c.signatureState.root, msg.GetSigners()[0], c.ValidatorShare.Committee); err != nil { // TODO need to add sig to msg and not use this sig - c.logger.Error("received invalid signature", zap.Error(err)) + c.logger.Warn("received invalid signature", zap.Error(err)) return nil } diff --git a/protocol/v1/qbft/controller/proccess_message.go b/protocol/v1/qbft/controller/proccess_message.go index a5b9bc10ea..93d651c36c 100644 --- a/protocol/v1/qbft/controller/proccess_message.go +++ b/protocol/v1/qbft/controller/proccess_message.go @@ -6,7 +6,6 @@ import ( "github.com/bloxapp/ssv/protocol/v1/message" "github.com/bloxapp/ssv/protocol/v1/qbft" - "github.com/bloxapp/ssv/utils/logex" ) func (c *Controller) processConsensusMsg(signedMessage *message.SignedMessage) error { @@ -54,7 +53,7 @@ func (c *Controller) processCommitMsg(signedMessage *message.SignedMessage) (boo } } - logger := logex.GetLogger(zap.String("who", "ProcessLateCommitMsg"), + logger := c.logger.With(zap.String("who", "ProcessLateCommitMsg"), //zap.Bool("is_full_sync", c.isFullNode()), zap.Uint64("seq", uint64(signedMessage.Message.Height)), zap.String("identifier", signedMessage.Message.Identifier.String()), @@ -80,9 +79,10 @@ func (c *Controller) processCommitMsg(signedMessage *message.SignedMessage) (boo Data: data, } if err := c.network.Broadcast(ssvMsg); err != nil { - logger.Error("could not broadcast decided message", zap.Error(err)) + logger.Warn("could not broadcast decided message", zap.Error(err)) + } else { + logger.Debug("updated decided was broadcasted") } - logger.Debug("updated decided was broadcasted") qbft.ReportDecided(c.ValidatorShare.PublicKey.SerializeToHexStr(), updated) } return true, nil diff --git a/protocol/v1/qbft/controller/signature.go b/protocol/v1/qbft/controller/signature.go index cf5944e706..b24927864f 100644 --- a/protocol/v1/qbft/controller/signature.go +++ b/protocol/v1/qbft/controller/signature.go @@ -77,7 +77,7 @@ func (s *SignatureState) start(logger *zap.Logger, height message.Height, signat logger.Debug("signatures were collected before timeout", zap.Int("received", len(s.signatures))) return } - logger.Error("could not process post consensus signature", zap.Error(errors.Errorf("timed out waiting for post consensus signatures, received %d", len(s.signatures)))) + logger.Warn("could not process post consensus signature", zap.Error(errors.Errorf("timed out waiting for post consensus signatures, received %d", len(s.signatures)))) }) //s.timer = time.NewTimer(s.SignatureCollectionTimeout) s.state.Store(StateRunning) diff --git a/protocol/v1/qbft/instance/change_round.go b/protocol/v1/qbft/instance/change_round.go index 40094da67c..8e021f7e9b 100644 --- a/protocol/v1/qbft/instance/change_round.go +++ b/protocol/v1/qbft/instance/change_round.go @@ -252,7 +252,7 @@ func (i *Instance) HighestPrepared(round message.Round) (notPrepared bool, highe } // compare to highest found - if candidateChangeData.GetPreparedValue() != nil { + if candidateChangeData.GetPreparedValue() != nil && len(candidateChangeData.GetPreparedValue()) > 0 { notPrepared = false if highestPrepared != nil { if candidateChangeData.GetPreparedRound() > highestPrepared.GetPreparedRound() { diff --git a/protocol/v1/qbft/instance/instance.go b/protocol/v1/qbft/instance/instance.go index 2a725063dc..426c2541cf 100644 --- a/protocol/v1/qbft/instance/instance.go +++ b/protocol/v1/qbft/instance/instance.go @@ -182,12 +182,12 @@ func (i *Instance) Start(inputValue []byte) error { msg, err := i.generatePrePrepareMessage(i.State().GetInputValue()) if err != nil { - i.Logger.Error("failed to generate pre-prepare message", zap.Error(err)) + i.Logger.Warn("failed to generate pre-prepare message", zap.Error(err)) return } if err := i.SignAndBroadcast(&msg); err != nil { - i.Logger.Fatal("could not broadcast pre-prepare", zap.Error(err)) + i.Logger.Error("could not broadcast pre-prepare", zap.Error(err)) } }() } diff --git a/protocol/v1/qbft/instance/prepare.go b/protocol/v1/qbft/instance/prepare.go index ce363be192..599d41d7e1 100644 --- a/protocol/v1/qbft/instance/prepare.go +++ b/protocol/v1/qbft/instance/prepare.go @@ -2,6 +2,7 @@ package instance import ( "bytes" + "encoding/hex" "github.com/bloxapp/ssv/protocol/v1/qbft/pipelines" "github.com/bloxapp/ssv/protocol/v1/message" @@ -54,6 +55,7 @@ func (i *Instance) PreparedAggregatedMsg() (*message.SignedMessage, error) { continue } if !bytes.Equal(p.Data, i.State().GetPreparedValue()) { + i.Logger.Warn("prepared value is not equal to prepare data", zap.String("stateValue", hex.EncodeToString(i.State().GetPreparedValue())), zap.String("prepareData", hex.EncodeToString(p.Data))) continue } if ret == nil { diff --git a/protocol/v1/qbft/instance/prepare_test.go b/protocol/v1/qbft/instance/prepare_test.go index bae66ec135..40ba378c03 100644 --- a/protocol/v1/qbft/instance/prepare_test.go +++ b/protocol/v1/qbft/instance/prepare_test.go @@ -1,6 +1,7 @@ package instance import ( + "go.uber.org/zap" "testing" "github.com/stretchr/testify/require" @@ -21,7 +22,8 @@ func TestPreparedAggregatedMsg(t *testing.T) { Committee: nodes, NodeID: 1, }, - state: &qbft.State{}, + state: &qbft.State{}, + Logger: zap.L(), } instance.state.Round.Store(message.Round(1)) diff --git a/protocol/v1/sync/commons.go b/protocol/v1/sync/commons.go index 7ec609bf1d..826941c06d 100644 --- a/protocol/v1/sync/commons.go +++ b/protocol/v1/sync/commons.go @@ -19,11 +19,9 @@ func GetHighest(logger *zap.Logger, remoteMsgs ...p2pprotocol.SyncResult) (highe continue } if sm == nil { - logger.Debug("sync message not found") continue } if len(sm.Data) == 0 { - logger.Warn("empty sync message") continue } signedMsg := sm.Data[0] diff --git a/protocol/v1/sync/handlers/last_change_round.go b/protocol/v1/sync/handlers/last_change_round.go index cdc75612ca..27fbda4a1b 100644 --- a/protocol/v1/sync/handlers/last_change_round.go +++ b/protocol/v1/sync/handlers/last_change_round.go @@ -1,10 +1,10 @@ package handlers import ( + "fmt" "github.com/bloxapp/ssv/protocol/v1/message" protocolp2p "github.com/bloxapp/ssv/protocol/v1/p2p" qbftstorage "github.com/bloxapp/ssv/protocol/v1/qbft/storage" - "github.com/bloxapp/ssv/utils/logex" "github.com/pkg/errors" "go.uber.org/zap" ) @@ -14,7 +14,7 @@ import ( func LastChangeRoundHandler(plogger *zap.Logger, store qbftstorage.ChangeRoundStore, reporting protocolp2p.ValidationReporting) protocolp2p.RequestHandler { //plogger = plogger.With(zap.String("who", "last decided handler")) return func(msg *message.SSVMessage) (*message.SSVMessage, error) { - //logger := plogger.With(zap.String("msg_id_hex", fmt.Sprintf("%x", msg.ID))) + logger := plogger.With(zap.String("msg_id_hex", fmt.Sprintf("%x", msg.ID))) sm := &message.SyncMessage{} err := sm.Decode(msg.Data) if err != nil { @@ -27,9 +27,9 @@ func LastChangeRoundHandler(plogger *zap.Logger, store qbftstorage.ChangeRoundSt } else { res, err := store.GetLastChangeRoundMsg(msg.ID) if err != nil { - logex.GetLogger().Warn("change round sync msg error", zap.Error(err)) + logger.Warn("change round sync msg error", zap.Error(err)) } - plogger.Debug("last change round handler", zap.Any("msgs", res), zap.Error(err)) + logger.Debug("last change round handler", zap.Any("msgs", res), zap.Error(err)) sm.UpdateResults(err, res) } diff --git a/protocol/v1/sync/lastdecided/fetcher.go b/protocol/v1/sync/lastdecided/fetcher.go index 6fab9359fc..1933a7b18a 100644 --- a/protocol/v1/sync/lastdecided/fetcher.go +++ b/protocol/v1/sync/lastdecided/fetcher.go @@ -67,7 +67,6 @@ func (l *lastDecidedFetcher) GetLastDecided(pctx context.Context, identifier mes highest, sender = sync.GetHighest(l.logger, remoteMsgs...) if highest == nil { - logger.Debug("remote highest decided not found", zap.Int("retryNumber", retries)) continue } }