diff --git a/go.mod b/go.mod index e996dad9ce..7956a35752 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.21.12 require ( github.com/VictoriaMetrics/fastcache v1.12.1 - github.com/ava-labs/avalanchego v1.11.10 + github.com/ava-labs/avalanchego v1.11.11-0.20240729205337-a0f7e422bb84 github.com/cespare/cp v0.1.0 github.com/crate-crypto/go-ipa v0.0.0-20231025140028-3c0104f4b233 github.com/davecgh/go-spew v1.1.1 diff --git a/go.sum b/go.sum index 9002e11eb2..7a6f111e4f 100644 --- a/go.sum +++ b/go.sum @@ -58,6 +58,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/ava-labs/avalanchego v1.11.10 h1:QujciF5OEp5FwAoe/RciFF/i47rxU5rkEr6fVuUBS1Q= github.com/ava-labs/avalanchego v1.11.10/go.mod h1:POgZPryqe80OeHCDNrXrPOKoFre736iFuMgmUBeKaLc= +github.com/ava-labs/avalanchego v1.11.11-0.20240729205337-a0f7e422bb84 h1:AmPZLlnVREbJ/viK/hDTIVn1bqX8QTB2CFtrBxHwnsw= +github.com/ava-labs/avalanchego v1.11.11-0.20240729205337-a0f7e422bb84/go.mod h1:POgZPryqe80OeHCDNrXrPOKoFre736iFuMgmUBeKaLc= github.com/ava-labs/coreth v0.13.7 h1:k8T9u/ROifl8f7oXjHRc1KvSISRl9txvy7gGVmHEz6g= github.com/ava-labs/coreth v0.13.7/go.mod h1:tXDujonxXFOF6oK5HS2EmgtSXJK3Gy6RpZxb5WzR9rM= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index 3654055f93..eaca77a71c 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -42,6 +42,7 @@ import ( "github.com/ava-labs/subnet-evm/sync/client/stats" "github.com/ava-labs/subnet-evm/trie" "github.com/ava-labs/subnet-evm/warp" + "github.com/ava-labs/subnet-evm/warp/handlers" warpValidators "github.com/ava-labs/subnet-evm/warp/validators" // Force-load tracer engine to trigger registration @@ -513,7 +514,7 @@ func (vm *VM) Initialize( go vm.ctx.Log.RecoverAndPanic(vm.startContinuousProfiler) - vm.initializeStateSyncServer() + vm.initializeHandlers() return vm.initializeStateSyncClient(lastAcceptedHeight) } @@ -616,13 +617,17 @@ func (vm *VM) initializeStateSyncClient(lastAcceptedHeight uint64) error { return nil } -// initializeStateSyncServer should be called after [vm.chain] is initialized. -func (vm *VM) initializeStateSyncServer() { +// initializeHandlers should be called after [vm.chain] is initialized. +func (vm *VM) initializeHandlers() { vm.StateSyncServer = NewStateSyncServer(&stateSyncServerConfig{ Chain: vm.blockChain, SyncableInterval: vm.config.StateSyncCommitInterval, }) + // Add p2p warp message warpHandler + warpHandler := handlers.NewSignatureRequestHandlerP2P(vm.warpBackend, vm.networkCodec) + vm.Network.AddHandler(p2p.SignatureRequestHandlerID, warpHandler) + vm.setAppRequestHandlers() vm.setCrossChainAppRequestHandler() } diff --git a/warp/backend.go b/warp/backend.go index 3bb19c1a4f..7e7377ad57 100644 --- a/warp/backend.go +++ b/warp/backend.go @@ -43,6 +43,8 @@ type Backend interface { GetBlockSignature(blockID ids.ID) ([bls.SignatureLen]byte, error) // GetMessage retrieves the [unsignedMessage] from the warp backend database if available + // TODO: After E-Upgrade, the backend no longer needs to store the mapping from messageHash + // to unsignedMessage (and this method can be removed). GetMessage(messageHash ids.ID) (*avalancheWarp.UnsignedMessage, error) // Clear clears the entire db diff --git a/warp/handlers/signature_request.go b/warp/handlers/signature_request.go index c307d284d1..cab7914243 100644 --- a/warp/handlers/signature_request.go +++ b/warp/handlers/signature_request.go @@ -16,6 +16,7 @@ import ( ) // SignatureRequestHandler serves warp signature requests. It is a peer.RequestHandler for message.MessageSignatureRequest. +// TODO: After E-Upgrade, this handler can be removed and SignatureRequestHandlerP2P is sufficient. type SignatureRequestHandler struct { backend warp.Backend codec codec.Manager diff --git a/warp/handlers/signature_request_p2p.go b/warp/handlers/signature_request_p2p.go new file mode 100644 index 0000000000..ecc72305af --- /dev/null +++ b/warp/handlers/signature_request_p2p.go @@ -0,0 +1,160 @@ +// (c) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package handlers + +import ( + "context" + "fmt" + "time" + + "github.com/ava-labs/avalanchego/codec" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p" + "github.com/ava-labs/avalanchego/proto/pb/sdk" + "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/utils/crypto/bls" + avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp" + "github.com/ava-labs/avalanchego/vms/platformvm/warp/payload" + "github.com/ava-labs/subnet-evm/warp" + "google.golang.org/protobuf/proto" +) + +var _ p2p.Handler = (*SignatureRequestHandlerP2P)(nil) + +const ( + ErrFailedToParse = iota + ErrFailedToGetSig + ErrFailedToMarshal +) + +// SignatureRequestHandlerP2P serves warp signature requests using the p2p +// framework from avalanchego. It is a peer.RequestHandler for +// message.MessageSignatureRequest. +type SignatureRequestHandlerP2P struct { + backend warp.Backend + codec codec.Manager + stats *handlerStats +} + +func NewSignatureRequestHandlerP2P(backend warp.Backend, codec codec.Manager) *SignatureRequestHandlerP2P { + return &SignatureRequestHandlerP2P{ + backend: backend, + codec: codec, + stats: newStats(), + } +} + +func (s *SignatureRequestHandlerP2P) AppRequest( + ctx context.Context, + nodeID ids.NodeID, + deadline time.Time, + requestBytes []byte, +) ([]byte, *common.AppError) { + // Per ACP-118, the requestBytes are the serialized form of + // sdk.SignatureRequest. + req := new(sdk.SignatureRequest) + if err := proto.Unmarshal(requestBytes, req); err != nil { + return nil, &common.AppError{ + Code: ErrFailedToParse, + Message: "failed to unmarshal request: " + err.Error(), + } + } + + unsignedMessage, err := avalancheWarp.ParseUnsignedMessage(req.Message) + if err != nil { + return nil, &common.AppError{ + Code: ErrFailedToParse, + Message: "failed to parse unsigned message: " + err.Error(), + } + } + parsed, err := payload.Parse(unsignedMessage.Payload) + if err != nil { + return nil, &common.AppError{ + Code: ErrFailedToParse, + Message: "failed to parse payload: " + err.Error(), + } + } + + var sig [bls.SignatureLen]byte + switch p := parsed.(type) { + case *payload.AddressedCall: + // Note we pass the unsigned message ID to GetMessageSignature since + // that is what the backend expects. + // However, we verify the types and format of the payload to ensure + // the message conforms to the ACP-118 spec. + sig, err = s.GetMessageSignature(unsignedMessage.ID()) + if err != nil { + s.stats.IncMessageSignatureMiss() + } else { + s.stats.IncMessageSignatureHit() + } + case *payload.Hash: + sig, err = s.GetBlockSignature(p.Hash) + if err != nil { + s.stats.IncBlockSignatureMiss() + } else { + s.stats.IncBlockSignatureHit() + } + default: + return nil, &common.AppError{ + Code: ErrFailedToParse, + Message: fmt.Sprintf("unknown payload type: %T", p), + } + } + if err != nil { + return nil, &common.AppError{ + Code: ErrFailedToGetSig, + Message: "failed to get signature: " + err.Error(), + } + } + + // Per ACP-118, the responseBytes are the serialized form of + // sdk.SignatureResponse. + resp := &sdk.SignatureResponse{Signature: sig[:]} + respBytes, err := proto.Marshal(resp) + if err != nil { + return nil, &common.AppError{ + Code: ErrFailedToMarshal, + Message: "failed to marshal response: " + err.Error(), + } + } + return respBytes, nil +} + +func (s *SignatureRequestHandlerP2P) GetMessageSignature(messageID ids.ID) ([bls.SignatureLen]byte, error) { + startTime := time.Now() + s.stats.IncMessageSignatureRequest() + + // Always report signature request time + defer func() { + s.stats.UpdateMessageSignatureRequestTime(time.Since(startTime)) + }() + + return s.backend.GetMessageSignature(messageID) +} + +func (s *SignatureRequestHandlerP2P) GetBlockSignature(blockID ids.ID) ([bls.SignatureLen]byte, error) { + startTime := time.Now() + s.stats.IncBlockSignatureRequest() + + // Always report signature request time + defer func() { + s.stats.UpdateBlockSignatureRequestTime(time.Since(startTime)) + }() + + return s.backend.GetBlockSignature(blockID) +} + +func (s *SignatureRequestHandlerP2P) CrossChainAppRequest( + ctx context.Context, + chainID ids.ID, + deadline time.Time, + requestBytes []byte, +) ([]byte, error) { + return nil, nil +} + +func (s *SignatureRequestHandlerP2P) AppGossip( + ctx context.Context, nodeID ids.NodeID, gossipBytes []byte) { +} diff --git a/warp/handlers/signature_request_p2p_test.go b/warp/handlers/signature_request_p2p_test.go new file mode 100644 index 0000000000..1f8f9530cb --- /dev/null +++ b/warp/handlers/signature_request_p2p_test.go @@ -0,0 +1,232 @@ +// (c) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package handlers + +import ( + "context" + "testing" + "time" + + "github.com/ava-labs/avalanchego/database/memdb" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/proto/pb/sdk" + "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/utils/crypto/bls" + avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp" + "github.com/ava-labs/avalanchego/vms/platformvm/warp/payload" + "github.com/ava-labs/subnet-evm/plugin/evm/message" + "github.com/ava-labs/subnet-evm/utils" + "github.com/ava-labs/subnet-evm/warp" + "github.com/ava-labs/subnet-evm/warp/warptest" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" +) + +func TestMessageSignatureHandlerP2P(t *testing.T) { + database := memdb.New() + snowCtx := utils.TestSnowContext() + blsSecretKey, err := bls.NewSecretKey() + require.NoError(t, err) + warpSigner := avalancheWarp.NewSigner(blsSecretKey, snowCtx.NetworkID, snowCtx.ChainID) + + addressedPayload, err := payload.NewAddressedCall([]byte{1, 2, 3}, []byte{1, 2, 3}) + require.NoError(t, err) + offchainMessage, err := avalancheWarp.NewUnsignedMessage(snowCtx.NetworkID, snowCtx.ChainID, addressedPayload.Bytes()) + require.NoError(t, err) + + backend, err := warp.NewBackend(snowCtx.NetworkID, snowCtx.ChainID, warpSigner, warptest.EmptyBlockClient, database, 100, [][]byte{offchainMessage.Bytes()}) + require.NoError(t, err) + + offchainPayload, err := payload.NewAddressedCall([]byte{0, 0, 0}, []byte("test")) + require.NoError(t, err) + msg, err := avalancheWarp.NewUnsignedMessage(snowCtx.NetworkID, snowCtx.ChainID, offchainPayload.Bytes()) + require.NoError(t, err) + messageID := msg.ID() + require.NoError(t, backend.AddMessage(msg)) + signature, err := backend.GetMessageSignature(messageID) + require.NoError(t, err) + offchainSignature, err := backend.GetMessageSignature(offchainMessage.ID()) + require.NoError(t, err) + + unknownPayload, err := payload.NewAddressedCall([]byte{0, 0, 0}, []byte("unknown message")) + require.NoError(t, err) + unknownMessage, err := avalancheWarp.NewUnsignedMessage(snowCtx.NetworkID, snowCtx.ChainID, unknownPayload.Bytes()) + require.NoError(t, err) + + tests := map[string]struct { + setup func() (request sdk.SignatureRequest, expectedResponse []byte) + verifyStats func(t *testing.T, stats *handlerStats) + err error + }{ + "known message": { + setup: func() (request sdk.SignatureRequest, expectedResponse []byte) { + return sdk.SignatureRequest{Message: msg.Bytes()}, signature[:] + }, + verifyStats: func(t *testing.T, stats *handlerStats) { + require.EqualValues(t, 1, stats.messageSignatureRequest.Snapshot().Count()) + require.EqualValues(t, 1, stats.messageSignatureHit.Snapshot().Count()) + require.EqualValues(t, 0, stats.messageSignatureMiss.Snapshot().Count()) + require.EqualValues(t, 0, stats.blockSignatureRequest.Snapshot().Count()) + require.EqualValues(t, 0, stats.blockSignatureHit.Snapshot().Count()) + require.EqualValues(t, 0, stats.blockSignatureMiss.Snapshot().Count()) + }, + }, + "offchain message": { + setup: func() (request sdk.SignatureRequest, expectedResponse []byte) { + return sdk.SignatureRequest{Message: offchainMessage.Bytes()}, offchainSignature[:] + }, + verifyStats: func(t *testing.T, stats *handlerStats) { + require.EqualValues(t, 1, stats.messageSignatureRequest.Snapshot().Count()) + require.EqualValues(t, 1, stats.messageSignatureHit.Snapshot().Count()) + require.EqualValues(t, 0, stats.messageSignatureMiss.Snapshot().Count()) + require.EqualValues(t, 0, stats.blockSignatureRequest.Snapshot().Count()) + require.EqualValues(t, 0, stats.blockSignatureHit.Snapshot().Count()) + require.EqualValues(t, 0, stats.blockSignatureMiss.Snapshot().Count()) + }, + }, + "unknown message": { + setup: func() (request sdk.SignatureRequest, expectedResponse []byte) { + return sdk.SignatureRequest{Message: unknownMessage.Bytes()}, nil + }, + verifyStats: func(t *testing.T, stats *handlerStats) { + require.EqualValues(t, 1, stats.messageSignatureRequest.Snapshot().Count()) + require.EqualValues(t, 0, stats.messageSignatureHit.Snapshot().Count()) + require.EqualValues(t, 1, stats.messageSignatureMiss.Snapshot().Count()) + require.EqualValues(t, 0, stats.blockSignatureRequest.Snapshot().Count()) + require.EqualValues(t, 0, stats.blockSignatureHit.Snapshot().Count()) + require.EqualValues(t, 0, stats.blockSignatureMiss.Snapshot().Count()) + }, + err: &common.AppError{Code: ErrFailedToGetSig}, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + handler := NewSignatureRequestHandlerP2P(backend, message.Codec) + handler.stats.Clear() + + request, expectedResponse := test.setup() + requestBytes, err := proto.Marshal(&request) + require.NoError(t, err) + responseBytes, appErr := handler.AppRequest(context.Background(), ids.GenerateTestNodeID(), time.Time{}, requestBytes) + if test.err != nil { + require.ErrorIs(t, appErr, test.err) + } else { + require.Nil(t, appErr) + } + + test.verifyStats(t, handler.stats) + + // If the expected response is empty, assert that the handler returns an empty response and return early. + if len(expectedResponse) == 0 { + require.Len(t, responseBytes, 0, "expected response to be empty") + return + } + var response sdk.SignatureResponse + err = proto.Unmarshal(responseBytes, &response) + require.NoError(t, err, "error unmarshalling SignatureResponse") + + require.Equal(t, expectedResponse, response.Signature) + }) + } +} + +func TestBlockSignatureHandlerP2P(t *testing.T) { + database := memdb.New() + snowCtx := utils.TestSnowContext() + blsSecretKey, err := bls.NewSecretKey() + require.NoError(t, err) + + warpSigner := avalancheWarp.NewSigner(blsSecretKey, snowCtx.NetworkID, snowCtx.ChainID) + blkID := ids.GenerateTestID() + blockClient := warptest.MakeBlockClient(blkID) + backend, err := warp.NewBackend( + snowCtx.NetworkID, + snowCtx.ChainID, + warpSigner, + blockClient, + database, + 100, + nil, + ) + require.NoError(t, err) + + signature, err := backend.GetBlockSignature(blkID) + require.NoError(t, err) + unknownBlockID := ids.GenerateTestID() + + toMessageBytes := func(id ids.ID) []byte { + idPayload, err := payload.NewHash(id) + require.NoError(t, err) + + msg, err := avalancheWarp.NewUnsignedMessage(snowCtx.NetworkID, snowCtx.ChainID, idPayload.Bytes()) + require.NoError(t, err) + + return msg.Bytes() + } + + tests := map[string]struct { + setup func() (request sdk.SignatureRequest, expectedResponse []byte) + verifyStats func(t *testing.T, stats *handlerStats) + err error + }{ + "known block": { + setup: func() (request sdk.SignatureRequest, expectedResponse []byte) { + return sdk.SignatureRequest{Message: toMessageBytes(blkID)}, signature[:] + }, + verifyStats: func(t *testing.T, stats *handlerStats) { + require.EqualValues(t, 0, stats.messageSignatureRequest.Snapshot().Count()) + require.EqualValues(t, 0, stats.messageSignatureHit.Snapshot().Count()) + require.EqualValues(t, 0, stats.messageSignatureMiss.Snapshot().Count()) + require.EqualValues(t, 1, stats.blockSignatureRequest.Snapshot().Count()) + require.EqualValues(t, 1, stats.blockSignatureHit.Snapshot().Count()) + require.EqualValues(t, 0, stats.blockSignatureMiss.Snapshot().Count()) + }, + }, + "unknown block": { + setup: func() (request sdk.SignatureRequest, expectedResponse []byte) { + return sdk.SignatureRequest{Message: toMessageBytes(unknownBlockID)}, nil + }, + verifyStats: func(t *testing.T, stats *handlerStats) { + require.EqualValues(t, 0, stats.messageSignatureRequest.Snapshot().Count()) + require.EqualValues(t, 0, stats.messageSignatureHit.Snapshot().Count()) + require.EqualValues(t, 0, stats.messageSignatureMiss.Snapshot().Count()) + require.EqualValues(t, 1, stats.blockSignatureRequest.Snapshot().Count()) + require.EqualValues(t, 0, stats.blockSignatureHit.Snapshot().Count()) + require.EqualValues(t, 1, stats.blockSignatureMiss.Snapshot().Count()) + }, + err: &common.AppError{Code: ErrFailedToGetSig}, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + handler := NewSignatureRequestHandlerP2P(backend, message.Codec) + handler.stats.Clear() + + request, expectedResponse := test.setup() + requestBytes, err := proto.Marshal(&request) + require.NoError(t, err) + responseBytes, appErr := handler.AppRequest(context.Background(), ids.GenerateTestNodeID(), time.Time{}, requestBytes) + if test.err != nil { + require.ErrorIs(t, appErr, test.err) + } else { + require.Nil(t, appErr) + } + + test.verifyStats(t, handler.stats) + + // If the expected response is empty, assert that the handler returns an empty response and return early. + if len(expectedResponse) == 0 { + require.Len(t, responseBytes, 0, "expected response to be empty") + return + } + var response sdk.SignatureResponse + err = proto.Unmarshal(responseBytes, &response) + require.NoError(t, err, "error unmarshalling SignatureResponse") + + require.Equal(t, expectedResponse, response.Signature) + }) + } +}