Skip to content
This repository has been archived by the owner on Jun 9, 2024. It is now read-only.

refactor(handler): Improve transaction broadcast retry logic #1249

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cosmos/miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (m *Miner) buildBlock(ctx sdk.Context) ([]byte, error) {
if err := m.submitPayloadForBuilding(ctx); err != nil {
return nil, err
}

return m.resolveEnvelope(), nil
}

Expand Down
142 changes: 72 additions & 70 deletions cosmos/txpool/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ import (

"pkg.berachain.dev/polaris/eth/core"
coretypes "pkg.berachain.dev/polaris/eth/core/types"
queuelib "pkg.berachain.dev/polaris/lib/queue"
)

// txChanSize is the size of channel listening to NewTxsEvent. The number is referenced from the
// size of tx pool.
const (
txChanSize = 4096
maxRetries = 5
retryDelay = 50 * time.Millisecond
txChanSize = 4096
retryDelay = 50 * time.Millisecond
emptyQueueBackoff = 250 * time.Millisecond
)

// SdkTx is used to generate mocks.
Expand Down Expand Up @@ -70,12 +71,6 @@ type Subscription interface {
event.Subscription
}

// failedTx represents a transaction that failed to broadcast.
type failedTx struct {
tx *coretypes.Transaction
retries int
}

