Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-batcher refactor: non blocking event loop #9

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 74 additions & 37 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,37 @@ func NewChannelManager(log log.Logger, metr metrics.Metricer, cfgProvider Channe
}
}

// IsFull returns true if the channel manager has more than 10 pending blocks to be inserted
// into channels. The driving loop can use this signal to throttle block loading from the sequencer.
func (s *channelManager) IsFull() bool {
s.mu.Lock()
defer s.mu.Unlock()
return len(s.blocks) > 10
}

func (s *channelManager) PendingBlocks() int {
s.mu.Lock()
defer s.mu.Unlock()
return len(s.blocks)
}

// AddL2Block adds an L2 block to the internal blocks queue. It returns ErrReorg
// if the block does not extend the last block loaded into the state. If no
// blocks were added yet, the parent hash check is skipped.
func (s *channelManager) AddL2Block(block *types.Block) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.tip != (common.Hash{}) && s.tip != block.ParentHash() {
return ErrReorg
}

s.metr.RecordL2BlockInPendingQueue(block)
s.blocks = append(s.blocks, block)
s.tip = block.Hash()

return nil
}

// Clear clears the entire state of the channel manager.
// It is intended to be used before launching op-batcher and after an L2 reorg.
func (s *channelManager) Clear(l1OriginLastClosedChannel eth.BlockID) {
Expand Down Expand Up @@ -144,6 +175,42 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) {
return tx, nil
}

func (s *channelManager) HasTxData() bool {
s.mu.Lock()
defer s.mu.Unlock()
for _, ch := range s.channelQueue {
if ch.HasTxData() {
return true
}
}
return false
}

func (s *channelManager) ProcessBlocks(l1Head eth.BlockID) error {
s.mu.Lock()
defer s.mu.Unlock()

// If we have no saved blocks, we will not be able to create valid frames
if len(s.blocks) == 0 {
return io.EOF
shrimalmadhur marked this conversation as resolved.
Show resolved Hide resolved
}

if err := s.ensureChannelWithSpace(eth.BlockID{}); err != nil {
return err
}

if err := s.processBlocks(); err != nil {
return err
}

// Register current L1 head only after all pending blocks have been
// processed. Even if a timeout will be triggered now, it is better to have
// all pending blocks be included in this channel for submission.
s.registerL1Block(l1Head)

return s.outputFrames()
}

// TxData returns the next tx data that should be submitted to L1.
//
// If the pending channel is
Expand All @@ -152,6 +219,7 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) {
func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
s.mu.Lock()
defer s.mu.Unlock()

var firstWithTxData *channel
for _, ch := range s.channelQueue {
if ch.HasTxData() {
Expand All @@ -160,7 +228,7 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
}
}

dataPending := firstWithTxData != nil && firstWithTxData.HasTxData()
dataPending := firstWithTxData != nil
s.log.Debug("Requested tx data", "l1Head", l1Head, "txdata_pending", dataPending, "blocks_pending", len(s.blocks))

// Short circuit if there is pending tx data or the channel manager is closed.
Expand All @@ -173,26 +241,12 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
// If we have no saved blocks, we will not be able to create valid frames
if len(s.blocks) == 0 {
return txData{}, io.EOF
} else {
// only adding this to help debug in case we forget cases while refactoring
// TODO(samlaf): change this to something appropriate after finishing the event loop refactor
s.log.Error("No pending tx data, but blocks are available. This should never happen. Did you forget to call processBlocks()?", "blocks_pending", len(s.blocks))
return txData{}, fmt.Errorf("fatal error")
}

if err := s.ensureChannelWithSpace(l1Head); err != nil {
return txData{}, err
}

if err := s.processBlocks(); err != nil {
return txData{}, err
}

// Register current L1 head only after all pending blocks have been
// processed. Even if a timeout will be triggered now, it is better to have
// all pending blocks be included in this channel for submission.
s.registerL1Block(l1Head)

if err := s.outputFrames(); err != nil {
return txData{}, err
}

return s.nextTxData(s.currentChannel)
}

// ensureChannelWithSpace ensures currentChannel is populated with a channel that has
Expand Down Expand Up @@ -333,23 +387,6 @@ func (s *channelManager) outputFrames() error {
return nil
}

// AddL2Block adds an L2 block to the internal blocks queue. It returns ErrReorg
// if the block does not extend the last block loaded into the state. If no
// blocks were added yet, the parent hash check is skipped.
func (s *channelManager) AddL2Block(block *types.Block) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.tip != (common.Hash{}) && s.tip != block.ParentHash() {
return ErrReorg
}

s.metr.RecordL2BlockInPendingQueue(block)
s.blocks = append(s.blocks, block)
s.tip = block.Hash()

return nil
}

func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info *derive.L1BlockInfo) eth.L2BlockRef {
return eth.L2BlockRef{
Hash: block.Hash(),
Expand Down
118 changes: 80 additions & 38 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uin
return nil, fmt.Errorf("adding L2 block to state: %w", err)
}

l.Log.Info("Added L2 block to local state", "block", eth.ToBlockID(block), "tx_count", len(block.Transactions()), "time", block.Time())
l.Log.Info("Added L2 block to local state", "block", eth.ToBlockID(block), "tx_count", len(block.Transactions()), "time", block.Time(), "pending_blocks", l.state.PendingBlocks())
return block, nil
}

