Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

non blocking da v2: frame publisher wrapper #6

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 11 additions & 48 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type DriverSetup struct {
RollupConfig *rollup.Config
Config BatcherConfig
Txmgr *txmgr.SimpleTxManager
Publisher *FramePublisher
L1Client L1Client
EndpointProvider dial.L2EndpointProvider
ChannelConfig ChannelConfigProvider
Expand Down Expand Up @@ -112,6 +113,9 @@ func (l *BatchSubmitter) StartBatchSubmitting() error {
l.shutdownCtx, l.cancelShutdownCtx = context.WithCancel(context.Background())
l.killCtx, l.cancelKillCtx = context.WithCancel(context.Background())
l.clearState(l.shutdownCtx)
// use same max number of goroutines for altda and eth txs for now.
// might want to use separate limits in the future, but can't think of a reason right now.
l.Publisher.Init(l.killCtx, l.Txmgr, l.state, l.Config.MaxPendingTransactions, l.Config.MaxPendingTransactions)
l.lastStoredBlock = eth.BlockID{}

if l.Config.WaitNodeSync {
Expand Down Expand Up @@ -334,9 +338,9 @@ func (l *BatchSubmitter) loop() {
defer ticker.Stop()

publishAndWait := func() {
l.publishStateToL1(queue, receiptsCh)
l.publishStateToL1()
if !l.Txmgr.IsClosed() {
queue.Wait()
l.Publisher.Wait()
} else {
l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
}
Expand Down Expand Up @@ -369,7 +373,7 @@ func (l *BatchSubmitter) loop() {
l.clearState(l.shutdownCtx)
continue
}
l.publishStateToL1(queue, receiptsCh)
l.publishStateToL1()
case <-l.shutdownCtx.Done():
if l.Txmgr.IsClosed() {
l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
Expand Down Expand Up @@ -426,14 +430,14 @@ func (l *BatchSubmitter) waitNodeSync() error {

// publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is
// no more data to queue for publishing or if there was an error queing the data.
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) {
func (l *BatchSubmitter) publishStateToL1() {
for {
// if the txmgr is closed, we stop the transaction sending
if l.Txmgr.IsClosed() {
l.Log.Info("Txmgr is closed, aborting state publishing")
return
}
err := l.publishTxToL1(l.killCtx, queue, receiptsCh)
err := l.publishTxToL1(l.killCtx)
if err != nil {
if err != io.EOF {
l.Log.Error("Error publishing tx to l1", "err", err)
Expand Down Expand Up @@ -483,7 +487,7 @@ func (l *BatchSubmitter) clearState(ctx context.Context) {
}

// publishTxToL1 submits a single state tx to the L1
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error {
func (l *BatchSubmitter) publishTxToL1(ctx context.Context) error {
// send all available transactions
l1tip, err := l.l1Tip(ctx)
if err != nil {
Expand All @@ -503,7 +507,7 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t
return err
}

if err = l.sendTransaction(ctx, txdata, queue, receiptsCh); err != nil {
if err = l.Publisher.Publish(ctx, txdata); err != nil {
return fmt.Errorf("BatchSubmitter.sendTransaction failed: %w", err)
}
return nil
Expand Down Expand Up @@ -548,47 +552,6 @@ func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh
l.queueTx(txData{}, true, candidate, queue, receiptsCh)
}

// sendTransaction creates & queues for sending a transaction to the batch inbox address with the given `txData`.
// The method will block if the queue's MaxPendingTransactions is exceeded.
func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error {
var err error
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.

var candidate *txmgr.TxCandidate
if txdata.asBlob {
if candidate, err = l.blobTxCandidate(txdata); err != nil {
// We could potentially fall through and try a calldata tx instead, but this would
// likely result in the chain spending more in gas fees than it is tuned for, so best
// to just fail. We do not expect this error to trigger unless there is a serious bug
// or configuration issue.
return fmt.Errorf("could not create blob tx candidate: %w", err)
}
} else {
// sanity check
if nf := len(txdata.frames); nf != 1 {
l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf)
}
data := txdata.CallData()
// if AltDA is enabled we post the txdata to the DA Provider and replace it with the commitment.
if l.Config.UseAltDA {
comm, err := l.AltDA.SetInput(ctx, data)
if err != nil {
l.Log.Error("Failed to post input to Alt DA", "error", err)
// requeue frame if we fail to post to the DA Provider so it can be retried
l.recordFailedTx(txdata.ID(), err)
return nil
}
l.Log.Info("Set AltDA input", "commitment", comm, "tx", txdata.ID())
// signal AltDA commitment tx with TxDataVersion1
data = comm.TxData()
}
candidate = l.calldataTxCandidate(data)
}

l.queueTx(txdata, false, candidate, queue, receiptsCh)
return nil
}

func (l *BatchSubmitter) queueTx(txdata txData, isCancel bool, candidate *txmgr.TxCandidate, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) {
intrinsicGas, err := core.IntrinsicGas(candidate.TxData, nil, false, true, true, false)
if err != nil {
Expand Down
155 changes: 155 additions & 0 deletions op-batcher/batcher/frame_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package batcher

import (
"context"
"fmt"

altda "github.com/ethereum-optimism/optimism/op-alt-da"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/log"
"golang.org/x/sync/errgroup"
)

// FramePublisher transforms txData structs into txmgr.TxCandidate structs and queues them for sending.
// It is responsible for sending transactions to the L1 chain.
type FramePublisher struct {
log log.Logger
rollupConfig *rollup.Config
altDA *altda.DAClient

// below need to be initialized dynamically by Start()
daErrGroup errgroup.Group
state *channelManager
queue *txmgr.Queue[txRef]
receiptsCh chan txmgr.TxReceipt[txRef]
}

func NewFramePublisher(
log log.Logger,
rollupConfig *rollup.Config,
altDA *altda.DAClient,
) *FramePublisher {
return &FramePublisher{
log: log,
rollupConfig: rollupConfig,
altDA: altDA,
}
}

// TODO: this is super super ugly... must be a better way to construct/initialize this
// but for now just want to focus on the idea of the FramePublisher
func (fp *FramePublisher) Init(killCtx context.Context, txMgr txmgr.TxManager, state *channelManager, maxPendingDaPutRequests uint64, maxPendingEthTxs uint64) {
receiptsCh := make(chan txmgr.TxReceipt[txRef])
queue := txmgr.NewQueue[txRef](killCtx, txMgr, maxPendingEthTxs)
daErrGroup := errgroup.Group{}
daErrGroup.SetLimit(int(maxPendingDaPutRequests))
fp.receiptsCh = receiptsCh
fp.queue = queue
fp.state = state
}

// Publish creates & queues for sending a transaction to the batch inbox address with the given `txData`.
// The method will block if the queue's MaxPendingTransactions is exceeded.
func (fp *FramePublisher) Publish(ctx context.Context, txdata txData) error {
var err error

// if Alt DA is enabled we post the txdata to the DA Provider and replace it with the commitment.
if fp.altDA != nil {
if txdata.asBlob {
return fmt.Errorf("AltDA with 4844 blob txs not supported")
}
// sanity check
if nf := len(txdata.frames); nf != 1 {
fp.log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf)
}
// when posting txdata to an external DA Provider, we use a goroutine to avoid blocking the main loop
// since it may take a while to post the txdata to the DA Provider.
fp.daErrGroup.Go(func() error {
comm, err := fp.altDA.SetInput(ctx, txdata.CallData())
if err != nil {
fp.log.Error("Failed to post input to Alt DA", "error", err)
// requeue frame if we fail to post to the DA Provider so it can be retried
fp.recordFailedTx(txdata.ID(), err)
return err
}
fp.log.Info("Set altda input", "commitment", comm, "tx", txdata.ID())
// signal altda commitment tx with TxDataVersion1
candidate := fp.calldataTxCandidate(comm.TxData())
fp.queueTx(txdata, false, candidate)
return nil
})
// we return nil to allow publishStateToL1 to keep processing the next txdata
return nil
}

var candidate *txmgr.TxCandidate
if txdata.asBlob {
if candidate, err = fp.blobTxCandidate(txdata); err != nil {
// We could potentially fall through and try a calldata tx instead, but this would
// likely result in the chain spending more in gas fees than it is tuned for, so best
// to just fail. We do not expect this error to trigger unless there is a serious bug
// or configuration issue.
return fmt.Errorf("could not create blob tx candidate: %w", err)
}
} else {
// sanity check
if nf := len(txdata.frames); nf != 1 {
fp.log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf)
}
candidate = fp.calldataTxCandidate(txdata.CallData())
}

fp.queueTx(txdata, false, candidate)
return nil
}

func (fp *FramePublisher) calldataTxCandidate(data []byte) *txmgr.TxCandidate {
fp.log.Info("Building Calldata transaction candidate", "size", len(data))
return &txmgr.TxCandidate{
To: &fp.rollupConfig.BatchInboxAddress,
TxData: data,
}
}

func (fp *FramePublisher) blobTxCandidate(data txData) (*txmgr.TxCandidate, error) {
blobs, err := data.Blobs()
if err != nil {
return nil, fmt.Errorf("generating blobs for tx data: %w", err)
}
size := data.Len()
lastSize := len(data.frames[len(data.frames)-1].data)
fp.log.Info("Building Blob transaction candidate",
"size", size, "last_size", lastSize, "num_blobs", len(blobs))
// TODO: move metric from driver to here
// fp.Metr.RecordBlobUsedBytes(lastSize)
return &txmgr.TxCandidate{
To: &fp.rollupConfig.BatchInboxAddress,
Blobs: blobs,
}, nil
}

func (fp *FramePublisher) queueTx(txdata txData, isCancel bool, candidate *txmgr.TxCandidate) {
intrinsicGas, err := core.IntrinsicGas(candidate.TxData, nil, false, true, true, false)
if err != nil {
// we log instead of return an error here because txmgr can do its own gas estimation
fp.log.Error("Failed to calculate intrinsic gas", "err", err)
} else {
candidate.GasLimit = intrinsicGas
}

fp.queue.Send(txRef{id: txdata.ID(), isCancel: isCancel, isBlob: txdata.asBlob}, *candidate, fp.receiptsCh)
}

func (l *FramePublisher) recordFailedTx(id txID, err error) {
l.log.Warn("Transaction failed to send", logFields(id, err)...)
l.state.TxFailed(id)
}

func (fp *FramePublisher) Wait() {
fp.log.Info("Wait for pure DA writes, not L1 txs")
fp.daErrGroup.Wait()
fp.log.Info("Wait for L1 writes (blobs or DA commitments)")
fp.queue.Wait()
}
10 changes: 9 additions & 1 deletion op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type BatcherService struct {
L1Client *ethclient.Client
EndpointProvider dial.L2EndpointProvider
TxManager *txmgr.SimpleTxManager
FramePublisher *FramePublisher
AltDA *altda.DAClient

BatcherConfig
Expand Down Expand Up @@ -115,10 +116,11 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string,
if err := bs.initPProf(cfg); err != nil {
return fmt.Errorf("failed to init profiling: %w", err)
}
// init before driver
// init before driver and framePublisher
if err := bs.initAltDA(cfg); err != nil {
return fmt.Errorf("failed to init AltDA: %w", err)
}
bs.initFramePublisher(cfg)
bs.initDriver()
if err := bs.initRPCServer(cfg); err != nil {
return fmt.Errorf("failed to start RPC server: %w", err)
Expand Down Expand Up @@ -276,6 +278,11 @@ func (bs *BatcherService) initTxManager(cfg *CLIConfig) error {
return nil
}

func (bs *BatcherService) initFramePublisher(cfg *CLIConfig) {
fp := NewFramePublisher(bs.Log, bs.RollupConfig, bs.AltDA)
bs.FramePublisher = fp
}

func (bs *BatcherService) initPProf(cfg *CLIConfig) error {
bs.pprofService = oppprof.New(
cfg.PprofConfig.ListenEnabled,
Expand Down Expand Up @@ -319,6 +326,7 @@ func (bs *BatcherService) initDriver() {
RollupConfig: bs.RollupConfig,
Config: bs.BatcherConfig,
Txmgr: bs.TxManager,
Publisher: bs.FramePublisher,
L1Client: bs.L1Client,
EndpointProvider: bs.EndpointProvider,
ChannelConfig: bs.ChannelConfig,
Expand Down
6 changes: 3 additions & 3 deletions op-batcher/batcher/tx_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (

// txData represents the data for a single transaction.
//
// Note: The batcher currently sends exactly one frame per transaction. This
// might change in the future to allow for multiple frames from possibly
// different channels.
// Note: The batcher currently sends transactions where all frames come from
// a single channel. This might change in the future to allow for
// multiple frames from possibly different channels.
type txData struct {
frames []frameData
asBlob bool // indicates whether this should be sent as blob
Expand Down