// handler listens for new insertions into the geth txpool and broadcasts them to the CometBFT
// layer for p2p and ABCI.
type handler struct {
Expand All @@ -89,13 +84,12 @@ type handler struct {
txsCh chan core.NewTxsEvent
stopCh chan struct{}
txsSub Subscription
running atomic.Bool
running atomic.Bool // Running method returns true if the handler is running.

// Queue for failed transactions
failedTxs chan *failedTx
txQueue *queuelib.LockFreeQueue[[]byte]
}

// newHandler creates a new handler.
// newHandler function creates a new handler.
func newHandler(
clientCtx TxBroadcaster, txPool TxSubProvider, serializer TxSerializer, logger log.Logger,
) *handler {
Expand All @@ -106,25 +100,30 @@ func newHandler(
txPool: txPool,
txsCh: make(chan core.NewTxsEvent, txChanSize),
stopCh: make(chan struct{}),
failedTxs: make(chan *failedTx, txChanSize),
txQueue: queuelib.NewLockFreeQueue[[]byte](),
}
return h
}

// Start starts the handler.
// Running method returns true if the handler is running.
func (h *handler) Running() bool {
return h.running.Load()
}

// Start method starts the handler.
func (h *handler) Start() error {
if h.running.Load() {
return errors.New("handler already started")
return errors.New("handler has already been started")
}
go h.mainLoop()
go h.failedLoop() // Start the retry policy
go h.queueLoop()
go h.broadcastLoop() // This starts the retry policy
return nil
}

// Stop stops the handler.
// Stop method stops the handler.
func (h *handler) Stop() error {
if !h.Running() {
return errors.New("handler already stopped")
return errors.New("handler has already been stopped")
}

// Push two stop signals to the stop channel to ensure that both loops stop.
Expand All @@ -133,12 +132,13 @@ func (h *handler) Stop() error {
return nil
}

// start handles the subscription to the txpool and broadcasts transactions.
func (h *handler) mainLoop() {
// queueLoop method handles the subscription to the txpool and broadcasts transactions.
func (h *handler) queueLoop() {
// Connect to the subscription.
h.txsSub = h.txPool.SubscribeNewTxsEvent(h.txsCh)
h.logger.With("module", "txpool-handler").Info("starting txpool handler")
h.logger.With("module", "txpool-handler").Info("txpool handler is starting")
h.running.Store(true)

// Handle events.
var err error
for {
Expand All @@ -149,94 +149,96 @@ func (h *handler) mainLoop() {
case err = <-h.txsSub.Err():
h.stopCh <- struct{}{}
case event := <-h.txsCh:
h.broadcastTransactions(event.Txs)
h.queueTransactions(event.Txs)
}
}
}

// failedLoop will periodically attempt to re-broadcast failed transactions.
func (h *handler) failedLoop() {
// broadcastLoop function is responsible for continuously broadcasting transactions from the
// queue. It runs in a loop and performs the following steps:
// broadcastLoop method broadcasts transactions from the queue.
func (h *handler) broadcastLoop() {
txCh := make(chan []byte)
for {
txCh <- h.dequeueTxBytes()
select {
case <-h.stopCh:
return
case failed := <-h.failedTxs:
if failed.retries == 0 {
h.logger.Error("failed to broadcast transaction after max retries", "tx", maxRetries)
continue
}
h.broadcastTransaction(failed.tx, failed.retries-1)
case tx := <-txCh:
h.broadcastTx(tx)
}
}
}

// We slightly space out the retries in order to prioritize new transactions.
time.Sleep(retryDelay)
// broadcastTx method dequeues the next transaction off the queue and attempts to broadcast it.
func (h *handler) broadcastTx(txBytes []byte) {
if err := h.broadcastTransaction(txBytes); errors.Is(err, sdkerrors.ErrMempoolIsFull) {
// If the mempool is full, we need to re-enqueue this transaction to be broadcast.
h.enqueueTxBytes(txBytes)
} else if errors.Is(err, sdkerrors.ErrTxInMempoolCache) {
// Do nothing, since this transaction already exists at the CometBFT layer.
return
} else if err != nil {
h.logger.Error("An error occurred during the transaction broadcast", "err", err)
}
}

// Running returns true if the handler is running.
func (h *handler) Running() bool {
return h.running.Load()
// enqueueTxBytes method adds a transaction to the queue and signals that a
// new transaction is ready. It takes a byte slice representing the transaction as an argument.
func (h *handler) enqueueTxBytes(txBytes []byte) {
h.txQueue.Enqueue(txBytes)
}

// dequeueTxBytes method removes and returns the next transaction from the queue.
// It waits for a signal that a transaction is ready before attempting to dequeue.
func (h *handler) dequeueTxBytes() []byte {
return h.txQueue.Dequeue()
}

// stop stops the handler.
// stop method stops the handler.
func (h *handler) stop(err error) {
// Mark as not running to prevent further events.
h.running.Store(false)

// If we are stopping because of an error, log it.
if err != nil {
h.logger.Error("txpool handler", "error", err)
h.logger.Error("An error occurred in the txpool handler", "error", err)
}

// Triggers txBroadcastLoop to quit.
// This triggers the txBroadcastLoop to quit.
h.txsSub.Unsubscribe()

// Close channels.
close(h.txsCh)
close(h.stopCh)
close(h.failedTxs)
}

// broadcastTransactions will propagate a batch of transactions to the CometBFT mempool.
func (h *handler) broadcastTransactions(txs coretypes.Transactions) {
h.logger.Debug("broadcasting transactions", "num_txs", len(txs))
// queueTransactions method will propagate a batch of transactions to the CometBFT mempool.
func (h *handler) queueTransactions(txs coretypes.Transactions) {
h.logger.Debug("The transactions are being broadcasted", "num_txs", len(txs))
for _, signedEthTx := range txs {
h.broadcastTransaction(signedEthTx, maxRetries)
txBytes, err := h.serializer.ToSdkTxBytes(signedEthTx, signedEthTx.Gas())
if err != nil {
h.logger.Error("Failed to serialize the transaction", "err", err)
return
}
h.enqueueTxBytes(txBytes)
}
}

// broadcastTransaction will propagate a transaction to the CometBFT mempool.
func (h *handler) broadcastTransaction(tx *coretypes.Transaction, retries int) {
txBytes, err := h.serializer.ToSdkTxBytes(tx, tx.Gas())
if err != nil {
h.logger.Error("failed to serialize transaction", "err", err)
return
}

// broadcastTransaction method will propagate a transaction to the CometBFT mempool.
func (h *handler) broadcastTransaction(txBytes []byte) error {
// Send the transaction to the CometBFT mempool, which will gossip it to peers via
// CometBFT's p2p layer.
rsp, err := h.clientCtx.BroadcastTxSync(txBytes)

if err != nil {
h.logger.Error("error on transactions broadcast", "err", err)
h.failedTxs <- &failedTx{tx: tx, retries: retries}
return
}

if rsp == nil || rsp.Code == 0 {
return
return nil
}

switch rsp.Code {
case sdkerrors.ErrMempoolIsFull.ABCICode():
h.logger.Error("failed to broadcast: comet-bft mempool is full", "tx_hash", tx.Hash())
case
sdkerrors.ErrTxInMempoolCache.ABCICode():
return
default:
h.logger.Error("failed to broadcast transaction",
"codespace", rsp.Codespace, "code", rsp.Code, "info", rsp.Info, "tx_hash", tx.Hash())
if err != nil {
return err
}

h.failedTxs <- &failedTx{tx: tx, retries: retries}
return nil
}
2 changes: 0 additions & 2 deletions cosmos/txpool/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ var _ = Describe("", func() {
It("should handle 1 tx", func() {
defer GinkgoRecover()
serializer.On("ToSdkTxBytes", mock.Anything, mock.Anything).Return([]byte{123}, nil).Once()
broadcaster.On("BroadcastTxSync", []byte{123}).Return(nil, nil).Once()

h.txsCh <- core.NewTxsEvent{
Txs: []*coretypes.Transaction{coretypes.NewTx(&coretypes.LegacyTx{Nonce: 5, Gas: 100})},
Expand All @@ -93,7 +92,6 @@ var _ = Describe("", func() {
It("should handle multiple tx", func() {
defer GinkgoRecover()
serializer.On("ToSdkTxBytes", mock.Anything, mock.Anything).Return([]byte{123}, nil).Twice()
broadcaster.On("BroadcastTxSync", []byte{123}).Return(nil, nil).Twice()

h.txsCh <- core.NewTxsEvent{Txs: []*coretypes.Transaction{
coretypes.NewTx(&coretypes.LegacyTx{Nonce: 5, Gas: 10}),
Expand Down
111 changes: 111 additions & 0 deletions lib/queue/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// SPDX-License-Identifier: Apache-2.0
//
// Copyright (c) 2023 Berachain Foundation
//
// Permission is hereby granted, free of charge, to any person
// obtaining a copy of this software and associated documentation
// files (the "Software"), to deal in the Software without
// restriction, including without limitation the rights to use,
// copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice shall be
// included in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
// OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
// OTHER DEALINGS IN THE SOFTWARE.

package queue

import (
"sync/atomic"
)

// LockfreeQueue represents a FIFO structure with operations to enqueue
// and dequeue generic values.
// Reference: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
type LockFreeQueue[T any] struct {
head *atomic.Pointer[node[T]]
tail *atomic.Pointer[node[T]]
}
calbera marked this conversation as resolved.
Show resolved Hide resolved

// node represents a node in the queue.
type node[T any] struct {
value T
next atomic.Pointer[node[T]]
}

// newNode creates and initializes a node.
func newNode[T any](v T) *node[T] {
return &node[T]{value: v}
}

// NewQueue creates and initializes a LockFreeQueue.
func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
var head atomic.Pointer[node[T]]
var tail atomic.Pointer[node[T]]
var n = node[T]{}
head.Store(&n)
tail.Store(&n)
return &LockFreeQueue[T]{
head: &head,
tail: &tail,
}
}
calbera marked this conversation as resolved.
Show resolved Hide resolved
calbera marked this conversation as resolved.
Show resolved Hide resolved

// Enqueue adds a series of Request to the queue.
func (q *LockFreeQueue[T]) Enqueue(v T) {
n := newNode(v)
for {
tail := q.tail.Load()
next := tail.next.Load()
if tail == q.tail.Load() {
if next == nil {
if tail.next.CompareAndSwap(next, n) {
q.tail.CompareAndSwap(tail, n)
return
}
} else {
q.tail.CompareAndSwap(tail, next)
}
}
}
}
calbera marked this conversation as resolved.
Show resolved Hide resolved
calbera marked this conversation as resolved.
Show resolved Hide resolved

// Dequeue removes a Request from the queue.
//
//nolint:nestif // its okay.
func (q *LockFreeQueue[T]) Dequeue() T {
var t T
for {
head := q.head.Load()
tail := q.tail.Load()
next := head.next.Load()
if head == q.head.Load() {
if head == tail {
if next == nil {
return t
}
q.tail.CompareAndSwap(tail, next)
} else {
v := next.value
if q.head.CompareAndSwap(head, next) {
return v
}
}
}
}
}
calbera marked this conversation as resolved.
Show resolved Hide resolved

// Check if the queue is empty.
func (q *LockFreeQueue[T]) IsEmpty() bool {
return q.head.Load() == q.tail.Load()
}
Loading