Skip to content

Commit

Permalink
refactor(server/v2): Update prepare & process proposal (backport #21237
Browse files Browse the repository at this point in the history
…) (#21536)

Co-authored-by: Hieu Vu <[email protected]>
Co-authored-by: Julien Robert <[email protected]>
  • Loading branch information
3 people authored Sep 4, 2024
1 parent 57c6f60 commit e9aec66
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 51 deletions.
33 changes: 10 additions & 23 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,17 +325,8 @@ func (c *Consensus[T]) PrepareProposal(
return nil, errors.New("PrepareProposal called with invalid height")
}

decodedTxs := make([]T, len(req.Txs))
for i, tx := range req.Txs {
decTx, err := c.txCodec.Decode(tx)
if err != nil {
// TODO: vote extension meta data as a custom type to avoid possibly accepting invalid txs
// continue even if tx decoding fails
c.logger.Error("failed to decode tx", "err", err)
continue
}

decodedTxs[i] = decTx
if c.prepareProposalHandler == nil {
return nil, errors.New("no prepare proposal function was set")
}

ciCtx := contextWithCometInfo(ctx, comet.Info{
Expand All @@ -345,7 +336,7 @@ func (c *Consensus[T]) PrepareProposal(
LastCommit: toCoreExtendedCommitInfo(req.LocalLastCommit),
})

txs, err := c.prepareProposalHandler(ciCtx, c.app, decodedTxs, req)
txs, err := c.prepareProposalHandler(ciCtx, c.app, c.txCodec, req)
if err != nil {
return nil, err
}
Expand All @@ -366,16 +357,12 @@ func (c *Consensus[T]) ProcessProposal(
ctx context.Context,
req *abciproto.ProcessProposalRequest,
) (*abciproto.ProcessProposalResponse, error) {
decodedTxs := make([]T, len(req.Txs))
for _, tx := range req.Txs {
decTx, err := c.txCodec.Decode(tx)
if err != nil {
// TODO: vote extension meta data as a custom type to avoid possibly accepting invalid txs
// continue even if tx decoding fails
c.logger.Error("failed to decode tx", "err", err)
continue
}
decodedTxs = append(decodedTxs, decTx)
if req.Height < 1 {
return nil, errors.New("ProcessProposal called with invalid height")
}

if c.processProposalHandler == nil {
return nil, errors.New("no process proposal function was set")
}

ciCtx := contextWithCometInfo(ctx, comet.Info{
Expand All @@ -385,7 +372,7 @@ func (c *Consensus[T]) ProcessProposal(
LastCommit: toCoreCommitInfo(req.ProposedLastCommit),
})

err := c.processProposalHandler(ciCtx, c.app, decodedTxs, req)
err := c.processProposalHandler(ciCtx, c.app, c.txCodec, req)
if err != nil {
c.logger.Error("failed to process proposal", "height", req.Height, "time", req.Time, "hash", fmt.Sprintf("%X", req.Hash), "err", err)
return &abciproto.ProcessProposalResponse{
Expand Down
68 changes: 43 additions & 25 deletions server/v2/cometbft/handlers/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ import (
"fmt"

abci "github.com/cometbft/cometbft/api/cometbft/abci/v1"
"github.com/cosmos/gogoproto/proto"

consensusv1 "cosmossdk.io/api/cosmos/consensus/v1"
"cosmossdk.io/core/server"
"cosmossdk.io/core/store"
"cosmossdk.io/core/transaction"
"cosmossdk.io/server/v2/cometbft/mempool"
consensustypes "cosmossdk.io/x/consensus/types"
)

type AppManager[T transaction.Tx] interface {
Expand All @@ -33,28 +32,25 @@ func NewDefaultProposalHandler[T transaction.Tx](mp mempool.Mempool[T]) *Default
}

func (h *DefaultProposalHandler[T]) PrepareHandler() PrepareHandler[T] {
return func(ctx context.Context, app AppManager[T], txs []T, req proto.Message) ([]T, error) {
abciReq, ok := req.(*abci.PrepareProposalRequest)
if !ok {
return nil, fmt.Errorf("expected abci.PrepareProposalRequest, invalid request type: %T,", req)
}

return func(ctx context.Context, app AppManager[T], codec transaction.Codec[T], req *abci.PrepareProposalRequest) ([]T, error) {
var maxBlockGas uint64

res, err := app.Query(ctx, 0, &consensusv1.QueryParamsRequest{})
res, err := app.Query(ctx, 0, &consensustypes.QueryParamsRequest{})
if err != nil {
return nil, err
}

paramsResp, ok := res.(*consensusv1.QueryParamsResponse)
paramsResp, ok := res.(*consensustypes.QueryParamsResponse)
if !ok {
return nil, fmt.Errorf("unexpected consensus params response type; expected: %T, got: %T", &consensusv1.QueryParamsResponse{}, res)
return nil, fmt.Errorf("unexpected consensus params response type; expected: %T, got: %T", &consensustypes.QueryParamsResponse{}, res)
}

if b := paramsResp.GetParams().Block; b != nil {
maxBlockGas = uint64(b.MaxGas)
}

txs := decodeTxs(codec, req.Txs)

defer h.txSelector.Clear()

// If the mempool is nil or NoOp we simply return the transactions
Expand All @@ -64,7 +60,7 @@ func (h *DefaultProposalHandler[T]) PrepareHandler() PrepareHandler[T] {
_, isNoOp := h.mempool.(mempool.NoOpMempool[T])
if h.mempool == nil || isNoOp {
for _, tx := range txs {
stop := h.txSelector.SelectTxForProposal(ctx, uint64(abciReq.MaxTxBytes), maxBlockGas, tx)
stop := h.txSelector.SelectTxForProposal(ctx, uint64(req.MaxTxBytes), maxBlockGas, tx)
if stop {
break
}
Expand All @@ -88,7 +84,7 @@ func (h *DefaultProposalHandler[T]) PrepareHandler() PrepareHandler[T] {
return nil, err
}
} else {
stop := h.txSelector.SelectTxForProposal(ctx, uint64(abciReq.MaxTxBytes), maxBlockGas, memTx)
stop := h.txSelector.SelectTxForProposal(ctx, uint64(req.MaxTxBytes), maxBlockGas, memTx)
if stop {
break
}
Expand All @@ -102,34 +98,40 @@ func (h *DefaultProposalHandler[T]) PrepareHandler() PrepareHandler[T] {
}

func (h *DefaultProposalHandler[T]) ProcessHandler() ProcessHandler[T] {
return func(ctx context.Context, app AppManager[T], txs []T, req proto.Message) error {
return func(ctx context.Context, app AppManager[T], codec transaction.Codec[T], req *abci.ProcessProposalRequest) error {
// If the mempool is nil we simply return ACCEPT,
// because PrepareProposal may have included txs that could fail verification.
_, isNoOp := h.mempool.(mempool.NoOpMempool[T])
if h.mempool == nil || isNoOp {
return nil
}

_, ok := req.(*abci.PrepareProposalRequest)
if !ok {
return fmt.Errorf("invalid request type: %T", req)
}

res, err := app.Query(ctx, 0, &consensusv1.QueryParamsRequest{})
res, err := app.Query(ctx, 0, &consensustypes.QueryParamsRequest{})
if err != nil {
return err
}

paramsResp, ok := res.(*consensusv1.QueryParamsResponse)
paramsResp, ok := res.(*consensustypes.QueryParamsResponse)
if !ok {
return fmt.Errorf("unexpected consensus params response type; expected: %T, got: %T", &consensusv1.QueryParamsResponse{}, res)
return fmt.Errorf("unexpected consensus params response type; expected: %T, got: %T", &consensustypes.QueryParamsResponse{}, res)
}

var maxBlockGas uint64
if b := paramsResp.GetParams().Block; b != nil {
maxBlockGas = uint64(b.MaxGas)
}

// Decode request txs bytes
// If there an tx decoded fail, return err
var txs []T
for _, tx := range req.Txs {
decTx, err := codec.Decode(tx)
if err != nil {
return fmt.Errorf("failed to decode tx: %w", err)
}
txs = append(txs, decTx)
}

var totalTxGas uint64
for _, tx := range txs {
_, err := app.ValidateTx(ctx, tx)
Expand All @@ -153,18 +155,34 @@ func (h *DefaultProposalHandler[T]) ProcessHandler() ProcessHandler[T] {
}
}

// decodeTxs decodes the txs bytes into a decoded txs
// If there a fail decoding tx, remove from the list
// Used for prepare proposal
func decodeTxs[T transaction.Tx](codec transaction.Codec[T], txsBz [][]byte) []T {
var txs []T
for _, tx := range txsBz {
decTx, err := codec.Decode(tx)
if err != nil {
continue
}

txs = append(txs, decTx)
}
return txs
}

// NoOpPrepareProposal defines a no-op PrepareProposal handler. It will always
// return the transactions sent by the client's request.
func NoOpPrepareProposal[T transaction.Tx]() PrepareHandler[T] {
return func(ctx context.Context, app AppManager[T], txs []T, req proto.Message) ([]T, error) {
return txs, nil
return func(ctx context.Context, app AppManager[T], codec transaction.Codec[T], req *abci.PrepareProposalRequest) ([]T, error) {
return decodeTxs(codec, req.Txs), nil
}
}

// NoOpProcessProposal defines a no-op ProcessProposal Handler. It will always
// return ACCEPT.
func NoOpProcessProposal[T transaction.Tx]() ProcessHandler[T] {
return func(context.Context, AppManager[T], []T, proto.Message) error {
return func(context.Context, AppManager[T], transaction.Codec[T], *abci.ProcessProposalRequest) error {
return nil
}
}
Expand Down
5 changes: 2 additions & 3 deletions server/v2/cometbft/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

abci "github.com/cometbft/cometbft/api/cometbft/abci/v1"
"github.com/cosmos/gogoproto/proto"

"cosmossdk.io/core/store"
"cosmossdk.io/core/transaction"
Expand All @@ -13,11 +12,11 @@ import (
type (
// PrepareHandler passes in the list of Txs that are being proposed. The app can then do stateful operations
// over the list of proposed transactions. It can return a modified list of txs to include in the proposal.
PrepareHandler[T transaction.Tx] func(context.Context, AppManager[T], []T, proto.Message) ([]T, error)
PrepareHandler[T transaction.Tx] func(context.Context, AppManager[T], transaction.Codec[T], *abci.PrepareProposalRequest) ([]T, error)

// ProcessHandler is a function that takes a list of transactions and returns a boolean and an error.
// If the verification of a transaction fails, the boolean is false and the error is non-nil.
ProcessHandler[T transaction.Tx] func(context.Context, AppManager[T], []T, proto.Message) error
ProcessHandler[T transaction.Tx] func(context.Context, AppManager[T], transaction.Codec[T], *abci.ProcessProposalRequest) error

// VerifyVoteExtensionhandler is a function type that handles the verification of a vote extension request.
// It takes a context, a store reader map, and a request to verify a vote extension.
Expand Down

0 comments on commit e9aec66

Please sign in to comment.