From d81cd6efac2c59f9ad05ae0580fa550d0bef5086 Mon Sep 17 00:00:00 2001 From: Giulio rebuffo Date: Wed, 8 Jan 2025 12:44:24 +0100 Subject: [PATCH] Caplin: Avoid republishing in `GossipManager` (#13314) Basically libp2p handles that automatically and I did not know --- cl/beacon/handler/pool.go | 100 +++++++++++------- cl/beacon/handler/utils_test.go | 11 +- cl/cltypes/aggregate.go | 12 --- cl/cltypes/bls_to_execution_change.go | 8 -- cl/cltypes/contribution.go | 14 --- cl/cltypes/validator.go | 8 -- cl/phase1/network/gossip_manager.go | 63 +++++------ .../services/aggregate_and_proof_service.go | 20 +++- .../aggregate_and_proof_service_test.go | 5 +- .../network/services/attestation_service.go | 14 +-- .../services/attestation_service_test.go | 2 +- .../services/batch_signature_verification.go | 32 ++---- .../bls_to_execution_change_service.go | 18 +++- .../bls_to_execution_change_service_test.go | 5 +- cl/phase1/network/services/interface.go | 12 +-- .../aggregate_and_proof_service_mock.go | 15 ++- .../mock_services/attestation_service_mock.go | 13 ++- .../blob_sidecars_service_mock.go | 9 +- .../mock_services/block_service_mock.go | 9 +- .../bls_to_execution_change_service_mock.go | 15 ++- .../proposer_slashing_service_mock.go | 9 +- .../sync_committee_messages_service_mock.go | 15 ++- .../sync_contribution_service_mock.go | 15 ++- .../voluntary_exit_service_mock.go | 15 ++- .../sync_committee_messages_service.go | 17 ++- .../sync_committee_messages_service_test.go | 5 +- .../services/sync_contribution_service.go | 14 ++- .../sync_contribution_service_test.go | 5 +- .../services/voluntary_exit_service.go | 18 +++- .../services/voluntary_exit_service_test.go | 8 +- 30 files changed, 249 insertions(+), 257 deletions(-) diff --git a/cl/beacon/handler/pool.go b/cl/beacon/handler/pool.go index ce8e793fdeb..368bc155f33 100644 --- a/cl/beacon/handler/pool.go +++ b/cl/beacon/handler/pool.go @@ -126,13 +126,8 @@ func (a *ApiHandler) PostEthV1BeaconPoolAttestations(w http.ResponseWriter, r *h beaconhttp.NewEndpointError(http.StatusInternalServerError, err).WriteTo(w) return } - attestationWithGossipData := &services.AttestationWithGossipData{ - Attestation: attestation, - GossipData: &sentinel.GossipData{ - Data: encodedSSZ, - Name: gossip.TopicNamePrefixBeaconAttestation, - SubnetId: &subnet, - }, + attestationWithGossipData := &services.AttestationForGossip{ + Attestation: attestation, ImmediateProcess: true, // we want to process attestation immediately } @@ -144,6 +139,16 @@ func (a *ApiHandler) PostEthV1BeaconPoolAttestations(w http.ResponseWriter, r *h }) continue } + if a.sentinel != nil { + if _, err := a.sentinel.PublishGossip(r.Context(), &sentinel.GossipData{ + Data: encodedSSZ, + Name: gossip.TopicNamePrefixBeaconAttestation, + SubnetId: &subnet, + }); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } } if len(failures) > 0 { errResp := poolingError{ @@ -173,11 +178,7 @@ func (a *ApiHandler) PostEthV1BeaconPoolVoluntaryExits(w http.ResponseWriter, r return } - if err := a.voluntaryExitService.ProcessMessage(r.Context(), nil, &cltypes.SignedVoluntaryExitWithGossipData{ - GossipData: &sentinel.GossipData{ - Data: encodedSSZ, - Name: gossip.TopicNameVoluntaryExit, - }, + if err := a.voluntaryExitService.ProcessMessage(r.Context(), nil, &services.SignedVoluntaryExitForGossip{ SignedVoluntaryExit: &req, ImmediateVerification: true, }); err != nil && !errors.Is(err, services.ErrIgnore) { @@ -185,7 +186,15 @@ func (a *ApiHandler) PostEthV1BeaconPoolVoluntaryExits(w http.ResponseWriter, r return } a.operationsPool.VoluntaryExitsPool.Insert(req.VoluntaryExit.ValidatorIndex, &req) - + if a.sentinel != nil { + if _, err := a.sentinel.PublishGossip(r.Context(), &sentinel.GossipData{ + Data: encodedSSZ, + Name: gossip.TopicNameVoluntaryExit, + }); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } // Only write 200 w.WriteHeader(http.StatusOK) } @@ -275,16 +284,21 @@ func (a *ApiHandler) PostEthV1BeaconPoolBlsToExecutionChanges(w http.ResponseWri return } - if err := a.blsToExecutionChangeService.ProcessMessage(r.Context(), nil, &cltypes.SignedBLSToExecutionChangeWithGossipData{ + if err := a.blsToExecutionChangeService.ProcessMessage(r.Context(), nil, &services.SignedBLSToExecutionChangeForGossip{ SignedBLSToExecutionChange: v, - GossipData: &sentinel.GossipData{ - Data: encodedSSZ, - Name: gossip.TopicNameBlsToExecutionChange, - }, }); err != nil && !errors.Is(err, services.ErrIgnore) { failures = append(failures, poolingFailure{Index: len(failures), Message: err.Error()}) continue } + if a.sentinel != nil { + if _, err := a.sentinel.PublishGossip(r.Context(), &sentinel.GossipData{ + Data: encodedSSZ, + Name: gossip.TopicNameBlsToExecutionChange, + }); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } } if len(failures) > 0 { @@ -312,21 +326,26 @@ func (a *ApiHandler) PostEthV1ValidatorAggregatesAndProof(w http.ResponseWriter, log.Warn("[Beacon REST] failed to encode aggregate and proof", "err", err) return } - gossipData := &sentinel.GossipData{ - Data: encodedSSZ, - Name: gossip.TopicNameBeaconAggregateAndProof, - } // for this service we are not publishing gossipData as the service does it internally, we just pass that data as a parameter. - if err := a.aggregateAndProofsService.ProcessMessage(r.Context(), nil, &cltypes.SignedAggregateAndProofData{ + if err := a.aggregateAndProofsService.ProcessMessage(r.Context(), nil, &services.SignedAggregateAndProofForGossip{ SignedAggregateAndProof: v, - GossipData: gossipData, ImmediateProcess: true, // we want to process aggregate and proof immediately }); err != nil && !errors.Is(err, services.ErrIgnore) { log.Warn("[Beacon REST] failed to process bls-change", "err", err) failures = append(failures, poolingFailure{Index: len(failures), Message: err.Error()}) continue } + + if a.sentinel != nil { + if _, err := a.sentinel.PublishGossip(r.Context(), &sentinel.GossipData{ + Data: encodedSSZ, + Name: gossip.TopicNameBeaconAggregateAndProof, + }); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } } } @@ -356,7 +375,7 @@ func (a *ApiHandler) PostEthV1BeaconPoolSyncCommittees(w http.ResponseWriter, r for _, subnet := range publishingSubnets { - var syncCommitteeMessageWithGossipData cltypes.SyncCommitteeMessageWithGossipData + var syncCommitteeMessageWithGossipData services.SyncCommitteeMessageForGossip syncCommitteeMessageWithGossipData.SyncCommitteeMessage = v syncCommitteeMessageWithGossipData.ImmediateVerification = true @@ -367,17 +386,22 @@ func (a *ApiHandler) PostEthV1BeaconPoolSyncCommittees(w http.ResponseWriter, r } subnetId := subnet - syncCommitteeMessageWithGossipData.GossipData = &sentinel.GossipData{ - Data: encodedSSZ, - Name: gossip.TopicNamePrefixSyncCommittee, - SubnetId: &subnetId, - } if err = a.syncCommitteeMessagesService.ProcessMessage(r.Context(), &subnet, &syncCommitteeMessageWithGossipData); err != nil && !errors.Is(err, services.ErrIgnore) { log.Warn("[Beacon REST] failed to process attestation in syncCommittee service", "err", err) failures = append(failures, poolingFailure{Index: idx, Message: err.Error()}) break } + if a.sentinel != nil { + if _, err := a.sentinel.PublishGossip(r.Context(), &sentinel.GossipData{ + Data: encodedSSZ, + Name: gossip.TopicNamePrefixSyncCommittee, + SubnetId: &subnetId, + }); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } } } if len(failures) > 0 { @@ -403,7 +427,7 @@ func (a *ApiHandler) PostEthV1ValidatorContributionsAndProofs(w http.ResponseWri continue // skip empty contributions } - var signedContributionAndProofWithGossipData cltypes.SignedContributionAndProofWithGossipData + var signedContributionAndProofWithGossipData services.SignedContributionAndProofForGossip signedContributionAndProofWithGossipData.SignedContributionAndProof = v signedContributionAndProofWithGossipData.ImmediateVerification = true @@ -414,16 +438,20 @@ func (a *ApiHandler) PostEthV1ValidatorContributionsAndProofs(w http.ResponseWri return } - signedContributionAndProofWithGossipData.GossipData = &sentinel.GossipData{ - Data: encodedSSZ, - Name: gossip.TopicNameSyncCommitteeContributionAndProof, - } - if err = a.syncContributionAndProofsService.ProcessMessage(r.Context(), nil, &signedContributionAndProofWithGossipData); err != nil && !errors.Is(err, services.ErrIgnore) { log.Warn("[Beacon REST] failed to process sync contribution", "err", err) failures = append(failures, poolingFailure{Index: idx, Message: err.Error()}) continue } + if a.sentinel != nil { + if _, err := a.sentinel.PublishGossip(r.Context(), &sentinel.GossipData{ + Data: encodedSSZ, + Name: gossip.TopicNameSyncCommitteeContributionAndProof, + }); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } } if len(failures) > 0 { diff --git a/cl/beacon/handler/utils_test.go b/cl/beacon/handler/utils_test.go index 16e403388c0..6198ab3cd61 100644 --- a/cl/beacon/handler/utils_test.go +++ b/cl/beacon/handler/utils_test.go @@ -45,6 +45,7 @@ import ( "github.com/erigontech/erigon/cl/persistence/state/historical_states_reader" "github.com/erigontech/erigon/cl/phase1/core/state" mock_services2 "github.com/erigontech/erigon/cl/phase1/forkchoice/mock_services" + "github.com/erigontech/erigon/cl/phase1/network/services" "github.com/erigontech/erigon/cl/phase1/network/services/mock_services" "github.com/erigontech/erigon/cl/pool" "github.com/erigontech/erigon/cl/utils/eth_clock" @@ -119,22 +120,22 @@ func setupTestingHandler(t *testing.T, v clparams.StateVersion, logger log.Logge mockValidatorMonitor := mockMonitor.NewMockValidatorMonitor(ctrl) // ctx context.Context, subnetID *uint64, msg *cltypes.SyncCommitteeMessage) error - syncCommitteeMessagesService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *cltypes.SyncCommitteeMessageWithGossipData) error { + syncCommitteeMessagesService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *services.SyncCommitteeMessageForGossip) error { return h.syncMessagePool.AddSyncCommitteeMessage(postState, *subnetID, msg.SyncCommitteeMessage) }).AnyTimes() - syncContributionService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *cltypes.SignedContributionAndProofWithGossipData) error { + syncContributionService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *services.SignedContributionAndProofForGossip) error { return h.syncMessagePool.AddSyncContribution(postState, msg.SignedContributionAndProof.Message.Contribution) }).AnyTimes() - aggregateAndProofsService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *cltypes.SignedAggregateAndProofData) error { + aggregateAndProofsService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *services.SignedAggregateAndProofForGossip) error { opPool.AttestationsPool.Insert(msg.SignedAggregateAndProof.Message.Aggregate.Signature, msg.SignedAggregateAndProof.Message.Aggregate) return nil }).AnyTimes() - voluntaryExitService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *cltypes.SignedVoluntaryExitWithGossipData) error { + voluntaryExitService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *services.SignedVoluntaryExitForGossip) error { opPool.VoluntaryExitsPool.Insert(msg.SignedVoluntaryExit.VoluntaryExit.ValidatorIndex, msg.SignedVoluntaryExit) return nil }).AnyTimes() - blsToExecutionChangeService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *cltypes.SignedBLSToExecutionChangeWithGossipData) error { + blsToExecutionChangeService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *services.SignedBLSToExecutionChangeForGossip) error { opPool.BLSToExecutionChangesPool.Insert(msg.SignedBLSToExecutionChange.Signature, msg.SignedBLSToExecutionChange) return nil }).AnyTimes() diff --git a/cl/cltypes/aggregate.go b/cl/cltypes/aggregate.go index 29920e362a7..2c269cadda9 100644 --- a/cl/cltypes/aggregate.go +++ b/cl/cltypes/aggregate.go @@ -18,7 +18,6 @@ package cltypes import ( libcommon "github.com/erigontech/erigon-lib/common" - sentinel "github.com/erigontech/erigon-lib/gointerfaces/sentinelproto" "github.com/erigontech/erigon/cl/cltypes/solid" "github.com/erigontech/erigon/cl/merkle_tree" ssz2 "github.com/erigontech/erigon/cl/ssz" @@ -55,17 +54,6 @@ func (a *AggregateAndProof) HashSSZ() ([32]byte, error) { return merkle_tree.HashTreeRoot(a.AggregatorIndex, a.Aggregate, a.SelectionProof[:]) } -// SignedAggregateAndProofData is passed to SignedAggregateAndProof service. The service does the signature verification -// asynchronously. That's why we cannot wait for its ProcessMessage call to finish to check error. The service -// will do re-publishing of the gossip or banning the peer in case of invalid signature by itself. -// that's why we are passing sentinel.SentinelClient and *sentinel.GossipData to enable the service -// to do all of that by itself. -type SignedAggregateAndProofData struct { - SignedAggregateAndProof *SignedAggregateAndProof - GossipData *sentinel.GossipData - ImmediateProcess bool -} - type SignedAggregateAndProof struct { Message *AggregateAndProof `json:"message"` Signature libcommon.Bytes96 `json:"signature"` diff --git a/cl/cltypes/bls_to_execution_change.go b/cl/cltypes/bls_to_execution_change.go index ca6a87f7d9a..00bd1bb0fe4 100644 --- a/cl/cltypes/bls_to_execution_change.go +++ b/cl/cltypes/bls_to_execution_change.go @@ -20,7 +20,6 @@ import ( "fmt" libcommon "github.com/erigontech/erigon-lib/common" - sentinel "github.com/erigontech/erigon-lib/gointerfaces/sentinelproto" "github.com/erigontech/erigon-lib/types/ssz" "github.com/erigontech/erigon/cl/merkle_tree" ssz2 "github.com/erigontech/erigon/cl/ssz" @@ -59,13 +58,6 @@ func (*BLSToExecutionChange) Static() bool { return true } -// SignedBLSToExecutionChangeWithGossipData type represents SignedBLSToExecutionChange with the gossip data where it's coming from. -type SignedBLSToExecutionChangeWithGossipData struct { - SignedBLSToExecutionChange *SignedBLSToExecutionChange - GossipData *sentinel.GossipData - ImmediateVerification bool -} - type SignedBLSToExecutionChange struct { Message *BLSToExecutionChange `json:"message"` Signature libcommon.Bytes96 `json:"signature"` diff --git a/cl/cltypes/contribution.go b/cl/cltypes/contribution.go index 319edc9a427..f1c1244c55d 100644 --- a/cl/cltypes/contribution.go +++ b/cl/cltypes/contribution.go @@ -20,7 +20,6 @@ import ( libcommon "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/common/hexutility" "github.com/erigontech/erigon-lib/common/length" - sentinel "github.com/erigontech/erigon-lib/gointerfaces/sentinelproto" "github.com/erigontech/erigon-lib/types/clonable" "github.com/erigontech/erigon/cl/merkle_tree" ssz2 "github.com/erigontech/erigon/cl/ssz" @@ -60,13 +59,6 @@ func (a *ContributionAndProof) HashSSZ() ([32]byte, error) { return merkle_tree.HashTreeRoot(a.AggregatorIndex, a.Contribution, a.SelectionProof[:]) } -// SignedContributionAndProofWithGossipData type represents SignedContributionAndProof with the gossip data where it's coming from. -type SignedContributionAndProofWithGossipData struct { - SignedContributionAndProof *SignedContributionAndProof - GossipData *sentinel.GossipData - ImmediateVerification bool -} - type SignedContributionAndProof struct { Message *ContributionAndProof `json:"message"` Signature libcommon.Bytes96 `json:"signature"` @@ -187,12 +179,6 @@ func (agg *SyncContribution) HashSSZ() ([32]byte, error) { } -type SyncCommitteeMessageWithGossipData struct { - SyncCommitteeMessage *SyncCommitteeMessage - GossipData *sentinel.GossipData - ImmediateVerification bool -} - type SyncCommitteeMessage struct { Slot uint64 `json:"slot,string"` BeaconBlockRoot libcommon.Hash `json:"beacon_block_root"` diff --git a/cl/cltypes/validator.go b/cl/cltypes/validator.go index 49c6d6e455a..e4989afd367 100644 --- a/cl/cltypes/validator.go +++ b/cl/cltypes/validator.go @@ -20,7 +20,6 @@ import ( "encoding/json" libcommon "github.com/erigontech/erigon-lib/common" - sentinel "github.com/erigontech/erigon-lib/gointerfaces/sentinelproto" "github.com/erigontech/erigon-lib/types/clonable" "github.com/erigontech/erigon-lib/types/ssz" @@ -129,13 +128,6 @@ func (*VoluntaryExit) EncodingSizeSSZ() int { return 16 } -// SignedVoluntaryExitWithGossipData type represents SignedVoluntaryExit with the gossip data where it's coming from. -type SignedVoluntaryExitWithGossipData struct { - SignedVoluntaryExit *SignedVoluntaryExit - GossipData *sentinel.GossipData - ImmediateVerification bool -} - type SignedVoluntaryExit struct { VoluntaryExit *VoluntaryExit `json:"message"` Signature libcommon.Bytes96 `json:"signature"` diff --git a/cl/phase1/network/gossip_manager.go b/cl/phase1/network/gossip_manager.go index 847d93c5a4b..baae4fb418b 100644 --- a/cl/phase1/network/gossip_manager.go +++ b/cl/phase1/network/gossip_manager.go @@ -135,9 +135,6 @@ func (g *GossipManager) onRecv(ctx context.Context, data *sentinel.GossipData, l g.sentinel.BanPeer(ctx, data.Peer) return err } - if _, err := g.sentinel.PublishGossip(ctx, data); err != nil { - log.Debug("failed to publish gossip", "err", err) - } return nil } @@ -145,24 +142,18 @@ func (g *GossipManager) isReadyToProcessOperations() bool { return g.forkChoice.HighestSeen()+8 >= g.ethClock.GetCurrentSlot() } -func copyOfSentinelData(in *sentinel.GossipData) *sentinel.GossipData { - ret := &sentinel.GossipData{ - Data: common.Copy(in.Data), - Name: in.Name, - } - if in.SubnetId != nil { - ret.SubnetId = new(uint64) - *ret.SubnetId = *in.SubnetId - } - if in.Peer != nil { - ret.Peer = new(sentinel.Peer) - ret.Peer.State = in.Peer.State - ret.Peer.Pid = in.Peer.Pid - ret.Peer.Enr = in.Peer.Enr - ret.Peer.Direction = in.Peer.Direction - ret.Peer.AgentVersion = in.Peer.AgentVersion - ret.Peer.Address = in.Peer.Address +func copyOfPeerData(in *sentinel.GossipData) *sentinel.Peer { + if in == nil || in.Peer == nil { + return nil } + ret := new(sentinel.Peer) + ret.State = in.Peer.State + ret.Pid = in.Peer.Pid + ret.Enr = in.Peer.Enr + ret.Direction = in.Peer.Direction + ret.AgentVersion = in.Peer.AgentVersion + ret.Address = in.Peer.Address + return ret } @@ -183,8 +174,8 @@ func (g *GossipManager) routeAndProcess(ctx context.Context, data *sentinel.Goss log.Debug("Received block via gossip", "slot", obj.Block.Slot) return g.blockService.ProcessMessage(ctx, data.SubnetId, obj) case gossip.TopicNameSyncCommitteeContributionAndProof: - obj := &cltypes.SignedContributionAndProofWithGossipData{ - GossipData: copyOfSentinelData(data), + obj := &services.SignedContributionAndProofForGossip{ + Receiver: copyOfPeerData(data), SignedContributionAndProof: &cltypes.SignedContributionAndProof{}, } if err := obj.SignedContributionAndProof.DecodeSSZ(data.Data, int(version)); err != nil { @@ -192,8 +183,8 @@ func (g *GossipManager) routeAndProcess(ctx context.Context, data *sentinel.Goss } return g.syncContributionService.ProcessMessage(ctx, data.SubnetId, obj) case gossip.TopicNameVoluntaryExit: - obj := &cltypes.SignedVoluntaryExitWithGossipData{ - GossipData: copyOfSentinelData(data), + obj := &services.SignedVoluntaryExitForGossip{ + Receiver: copyOfPeerData(data), SignedVoluntaryExit: &cltypes.SignedVoluntaryExit{}, } if err := obj.SignedVoluntaryExit.DecodeSSZ(data.Data, int(version)); err != nil { @@ -217,13 +208,10 @@ func (g *GossipManager) routeAndProcess(ctx context.Context, data *sentinel.Goss return err } - if _, err := g.sentinel.PublishGossip(ctx, data); err != nil { - log.Debug("failed to publish gossip", "err", err) - } return nil case gossip.TopicNameBlsToExecutionChange: - obj := &cltypes.SignedBLSToExecutionChangeWithGossipData{ - GossipData: copyOfSentinelData(data), + obj := &services.SignedBLSToExecutionChangeForGossip{ + Receiver: copyOfPeerData(data), SignedBLSToExecutionChange: &cltypes.SignedBLSToExecutionChange{}, } if err := obj.SignedBLSToExecutionChange.DecodeSSZ(data.Data, int(version)); err != nil { @@ -231,11 +219,10 @@ func (g *GossipManager) routeAndProcess(ctx context.Context, data *sentinel.Goss } return g.blsToExecutionChangeService.ProcessMessage(ctx, data.SubnetId, obj) case gossip.TopicNameBeaconAggregateAndProof: - obj := &cltypes.SignedAggregateAndProofData{ - GossipData: copyOfSentinelData(data), + obj := &services.SignedAggregateAndProofForGossip{ + Receiver: copyOfPeerData(data), SignedAggregateAndProof: &cltypes.SignedAggregateAndProof{}, } - if err := obj.SignedAggregateAndProof.DecodeSSZ(common.CopyBytes(data.Data), int(version)); err != nil { return err } @@ -252,17 +239,17 @@ func (g *GossipManager) routeAndProcess(ctx context.Context, data *sentinel.Goss // The background checks above are enough for now. return g.blobService.ProcessMessage(ctx, data.SubnetId, blobSideCar) case gossip.IsTopicSyncCommittee(data.Name): - msg := &cltypes.SyncCommitteeMessageWithGossipData{ - GossipData: copyOfSentinelData(data), + obj := &services.SyncCommitteeMessageForGossip{ + Receiver: copyOfPeerData(data), SyncCommitteeMessage: &cltypes.SyncCommitteeMessage{}, } - if err := msg.SyncCommitteeMessage.DecodeSSZ(common.CopyBytes(data.Data), int(version)); err != nil { + if err := obj.SyncCommitteeMessage.DecodeSSZ(common.CopyBytes(data.Data), int(version)); err != nil { return err } - return g.syncCommitteeMessagesService.ProcessMessage(ctx, data.SubnetId, msg) + return g.syncCommitteeMessagesService.ProcessMessage(ctx, data.SubnetId, obj) case gossip.IsTopicBeaconAttestation(data.Name): - obj := &services.AttestationWithGossipData{ - GossipData: copyOfSentinelData(data), + obj := &services.AttestationForGossip{ + Receiver: copyOfPeerData(data), Attestation: &solid.Attestation{}, SingleAttestation: &solid.SingleAttestation{}, ImmediateProcess: false, diff --git a/cl/phase1/network/services/aggregate_and_proof_service.go b/cl/phase1/network/services/aggregate_and_proof_service.go index 1e80db2a673..6307d554c00 100644 --- a/cl/phase1/network/services/aggregate_and_proof_service.go +++ b/cl/phase1/network/services/aggregate_and_proof_service.go @@ -26,6 +26,7 @@ import ( "github.com/Giulio2002/bls" + sentinel "github.com/erigontech/erigon-lib/gointerfaces/sentinelproto" "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon/cl/beacon/synced_data" @@ -42,8 +43,19 @@ import ( "github.com/erigontech/erigon/cl/utils" ) +// SignedAggregateAndProofData is passed to SignedAggregateAndProof service. The service does the signature verification +// asynchronously. That's why we cannot wait for its ProcessMessage call to finish to check error. The service +// will do re-publishing of the gossip or banning the peer in case of invalid signature by itself. +// that's why we are passing sentinel.SentinelClient and *sentinel.GossipData to enable the service +// to do all of that by itself. +type SignedAggregateAndProofForGossip struct { + SignedAggregateAndProof *cltypes.SignedAggregateAndProof + Receiver *sentinel.Peer + ImmediateProcess bool +} + type aggregateJob struct { - aggregate *cltypes.SignedAggregateAndProofData + aggregate *SignedAggregateAndProofForGossip creationTime time.Time } @@ -96,7 +108,7 @@ func NewAggregateAndProofService( func (a *aggregateAndProofServiceImpl) ProcessMessage( ctx context.Context, subnet *uint64, - aggregateAndProof *cltypes.SignedAggregateAndProofData, + aggregateAndProof *SignedAggregateAndProofForGossip, ) error { selectionProof := aggregateAndProof.SignedAggregateAndProof.Message.SelectionProof aggregateData := aggregateAndProof.SignedAggregateAndProof.Message.Aggregate.Data @@ -232,7 +244,7 @@ func (a *aggregateAndProofServiceImpl) ProcessMessage( a.seenAggreatorIndexes.Add(seenIndex, struct{}{}) } // for this specific request, collect data for potential peer banning or gossip publishing - aggregateVerificationData.GossipData = aggregateAndProof.GossipData + aggregateVerificationData.SendingPeer = aggregateAndProof.Receiver if aggregateAndProof.ImmediateProcess { return a.batchSignatureVerifier.ImmediateVerification(aggregateVerificationData) @@ -361,7 +373,7 @@ func AggregateMessageSignature( } func (a *aggregateAndProofServiceImpl) scheduleAggregateForLaterProcessing( - aggregateAndProof *cltypes.SignedAggregateAndProofData, + aggregateAndProof *SignedAggregateAndProofForGossip, ) { key, err := aggregateAndProof.SignedAggregateAndProof.HashSSZ() if err != nil { diff --git a/cl/phase1/network/services/aggregate_and_proof_service_test.go b/cl/phase1/network/services/aggregate_and_proof_service_test.go index 6745a05a893..82633125207 100644 --- a/cl/phase1/network/services/aggregate_and_proof_service_test.go +++ b/cl/phase1/network/services/aggregate_and_proof_service_test.go @@ -35,11 +35,11 @@ import ( "github.com/erigontech/erigon/cl/pool" ) -func getAggregateAndProofAndState(t *testing.T) (*cltypes.SignedAggregateAndProofData, *state.CachingBeaconState) { +func getAggregateAndProofAndState(t *testing.T) (*SignedAggregateAndProofForGossip, *state.CachingBeaconState) { _, _, s := tests.GetBellatrixRandom() br, _ := s.BlockRoot() checkpoint := s.CurrentJustifiedCheckpoint() - a := &cltypes.SignedAggregateAndProofData{ + a := &SignedAggregateAndProofForGossip{ SignedAggregateAndProof: &cltypes.SignedAggregateAndProof{ Message: &cltypes.AggregateAndProof{ AggregatorIndex: 141, @@ -62,7 +62,6 @@ func getAggregateAndProofAndState(t *testing.T) (*cltypes.SignedAggregateAndProo }, }, }, - GossipData: nil, } a.SignedAggregateAndProof.Message.Aggregate.Data.Target.Epoch = s.Slot() / 32 diff --git a/cl/phase1/network/services/attestation_service.go b/cl/phase1/network/services/attestation_service.go index 6c7adfba1fa..b701a2533cc 100644 --- a/cl/phase1/network/services/attestation_service.go +++ b/cl/phase1/network/services/attestation_service.go @@ -65,10 +65,10 @@ type attestationService struct { } // AttestationWithGossipData type represents attestation with the gossip data where it's coming from. -type AttestationWithGossipData struct { +type AttestationForGossip struct { Attestation *solid.Attestation SingleAttestation *solid.SingleAttestation // New container after Electra - GossipData *sentinel.GossipData + Receiver *sentinel.Peer // ImmediateProcess indicates whether the attestation should be processed immediately or able to be scheduled for later processing. ImmediateProcess bool } @@ -103,7 +103,7 @@ func NewAttestationService( return a } -func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64, att *AttestationWithGossipData) error { +func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64, att *AttestationForGossip) error { var ( root libcommon.Hash slot uint64 @@ -276,10 +276,10 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64, } aggregateVerificationData := &AggregateVerificationData{ - Signatures: [][]byte{signature[:]}, - SignRoots: [][]byte{signingRoot[:]}, - Pks: [][]byte{pubKey[:]}, - GossipData: att.GossipData, + Signatures: [][]byte{signature[:]}, + SignRoots: [][]byte{signingRoot[:]}, + Pks: [][]byte{pubKey[:]}, + SendingPeer: att.Receiver, F: func() { start := time.Now() defer monitor.ObserveAggregateAttestation(start) diff --git a/cl/phase1/network/services/attestation_service_test.go b/cl/phase1/network/services/attestation_service_test.go index 45d5a008435..b5d61bbf553 100644 --- a/cl/phase1/network/services/attestation_service_test.go +++ b/cl/phase1/network/services/attestation_service_test.go @@ -346,7 +346,7 @@ func (t *attestationTestSuite) TestAttestationProcessMessage() { log.Printf("test case: %s", tt.name) t.SetupTest() tt.mock() - err := t.attService.ProcessMessage(tt.args.ctx, tt.args.subnet, &AttestationWithGossipData{Attestation: tt.args.msg, GossipData: nil, ImmediateProcess: true}) + err := t.attService.ProcessMessage(tt.args.ctx, tt.args.subnet, &AttestationForGossip{Attestation: tt.args.msg, ImmediateProcess: true}) time.Sleep(time.Millisecond * 60) if tt.wantErr { t.Require().Error(err) diff --git a/cl/phase1/network/services/batch_signature_verification.go b/cl/phase1/network/services/batch_signature_verification.go index c7c7cef9ca8..b70db4b9efe 100644 --- a/cl/phase1/network/services/batch_signature_verification.go +++ b/cl/phase1/network/services/batch_signature_verification.go @@ -38,11 +38,11 @@ var ErrInvalidBlsSignature = errors.New("invalid bls signature") // to make sure that we can validate it separately and in case of failure we ban corresponding // GossipData.Peer or simply run F and publish GossipData in case signature verification succeeds. type AggregateVerificationData struct { - Signatures [][]byte - SignRoots [][]byte - Pks [][]byte - F func() - GossipData *sentinel.GossipData + Signatures [][]byte + SignRoots [][]byte + Pks [][]byte + F func() + SendingPeer *sentinel.Peer } func NewBatchSignatureVerifier(ctx context.Context, sentinel sentinel.SentinelClient) *BatchSignatureVerifier { @@ -151,11 +151,6 @@ func (b *BatchSignatureVerifier) processSignatureVerification(aggregateVerificat // Everything went well, run corresponding Fs and send all the gossip data to the network for _, v := range aggregateVerificationData { v.F() - if b.sentinel != nil && v.GossipData != nil { - if _, err := b.sentinel.PublishGossip(b.ctx, v.GossipData); err != nil { - log.Debug("failed to publish gossip", "err", err) - } - } } return nil } @@ -167,19 +162,19 @@ func (b *BatchSignatureVerifier) handleIncorrectSignatures(aggregateVerification valid, err := blsVerifyMultipleSignatures(v.Signatures, v.SignRoots, v.Pks) if err != nil { log.Crit("[BatchVerifier] signature verification failed with the error: " + err.Error()) - if b.sentinel != nil && v.GossipData != nil && v.GossipData.Peer != nil { - b.sentinel.BanPeer(b.ctx, v.GossipData.Peer) + if b.sentinel != nil && v.SendingPeer != nil { + b.sentinel.BanPeer(b.ctx, v.SendingPeer) } continue } if !valid { - if v.GossipData == nil || alreadyBanned { + if v.SendingPeer == nil || alreadyBanned { continue } - log.Debug("[BatchVerifier] received invalid signature on the gossip", "topic", v.GossipData.Name) - if b.sentinel != nil && v.GossipData != nil && v.GossipData.Peer != nil { - b.sentinel.BanPeer(b.ctx, v.GossipData.Peer) + log.Debug("[BatchVerifier] received invalid signature on the gossip", "peer", v.SendingPeer.Pid) + if b.sentinel != nil && v.SendingPeer != nil { + b.sentinel.BanPeer(b.ctx, v.SendingPeer) alreadyBanned = true } continue @@ -187,11 +182,6 @@ func (b *BatchSignatureVerifier) handleIncorrectSignatures(aggregateVerification // run corresponding function and publish the gossip into the network v.F() - if b.sentinel != nil && v.GossipData != nil { - if _, err := b.sentinel.PublishGossip(b.ctx, v.GossipData); err != nil { - log.Debug("failed to publish gossip", "err", err) - } - } } } diff --git a/cl/phase1/network/services/bls_to_execution_change_service.go b/cl/phase1/network/services/bls_to_execution_change_service.go index 2a9a8cfe347..4d8919c15db 100644 --- a/cl/phase1/network/services/bls_to_execution_change_service.go +++ b/cl/phase1/network/services/bls_to_execution_change_service.go @@ -24,6 +24,7 @@ import ( "github.com/Giulio2002/bls" "github.com/erigontech/erigon-lib/common" + sentinel "github.com/erigontech/erigon-lib/gointerfaces/sentinelproto" "github.com/erigontech/erigon/cl/beacon/beaconevents" "github.com/erigontech/erigon/cl/beacon/synced_data" "github.com/erigontech/erigon/cl/clparams" @@ -38,6 +39,13 @@ var ( blsVerify = bls.Verify ) +// SignedBLSToExecutionChangeForGossip type represents SignedBLSToExecutionChange with the gossip data where it's coming from. +type SignedBLSToExecutionChangeForGossip struct { + SignedBLSToExecutionChange *cltypes.SignedBLSToExecutionChange + Receiver *sentinel.Peer + ImmediateVerification bool +} + type blsToExecutionChangeService struct { operationsPool pool.OperationsPool emitters *beaconevents.EventEmitter @@ -62,7 +70,7 @@ func NewBLSToExecutionChangeService( } } -func (s *blsToExecutionChangeService) ProcessMessage(ctx context.Context, subnet *uint64, msg *cltypes.SignedBLSToExecutionChangeWithGossipData) error { +func (s *blsToExecutionChangeService) ProcessMessage(ctx context.Context, subnet *uint64, msg *SignedBLSToExecutionChangeForGossip) error { // https://github.com/ethereum/consensus-specs/blob/dev/specs/capella/p2p-interface.md#bls_to_execution_change // [IGNORE] The signed_bls_to_execution_change is the first valid signed bls to execution change received // for the validator with index signed_bls_to_execution_change.message.validator_index. @@ -117,10 +125,10 @@ func (s *blsToExecutionChangeService) ProcessMessage(ctx context.Context, subnet } aggregateVerificationData := &AggregateVerificationData{ - Signatures: [][]byte{msg.SignedBLSToExecutionChange.Signature[:]}, - SignRoots: [][]byte{signedRoot[:]}, - Pks: [][]byte{change.From[:]}, - GossipData: msg.GossipData, + Signatures: [][]byte{msg.SignedBLSToExecutionChange.Signature[:]}, + SignRoots: [][]byte{signedRoot[:]}, + Pks: [][]byte{change.From[:]}, + SendingPeer: msg.Receiver, F: func() { s.emitters.Operation().SendBlsToExecution(msg.SignedBLSToExecutionChange) s.operationsPool.BLSToExecutionChangesPool.Insert(msg.SignedBLSToExecutionChange.Signature, msg.SignedBLSToExecutionChange) diff --git a/cl/phase1/network/services/bls_to_execution_change_service_test.go b/cl/phase1/network/services/bls_to_execution_change_service_test.go index 6f49fd827e9..1bd041ef3fb 100644 --- a/cl/phase1/network/services/bls_to_execution_change_service_test.go +++ b/cl/phase1/network/services/bls_to_execution_change_service_test.go @@ -78,7 +78,7 @@ func (t *blsToExecutionChangeTestSuite) TearDownTest() { } func (t *blsToExecutionChangeTestSuite) TestProcessMessage() { - mockMsg := &cltypes.SignedBLSToExecutionChangeWithGossipData{ + mockMsg := &SignedBLSToExecutionChangeForGossip{ SignedBLSToExecutionChange: &cltypes.SignedBLSToExecutionChange{ Message: &cltypes.BLSToExecutionChange{ ValidatorIndex: 1, @@ -87,14 +87,13 @@ func (t *blsToExecutionChangeTestSuite) TestProcessMessage() { }, Signature: [96]byte{1, 2, 3}, }, - GossipData: nil, ImmediateVerification: true, } tests := []struct { name string mock func() - msg *cltypes.SignedBLSToExecutionChangeWithGossipData + msg *SignedBLSToExecutionChangeForGossip wantErr bool specificErr error }{ diff --git a/cl/phase1/network/services/interface.go b/cl/phase1/network/services/interface.go index 02a3b26a8d2..fc03bd35af1 100644 --- a/cl/phase1/network/services/interface.go +++ b/cl/phase1/network/services/interface.go @@ -35,22 +35,22 @@ type BlockService Service[*cltypes.SignedBeaconBlock] type BlobSidecarsService Service[*cltypes.BlobSidecar] //go:generate mockgen -typed=true -destination=./mock_services/sync_committee_messages_service_mock.go -package=mock_services . SyncCommitteeMessagesService -type SyncCommitteeMessagesService Service[*cltypes.SyncCommitteeMessageWithGossipData] +type SyncCommitteeMessagesService Service[*SyncCommitteeMessageForGossip] //go:generate mockgen -typed=true -destination=./mock_services/sync_contribution_service_mock.go -package=mock_services . SyncContributionService -type SyncContributionService Service[*cltypes.SignedContributionAndProofWithGossipData] +type SyncContributionService Service[*SignedContributionAndProofForGossip] //go:generate mockgen -typed=true -destination=./mock_services/aggregate_and_proof_service_mock.go -package=mock_services . AggregateAndProofService -type AggregateAndProofService Service[*cltypes.SignedAggregateAndProofData] +type AggregateAndProofService Service[*SignedAggregateAndProofForGossip] //go:generate mockgen -typed=true -destination=./mock_services/attestation_service_mock.go -package=mock_services . AttestationService -type AttestationService Service[*AttestationWithGossipData] +type AttestationService Service[*AttestationForGossip] //go:generate mockgen -typed=true -destination=./mock_services/voluntary_exit_service_mock.go -package=mock_services . VoluntaryExitService -type VoluntaryExitService Service[*cltypes.SignedVoluntaryExitWithGossipData] +type VoluntaryExitService Service[*SignedVoluntaryExitForGossip] //go:generate mockgen -typed=true -destination=./mock_services/bls_to_execution_change_service_mock.go -package=mock_services . BLSToExecutionChangeService -type BLSToExecutionChangeService Service[*cltypes.SignedBLSToExecutionChangeWithGossipData] +type BLSToExecutionChangeService Service[*SignedBLSToExecutionChangeForGossip] //go:generate mockgen -typed=true -destination=./mock_services/proposer_slashing_service_mock.go -package=mock_services . ProposerSlashingService type ProposerSlashingService Service[*cltypes.ProposerSlashing] diff --git a/cl/phase1/network/services/mock_services/aggregate_and_proof_service_mock.go b/cl/phase1/network/services/mock_services/aggregate_and_proof_service_mock.go index 862b6c4043c..7eb06d711f5 100644 --- a/cl/phase1/network/services/mock_services/aggregate_and_proof_service_mock.go +++ b/cl/phase1/network/services/mock_services/aggregate_and_proof_service_mock.go @@ -13,7 +13,7 @@ import ( context "context" reflect "reflect" - cltypes "github.com/erigontech/erigon/cl/cltypes" + services "github.com/erigontech/erigon/cl/phase1/network/services" gomock "go.uber.org/mock/gomock" ) @@ -21,7 +21,6 @@ import ( type MockAggregateAndProofService struct { ctrl *gomock.Controller recorder *MockAggregateAndProofServiceMockRecorder - isgomock struct{} } // MockAggregateAndProofServiceMockRecorder is the mock recorder for MockAggregateAndProofService. @@ -42,17 +41,17 @@ func (m *MockAggregateAndProofService) EXPECT() *MockAggregateAndProofServiceMoc } // ProcessMessage mocks base method. -func (m *MockAggregateAndProofService) ProcessMessage(ctx context.Context, subnet *uint64, msg *cltypes.SignedAggregateAndProofData) error { +func (m *MockAggregateAndProofService) ProcessMessage(arg0 context.Context, arg1 *uint64, arg2 *services.SignedAggregateAndProofForGossip) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ProcessMessage", ctx, subnet, msg) + ret := m.ctrl.Call(m, "ProcessMessage", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } // ProcessMessage indicates an expected call of ProcessMessage. -func (mr *MockAggregateAndProofServiceMockRecorder) ProcessMessage(ctx, subnet, msg any) *MockAggregateAndProofServiceProcessMessageCall { +func (mr *MockAggregateAndProofServiceMockRecorder) ProcessMessage(arg0, arg1, arg2 any) *MockAggregateAndProofServiceProcessMessageCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessMessage", reflect.TypeOf((*MockAggregateAndProofService)(nil).ProcessMessage), ctx, subnet, msg) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessMessage", reflect.TypeOf((*MockAggregateAndProofService)(nil).ProcessMessage), arg0, arg1, arg2) return &MockAggregateAndProofServiceProcessMessageCall{Call: call} } @@ -68,13 +67,13 @@ func (c *MockAggregateAndProofServiceProcessMessageCall) Return(arg0 error) *Moc } // Do rewrite *gomock.Call.Do -func (c *MockAggregateAndProofServiceProcessMessageCall) Do(f func(context.Context, *uint64, *cltypes.SignedAggregateAndProofData) error) *MockAggregateAndProofServiceProcessMessageCall { +func (c *MockAggregateAndProofServiceProcessMessageCall) Do(f func(context.Context, *uint64, *services.SignedAggregateAndProofForGossip) error) *MockAggregateAndProofServiceProcessMessageCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *MockAggregateAndProofServiceProcessMessageCall) DoAndReturn(f func(context.Context, *uint64, *cltypes.SignedAggregateAndProofData) error) *MockAggregateAndProofServiceProcessMessageCall { +func (c *MockAggregateAndProofServiceProcessMessageCall) DoAndReturn(f func(context.Context, *uint64, *services.SignedAggregateAndProofForGossip) error) *MockAggregateAndProofServiceProcessMessageCall { c.Call = c.Call.DoAndReturn(f) return c } diff --git a/cl/phase1/network/services/mock_services/attestation_service_mock.go b/cl/phase1/network/services/mock_services/attestation_service_mock.go index 5220a08349a..87b15ca5232 100644 --- a/cl/phase1/network/services/mock_services/attestation_service_mock.go +++ b/cl/phase1/network/services/mock_services/attestation_service_mock.go @@ -21,7 +21,6 @@ import ( type MockAttestationService struct { ctrl *gomock.Controller recorder *MockAttestationServiceMockRecorder - isgomock struct{} } // MockAttestationServiceMockRecorder is the mock recorder for MockAttestationService. @@ -42,17 +41,17 @@ func (m *MockAttestationService) EXPECT() *MockAttestationServiceMockRecorder { } // ProcessMessage mocks base method. -func (m *MockAttestationService) ProcessMessage(ctx context.Context, subnet *uint64, msg *services.AttestationWithGossipData) error { +func (m *MockAttestationService) ProcessMessage(arg0 context.Context, arg1 *uint64, arg2 *services.AttestationForGossip) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ProcessMessage", ctx, subnet, msg) + ret := m.ctrl.Call(m, "ProcessMessage", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } // ProcessMessage indicates an expected call of ProcessMessage. -func (mr *MockAttestationServiceMockRecorder) ProcessMessage(ctx, subnet, msg any) *MockAttestationServiceProcessMessageCall { +func (mr *MockAttestationServiceMockRecorder) ProcessMessage(arg0, arg1, arg2 any) *MockAttestationServiceProcessMessageCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessMessage", reflect.TypeOf((*MockAttestationService)(nil).ProcessMessage), ctx, subnet, msg) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessMessage", reflect.TypeOf((*MockAttestationService)(nil).ProcessMessage), arg0, arg1, arg2) return &MockAttestationServiceProcessMessageCall{Call: call} } @@ -68,13 +67,13 @@ func (c *MockAttestationServiceProcessMessageCall) Return(arg0 error) *MockAttes } // Do rewrite *gomock.Call.Do -func (c *MockAttestationServiceProcessMessageCall) Do(f func(context.Context, *uint64, *services.AttestationWithGossipData) error) *MockAttestationServiceProcessMessageCall { +func (c *MockAttestationServiceProcessMessageCall) Do(f func(context.Context, *uint64, *services.AttestationForGossip) error) *MockAttestationServiceProcessMessageCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *MockAttestationServiceProcessMessageCall) DoAndReturn(f func(context.Context, *uint64, *services.AttestationWithGossipData) error) *MockAttestationServiceProcessMessageCall { +func (c *MockAttestationServiceProcessMessageCall) DoAndReturn(f func(context.Context, *uint64, *services.AttestationForGossip) error) *MockAttestationServiceProcessMessageCall { c.Call = c.Call.DoAndReturn(f) return c } diff --git a/cl/phase1/network/services/mock_services/blob_sidecars_service_mock.go b/cl/phase1/network/services/mock_services/blob_sidecars_service_mock.go index 1445222c060..ee82d59319a 100644 --- a/cl/phase1/network/services/mock_services/blob_sidecars_service_mock.go +++ b/cl/phase1/network/services/mock_services/blob_sidecars_service_mock.go @@ -21,7 +21,6 @@ import ( type MockBlobSidecarsService struct { ctrl *gomock.Controller recorder *MockBlobSidecarsServiceMockRecorder - isgomock struct{} } // MockBlobSidecarsServiceMockRecorder is the mock recorder for MockBlobSidecarsService. @@ -42,17 +41,17 @@ func (m *MockBlobSidecarsService) EXPECT() *MockBlobSidecarsServiceMockRecorder } // ProcessMessage mocks base method. -func (m *MockBlobSidecarsService) ProcessMessage(ctx context.Context, subnet *uint64, msg *cltypes.BlobSidecar) error { +func (m *MockBlobSidecarsService) ProcessMessage(arg0 context.Context, arg1 *uint64, arg2 *cltypes.BlobSidecar) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ProcessMessage", ctx, subnet, msg) + ret := m.ctrl.Call(m, "ProcessMessage", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } // ProcessMessage indicates an expected call of ProcessMessage. -func (mr *MockBlobSidecarsServiceMockRecorder) ProcessMessage(ctx, subnet, msg any) *MockBlobSidecarsServiceProcessMessageCall { +func (mr *MockBlobSidecarsServiceMockRecorder) ProcessMessage(arg0, arg1, arg2 any) *MockBlobSidecarsServiceProcessMessageCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessMessage", reflect.TypeOf((*MockBlobSidecarsService)(nil).ProcessMessage), ctx, subnet, msg) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessMessage", reflect.TypeOf((*MockBlobSidecarsService)(nil).ProcessMessage), arg0, arg1, arg2) return &MockBlobSidecarsServiceProcessMessageCall{Call: call} } diff --git a/cl/phase1/network/services/mock_services/block_service_mock.go b/cl/phase1/network/services/mock_services/block_service_mock.go index 52ae2370c10..056061874a9 100644 --- a/cl/phase1/network/services/mock_services/block_service_mock.go +++ b/cl/phase1/network/services/mock_services/block_service_mock.go @@ -21,7 +21,6 @@ import ( type MockBlockService struct { ctrl *gomock.Controller recorder *MockBlockServiceMockRecorder - isgomock struct{} } // MockBlockServiceMockRecorder is the mock recorder for MockBlockService. @@ -42,17 +41,17 @@ func (m *MockBlockService) EXPECT() *MockBlockServiceMockRecorder { } // ProcessMessage mocks base method. -func (m *MockBlockService) ProcessMessage(ctx context.Context, subnet *uint64, msg *cltypes.SignedBeaconBlock) error { +func (m *MockBlockService) ProcessMessage(arg0 context.Context, arg1 *uint64, arg2 *cltypes.SignedBeaconBlock) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ProcessMessage", ctx, subnet, msg) + ret := m.ctrl.Call(m, "ProcessMessage", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } // ProcessMessage indicates an expected call of ProcessMessage. -func (mr *MockBlockServiceMockRecorder) ProcessMessage(ctx, subnet, msg any) *MockBlockServiceProcessMessageCall { +func (mr *MockBlockServiceMockRecorder) ProcessMessage(arg0, arg1, arg2 any) *MockBlockServiceProcessMessageCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessMessage", reflect.TypeOf((*MockBlockService)(nil).ProcessMessage), ctx, subnet, msg) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessMessage", reflect.TypeOf((*MockBlockService)(nil).ProcessMessage), arg0, arg1, arg2) return &MockBlockServiceProcessMessageCall{Call: call} } diff --git a/cl/phase1/network/services/mock_services/bls_to_execution_change_service_mock.go b/cl/phase1/network/services/mock_services/bls_to_execution_change_service_mock.go index fd4b3da1f37..e8f396b0878 100644 --- a/cl/phase1/network/services/mock_services/bls_to_execution_change_service_mock.go +++ b/cl/phase1/network/services/mock_services/bls_to_execution_change_service_mock.go @@ -13,7 +13,7 @@ import ( context "context" reflect "reflect" - cltypes "github.com/erigontech/erigon/cl/cltypes" + services "github.com/erigontech/erigon/cl/phase1/network/services" gomock "go.uber.org/mock/gomock" ) @@ -21,7 +21,6 @@ import ( type MockBLSToExecutionChangeService struct { ctrl *gomock.Controller recorder *MockBLSToExecutionChangeServiceMockRecorder - isgomock struct{} } // MockBLSToExecutionChangeServiceMockRecorder is the mock recorder for MockBLSToExecutionChangeService. @@ -42,17 +41,17 @@ func (m *MockBLSToExecutionChangeService) EXPECT() *MockBLSToExecutionChangeServ } // ProcessMessage mocks base method. -func (m *MockBLSToExecutionChangeService) ProcessMessage(ctx context.Context, subnet *uint64, msg *cltypes.SignedBLSToExecutionChangeWithGossipData) error { +func (m *MockBLSToExecutionChangeService) ProcessMessage(arg0 context.Context, arg1 *uint64, arg2 *services.SignedBLSToExecutionChangeForGossip) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ProcessMessage", ctx, subnet, msg) + ret := m.ctrl.Call(m, "ProcessMessage", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } // ProcessMessage indicates an expected call of ProcessMessage. -func (mr *MockBLSToExecutionChangeServiceMockRecorder) ProcessMessage(ctx, subnet, msg any) *MockBLSToExecutionChangeServiceProcessMessageCall { +func (mr *MockBLSToExecutionChangeServiceMockRecorder) ProcessMessage(arg0, arg1, arg2 any) *MockBLSToExecutionChangeServiceProcessMessageCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessMessage", reflect.TypeOf((*MockBLSToExecutionChangeService)(nil).ProcessMessage), ctx, subnet, msg) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessMessage", reflect.TypeOf((*MockBLSToExecutionChangeService)(nil).ProcessMessage), arg0, arg1, arg2) return &MockBLSToExecutionChangeServiceProcessMessageCall{Call: call} } @@ -68,13 +67,13 @@ func (c *MockBLSToExecutionChangeServiceProcessMessageCall) Return(arg0 error) * } // Do rewrite *gomock.Call.Do -func (c *MockBLSToExecutionChangeServiceProcessMessageCall) Do(f func(context.Context, *uint64, *cltypes.SignedBLSToExecutionChangeWithGossipData) error) *MockBLSToExecutionChangeServiceProcessMessageCall { +func (c *MockBLSToExecutionChangeServiceProcessMessageCall) Do(f func(context.Context, *uint64, *services.SignedBLSToExecutionChangeForGossip) error) *MockBLSToExecutionChangeServiceProcessMessageCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *MockBLSToExecutionChangeServiceProcessMessageCall) DoAndReturn(f func(context.Context, *uint64, *cltypes.SignedBLSToExecutionChangeWithGossipData) error) *MockBLSToExecutionChangeServiceProcessMessageCall { +func (c *MockBLSToExecutionChangeServiceProcessMessageCall) DoAndReturn(f func(context.Context, *uint64, *services.SignedBLSToExecutionChangeForGossip) error) *MockBLSToExecutionChangeServiceProcessMessageCall { c.Call = c.Call.DoAndReturn(f) return c } diff --git a/cl/phase1/network/services/mock_services/proposer_slashing_service_mock.go b/cl/phase1/network/services/mock_services/proposer_slashing_service_mock.go index 9be17ca5202..ee9c87ab65b 100644 --- a/cl/phase1/network/services/mock_services/proposer_slashing_service_mock.go +++ b/cl/phase1/network/services/mock_services/proposer_slashing_service_mock.go @@ -21,7 +21,6 @@ import ( type MockProposerSlashingService struct { ctrl *gomock.Controller recorder *MockProposerSlashingServiceMockRecorder - isgomock struct{} } // MockProposerSlashingServiceMockRecorder is the mock recorder for MockProposerSlashingService. @@ -42,17 +41,17 @@ func (m *MockProposerSlashingService) EXPECT() *MockProposerSlashingServiceMockR } // ProcessMessage mocks base method. -func (m *MockProposerSlashingService) ProcessMessage(ctx context.Context, subnet *uint64, msg *cltypes.ProposerSlashing) error { +func (m *MockProposerSlashingService) ProcessMessage(arg0 context.Context, arg1 *uint64, arg2 *cltypes.ProposerSlashing) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ProcessMessage", ctx, subnet, msg) + ret := m.ctrl.Call(m, "ProcessMessage", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } // ProcessMessage indicates an expected call of ProcessMessage. -func (mr *MockProposerSlashingServiceMockRecorder) ProcessMessage(ctx, subnet, msg any) *MockProposerSlashingServiceProcessMessageCall { +func (mr *MockProposerSlashingServiceMockRecorder) ProcessMessage(arg0, arg1, arg2 any) *MockProposerSlashingServiceProcessMessageCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessMessage", reflect.TypeOf((*MockProposerSlashingService)(nil).ProcessMessage), ctx, subnet, msg) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessMessage", reflect.TypeOf((*MockProposerSlashingService)(nil).ProcessMessage), arg0, arg1, arg2) return &MockProposerSlashingServiceProcessMessageCall{Call: call} } diff --git a/cl/phase1/network/services/mock_services/sync_committee_messages_service_mock.go b/cl/phase1/network/services/mock_services/sync_committee_messages_service_mock.go index 02383e81fa8..c2bb9a141a8 100644 --- a/cl/phase1/network/services/mock_services/sync_committee_messages_service_mock.go +++ b/cl/phase1/network/services/mock_services/sync_committee_messages_service_mock.go @@ -13,7 +13,7 @@ import ( context "context" reflect "reflect" - cltypes "github.com/erigontech/erigon/cl/cltypes" + services "github.com/erigontech/erigon/cl/phase1/network/services" gomock "go.uber.org/mock/gomock" ) @@ -21,7 +21,6 @@ import ( type MockSyncCommitteeMessagesService struct { ctrl *gomock.Controller recorder *MockSyncCommitteeMessagesServiceMockRecorder - isgomock struct{} } // MockSyncCommitteeMessagesServiceMockRecorder is the mock recorder for MockSyncCommitteeMessagesService. @@ -42,17 +41,17 @@ func (m *MockSyncCommitteeMessagesService) EXPECT() *MockSyncCommitteeMessagesSe } // ProcessMessage mocks base method. -func (m *MockSyncCommitteeMessagesService) ProcessMessage(ctx context.Context, subnet *uint64, msg *cltypes.SyncCommitteeMessageWithGossipData) error { +func (m *MockSyncCommitteeMessagesService) ProcessMessage(arg0 context.Context, arg1 *uint64, arg2 *services.SyncCommitteeMessageForGossip) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ProcessMessage", ctx, subnet, msg) + ret := m.ctrl.Call(m, "ProcessMessage", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } // ProcessMessage indicates an expected call of ProcessMessage. -func (mr *MockSyncCommitteeMessagesServiceMockRecorder) ProcessMessage(ctx, subnet, msg any) *MockSyncCommitteeMessagesServiceProcessMessageCall { +func (mr *MockSyncCommitteeMessagesServiceMockRecorder) ProcessMessage(arg0, arg1, arg2 any) *MockSyncCommitteeMessagesServiceProcessMessageCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessMessage", reflect.TypeOf((*MockSyncCommitteeMessagesService)(nil).ProcessMessage), ctx, subnet, msg) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessMessage", reflect.TypeOf((*MockSyncCommitteeMessagesService)(nil).ProcessMessage), arg0, arg1, arg2) return &MockSyncCommitteeMessagesServiceProcessMessageCall{Call: call} } @@ -68,13 +67,13 @@ func (c *MockSyncCommitteeMessagesServiceProcessMessageCall) Return(arg0 error) } // Do rewrite *gomock.Call.Do -func (c *MockSyncCommitteeMessagesServiceProcessMessageCall) Do(f func(context.Context, *uint64, *cltypes.SyncCommitteeMessageWithGossipData) error) *MockSyncCommitteeMessagesServiceProcessMessageCall { +func (c *MockSyncCommitteeMessagesServiceProcessMessageCall) Do(f func(context.Context, *uint64, *services.SyncCommitteeMessageForGossip) error) *MockSyncCommitteeMessagesServiceProcessMessageCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *MockSyncCommitteeMessagesServiceProcessMessageCall) DoAndReturn(f func(context.Context, *uint64, *cltypes.SyncCommitteeMessageWithGossipData) error) *MockSyncCommitteeMessagesServiceProcessMessageCall { +func (c *MockSyncCommitteeMessagesServiceProcessMessageCall) DoAndReturn(f func(context.Context, *uint64, *services.SyncCommitteeMessageForGossip) error) *MockSyncCommitteeMessagesServiceProcessMessageCall { c.Call = c.Call.DoAndReturn(f) return c } diff --git a/cl/phase1/network/services/mock_services/sync_contribution_service_mock.go b/cl/phase1/network/services/mock_services/sync_contribution_service_mock.go index cdc90e8b0c7..a33930a9bb0 100644 --- a/cl/phase1/network/services/mock_services/sync_contribution_service_mock.go +++ b/cl/phase1/network/services/mock_services/sync_contribution_service_mock.go @@ -13,7 +13,7 @@ import ( context "context" reflect "reflect" - cltypes "github.com/erigontech/erigon/cl/cltypes" + services "github.com/erigontech/erigon/cl/phase1/network/services" gomock "go.uber.org/mock/gomock" ) @@ -21,7 +21,6 @@ import ( type MockSyncContributionService struct { ctrl *gomock.Controller recorder *MockSyncContributionServiceMockRecorder - isgomock struct{} } // MockSyncContributionServiceMockRecorder is the mock recorder for MockSyncContributionService. @@ -42,17 +41,17 @@ func (m *MockSyncContributionService) EXPECT() *MockSyncContributionServiceMockR } // ProcessMessage mocks base method. -func (m *MockSyncContributionService) ProcessMessage(ctx context.Context, subnet *uint64, msg *cltypes.SignedContributionAndProofWithGossipData) error { +func (m *MockSyncContributionService) ProcessMessage(arg0 context.Context, arg1 *uint64, arg2 *services.SignedContributionAndProofForGossip) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ProcessMessage", ctx, subnet, msg) + ret := m.ctrl.Call(m, "ProcessMessage", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } // ProcessMessage indicates an expected call of ProcessMessage. -func (mr *MockSyncContributionServiceMockRecorder) ProcessMessage(ctx, subnet, msg any) *MockSyncContributionServiceProcessMessageCall { +func (mr *MockSyncContributionServiceMockRecorder) ProcessMessage(arg0, arg1, arg2 any) *MockSyncContributionServiceProcessMessageCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessMessage", reflect.TypeOf((*MockSyncContributionService)(nil).ProcessMessage), ctx, subnet, msg) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessMessage", reflect.TypeOf((*MockSyncContributionService)(nil).ProcessMessage), arg0, arg1, arg2) return &MockSyncContributionServiceProcessMessageCall{Call: call} } @@ -68,13 +67,13 @@ func (c *MockSyncContributionServiceProcessMessageCall) Return(arg0 error) *Mock } // Do rewrite *gomock.Call.Do -func (c *MockSyncContributionServiceProcessMessageCall) Do(f func(context.Context, *uint64, *cltypes.SignedContributionAndProofWithGossipData) error) *MockSyncContributionServiceProcessMessageCall { +func (c *MockSyncContributionServiceProcessMessageCall) Do(f func(context.Context, *uint64, *services.SignedContributionAndProofForGossip) error) *MockSyncContributionServiceProcessMessageCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *MockSyncContributionServiceProcessMessageCall) DoAndReturn(f func(context.Context, *uint64, *cltypes.SignedContributionAndProofWithGossipData) error) *MockSyncContributionServiceProcessMessageCall { +func (c *MockSyncContributionServiceProcessMessageCall) DoAndReturn(f func(context.Context, *uint64, *services.SignedContributionAndProofForGossip) error) *MockSyncContributionServiceProcessMessageCall { c.Call = c.Call.DoAndReturn(f) return c } diff --git a/cl/phase1/network/services/mock_services/voluntary_exit_service_mock.go b/cl/phase1/network/services/mock_services/voluntary_exit_service_mock.go index b2e94717f9a..5d8bc87194e 100644 --- a/cl/phase1/network/services/mock_services/voluntary_exit_service_mock.go +++ b/cl/phase1/network/services/mock_services/voluntary_exit_service_mock.go @@ -13,7 +13,7 @@ import ( context "context" reflect "reflect" - cltypes "github.com/erigontech/erigon/cl/cltypes" + services "github.com/erigontech/erigon/cl/phase1/network/services" gomock "go.uber.org/mock/gomock" ) @@ -21,7 +21,6 @@ import ( type MockVoluntaryExitService struct { ctrl *gomock.Controller recorder *MockVoluntaryExitServiceMockRecorder - isgomock struct{} } // MockVoluntaryExitServiceMockRecorder is the mock recorder for MockVoluntaryExitService. @@ -42,17 +41,17 @@ func (m *MockVoluntaryExitService) EXPECT() *MockVoluntaryExitServiceMockRecorde } // ProcessMessage mocks base method. -func (m *MockVoluntaryExitService) ProcessMessage(ctx context.Context, subnet *uint64, msg *cltypes.SignedVoluntaryExitWithGossipData) error { +func (m *MockVoluntaryExitService) ProcessMessage(arg0 context.Context, arg1 *uint64, arg2 *services.SignedVoluntaryExitForGossip) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ProcessMessage", ctx, subnet, msg) + ret := m.ctrl.Call(m, "ProcessMessage", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } // ProcessMessage indicates an expected call of ProcessMessage. -func (mr *MockVoluntaryExitServiceMockRecorder) ProcessMessage(ctx, subnet, msg any) *MockVoluntaryExitServiceProcessMessageCall { +func (mr *MockVoluntaryExitServiceMockRecorder) ProcessMessage(arg0, arg1, arg2 any) *MockVoluntaryExitServiceProcessMessageCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessMessage", reflect.TypeOf((*MockVoluntaryExitService)(nil).ProcessMessage), ctx, subnet, msg) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessMessage", reflect.TypeOf((*MockVoluntaryExitService)(nil).ProcessMessage), arg0, arg1, arg2) return &MockVoluntaryExitServiceProcessMessageCall{Call: call} } @@ -68,13 +67,13 @@ func (c *MockVoluntaryExitServiceProcessMessageCall) Return(arg0 error) *MockVol } // Do rewrite *gomock.Call.Do -func (c *MockVoluntaryExitServiceProcessMessageCall) Do(f func(context.Context, *uint64, *cltypes.SignedVoluntaryExitWithGossipData) error) *MockVoluntaryExitServiceProcessMessageCall { +func (c *MockVoluntaryExitServiceProcessMessageCall) Do(f func(context.Context, *uint64, *services.SignedVoluntaryExitForGossip) error) *MockVoluntaryExitServiceProcessMessageCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *MockVoluntaryExitServiceProcessMessageCall) DoAndReturn(f func(context.Context, *uint64, *cltypes.SignedVoluntaryExitWithGossipData) error) *MockVoluntaryExitServiceProcessMessageCall { +func (c *MockVoluntaryExitServiceProcessMessageCall) DoAndReturn(f func(context.Context, *uint64, *services.SignedVoluntaryExitForGossip) error) *MockVoluntaryExitServiceProcessMessageCall { c.Call = c.Call.DoAndReturn(f) return c } diff --git a/cl/phase1/network/services/sync_committee_messages_service.go b/cl/phase1/network/services/sync_committee_messages_service.go index 83c7c224c57..3ab1c645148 100644 --- a/cl/phase1/network/services/sync_committee_messages_service.go +++ b/cl/phase1/network/services/sync_committee_messages_service.go @@ -22,6 +22,7 @@ import ( "slices" "sync" + sentinel "github.com/erigontech/erigon-lib/gointerfaces/sentinelproto" "github.com/erigontech/erigon/cl/beacon/synced_data" "github.com/erigontech/erigon/cl/clparams" "github.com/erigontech/erigon/cl/cltypes" @@ -50,6 +51,12 @@ type syncCommitteeMessagesService struct { mu sync.Mutex } +type SyncCommitteeMessageForGossip struct { + SyncCommitteeMessage *cltypes.SyncCommitteeMessage + Receiver *sentinel.Peer + ImmediateVerification bool +} + // NewSyncCommitteeMessagesService creates a new sync committee messages service func NewSyncCommitteeMessagesService( beaconChainCfg *clparams.BeaconChainConfig, @@ -70,7 +77,7 @@ func NewSyncCommitteeMessagesService( } // ProcessMessage processes a sync committee message -func (s *syncCommitteeMessagesService) ProcessMessage(ctx context.Context, subnet *uint64, msg *cltypes.SyncCommitteeMessageWithGossipData) error { +func (s *syncCommitteeMessagesService) ProcessMessage(ctx context.Context, subnet *uint64, msg *SyncCommitteeMessageForGossip) error { s.mu.Lock() defer s.mu.Unlock() @@ -105,10 +112,10 @@ func (s *syncCommitteeMessagesService) ProcessMessage(ctx context.Context, subne return err } aggregateVerificationData := &AggregateVerificationData{ - Signatures: [][]byte{signature}, - SignRoots: [][]byte{signingRoot}, - Pks: [][]byte{pubKey}, - GossipData: msg.GossipData, + Signatures: [][]byte{signature}, + SignRoots: [][]byte{signingRoot}, + Pks: [][]byte{pubKey}, + SendingPeer: msg.Receiver, F: func() { s.seenSyncCommitteeMessages.Store(seenSyncCommitteeMessageIdentifier, struct{}{}) s.cleanupOldSyncCommitteeMessages() // cleanup old messages diff --git a/cl/phase1/network/services/sync_committee_messages_service_test.go b/cl/phase1/network/services/sync_committee_messages_service_test.go index ca2f0a0fb84..436f7063c79 100644 --- a/cl/phase1/network/services/sync_committee_messages_service_test.go +++ b/cl/phase1/network/services/sync_committee_messages_service_test.go @@ -43,16 +43,15 @@ func setupSyncCommitteesServiceTest(t *testing.T, ctrl *gomock.Controller) (Sync return s, syncedDataManager, ethClock } -func getObjectsForSyncCommitteesServiceTest(t *testing.T, ctrl *gomock.Controller) (*state.CachingBeaconState, *cltypes.SyncCommitteeMessageWithGossipData) { +func getObjectsForSyncCommitteesServiceTest(t *testing.T, ctrl *gomock.Controller) (*state.CachingBeaconState, *SyncCommitteeMessageForGossip) { _, _, state := tests.GetBellatrixRandom() br, _ := state.BlockRoot() - msg := &cltypes.SyncCommitteeMessageWithGossipData{ + msg := &SyncCommitteeMessageForGossip{ SyncCommitteeMessage: &cltypes.SyncCommitteeMessage{ Slot: state.Slot(), BeaconBlockRoot: br, ValidatorIndex: 0, }, - GossipData: nil, ImmediateVerification: true, } return state, msg diff --git a/cl/phase1/network/services/sync_contribution_service.go b/cl/phase1/network/services/sync_contribution_service.go index 27da98964e1..542a533dac2 100644 --- a/cl/phase1/network/services/sync_contribution_service.go +++ b/cl/phase1/network/services/sync_contribution_service.go @@ -28,6 +28,7 @@ import ( "github.com/erigontech/erigon-lib/common" libcommon "github.com/erigontech/erigon-lib/common" + sentinel "github.com/erigontech/erigon-lib/gointerfaces/sentinelproto" "github.com/erigontech/erigon/cl/beacon/beaconevents" "github.com/erigontech/erigon/cl/beacon/synced_data" "github.com/erigontech/erigon/cl/clparams" @@ -59,6 +60,13 @@ type syncContributionService struct { mu sync.Mutex } +// SignedContributionAndProofWithGossipData type represents SignedContributionAndProof with the gossip data where it's coming from. +type SignedContributionAndProofForGossip struct { + SignedContributionAndProof *cltypes.SignedContributionAndProof + Receiver *sentinel.Peer + ImmediateVerification bool +} + // NewSyncContributionService creates a new sync contribution service func NewSyncContributionService( syncedDataManager *synced_data.SyncedDataManager, @@ -82,7 +90,7 @@ func NewSyncContributionService( } // ProcessMessage processes a sync contribution message -func (s *syncContributionService) ProcessMessage(ctx context.Context, subnet *uint64, signedContribution *cltypes.SignedContributionAndProofWithGossipData) error { +func (s *syncContributionService) ProcessMessage(ctx context.Context, subnet *uint64, signedContribution *SignedContributionAndProofForGossip) error { s.mu.Lock() defer s.mu.Unlock() @@ -138,7 +146,7 @@ func (s *syncContributionService) ProcessMessage(ctx context.Context, subnet *ui return err } - aggregateVerificationData.GossipData = signedContribution.GossipData + aggregateVerificationData.SendingPeer = signedContribution.Receiver // further processing will be done after async signature verification aggregateVerificationData.F = func() { @@ -174,7 +182,7 @@ func (s *syncContributionService) ProcessMessage(ctx context.Context, subnet *ui func (s *syncContributionService) GetSignaturesOnContributionSignatures( headState *state.CachingBeaconState, contributionAndProof *cltypes.ContributionAndProof, - signedContribution *cltypes.SignedContributionAndProofWithGossipData, + signedContribution *SignedContributionAndProofForGossip, subcommiteePubsKeys []libcommon.Bytes48) (*AggregateVerificationData, error) { // [REJECT] The contribution_and_proof.selection_proof is a valid signature of the SyncAggregatorSelectionData derived from the contribution by the validator with index contribution_and_proof.aggregator_index. diff --git a/cl/phase1/network/services/sync_contribution_service_test.go b/cl/phase1/network/services/sync_contribution_service_test.go index 54835758d80..23194bbceb3 100644 --- a/cl/phase1/network/services/sync_contribution_service_test.go +++ b/cl/phase1/network/services/sync_contribution_service_test.go @@ -45,12 +45,12 @@ func setupSyncContributionServiceTest(t *testing.T, ctrl *gomock.Controller) (Sy return s, syncedDataManager, ethClock } -func getObjectsForSyncContributionServiceTest(t *testing.T, ctrl *gomock.Controller) (*state.CachingBeaconState, *cltypes.SignedContributionAndProofWithGossipData) { +func getObjectsForSyncContributionServiceTest(t *testing.T, ctrl *gomock.Controller) (*state.CachingBeaconState, *SignedContributionAndProofForGossip) { _, _, state := tests.GetBellatrixRandom() br, _ := state.BlockRoot() aggBits := make([]byte, 16) aggBits[0] = 1 - msg := &cltypes.SignedContributionAndProofWithGossipData{ + msg := &SignedContributionAndProofForGossip{ SignedContributionAndProof: &cltypes.SignedContributionAndProof{ Message: &cltypes.ContributionAndProof{ AggregatorIndex: 0, @@ -62,7 +62,6 @@ func getObjectsForSyncContributionServiceTest(t *testing.T, ctrl *gomock.Control }, }, }, - GossipData: nil, ImmediateVerification: true, } diff --git a/cl/phase1/network/services/voluntary_exit_service.go b/cl/phase1/network/services/voluntary_exit_service.go index da23aa8b313..bef603d8bdd 100644 --- a/cl/phase1/network/services/voluntary_exit_service.go +++ b/cl/phase1/network/services/voluntary_exit_service.go @@ -21,6 +21,7 @@ import ( "fmt" "github.com/erigontech/erigon-lib/common" + sentinel "github.com/erigontech/erigon-lib/gointerfaces/sentinelproto" "github.com/erigontech/erigon/cl/beacon/beaconevents" "github.com/erigontech/erigon/cl/beacon/synced_data" "github.com/erigontech/erigon/cl/clparams" @@ -42,6 +43,13 @@ type voluntaryExitService struct { batchSignatureVerifier *BatchSignatureVerifier } +// SignedVoluntaryExitForGossip type represents SignedVoluntaryExit with the gossip data where it's coming from. +type SignedVoluntaryExitForGossip struct { + SignedVoluntaryExit *cltypes.SignedVoluntaryExit + Receiver *sentinel.Peer + ImmediateVerification bool +} + func NewVoluntaryExitService( operationsPool pool.OperationsPool, emitters *beaconevents.EventEmitter, @@ -60,7 +68,7 @@ func NewVoluntaryExitService( } } -func (s *voluntaryExitService) ProcessMessage(ctx context.Context, subnet *uint64, msg *cltypes.SignedVoluntaryExitWithGossipData) error { +func (s *voluntaryExitService) ProcessMessage(ctx context.Context, subnet *uint64, msg *SignedVoluntaryExitForGossip) error { // ref: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#voluntary_exit voluntaryExit := msg.SignedVoluntaryExit.VoluntaryExit @@ -133,10 +141,10 @@ func (s *voluntaryExitService) ProcessMessage(ctx context.Context, subnet *uint6 } aggregateVerificationData := &AggregateVerificationData{ - Signatures: [][]byte{msg.SignedVoluntaryExit.Signature[:]}, - SignRoots: [][]byte{signingRoot[:]}, - Pks: [][]byte{pk[:]}, - GossipData: msg.GossipData, + Signatures: [][]byte{msg.SignedVoluntaryExit.Signature[:]}, + SignRoots: [][]byte{signingRoot[:]}, + Pks: [][]byte{pk[:]}, + SendingPeer: msg.Receiver, F: func() { s.operationsPool.VoluntaryExitsPool.Insert(voluntaryExit.ValidatorIndex, msg.SignedVoluntaryExit) s.emitters.Operation().SendVoluntaryExit(msg.SignedVoluntaryExit) diff --git a/cl/phase1/network/services/voluntary_exit_service_test.go b/cl/phase1/network/services/voluntary_exit_service_test.go index bbb5a3a14a2..fba930fe2fe 100644 --- a/cl/phase1/network/services/voluntary_exit_service_test.go +++ b/cl/phase1/network/services/voluntary_exit_service_test.go @@ -80,7 +80,7 @@ func (t *voluntaryExitTestSuite) TearDownTest() { func (t *voluntaryExitTestSuite) TestProcessMessage() { curEpoch := uint64(100) mockValidatorIndex := uint64(10) - mockMsg := &cltypes.SignedVoluntaryExitWithGossipData{ + mockMsg := &SignedVoluntaryExitForGossip{ SignedVoluntaryExit: &cltypes.SignedVoluntaryExit{ VoluntaryExit: &cltypes.VoluntaryExit{ Epoch: 1, @@ -88,10 +88,9 @@ func (t *voluntaryExitTestSuite) TestProcessMessage() { }, Signature: [96]byte{}, }, - GossipData: nil, ImmediateVerification: true, } - mockMsg2 := &cltypes.SignedVoluntaryExitWithGossipData{ + mockMsg2 := &SignedVoluntaryExitForGossip{ SignedVoluntaryExit: &cltypes.SignedVoluntaryExit{ VoluntaryExit: &cltypes.VoluntaryExit{ Epoch: 1, @@ -99,7 +98,6 @@ func (t *voluntaryExitTestSuite) TestProcessMessage() { }, Signature: [96]byte{}, }, - GossipData: nil, ImmediateVerification: true, } @@ -108,7 +106,7 @@ func (t *voluntaryExitTestSuite) TestProcessMessage() { tests := []struct { name string mock func() - msg *cltypes.SignedVoluntaryExitWithGossipData + msg *SignedVoluntaryExitForGossip wantErr bool err error }{