diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 39ebf2f25b24..33ecf0e45851 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -62,6 +62,7 @@ type DriverSetup struct { RollupConfig *rollup.Config Config BatcherConfig Txmgr *txmgr.SimpleTxManager + Publisher *FramePublisher L1Client L1Client EndpointProvider dial.L2EndpointProvider ChannelConfig ChannelConfigProvider @@ -112,6 +113,9 @@ func (l *BatchSubmitter) StartBatchSubmitting() error { l.shutdownCtx, l.cancelShutdownCtx = context.WithCancel(context.Background()) l.killCtx, l.cancelKillCtx = context.WithCancel(context.Background()) l.clearState(l.shutdownCtx) + // use same max number of goroutines for altda and eth txs for now. + // might want to use separate limits in the future, but can't think of a reason right now. + l.Publisher.Init(l.killCtx, l.Txmgr, l.state, l.Config.MaxPendingTransactions, l.Config.MaxPendingTransactions) l.lastStoredBlock = eth.BlockID{} if l.Config.WaitNodeSync { @@ -334,9 +338,9 @@ func (l *BatchSubmitter) loop() { defer ticker.Stop() publishAndWait := func() { - l.publishStateToL1(queue, receiptsCh) + l.publishStateToL1() if !l.Txmgr.IsClosed() { - queue.Wait() + l.Publisher.Wait() } else { l.Log.Info("Txmgr is closed, remaining channel data won't be sent") } @@ -369,7 +373,7 @@ func (l *BatchSubmitter) loop() { l.clearState(l.shutdownCtx) continue } - l.publishStateToL1(queue, receiptsCh) + l.publishStateToL1() case <-l.shutdownCtx.Done(): if l.Txmgr.IsClosed() { l.Log.Info("Txmgr is closed, remaining channel data won't be sent") @@ -426,14 +430,14 @@ func (l *BatchSubmitter) waitNodeSync() error { // publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is // no more data to queue for publishing or if there was an error queing the data. -func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) { +func (l *BatchSubmitter) publishStateToL1() { for { // if the txmgr is closed, we stop the transaction sending if l.Txmgr.IsClosed() { l.Log.Info("Txmgr is closed, aborting state publishing") return } - err := l.publishTxToL1(l.killCtx, queue, receiptsCh) + err := l.publishTxToL1(l.killCtx) if err != nil { if err != io.EOF { l.Log.Error("Error publishing tx to l1", "err", err) @@ -483,7 +487,7 @@ func (l *BatchSubmitter) clearState(ctx context.Context) { } // publishTxToL1 submits a single state tx to the L1 -func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error { +func (l *BatchSubmitter) publishTxToL1(ctx context.Context) error { // send all available transactions l1tip, err := l.l1Tip(ctx) if err != nil { @@ -503,7 +507,7 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t return err } - if err = l.sendTransaction(ctx, txdata, queue, receiptsCh); err != nil { + if err = l.Publisher.Publish(ctx, txdata); err != nil { return fmt.Errorf("BatchSubmitter.sendTransaction failed: %w", err) } return nil @@ -548,47 +552,6 @@ func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh l.queueTx(txData{}, true, candidate, queue, receiptsCh) } -// sendTransaction creates & queues for sending a transaction to the batch inbox address with the given `txData`. -// The method will block if the queue's MaxPendingTransactions is exceeded. -func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error { - var err error - // Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit. - - var candidate *txmgr.TxCandidate - if txdata.asBlob { - if candidate, err = l.blobTxCandidate(txdata); err != nil { - // We could potentially fall through and try a calldata tx instead, but this would - // likely result in the chain spending more in gas fees than it is tuned for, so best - // to just fail. We do not expect this error to trigger unless there is a serious bug - // or configuration issue. - return fmt.Errorf("could not create blob tx candidate: %w", err) - } - } else { - // sanity check - if nf := len(txdata.frames); nf != 1 { - l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf) - } - data := txdata.CallData() - // if AltDA is enabled we post the txdata to the DA Provider and replace it with the commitment. - if l.Config.UseAltDA { - comm, err := l.AltDA.SetInput(ctx, data) - if err != nil { - l.Log.Error("Failed to post input to Alt DA", "error", err) - // requeue frame if we fail to post to the DA Provider so it can be retried - l.recordFailedTx(txdata.ID(), err) - return nil - } - l.Log.Info("Set AltDA input", "commitment", comm, "tx", txdata.ID()) - // signal AltDA commitment tx with TxDataVersion1 - data = comm.TxData() - } - candidate = l.calldataTxCandidate(data) - } - - l.queueTx(txdata, false, candidate, queue, receiptsCh) - return nil -} - func (l *BatchSubmitter) queueTx(txdata txData, isCancel bool, candidate *txmgr.TxCandidate, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) { intrinsicGas, err := core.IntrinsicGas(candidate.TxData, nil, false, true, true, false) if err != nil { diff --git a/op-batcher/batcher/frame_publisher.go b/op-batcher/batcher/frame_publisher.go new file mode 100644 index 000000000000..f0aa301e9ebf --- /dev/null +++ b/op-batcher/batcher/frame_publisher.go @@ -0,0 +1,155 @@ +package batcher + +import ( + "context" + "fmt" + + altda "github.com/ethereum-optimism/optimism/op-alt-da" + "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-service/txmgr" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/log" + "golang.org/x/sync/errgroup" +) + +// FramePublisher transforms txData structs into txmgr.TxCandidate structs and queues them for sending. +// It is responsible for sending transactions to the L1 chain. +type FramePublisher struct { + log log.Logger + rollupConfig *rollup.Config + altDA *altda.DAClient + + // below need to be initialized dynamically by Start() + daErrGroup errgroup.Group + state *channelManager + queue *txmgr.Queue[txRef] + receiptsCh chan txmgr.TxReceipt[txRef] +} + +func NewFramePublisher( + log log.Logger, + rollupConfig *rollup.Config, + altDA *altda.DAClient, +) *FramePublisher { + return &FramePublisher{ + log: log, + rollupConfig: rollupConfig, + altDA: altDA, + } +} + +// TODO: this is super super ugly... must be a better way to construct/initialize this +// but for now just want to focus on the idea of the FramePublisher +func (fp *FramePublisher) Init(killCtx context.Context, txMgr txmgr.TxManager, state *channelManager, maxPendingDaPutRequests uint64, maxPendingEthTxs uint64) { + receiptsCh := make(chan txmgr.TxReceipt[txRef]) + queue := txmgr.NewQueue[txRef](killCtx, txMgr, maxPendingEthTxs) + daErrGroup := errgroup.Group{} + daErrGroup.SetLimit(int(maxPendingDaPutRequests)) + fp.receiptsCh = receiptsCh + fp.queue = queue + fp.state = state +} + +// Publish creates & queues for sending a transaction to the batch inbox address with the given `txData`. +// The method will block if the queue's MaxPendingTransactions is exceeded. +func (fp *FramePublisher) Publish(ctx context.Context, txdata txData) error { + var err error + + // if Alt DA is enabled we post the txdata to the DA Provider and replace it with the commitment. + if fp.altDA != nil { + if txdata.asBlob { + return fmt.Errorf("AltDA with 4844 blob txs not supported") + } + // sanity check + if nf := len(txdata.frames); nf != 1 { + fp.log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf) + } + // when posting txdata to an external DA Provider, we use a goroutine to avoid blocking the main loop + // since it may take a while to post the txdata to the DA Provider. + fp.daErrGroup.Go(func() error { + comm, err := fp.altDA.SetInput(ctx, txdata.CallData()) + if err != nil { + fp.log.Error("Failed to post input to Alt DA", "error", err) + // requeue frame if we fail to post to the DA Provider so it can be retried + fp.recordFailedTx(txdata.ID(), err) + return err + } + fp.log.Info("Set altda input", "commitment", comm, "tx", txdata.ID()) + // signal altda commitment tx with TxDataVersion1 + candidate := fp.calldataTxCandidate(comm.TxData()) + fp.queueTx(txdata, false, candidate) + return nil + }) + // we return nil to allow publishStateToL1 to keep processing the next txdata + return nil + } + + var candidate *txmgr.TxCandidate + if txdata.asBlob { + if candidate, err = fp.blobTxCandidate(txdata); err != nil { + // We could potentially fall through and try a calldata tx instead, but this would + // likely result in the chain spending more in gas fees than it is tuned for, so best + // to just fail. We do not expect this error to trigger unless there is a serious bug + // or configuration issue. + return fmt.Errorf("could not create blob tx candidate: %w", err) + } + } else { + // sanity check + if nf := len(txdata.frames); nf != 1 { + fp.log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf) + } + candidate = fp.calldataTxCandidate(txdata.CallData()) + } + + fp.queueTx(txdata, false, candidate) + return nil +} + +func (fp *FramePublisher) calldataTxCandidate(data []byte) *txmgr.TxCandidate { + fp.log.Info("Building Calldata transaction candidate", "size", len(data)) + return &txmgr.TxCandidate{ + To: &fp.rollupConfig.BatchInboxAddress, + TxData: data, + } +} + +func (fp *FramePublisher) blobTxCandidate(data txData) (*txmgr.TxCandidate, error) { + blobs, err := data.Blobs() + if err != nil { + return nil, fmt.Errorf("generating blobs for tx data: %w", err) + } + size := data.Len() + lastSize := len(data.frames[len(data.frames)-1].data) + fp.log.Info("Building Blob transaction candidate", + "size", size, "last_size", lastSize, "num_blobs", len(blobs)) + // TODO: move metric from driver to here + // fp.Metr.RecordBlobUsedBytes(lastSize) + return &txmgr.TxCandidate{ + To: &fp.rollupConfig.BatchInboxAddress, + Blobs: blobs, + }, nil +} + +func (fp *FramePublisher) queueTx(txdata txData, isCancel bool, candidate *txmgr.TxCandidate) { + intrinsicGas, err := core.IntrinsicGas(candidate.TxData, nil, false, true, true, false) + if err != nil { + // we log instead of return an error here because txmgr can do its own gas estimation + fp.log.Error("Failed to calculate intrinsic gas", "err", err) + } else { + candidate.GasLimit = intrinsicGas + } + + fp.queue.Send(txRef{id: txdata.ID(), isCancel: isCancel, isBlob: txdata.asBlob}, *candidate, fp.receiptsCh) +} + +func (l *FramePublisher) recordFailedTx(id txID, err error) { + l.log.Warn("Transaction failed to send", logFields(id, err)...) + l.state.TxFailed(id) +} + +func (fp *FramePublisher) Wait() { + fp.log.Info("Wait for pure DA writes, not L1 txs") + fp.daErrGroup.Wait() + fp.log.Info("Wait for L1 writes (blobs or DA commitments)") + fp.queue.Wait() +} diff --git a/op-batcher/batcher/service.go b/op-batcher/batcher/service.go index 00d3d32071f7..dd20c24470e9 100644 --- a/op-batcher/batcher/service.go +++ b/op-batcher/batcher/service.go @@ -52,6 +52,7 @@ type BatcherService struct { L1Client *ethclient.Client EndpointProvider dial.L2EndpointProvider TxManager *txmgr.SimpleTxManager + FramePublisher *FramePublisher AltDA *altda.DAClient BatcherConfig @@ -115,10 +116,11 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string, if err := bs.initPProf(cfg); err != nil { return fmt.Errorf("failed to init profiling: %w", err) } - // init before driver + // init before driver and framePublisher if err := bs.initAltDA(cfg); err != nil { return fmt.Errorf("failed to init AltDA: %w", err) } + bs.initFramePublisher(cfg) bs.initDriver() if err := bs.initRPCServer(cfg); err != nil { return fmt.Errorf("failed to start RPC server: %w", err) @@ -276,6 +278,11 @@ func (bs *BatcherService) initTxManager(cfg *CLIConfig) error { return nil } +func (bs *BatcherService) initFramePublisher(cfg *CLIConfig) { + fp := NewFramePublisher(bs.Log, bs.RollupConfig, bs.AltDA) + bs.FramePublisher = fp +} + func (bs *BatcherService) initPProf(cfg *CLIConfig) error { bs.pprofService = oppprof.New( cfg.PprofConfig.ListenEnabled, @@ -319,6 +326,7 @@ func (bs *BatcherService) initDriver() { RollupConfig: bs.RollupConfig, Config: bs.BatcherConfig, Txmgr: bs.TxManager, + Publisher: bs.FramePublisher, L1Client: bs.L1Client, EndpointProvider: bs.EndpointProvider, ChannelConfig: bs.ChannelConfig, diff --git a/op-batcher/batcher/tx_data.go b/op-batcher/batcher/tx_data.go index d0f5474fd5f2..d8834dd3a334 100644 --- a/op-batcher/batcher/tx_data.go +++ b/op-batcher/batcher/tx_data.go @@ -10,9 +10,9 @@ import ( // txData represents the data for a single transaction. // -// Note: The batcher currently sends exactly one frame per transaction. This -// might change in the future to allow for multiple frames from possibly -// different channels. +// Note: The batcher currently sends transactions where all frames come from +// a single channel. This might change in the future to allow for +// multiple frames from possibly different channels. type txData struct { frames []frameData asBlob bool // indicates whether this should be sent as blob