From bd065462d97e70dace155d3569cb29484aa668a5 Mon Sep 17 00:00:00 2001 From: Samuel Laferriere Date: Wed, 28 Aug 2024 15:28:24 -0700 Subject: [PATCH 1/2] refactor(wip): move receipts goroutine into event loop --- op-batcher/batcher/driver.go | 35 +++++++++++------------------------ 1 file changed, 11 insertions(+), 24 deletions(-) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 39ebf2f25b24..5d3ebf281f51 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -300,35 +300,11 @@ func (l *BatchSubmitter) loop() { receiptsCh := make(chan txmgr.TxReceipt[txRef]) queue := txmgr.NewQueue[txRef](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions) - // start the receipt/result processing loop - receiptLoopDone := make(chan struct{}) - defer close(receiptLoopDone) // shut down receipt loop - var ( txpoolState atomic.Int32 txpoolBlockedBlob bool ) txpoolState.Store(TxpoolGood) - go func() { - for { - select { - case r := <-receiptsCh: - if errors.Is(r.Err, txpool.ErrAlreadyReserved) && txpoolState.CompareAndSwap(TxpoolGood, TxpoolBlocked) { - txpoolBlockedBlob = r.ID.isBlob - l.Log.Info("incompatible tx in txpool") - } else if r.ID.isCancel && txpoolState.CompareAndSwap(TxpoolCancelPending, TxpoolGood) { - // Set state to TxpoolGood even if the cancellation transaction ended in error - // since the stuck transaction could have cleared while we were waiting. - l.Log.Info("txpool may no longer be blocked", "err", r.Err) - } - l.Log.Info("Handling receipt", "id", r.ID) - l.handleReceipt(r) - case <-receiptLoopDone: - l.Log.Info("Receipt processing loop done") - return - } - } - }() ticker := time.NewTicker(l.Config.PollInterval) defer ticker.Stop() @@ -370,6 +346,17 @@ func (l *BatchSubmitter) loop() { continue } l.publishStateToL1(queue, receiptsCh) + case r := <-receiptsCh: + if errors.Is(r.Err, txpool.ErrAlreadyReserved) && txpoolState.CompareAndSwap(TxpoolGood, TxpoolBlocked) { + txpoolBlockedBlob = r.ID.isBlob + l.Log.Info("incompatible tx in txpool") + } else if r.ID.isCancel && txpoolState.CompareAndSwap(TxpoolCancelPending, TxpoolGood) { + // Set state to TxpoolGood even if the cancellation transaction ended in error + // since the stuck transaction could have cleared while we were waiting. + l.Log.Info("txpool may no longer be blocked", "err", r.Err) + } + l.Log.Info("Handling receipt", "id", r.ID) + l.handleReceipt(r) case <-l.shutdownCtx.Done(): if l.Txmgr.IsClosed() { l.Log.Info("Txmgr is closed, remaining channel data won't be sent") From a2938274f86be7f4651a847e3fa042dfdd3253c8 Mon Sep 17 00:00:00 2001 From: Samuel Laferriere Date: Wed, 28 Aug 2024 23:29:31 -0700 Subject: [PATCH 2/2] refactor(batcher): event loop seems to be working --- op-batcher/batcher/channel_manager.go | 111 +++++++++++++++++--------- op-batcher/batcher/driver.go | 83 +++++++++++++++---- 2 files changed, 143 insertions(+), 51 deletions(-) diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index 1f22565c94c5..dda526d236e5 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -59,6 +59,37 @@ func NewChannelManager(log log.Logger, metr metrics.Metricer, cfgProvider Channe } } +// IsFull returns true if the channel manager has more than 10 pending blocks to be inserted +// into channels. The driving loop can use this signal to throttle block loading from the sequencer. +func (s *channelManager) IsFull() bool { + s.mu.Lock() + defer s.mu.Unlock() + return len(s.blocks) > 10 +} + +func (s *channelManager) PendingBlocks() int { + s.mu.Lock() + defer s.mu.Unlock() + return len(s.blocks) +} + +// AddL2Block adds an L2 block to the internal blocks queue. It returns ErrReorg +// if the block does not extend the last block loaded into the state. If no +// blocks were added yet, the parent hash check is skipped. +func (s *channelManager) AddL2Block(block *types.Block) error { + s.mu.Lock() + defer s.mu.Unlock() + if s.tip != (common.Hash{}) && s.tip != block.ParentHash() { + return ErrReorg + } + + s.metr.RecordL2BlockInPendingQueue(block) + s.blocks = append(s.blocks, block) + s.tip = block.Hash() + + return nil +} + // Clear clears the entire state of the channel manager. // It is intended to be used before launching op-batcher and after an L2 reorg. func (s *channelManager) Clear(l1OriginLastClosedChannel eth.BlockID) { @@ -144,6 +175,42 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) { return tx, nil } +func (s *channelManager) HasTxData() bool { + s.mu.Lock() + defer s.mu.Unlock() + for _, ch := range s.channelQueue { + if ch.HasTxData() { + return true + } + } + return false +} + +func (s *channelManager) ProcessBlocks(l1Head eth.BlockID) error { + s.mu.Lock() + defer s.mu.Unlock() + + // If we have no saved blocks, we will not be able to create valid frames + if len(s.blocks) == 0 { + return io.EOF + } + + if err := s.ensureChannelWithSpace(eth.BlockID{}); err != nil { + return err + } + + if err := s.processBlocks(); err != nil { + return err + } + + // Register current L1 head only after all pending blocks have been + // processed. Even if a timeout will be triggered now, it is better to have + // all pending blocks be included in this channel for submission. + s.registerL1Block(l1Head) + + return s.outputFrames() +} + // TxData returns the next tx data that should be submitted to L1. // // If the pending channel is @@ -152,6 +219,7 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) { func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { s.mu.Lock() defer s.mu.Unlock() + var firstWithTxData *channel for _, ch := range s.channelQueue { if ch.HasTxData() { @@ -160,7 +228,7 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { } } - dataPending := firstWithTxData != nil && firstWithTxData.HasTxData() + dataPending := firstWithTxData != nil s.log.Debug("Requested tx data", "l1Head", l1Head, "txdata_pending", dataPending, "blocks_pending", len(s.blocks)) // Short circuit if there is pending tx data or the channel manager is closed. @@ -173,26 +241,12 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { // If we have no saved blocks, we will not be able to create valid frames if len(s.blocks) == 0 { return txData{}, io.EOF + } else { + // only adding this to help debug in case we forget cases while refactoring + // TODO(samlaf): change this to something appropriate after finishing the event loop refactor + s.log.Error("No pending tx data, but blocks are available. This should never happen. Did you forget to call processBlocks()?", "blocks_pending", len(s.blocks)) + return txData{}, fmt.Errorf("fatal error") } - - if err := s.ensureChannelWithSpace(l1Head); err != nil { - return txData{}, err - } - - if err := s.processBlocks(); err != nil { - return txData{}, err - } - - // Register current L1 head only after all pending blocks have been - // processed. Even if a timeout will be triggered now, it is better to have - // all pending blocks be included in this channel for submission. - s.registerL1Block(l1Head) - - if err := s.outputFrames(); err != nil { - return txData{}, err - } - - return s.nextTxData(s.currentChannel) } // ensureChannelWithSpace ensures currentChannel is populated with a channel that has @@ -333,23 +387,6 @@ func (s *channelManager) outputFrames() error { return nil } -// AddL2Block adds an L2 block to the internal blocks queue. It returns ErrReorg -// if the block does not extend the last block loaded into the state. If no -// blocks were added yet, the parent hash check is skipped. -func (s *channelManager) AddL2Block(block *types.Block) error { - s.mu.Lock() - defer s.mu.Unlock() - if s.tip != (common.Hash{}) && s.tip != block.ParentHash() { - return ErrReorg - } - - s.metr.RecordL2BlockInPendingQueue(block) - s.blocks = append(s.blocks, block) - s.tip = block.Hash() - - return nil -} - func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info *derive.L1BlockInfo) eth.L2BlockRef { return eth.L2BlockRef{ Hash: block.Hash(), diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 5d3ebf281f51..9f4986ec5f99 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -227,7 +227,7 @@ func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uin return nil, fmt.Errorf("adding L2 block to state: %w", err) } - l.Log.Info("Added L2 block to local state", "block", eth.ToBlockID(block), "tx_count", len(block.Transactions()), "time", block.Time()) + l.Log.Info("Added L2 block to local state", "block", eth.ToBlockID(block), "tx_count", len(block.Transactions()), "time", block.Time(), "pending_blocks", l.state.PendingBlocks()) return block, nil } @@ -306,9 +306,6 @@ func (l *BatchSubmitter) loop() { ) txpoolState.Store(TxpoolGood) - ticker := time.NewTicker(l.Config.PollInterval) - defer ticker.Stop() - publishAndWait := func() { l.publishStateToL1(queue, receiptsCh) if !l.Txmgr.IsClosed() { @@ -318,16 +315,50 @@ func (l *BatchSubmitter) loop() { } } + txpoolStateTick := time.NewTicker(l.Config.PollInterval) + loadBlocksTick := time.NewTicker(l.Config.PollInterval) + + var pendingTxData *txData + txDataCh := make(chan *txData) + + // tx publishing goroutine + go func() { + for txdata := range txDataCh { + if txdata == nil { + l.Log.Error("nil txdata received... shouldn't happen") + continue + } + if err := l.sendTransaction(l.killCtx, *txdata, queue, receiptsCh); err != nil { + l.Log.Error("Error sending transaction", "err", err) + } + } + }() + for { - select { - case <-ticker.C: - if txpoolState.CompareAndSwap(TxpoolBlocked, TxpoolCancelPending) { - // txpoolState is set to Blocked only if Send() is returning - // ErrAlreadyReserved. In this case, the TxMgr nonce should be reset to nil, - // allowing us to send a cancellation transaction. - l.cancelBlockingTx(queue, receiptsCh, txpoolBlockedBlob) + l.Log.Trace("BatchSubmitter loop iteration") + + // maybeNilTxDataCh is nil when pendingTxData is nil, + // such that we don't send a txData to the channel when there is no data to send. + var maybeNilTxDataCh chan *txData + if pendingTxData != nil { + maybeNilTxDataCh = txDataCh + } else if !l.Txmgr.IsClosed() && l.state.HasTxData() { + if l1tip, err := l.l1Tip(l.killCtx); err == nil { + if pendingTxDataPtr, err := l.state.TxData(l1tip.ID()); err == nil { + pendingTxData = &pendingTxDataPtr + maybeNilTxDataCh = txDataCh + } } - if txpoolState.Load() != TxpoolGood { + } + + var processBlocks <-chan time.Time + if l.state.IsFull() { + processBlocks = time.After(0) + } + + select { + case <-loadBlocksTick.C: + if l.state.IsFull() { continue } if err := l.loadBlocksIntoState(l.shutdownCtx); errors.Is(err, ErrReorg) { @@ -345,7 +376,20 @@ func (l *BatchSubmitter) loop() { l.clearState(l.shutdownCtx) continue } - l.publishStateToL1(queue, receiptsCh) + case <-processBlocks: + l.Log.Debug("Processing blocks") + l1tip, err := l.l1Tip(l.killCtx) + if err != nil { + l.Log.Error("Failed to query L1 tip", "err", err) + continue + } + err = l.state.ProcessBlocks(l1tip.ID()) + if err != nil && !errors.Is(err, io.EOF) { + l.Log.Error("Error processing blocks", "err", err) + } + case maybeNilTxDataCh <- pendingTxData: + // sent pendingTxData to the tx publishing goroutine + pendingTxData = nil case r := <-receiptsCh: if errors.Is(r.Err, txpool.ErrAlreadyReserved) && txpoolState.CompareAndSwap(TxpoolGood, TxpoolBlocked) { txpoolBlockedBlob = r.ID.isBlob @@ -357,6 +401,18 @@ func (l *BatchSubmitter) loop() { } l.Log.Info("Handling receipt", "id", r.ID) l.handleReceipt(r) + case <-txpoolStateTick.C: + l.Log.Trace("Checking txpool state") + if txpoolState.CompareAndSwap(TxpoolBlocked, TxpoolCancelPending) { + // txpoolState is set to Blocked only if Send() is returning + // ErrAlreadyReserved. In this case, the TxMgr nonce should be reset to nil, + // allowing us to send a cancellation transaction. + // TODO(samlaf): unblock this + l.cancelBlockingTx(queue, receiptsCh, txpoolBlockedBlob) + } + if txpoolState.Load() != TxpoolGood { + continue + } case <-l.shutdownCtx.Done(): if l.Txmgr.IsClosed() { l.Log.Info("Txmgr is closed, remaining channel data won't be sent") @@ -471,7 +527,6 @@ func (l *BatchSubmitter) clearState(ctx context.Context) { // publishTxToL1 submits a single state tx to the L1 func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error { - // send all available transactions l1tip, err := l.l1Tip(ctx) if err != nil { l.Log.Error("Failed to query L1 tip", "err", err)