Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(server/v2): Update prepare & process proposal (backport #21237) #21536

Merged
merged 2 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 10 additions & 23 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,17 +325,8 @@ func (c *Consensus[T]) PrepareProposal(
return nil, errors.New("PrepareProposal called with invalid height")
}

decodedTxs := make([]T, len(req.Txs))
for i, tx := range req.Txs {
decTx, err := c.txCodec.Decode(tx)
if err != nil {
// TODO: vote extension meta data as a custom type to avoid possibly accepting invalid txs
// continue even if tx decoding fails
c.logger.Error("failed to decode tx", "err", err)
continue
}

decodedTxs[i] = decTx
if c.prepareProposalHandler == nil {
return nil, errors.New("no prepare proposal function was set")
}

ciCtx := contextWithCometInfo(ctx, comet.Info{
Expand All @@ -345,7 +336,7 @@ func (c *Consensus[T]) PrepareProposal(
LastCommit: toCoreExtendedCommitInfo(req.LocalLastCommit),
})

txs, err := c.prepareProposalHandler(ciCtx, c.app, decodedTxs, req)
txs, err := c.prepareProposalHandler(ciCtx, c.app, c.txCodec, req)
if err != nil {
return nil, err
}
Expand All @@ -366,16 +357,12 @@ func (c *Consensus[T]) ProcessProposal(
ctx context.Context,
req *abciproto.ProcessProposalRequest,
) (*abciproto.ProcessProposalResponse, error) {
decodedTxs := make([]T, len(req.Txs))
for _, tx := range req.Txs {
decTx, err := c.txCodec.Decode(tx)
if err != nil {
// TODO: vote extension meta data as a custom type to avoid possibly accepting invalid txs
// continue even if tx decoding fails
c.logger.Error("failed to decode tx", "err", err)
continue
}
decodedTxs = append(decodedTxs, decTx)
if req.Height < 1 {
return nil, errors.New("ProcessProposal called with invalid height")
}

if c.processProposalHandler == nil {
return nil, errors.New("no process proposal function was set")
}

ciCtx := contextWithCometInfo(ctx, comet.Info{
Expand All @@ -385,7 +372,7 @@ func (c *Consensus[T]) ProcessProposal(
LastCommit: toCoreCommitInfo(req.ProposedLastCommit),
})

err := c.processProposalHandler(ciCtx, c.app, decodedTxs, req)
err := c.processProposalHandler(ciCtx, c.app, c.txCodec, req)
if err != nil {
c.logger.Error("failed to process proposal", "height", req.Height, "time", req.Time, "hash", fmt.Sprintf("%X", req.Hash), "err", err)
return &abciproto.ProcessProposalResponse{
Expand Down
68 changes: 43 additions & 25 deletions server/v2/cometbft/handlers/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ import (
"fmt"

abci "github.com/cometbft/cometbft/api/cometbft/abci/v1"
"github.com/cosmos/gogoproto/proto"

consensusv1 "cosmossdk.io/api/cosmos/consensus/v1"
"cosmossdk.io/core/server"
"cosmossdk.io/core/store"
"cosmossdk.io/core/transaction"
"cosmossdk.io/server/v2/cometbft/mempool"
consensustypes "cosmossdk.io/x/consensus/types"
)

type AppManager[T transaction.Tx] interface {
Expand All @@ -33,28 +32,25 @@ func NewDefaultProposalHandler[T transaction.Tx](mp mempool.Mempool[T]) *Default
}

func (h *DefaultProposalHandler[T]) PrepareHandler() PrepareHandler[T] {
return func(ctx context.Context, app AppManager[T], txs []T, req proto.Message) ([]T, error) {
abciReq, ok := req.(*abci.PrepareProposalRequest)
if !ok {
return nil, fmt.Errorf("expected abci.PrepareProposalRequest, invalid request type: %T,", req)
}

return func(ctx context.Context, app AppManager[T], codec transaction.Codec[T], req *abci.PrepareProposalRequest) ([]T, error) {
var maxBlockGas uint64

res, err := app.Query(ctx, 0, &consensusv1.QueryParamsRequest{})
res, err := app.Query(ctx, 0, &consensustypes.QueryParamsRequest{})
if err != nil {
return nil, err
}

paramsResp, ok := res.(*consensusv1.QueryParamsResponse)
paramsResp, ok := res.(*consensustypes.QueryParamsResponse)
if !ok {
return nil, fmt.Errorf("unexpected consensus params response type; expected: %T, got: %T", &consensusv1.QueryParamsResponse{}, res)
return nil, fmt.Errorf("unexpected consensus params response type; expected: %T, got: %T", &consensustypes.QueryParamsResponse{}, res)
}

if b := paramsResp.GetParams().Block; b != nil {
maxBlockGas = uint64(b.MaxGas)
}

txs := decodeTxs(codec, req.Txs)

defer h.txSelector.Clear()

// If the mempool is nil or NoOp we simply return the transactions
Expand All @@ -64,7 +60,7 @@ func (h *DefaultProposalHandler[T]) PrepareHandler() PrepareHandler[T] {
_, isNoOp := h.mempool.(mempool.NoOpMempool[T])
if h.mempool == nil || isNoOp {
for _, tx := range txs {
stop := h.txSelector.SelectTxForProposal(ctx, uint64(abciReq.MaxTxBytes), maxBlockGas, tx)
stop := h.txSelector.SelectTxForProposal(ctx, uint64(req.MaxTxBytes), maxBlockGas, tx)
if stop {
break
}
Expand All @@ -88,7 +84,7 @@ func (h *DefaultProposalHandler[T]) PrepareHandler() PrepareHandler[T] {
return nil, err
}
} else {
stop := h.txSelector.SelectTxForProposal(ctx, uint64(abciReq.MaxTxBytes), maxBlockGas, memTx)
stop := h.txSelector.SelectTxForProposal(ctx, uint64(req.MaxTxBytes), maxBlockGas, memTx)
if stop {
break
}
Expand All @@ -102,34 +98,40 @@ func (h *DefaultProposalHandler[T]) PrepareHandler() PrepareHandler[T] {
}

func (h *DefaultProposalHandler[T]) ProcessHandler() ProcessHandler[T] {
return func(ctx context.Context, app AppManager[T], txs []T, req proto.Message) error {
return func(ctx context.Context, app AppManager[T], codec transaction.Codec[T], req *abci.ProcessProposalRequest) error {
// If the mempool is nil we simply return ACCEPT,
// because PrepareProposal may have included txs that could fail verification.
_, isNoOp := h.mempool.(mempool.NoOpMempool[T])
if h.mempool == nil || isNoOp {
return nil
}

_, ok := req.(*abci.PrepareProposalRequest)
if !ok {
return fmt.Errorf("invalid request type: %T", req)
}

res, err := app.Query(ctx, 0, &consensusv1.QueryParamsRequest{})
res, err := app.Query(ctx, 0, &consensustypes.QueryParamsRequest{})
if err != nil {
return err
}

paramsResp, ok := res.(*consensusv1.QueryParamsResponse)
paramsResp, ok := res.(*consensustypes.QueryParamsResponse)
if !ok {
return fmt.Errorf("unexpected consensus params response type; expected: %T, got: %T", &consensusv1.QueryParamsResponse{}, res)
return fmt.Errorf("unexpected consensus params response type; expected: %T, got: %T", &consensustypes.QueryParamsResponse{}, res)
}

var maxBlockGas uint64
if b := paramsResp.GetParams().Block; b != nil {
maxBlockGas = uint64(b.MaxGas)
}

// Decode request txs bytes
// If there an tx decoded fail, return err
var txs []T
for _, tx := range req.Txs {
decTx, err := codec.Decode(tx)
if err != nil {
return fmt.Errorf("failed to decode tx: %w", err)
}
txs = append(txs, decTx)
}

var totalTxGas uint64
for _, tx := range txs {
_, err := app.ValidateTx(ctx, tx)
Expand All @@ -153,18 +155,34 @@ func (h *DefaultProposalHandler[T]) ProcessHandler() ProcessHandler[T] {
}
}

// decodeTxs decodes the txs bytes into a decoded txs
// If there a fail decoding tx, remove from the list
// Used for prepare proposal
func decodeTxs[T transaction.Tx](codec transaction.Codec[T], txsBz [][]byte) []T {
var txs []T
for _, tx := range txsBz {
decTx, err := codec.Decode(tx)
if err != nil {
continue
}

txs = append(txs, decTx)
}
return txs
}

// NoOpPrepareProposal defines a no-op PrepareProposal handler. It will always
// return the transactions sent by the client's request.
func NoOpPrepareProposal[T transaction.Tx]() PrepareHandler[T] {
return func(ctx context.Context, app AppManager[T], txs []T, req proto.Message) ([]T, error) {
return txs, nil
return func(ctx context.Context, app AppManager[T], codec transaction.Codec[T], req *abci.PrepareProposalRequest) ([]T, error) {
return decodeTxs(codec, req.Txs), nil
}
}

// NoOpProcessProposal defines a no-op ProcessProposal Handler. It will always
// return ACCEPT.
func NoOpProcessProposal[T transaction.Tx]() ProcessHandler[T] {
return func(context.Context, AppManager[T], []T, proto.Message) error {
return func(context.Context, AppManager[T], transaction.Codec[T], *abci.ProcessProposalRequest) error {
return nil
}
}
Expand Down
5 changes: 2 additions & 3 deletions server/v2/cometbft/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

abci "github.com/cometbft/cometbft/api/cometbft/abci/v1"
"github.com/cosmos/gogoproto/proto"

"cosmossdk.io/core/store"
"cosmossdk.io/core/transaction"
Expand All @@ -13,11 +12,11 @@ import (
type (
// PrepareHandler passes in the list of Txs that are being proposed. The app can then do stateful operations
// over the list of proposed transactions. It can return a modified list of txs to include in the proposal.
PrepareHandler[T transaction.Tx] func(context.Context, AppManager[T], []T, proto.Message) ([]T, error)
PrepareHandler[T transaction.Tx] func(context.Context, AppManager[T], transaction.Codec[T], *abci.PrepareProposalRequest) ([]T, error)

// ProcessHandler is a function that takes a list of transactions and returns a boolean and an error.
// If the verification of a transaction fails, the boolean is false and the error is non-nil.
ProcessHandler[T transaction.Tx] func(context.Context, AppManager[T], []T, proto.Message) error
ProcessHandler[T transaction.Tx] func(context.Context, AppManager[T], transaction.Codec[T], *abci.ProcessProposalRequest) error

// VerifyVoteExtensionhandler is a function type that handles the verification of a vote extension request.
// It takes a context, a store reader map, and a request to verify a vote extension.
Expand Down
Loading