Expand Down Expand Up @@ -300,38 +300,11 @@ func (l *BatchSubmitter) loop() {
receiptsCh := make(chan txmgr.TxReceipt[txRef])
queue := txmgr.NewQueue[txRef](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions)

// start the receipt/result processing loop
receiptLoopDone := make(chan struct{})
defer close(receiptLoopDone) // shut down receipt loop

var (
txpoolState atomic.Int32
txpoolBlockedBlob bool
)
txpoolState.Store(TxpoolGood)
go func() {
for {
select {
case r := <-receiptsCh:
if errors.Is(r.Err, txpool.ErrAlreadyReserved) && txpoolState.CompareAndSwap(TxpoolGood, TxpoolBlocked) {
txpoolBlockedBlob = r.ID.isBlob
l.Log.Info("incompatible tx in txpool")
} else if r.ID.isCancel && txpoolState.CompareAndSwap(TxpoolCancelPending, TxpoolGood) {
// Set state to TxpoolGood even if the cancellation transaction ended in error
// since the stuck transaction could have cleared while we were waiting.
l.Log.Info("txpool may no longer be blocked", "err", r.Err)
}
l.Log.Info("Handling receipt", "id", r.ID)
l.handleReceipt(r)
case <-receiptLoopDone:
l.Log.Info("Receipt processing loop done")
return
}
}
}()

ticker := time.NewTicker(l.Config.PollInterval)
defer ticker.Stop()

publishAndWait := func() {
l.publishStateToL1(queue, receiptsCh)
Expand All @@ -342,16 +315,50 @@ func (l *BatchSubmitter) loop() {
}
}

txpoolStateTick := time.NewTicker(l.Config.PollInterval)
loadBlocksTick := time.NewTicker(l.Config.PollInterval)

var pendingTxData *txData
txDataCh := make(chan *txData)

// tx publishing goroutine
go func() {
for txdata := range txDataCh {
if txdata == nil {
l.Log.Error("nil txdata received... shouldn't happen")
continue
}
if err := l.sendTransaction(l.killCtx, *txdata, queue, receiptsCh); err != nil {
l.Log.Error("Error sending transaction", "err", err)
}
}
}()

for {
select {
case <-ticker.C:
if txpoolState.CompareAndSwap(TxpoolBlocked, TxpoolCancelPending) {
// txpoolState is set to Blocked only if Send() is returning
// ErrAlreadyReserved. In this case, the TxMgr nonce should be reset to nil,
// allowing us to send a cancellation transaction.
l.cancelBlockingTx(queue, receiptsCh, txpoolBlockedBlob)
l.Log.Trace("BatchSubmitter loop iteration")

// maybeNilTxDataCh is nil when pendingTxData is nil,
// such that we don't send a txData to the channel when there is no data to send.
var maybeNilTxDataCh chan *txData
if pendingTxData != nil {
maybeNilTxDataCh = txDataCh
} else if !l.Txmgr.IsClosed() && l.state.HasTxData() {
if l1tip, err := l.l1Tip(l.killCtx); err == nil {
if pendingTxDataPtr, err := l.state.TxData(l1tip.ID()); err == nil {
pendingTxData = &pendingTxDataPtr
maybeNilTxDataCh = txDataCh
}
}
if txpoolState.Load() != TxpoolGood {
}

var processBlocks <-chan time.Time
if l.state.IsFull() {
processBlocks = time.After(0)
}

select {
case <-loadBlocksTick.C:
if l.state.IsFull() {
continue
}
if err := l.loadBlocksIntoState(l.shutdownCtx); errors.Is(err, ErrReorg) {
Expand All @@ -369,7 +376,43 @@ func (l *BatchSubmitter) loop() {
l.clearState(l.shutdownCtx)
continue
}
l.publishStateToL1(queue, receiptsCh)
case <-processBlocks:
l.Log.Debug("Processing blocks")
l1tip, err := l.l1Tip(l.killCtx)
if err != nil {
l.Log.Error("Failed to query L1 tip", "err", err)
continue
}
err = l.state.ProcessBlocks(l1tip.ID())
if err != nil && !errors.Is(err, io.EOF) {
l.Log.Error("Error processing blocks", "err", err)
}
case maybeNilTxDataCh <- pendingTxData:
// sent pendingTxData to the tx publishing goroutine
pendingTxData = nil
case r := <-receiptsCh:
if errors.Is(r.Err, txpool.ErrAlreadyReserved) && txpoolState.CompareAndSwap(TxpoolGood, TxpoolBlocked) {
txpoolBlockedBlob = r.ID.isBlob
l.Log.Info("incompatible tx in txpool")
} else if r.ID.isCancel && txpoolState.CompareAndSwap(TxpoolCancelPending, TxpoolGood) {
// Set state to TxpoolGood even if the cancellation transaction ended in error
// since the stuck transaction could have cleared while we were waiting.
l.Log.Info("txpool may no longer be blocked", "err", r.Err)
}
l.Log.Info("Handling receipt", "id", r.ID)
l.handleReceipt(r)
case <-txpoolStateTick.C:
l.Log.Trace("Checking txpool state")
if txpoolState.CompareAndSwap(TxpoolBlocked, TxpoolCancelPending) {
// txpoolState is set to Blocked only if Send() is returning
// ErrAlreadyReserved. In this case, the TxMgr nonce should be reset to nil,
// allowing us to send a cancellation transaction.
// TODO(samlaf): unblock this
l.cancelBlockingTx(queue, receiptsCh, txpoolBlockedBlob)
}
if txpoolState.Load() != TxpoolGood {
continue
}
case <-l.shutdownCtx.Done():
if l.Txmgr.IsClosed() {
l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
Expand Down Expand Up @@ -484,7 +527,6 @@ func (l *BatchSubmitter) clearState(ctx context.Context) {

// publishTxToL1 submits a single state tx to the L1
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error {
// send all available transactions
l1tip, err := l.l1Tip(ctx)
if err != nil {
l.Log.Error("Failed to query L1 tip", "err", err)
Expand Down