Skip to content
This repository has been archived by the owner on Jun 9, 2024. It is now read-only.

refactor(handler): Improve transaction broadcast retry logic #1249

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cosmos/miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (m *Miner) buildBlock(ctx sdk.Context) ([]byte, error) {
if err := m.submitPayloadForBuilding(ctx); err != nil {
return nil, err
}

return m.resolveEnvelope(), nil
}

Expand Down
141 changes: 72 additions & 69 deletions cosmos/txpool/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ import (

"pkg.berachain.dev/polaris/eth/core"
coretypes "pkg.berachain.dev/polaris/eth/core/types"
queuelib "pkg.berachain.dev/polaris/lib/queue"
)

// txChanSize is the size of channel listening to NewTxsEvent. The number is referenced from the
// size of tx pool.
const (
txChanSize = 4096
maxRetries = 5
retryDelay = 50 * time.Millisecond
txChanSize = 4096
retryDelay = 50 * time.Millisecond
emptyQueueBackoff = 250 * time.Millisecond
)

// SdkTx is used to generate mocks.
Expand Down Expand Up @@ -70,12 +71,6 @@ type Subscription interface {
event.Subscription
}

// failedTx represents a transaction that failed to broadcast.
type failedTx struct {
tx *coretypes.Transaction
retries int
}

