From 44da14fa7045daac9f892c04113558fdfb7cdb08 Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Fri, 2 Aug 2024 10:16:33 -0700 Subject: [PATCH] Supports ACP-118 messages (code sync from subnet-evm) --- go.mod | 4 +- go.sum | 8 +- plugin/evm/vm.go | 12 +- warp/backend.go | 2 + warp/handlers/signature_request.go | 1 + warp/handlers/signature_request_p2p.go | 160 ++++++++++++++ warp/handlers/signature_request_p2p_test.go | 232 ++++++++++++++++++++ 7 files changed, 409 insertions(+), 10 deletions(-) create mode 100644 warp/handlers/signature_request_p2p.go create mode 100644 warp/handlers/signature_request_p2p_test.go diff --git a/go.mod b/go.mod index 2487d92562..8adef45ff1 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( github.com/mattn/go-colorable v0.1.13 github.com/mattn/go-isatty v0.0.17 github.com/olekukonko/tablewriter v0.0.5 - github.com/prometheus/client_golang v1.14.0 + github.com/prometheus/client_golang v1.16.0 github.com/prometheus/client_model v0.3.0 github.com/shirou/gopsutil v3.21.11+incompatible github.com/spf13/cast v1.5.0 @@ -119,7 +119,7 @@ require ( go.opentelemetry.io/otel/sdk v1.22.0 // indirect go.opentelemetry.io/otel/trace v1.22.0 // indirect go.opentelemetry.io/proto/otlp v1.0.0 // indirect - go.uber.org/multierr v1.10.0 // indirect + go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/net v0.23.0 // indirect golang.org/x/term v0.18.0 // indirect diff --git a/go.sum b/go.sum index d7ba29072d..a2b2586ea3 100644 --- a/go.sum +++ b/go.sum @@ -453,8 +453,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.14.0 h1:nJdhIvne2eSX/XRAFV9PcvFFRbrjbcTUj0VP62TMhnw= -github.com/prometheus/client_golang v1.14.0/go.mod h1:8vpkKitgIVNcqrRBWh1C4TIUQgYNtG/XQE4E/Zae36Y= +github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8= +github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4= github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w= @@ -584,8 +584,8 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= -go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= -go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index cca7605c99..df8474c395 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -47,6 +47,7 @@ import ( "github.com/ava-labs/coreth/trie" "github.com/ava-labs/coreth/utils" "github.com/ava-labs/coreth/warp" + "github.com/ava-labs/coreth/warp/handlers" warpValidators "github.com/ava-labs/coreth/warp/validators" // Force-load tracer engine to trigger registration @@ -645,7 +646,6 @@ func (vm *VM) Initialize( go vm.ctx.Log.RecoverAndPanic(vm.startContinuousProfiler) - // The Codec explicitly registers the types it requires from the secp256k1fx // so [vm.baseCodec] is a dummy codec use to fulfill the secp256k1fx VM // interface. The fx will register all of its types, which can be safely // ignored by the VM's codec. @@ -655,7 +655,7 @@ func (vm *VM) Initialize( return err } - vm.initializeStateSyncServer() + vm.initializeHandlers() return vm.initializeStateSyncClient(lastAcceptedHeight) } @@ -793,14 +793,18 @@ func (vm *VM) initializeStateSyncClient(lastAcceptedHeight uint64) error { return nil } -// initializeStateSyncServer should be called after [vm.chain] is initialized. -func (vm *VM) initializeStateSyncServer() { +// initializeHandlers should be called after [vm.chain] is initialized. +func (vm *VM) initializeHandlers() { vm.StateSyncServer = NewStateSyncServer(&stateSyncServerConfig{ Chain: vm.blockChain, AtomicTrie: vm.atomicTrie, SyncableInterval: vm.config.StateSyncCommitInterval, }) + // Add p2p warp message warpHandler + warpHandler := handlers.NewSignatureRequestHandlerP2P(vm.warpBackend, vm.networkCodec) + vm.Network.AddHandler(p2p.SignatureRequestHandlerID, warpHandler) + vm.setAppRequestHandlers() vm.setCrossChainAppRequestHandler() } diff --git a/warp/backend.go b/warp/backend.go index 3bb19c1a4f..7e7377ad57 100644 --- a/warp/backend.go +++ b/warp/backend.go @@ -43,6 +43,8 @@ type Backend interface { GetBlockSignature(blockID ids.ID) ([bls.SignatureLen]byte, error) // GetMessage retrieves the [unsignedMessage] from the warp backend database if available + // TODO: After E-Upgrade, the backend no longer needs to store the mapping from messageHash + // to unsignedMessage (and this method can be removed). GetMessage(messageHash ids.ID) (*avalancheWarp.UnsignedMessage, error) // Clear clears the entire db diff --git a/warp/handlers/signature_request.go b/warp/handlers/signature_request.go index 7b03fc5296..8a8b4e4e1e 100644 --- a/warp/handlers/signature_request.go +++ b/warp/handlers/signature_request.go @@ -16,6 +16,7 @@ import ( ) // SignatureRequestHandler serves warp signature requests. It is a peer.RequestHandler for message.MessageSignatureRequest. +// TODO: After E-Upgrade, this handler can be removed and SignatureRequestHandlerP2P is sufficient. type SignatureRequestHandler struct { backend warp.Backend codec codec.Manager diff --git a/warp/handlers/signature_request_p2p.go b/warp/handlers/signature_request_p2p.go new file mode 100644 index 0000000000..a299a06079 --- /dev/null +++ b/warp/handlers/signature_request_p2p.go @@ -0,0 +1,160 @@ +// (c) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package handlers + +import ( + "context" + "fmt" + "time" + + "github.com/ava-labs/avalanchego/codec" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p" + "github.com/ava-labs/avalanchego/proto/pb/sdk" + "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/utils/crypto/bls" + avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp" + "github.com/ava-labs/avalanchego/vms/platformvm/warp/payload" + "github.com/ava-labs/coreth/warp" + "google.golang.org/protobuf/proto" +) + +var _ p2p.Handler = (*SignatureRequestHandlerP2P)(nil) + +const ( + ErrFailedToParse = iota + ErrFailedToGetSig + ErrFailedToMarshal +) + +// SignatureRequestHandlerP2P serves warp signature requests using the p2p +// framework from avalanchego. It is a peer.RequestHandler for +// message.MessageSignatureRequest. +type SignatureRequestHandlerP2P struct { + backend warp.Backend + codec codec.Manager + stats *handlerStats +} + +func NewSignatureRequestHandlerP2P(backend warp.Backend, codec codec.Manager) *SignatureRequestHandlerP2P { + return &SignatureRequestHandlerP2P{ + backend: backend, + codec: codec, + stats: newStats(), + } +} + +func (s *SignatureRequestHandlerP2P) AppRequest( + ctx context.Context, + nodeID ids.NodeID, + deadline time.Time, + requestBytes []byte, +) ([]byte, *common.AppError) { + // Per ACP-118, the requestBytes are the serialized form of + // sdk.SignatureRequest. + req := new(sdk.SignatureRequest) + if err := proto.Unmarshal(requestBytes, req); err != nil { + return nil, &common.AppError{ + Code: ErrFailedToParse, + Message: "failed to unmarshal request: " + err.Error(), + } + } + + unsignedMessage, err := avalancheWarp.ParseUnsignedMessage(req.Message) + if err != nil { + return nil, &common.AppError{ + Code: ErrFailedToParse, + Message: "failed to parse unsigned message: " + err.Error(), + } + } + parsed, err := payload.Parse(unsignedMessage.Payload) + if err != nil { + return nil, &common.AppError{ + Code: ErrFailedToParse, + Message: "failed to parse payload: " + err.Error(), + } + } + + var sig [bls.SignatureLen]byte + switch p := parsed.(type) { + case *payload.AddressedCall: + // Note we pass the unsigned message ID to GetMessageSignature since + // that is what the backend expects. + // However, we verify the types and format of the payload to ensure + // the message conforms to the ACP-118 spec. + sig, err = s.GetMessageSignature(unsignedMessage.ID()) + if err != nil { + s.stats.IncMessageSignatureMiss() + } else { + s.stats.IncMessageSignatureHit() + } + case *payload.Hash: + sig, err = s.GetBlockSignature(p.Hash) + if err != nil { + s.stats.IncBlockSignatureMiss() + } else { + s.stats.IncBlockSignatureHit() + } + default: + return nil, &common.AppError{ + Code: ErrFailedToParse, + Message: fmt.Sprintf("unknown payload type: %T", p), + } + } + if err != nil { + return nil, &common.AppError{ + Code: ErrFailedToGetSig, + Message: "failed to get signature: " + err.Error(), + } + } + + // Per ACP-118, the responseBytes are the serialized form of + // sdk.SignatureResponse. + resp := &sdk.SignatureResponse{Signature: sig[:]} + respBytes, err := proto.Marshal(resp) + if err != nil { + return nil, &common.AppError{ + Code: ErrFailedToMarshal, + Message: "failed to marshal response: " + err.Error(), + } + } + return respBytes, nil +} + +func (s *SignatureRequestHandlerP2P) GetMessageSignature(messageID ids.ID) ([bls.SignatureLen]byte, error) { + startTime := time.Now() + s.stats.IncMessageSignatureRequest() + + // Always report signature request time + defer func() { + s.stats.UpdateMessageSignatureRequestTime(time.Since(startTime)) + }() + + return s.backend.GetMessageSignature(messageID) +} + +func (s *SignatureRequestHandlerP2P) GetBlockSignature(blockID ids.ID) ([bls.SignatureLen]byte, error) { + startTime := time.Now() + s.stats.IncBlockSignatureRequest() + + // Always report signature request time + defer func() { + s.stats.UpdateBlockSignatureRequestTime(time.Since(startTime)) + }() + + return s.backend.GetBlockSignature(blockID) +} + +func (s *SignatureRequestHandlerP2P) CrossChainAppRequest( + ctx context.Context, + chainID ids.ID, + deadline time.Time, + requestBytes []byte, +) ([]byte, error) { + return nil, nil +} + +func (s *SignatureRequestHandlerP2P) AppGossip( + ctx context.Context, nodeID ids.NodeID, gossipBytes []byte) { +} diff --git a/warp/handlers/signature_request_p2p_test.go b/warp/handlers/signature_request_p2p_test.go new file mode 100644 index 0000000000..36d6507771 --- /dev/null +++ b/warp/handlers/signature_request_p2p_test.go @@ -0,0 +1,232 @@ +// (c) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package handlers + +import ( + "context" + "testing" + "time" + + "github.com/ava-labs/avalanchego/database/memdb" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/proto/pb/sdk" + "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/utils/crypto/bls" + avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp" + "github.com/ava-labs/avalanchego/vms/platformvm/warp/payload" + "github.com/ava-labs/coreth/plugin/evm/message" + "github.com/ava-labs/coreth/utils" + "github.com/ava-labs/coreth/warp" + "github.com/ava-labs/coreth/warp/warptest" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" +) + +func TestMessageSignatureHandlerP2P(t *testing.T) { + database := memdb.New() + snowCtx := utils.TestSnowContext() + blsSecretKey, err := bls.NewSecretKey() + require.NoError(t, err) + warpSigner := avalancheWarp.NewSigner(blsSecretKey, snowCtx.NetworkID, snowCtx.ChainID) + + addressedPayload, err := payload.NewAddressedCall([]byte{1, 2, 3}, []byte{1, 2, 3}) + require.NoError(t, err) + offchainMessage, err := avalancheWarp.NewUnsignedMessage(snowCtx.NetworkID, snowCtx.ChainID, addressedPayload.Bytes()) + require.NoError(t, err) + + backend, err := warp.NewBackend(snowCtx.NetworkID, snowCtx.ChainID, warpSigner, warptest.EmptyBlockClient, database, 100, [][]byte{offchainMessage.Bytes()}) + require.NoError(t, err) + + offchainPayload, err := payload.NewAddressedCall([]byte{0, 0, 0}, []byte("test")) + require.NoError(t, err) + msg, err := avalancheWarp.NewUnsignedMessage(snowCtx.NetworkID, snowCtx.ChainID, offchainPayload.Bytes()) + require.NoError(t, err) + messageID := msg.ID() + require.NoError(t, backend.AddMessage(msg)) + signature, err := backend.GetMessageSignature(messageID) + require.NoError(t, err) + offchainSignature, err := backend.GetMessageSignature(offchainMessage.ID()) + require.NoError(t, err) + + unknownPayload, err := payload.NewAddressedCall([]byte{0, 0, 0}, []byte("unknown message")) + require.NoError(t, err) + unknownMessage, err := avalancheWarp.NewUnsignedMessage(snowCtx.NetworkID, snowCtx.ChainID, unknownPayload.Bytes()) + require.NoError(t, err) + + tests := map[string]struct { + setup func() (request sdk.SignatureRequest, expectedResponse []byte) + verifyStats func(t *testing.T, stats *handlerStats) + err error + }{ + "known message": { + setup: func() (request sdk.SignatureRequest, expectedResponse []byte) { + return sdk.SignatureRequest{Message: msg.Bytes()}, signature[:] + }, + verifyStats: func(t *testing.T, stats *handlerStats) { + require.EqualValues(t, 1, stats.messageSignatureRequest.Snapshot().Count()) + require.EqualValues(t, 1, stats.messageSignatureHit.Snapshot().Count()) + require.EqualValues(t, 0, stats.messageSignatureMiss.Snapshot().Count()) + require.EqualValues(t, 0, stats.blockSignatureRequest.Snapshot().Count()) + require.EqualValues(t, 0, stats.blockSignatureHit.Snapshot().Count()) + require.EqualValues(t, 0, stats.blockSignatureMiss.Snapshot().Count()) + }, + }, + "offchain message": { + setup: func() (request sdk.SignatureRequest, expectedResponse []byte) { + return sdk.SignatureRequest{Message: offchainMessage.Bytes()}, offchainSignature[:] + }, + verifyStats: func(t *testing.T, stats *handlerStats) { + require.EqualValues(t, 1, stats.messageSignatureRequest.Snapshot().Count()) + require.EqualValues(t, 1, stats.messageSignatureHit.Snapshot().Count()) + require.EqualValues(t, 0, stats.messageSignatureMiss.Snapshot().Count()) + require.EqualValues(t, 0, stats.blockSignatureRequest.Snapshot().Count()) + require.EqualValues(t, 0, stats.blockSignatureHit.Snapshot().Count()) + require.EqualValues(t, 0, stats.blockSignatureMiss.Snapshot().Count()) + }, + }, + "unknown message": { + setup: func() (request sdk.SignatureRequest, expectedResponse []byte) { + return sdk.SignatureRequest{Message: unknownMessage.Bytes()}, nil + }, + verifyStats: func(t *testing.T, stats *handlerStats) { + require.EqualValues(t, 1, stats.messageSignatureRequest.Snapshot().Count()) + require.EqualValues(t, 0, stats.messageSignatureHit.Snapshot().Count()) + require.EqualValues(t, 1, stats.messageSignatureMiss.Snapshot().Count()) + require.EqualValues(t, 0, stats.blockSignatureRequest.Snapshot().Count()) + require.EqualValues(t, 0, stats.blockSignatureHit.Snapshot().Count()) + require.EqualValues(t, 0, stats.blockSignatureMiss.Snapshot().Count()) + }, + err: &common.AppError{Code: ErrFailedToGetSig}, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + handler := NewSignatureRequestHandlerP2P(backend, message.Codec) + handler.stats.Clear() + + request, expectedResponse := test.setup() + requestBytes, err := proto.Marshal(&request) + require.NoError(t, err) + responseBytes, appErr := handler.AppRequest(context.Background(), ids.GenerateTestNodeID(), time.Time{}, requestBytes) + if test.err != nil { + require.ErrorIs(t, appErr, test.err) + } else { + require.Nil(t, appErr) + } + + test.verifyStats(t, handler.stats) + + // If the expected response is empty, assert that the handler returns an empty response and return early. + if len(expectedResponse) == 0 { + require.Len(t, responseBytes, 0, "expected response to be empty") + return + } + var response sdk.SignatureResponse + err = proto.Unmarshal(responseBytes, &response) + require.NoError(t, err, "error unmarshalling SignatureResponse") + + require.Equal(t, expectedResponse, response.Signature) + }) + } +} + +func TestBlockSignatureHandlerP2P(t *testing.T) { + database := memdb.New() + snowCtx := utils.TestSnowContext() + blsSecretKey, err := bls.NewSecretKey() + require.NoError(t, err) + + warpSigner := avalancheWarp.NewSigner(blsSecretKey, snowCtx.NetworkID, snowCtx.ChainID) + blkID := ids.GenerateTestID() + blockClient := warptest.MakeBlockClient(blkID) + backend, err := warp.NewBackend( + snowCtx.NetworkID, + snowCtx.ChainID, + warpSigner, + blockClient, + database, + 100, + nil, + ) + require.NoError(t, err) + + signature, err := backend.GetBlockSignature(blkID) + require.NoError(t, err) + unknownBlockID := ids.GenerateTestID() + + toMessageBytes := func(id ids.ID) []byte { + idPayload, err := payload.NewHash(id) + require.NoError(t, err) + + msg, err := avalancheWarp.NewUnsignedMessage(snowCtx.NetworkID, snowCtx.ChainID, idPayload.Bytes()) + require.NoError(t, err) + + return msg.Bytes() + } + + tests := map[string]struct { + setup func() (request sdk.SignatureRequest, expectedResponse []byte) + verifyStats func(t *testing.T, stats *handlerStats) + err error + }{ + "known block": { + setup: func() (request sdk.SignatureRequest, expectedResponse []byte) { + return sdk.SignatureRequest{Message: toMessageBytes(blkID)}, signature[:] + }, + verifyStats: func(t *testing.T, stats *handlerStats) { + require.EqualValues(t, 0, stats.messageSignatureRequest.Snapshot().Count()) + require.EqualValues(t, 0, stats.messageSignatureHit.Snapshot().Count()) + require.EqualValues(t, 0, stats.messageSignatureMiss.Snapshot().Count()) + require.EqualValues(t, 1, stats.blockSignatureRequest.Snapshot().Count()) + require.EqualValues(t, 1, stats.blockSignatureHit.Snapshot().Count()) + require.EqualValues(t, 0, stats.blockSignatureMiss.Snapshot().Count()) + }, + }, + "unknown block": { + setup: func() (request sdk.SignatureRequest, expectedResponse []byte) { + return sdk.SignatureRequest{Message: toMessageBytes(unknownBlockID)}, nil + }, + verifyStats: func(t *testing.T, stats *handlerStats) { + require.EqualValues(t, 0, stats.messageSignatureRequest.Snapshot().Count()) + require.EqualValues(t, 0, stats.messageSignatureHit.Snapshot().Count()) + require.EqualValues(t, 0, stats.messageSignatureMiss.Snapshot().Count()) + require.EqualValues(t, 1, stats.blockSignatureRequest.Snapshot().Count()) + require.EqualValues(t, 0, stats.blockSignatureHit.Snapshot().Count()) + require.EqualValues(t, 1, stats.blockSignatureMiss.Snapshot().Count()) + }, + err: &common.AppError{Code: ErrFailedToGetSig}, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + handler := NewSignatureRequestHandlerP2P(backend, message.Codec) + handler.stats.Clear() + + request, expectedResponse := test.setup() + requestBytes, err := proto.Marshal(&request) + require.NoError(t, err) + responseBytes, appErr := handler.AppRequest(context.Background(), ids.GenerateTestNodeID(), time.Time{}, requestBytes) + if test.err != nil { + require.ErrorIs(t, appErr, test.err) + } else { + require.Nil(t, appErr) + } + + test.verifyStats(t, handler.stats) + + // If the expected response is empty, assert that the handler returns an empty response and return early. + if len(expectedResponse) == 0 { + require.Len(t, responseBytes, 0, "expected response to be empty") + return + } + var response sdk.SignatureResponse + err = proto.Unmarshal(responseBytes, &response) + require.NoError(t, err, "error unmarshalling SignatureResponse") + + require.Equal(t, expectedResponse, response.Signature) + }) + } +}