// handler listens for new insertions into the geth txpool and broadcasts them to the CometBFT
// layer for p2p and ABCI.
type handler struct {
Expand All @@ -89,13 +84,12 @@ type handler struct {
txsCh chan core.NewTxsEvent
stopCh chan struct{}
txsSub Subscription
running atomic.Bool
running atomic.Bool // Running method returns true if the handler is running.

// Queue for failed transactions
failedTxs chan *failedTx
txQueue *queuelib.LockFreeQueue[[]byte]
}

// newHandler creates a new handler.
// newHandler function creates a new handler.
func newHandler(
clientCtx TxBroadcaster, txPool TxSubProvider, serializer TxSerializer, logger log.Logger,
) *handler {
Expand All @@ -106,25 +100,30 @@ func newHandler(
txPool: txPool,
txsCh: make(chan core.NewTxsEvent, txChanSize),
stopCh: make(chan struct{}),
failedTxs: make(chan *failedTx, txChanSize),
txQueue: queuelib.NewLockFreeQueue[[]byte](),
}
return h
}

// Start starts the handler.
// Running method returns true if the handler is running.
func (h *handler) Running() bool {
return h.running.Load()
}

// Start method starts the handler.
func (h *handler) Start() error {
if h.running.Load() {
return errors.New("handler already started")
return errors.New("handler has already been started")
}
go h.mainLoop()
go h.failedLoop() // Start the retry policy
go h.queueLoop()
go h.broadcastLoop() // This starts the retry policy
return nil
}

// Stop stops the handler.
// Stop method stops the handler.
func (h *handler) Stop() error {
if !h.Running() {
return errors.New("handler already stopped")
return errors.New("handler has already been stopped")
}

// Push two stop signals to the stop channel to ensure that both loops stop.
Expand All @@ -133,12 +132,13 @@ func (h *handler) Stop() error {
return nil
}

// start handles the subscription to the txpool and broadcasts transactions.
func (h *handler) mainLoop() {
// queueLoop method handles the subscription to the txpool and broadcasts transactions.
func (h *handler) queueLoop() {
// Connect to the subscription.
h.txsSub = h.txPool.SubscribeNewTxsEvent(h.txsCh)
h.logger.With("module", "txpool-handler").Info("starting txpool handler")
h.logger.With("module", "txpool-handler").Info("txpool handler is starting")
h.running.Store(true)

// Handle events.
var err error
for {
Expand All @@ -149,94 +149,97 @@ func (h *handler) mainLoop() {
case err = <-h.txsSub.Err():
h.stopCh <- struct{}{}
case event := <-h.txsCh:
h.broadcastTransactions(event.Txs)
h.queueTransactions(event.Txs)
}
}
}

// failedLoop will periodically attempt to re-broadcast failed transactions.
func (h *handler) failedLoop() {
// broadcastLoop function is responsible for continuously broadcasting transactions from the
// queue. It runs in a loop and performs the following steps:
// broadcastLoop method broadcasts transactions from the queue.
func (h *handler) broadcastLoop() {
for {
select {
case <-h.stopCh:
return
case failed := <-h.failedTxs:
if failed.retries == 0 {
h.logger.Error("failed to broadcast transaction after max retries", "tx", maxRetries)
continue
case <-time.After(emptyQueueBackoff):
if tx := h.dequeueTxBytes(); tx != nil {
h.broadcastTx(tx)
}
h.broadcastTransaction(failed.tx, failed.retries-1)
}
}
}

// We slightly space out the retries in order to prioritize new transactions.
time.Sleep(retryDelay)
// broadcastTx method dequeues the next transaction off the queue and attempts to broadcast it.
func (h *handler) broadcastTx(txBytes []byte) {
if err := h.broadcastTransaction(txBytes); errors.Is(err, sdkerrors.ErrMempoolIsFull) {
// If the mempool is full, we need to re-enqueue this transaction to be broadcast.
h.enqueueTxBytes(txBytes)
} else if errors.Is(err, sdkerrors.ErrTxInMempoolCache) {
// Do nothing, since this transaction already exists at the CometBFT layer.
return
} else if err != nil {
h.logger.Error("An error occurred during the transaction broadcast", "err", err)
}
}

// Running returns true if the handler is running.
func (h *handler) Running() bool {
return h.running.Load()
// enqueueTxBytes method adds a transaction to the queue and signals that a
// new transaction is ready. It takes a byte slice representing the transaction as an argument.
func (h *handler) enqueueTxBytes(txBytes []byte) {
h.txQueue.Enqueue(txBytes)
}

// stop stops the handler.
// dequeueTxBytes method removes and returns the next transaction from the queue.
// It waits for a signal that a transaction is ready before attempting to dequeue.
func (h *handler) dequeueTxBytes() []byte {
return h.txQueue.Dequeue()
}

// stop method stops the handler.
func (h *handler) stop(err error) {
// Mark as not running to prevent further events.
h.running.Store(false)

// If we are stopping because of an error, log it.
if err != nil {
h.logger.Error("txpool handler", "error", err)
h.logger.Error("An error occurred in the txpool handler", "error", err)
}

// Triggers txBroadcastLoop to quit.
// This triggers the txBroadcastLoop to quit.
h.txsSub.Unsubscribe()

// Close channels.
close(h.txsCh)
close(h.stopCh)
close(h.failedTxs)
close(h.stopCh)
}

// broadcastTransactions will propagate a batch of transactions to the CometBFT mempool.
func (h *handler) broadcastTransactions(txs coretypes.Transactions) {
h.logger.Debug("broadcasting transactions", "num_txs", len(txs))
// queueTransactions method will propagate a batch of transactions to the CometBFT mempool.
func (h *handler) queueTransactions(txs coretypes.Transactions) {
h.logger.Debug("The transactions are being broadcasted", "num_txs", len(txs))
for _, signedEthTx := range txs {
h.broadcastTransaction(signedEthTx, maxRetries)
txBytes, err := h.serializer.ToSdkTxBytes(signedEthTx, signedEthTx.Gas())
if err != nil {
h.logger.Error("Failed to serialize the transaction", "err", err)
return
}
h.enqueueTxBytes(txBytes)
}
}

// broadcastTransaction will propagate a transaction to the CometBFT mempool.
func (h *handler) broadcastTransaction(tx *coretypes.Transaction, retries int) {
txBytes, err := h.serializer.ToSdkTxBytes(tx, tx.Gas())
if err != nil {
h.logger.Error("failed to serialize transaction", "err", err)
return
}

// broadcastTransaction method will propagate a transaction to the CometBFT mempool.
func (h *handler) broadcastTransaction(txBytes []byte) error {
// Send the transaction to the CometBFT mempool, which will gossip it to peers via
// CometBFT's p2p layer.
rsp, err := h.clientCtx.BroadcastTxSync(txBytes)

if err != nil {
h.logger.Error("error on transactions broadcast", "err", err)
h.failedTxs <- &failedTx{tx: tx, retries: retries}
return
}

if rsp == nil || rsp.Code == 0 {
return
return nil
}

switch rsp.Code {
case sdkerrors.ErrMempoolIsFull.ABCICode():
h.logger.Error("failed to broadcast: comet-bft mempool is full", "tx_hash", tx.Hash())
case
sdkerrors.ErrTxInMempoolCache.ABCICode():
return
default:
h.logger.Error("failed to broadcast transaction",
"codespace", rsp.Codespace, "code", rsp.Code, "info", rsp.Info, "tx_hash", tx.Hash())
if err != nil {
return err
}

h.failedTxs <- &failedTx{tx: tx, retries: retries}
return nil
}
111 changes: 111 additions & 0 deletions lib/queue/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// SPDX-License-Identifier: Apache-2.0
//
// Copyright (c) 2023 Berachain Foundation
//
// Permission is hereby granted, free of charge, to any person
// obtaining a copy of this software and associated documentation
// files (the "Software"), to deal in the Software without
// restriction, including without limitation the rights to use,
// copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice shall be
// included in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
// OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
// OTHER DEALINGS IN THE SOFTWARE.

package queue

import (
"sync/atomic"
)

// LockfreeQueue represents a FIFO structure with operations to enqueue
// and dequeue generic values.
// Reference: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
type LockFreeQueue[T any] struct {
head *atomic.Pointer[node[T]]
tail *atomic.Pointer[node[T]]
}
calbera marked this conversation as resolved.
Show resolved Hide resolved

// node represents a node in the queue.
type node[T any] struct {
value T
next atomic.Pointer[node[T]]
}

// newNode creates and initializes a node.
func newNode[T any](v T) *node[T] {
return &node[T]{value: v}
}

// NewQueue creates and initializes a LockFreeQueue.
func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
var head atomic.Pointer[node[T]]
var tail atomic.Pointer[node[T]]
var n = node[T]{}
head.Store(&n)
tail.Store(&n)
return &LockFreeQueue[T]{
head: &head,
tail: &tail,
}
}
calbera marked this conversation as resolved.
Show resolved Hide resolved
calbera marked this conversation as resolved.
Show resolved Hide resolved

// Enqueue adds a series of Request to the queue.
func (q *LockFreeQueue[T]) Enqueue(v T) {
n := newNode(v)
for {
tail := q.tail.Load()
next := tail.next.Load()
if tail == q.tail.Load() {
if next == nil {
if tail.next.CompareAndSwap(next, n) {
q.tail.CompareAndSwap(tail, n)
return
}
} else {
q.tail.CompareAndSwap(tail, next)
}
}
}
}
calbera marked this conversation as resolved.
Show resolved Hide resolved
calbera marked this conversation as resolved.
Show resolved Hide resolved

// Dequeue removes a Request from the queue.
//
//nolint:nestif // its okay.
func (q *LockFreeQueue[T]) Dequeue() T {
var t T
for {
head := q.head.Load()
tail := q.tail.Load()
next := head.next.Load()
if head == q.head.Load() {
if head == tail {
if next == nil {
return t
}
q.tail.CompareAndSwap(tail, next)
} else {
v := next.value
if q.head.CompareAndSwap(head, next) {
return v
}
}
}
}
}
calbera marked this conversation as resolved.
Show resolved Hide resolved

// Check if the queue is empty.
func (q *LockFreeQueue[T]) IsEmpty() bool {
return q.head.Load() == q.tail.Load()
}
Loading