Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

non blocking da v3: altdasend -> txsend pipeline #8

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 66 additions & 48 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ const (
func (l *BatchSubmitter) loop() {
defer l.wg.Done()

txDataCh := make(chan txData)
receiptsCh := make(chan txmgr.TxReceipt[txRef])
queue := txmgr.NewQueue[txRef](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions)

Expand Down Expand Up @@ -329,12 +330,41 @@ func (l *BatchSubmitter) loop() {
}
}
}()
go func() {
for {
select {
case txdata := <-txDataCh:
var candidate *txmgr.TxCandidate
var err error
if txdata.asBlob {
if candidate, err = l.blobTxCandidate(txdata); err != nil {
// We could potentially fall through and try a calldata tx instead, but this would
// likely result in the chain spending more in gas fees than it is tuned for, so best
// to just fail. We do not expect this error to trigger unless there is a serious bug
// or configuration issue.
l.state.TxFailed(txdata.ID())
l.Log.Error("could not create blob tx candidate: %w", err)
}
} else {
// sanity check
if nf := len(txdata.frames); nf != 1 {
l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf)
}
candidate = l.calldataTxCandidate(txdata.CallData())
}
l.queueTx(txdata, false, candidate, queue, receiptsCh)
case <-l.shutdownCtx.Done():
l.Log.Info("Shutting down tx sending loop")
return
}
}
}()

ticker := time.NewTicker(l.Config.PollInterval)
defer ticker.Stop()

publishAndWait := func() {
l.publishStateToL1(queue, receiptsCh)
l.prepareStateToSubmitToL1(txDataCh)
if !l.Txmgr.IsClosed() {
queue.Wait()
} else {
Expand Down Expand Up @@ -369,7 +399,7 @@ func (l *BatchSubmitter) loop() {
l.clearState(l.shutdownCtx)
continue
}
l.publishStateToL1(queue, receiptsCh)
l.prepareStateToSubmitToL1(txDataCh)
case <-l.shutdownCtx.Done():
if l.Txmgr.IsClosed() {
l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
Expand Down Expand Up @@ -424,22 +454,24 @@ func (l *BatchSubmitter) waitNodeSync() error {
return dial.WaitRollupSync(l.shutdownCtx, l.Log, rollupClient, l1TargetBlock, time.Second*12)
}

// publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is
// prepareStateToSubmitToL1 queues up all pending TxData to be published to the L1, returning when there is
// no more data to queue for publishing or if there was an error queing the data.
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) {
func (l *BatchSubmitter) prepareStateToSubmitToL1(txDataCh chan<- txData) {
for {
// if the txmgr is closed, we stop the transaction sending
if l.Txmgr.IsClosed() {
l.Log.Info("Txmgr is closed, aborting state publishing")
return
}
err := l.publishTxToL1(l.killCtx, queue, receiptsCh)
txdata, err := l.prepareTxData(l.killCtx)
if err != nil {
if err != io.EOF {
l.Log.Error("Error publishing tx to l1", "err", err)
}
return
}
// send txdata to be published by the tx sending loop
txDataCh <- txdata
}
}

Expand Down Expand Up @@ -482,13 +514,13 @@ func (l *BatchSubmitter) clearState(ctx context.Context) {
}
}

// publishTxToL1 submits a single state tx to the L1
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error {
// prepareTxData prepares a single state tx to be sent to the L1
func (l *BatchSubmitter) prepareTxData(ctx context.Context) (txData, error) {
// send all available transactions
l1tip, err := l.l1Tip(ctx)
if err != nil {
l.Log.Error("Failed to query L1 tip", "err", err)
return err
return txData{}, err
}
l.recordL1Tip(l1tip)

Expand All @@ -497,16 +529,21 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t

if err == io.EOF {
l.Log.Trace("No transaction data available")
return err
return txData{}, err
} else if err != nil {
l.Log.Error("Unable to get tx data", "err", err)
return err
return txData{}, err
}

if err = l.sendTransaction(ctx, txdata, queue, receiptsCh); err != nil {
return fmt.Errorf("BatchSubmitter.sendTransaction failed: %w", err)
// if AltDA is enabled we post the txdata to the DA Provider and replace it with the commitment.
if l.Config.UseAltDA {
// TODO: run this in a goroutine to avoid blocking the main loop
if err = l.publishToAltDAUpdateTxData(ctx, txdata); err != nil {
return txData{}, fmt.Errorf("BatchSubmitter.prepareAndQueueTxData failed: %w", err)
}
}
return nil

return txdata, nil
}

func (l *BatchSubmitter) safeL1Origin(ctx context.Context) (eth.BlockID, error) {
Expand Down Expand Up @@ -548,44 +585,25 @@ func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh
l.queueTx(txData{}, true, candidate, queue, receiptsCh)
}

// sendTransaction creates & queues for sending a transaction to the batch inbox address with the given `txData`.
// publishToAltDAUpdateTxData creates & queues for sending a transaction to the batch inbox address with the given `txData`.
// The method will block if the queue's MaxPendingTransactions is exceeded.
func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error {
var err error
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.

var candidate *txmgr.TxCandidate
if txdata.asBlob {
if candidate, err = l.blobTxCandidate(txdata); err != nil {
// We could potentially fall through and try a calldata tx instead, but this would
// likely result in the chain spending more in gas fees than it is tuned for, so best
// to just fail. We do not expect this error to trigger unless there is a serious bug
// or configuration issue.
return fmt.Errorf("could not create blob tx candidate: %w", err)
}
} else {
// sanity check
if nf := len(txdata.frames); nf != 1 {
l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf)
}
data := txdata.CallData()
// if AltDA is enabled we post the txdata to the DA Provider and replace it with the commitment.
if l.Config.UseAltDA {
comm, err := l.AltDA.SetInput(ctx, data)
if err != nil {
l.Log.Error("Failed to post input to Alt DA", "error", err)
// requeue frame if we fail to post to the DA Provider so it can be retried
l.recordFailedTx(txdata.ID(), err)
return nil
}
l.Log.Info("Set AltDA input", "commitment", comm, "tx", txdata.ID())
// signal AltDA commitment tx with TxDataVersion1
data = comm.TxData()
}
candidate = l.calldataTxCandidate(data)
func (l *BatchSubmitter) publishToAltDAUpdateTxData(ctx context.Context, txdata txData) error {
// sanity check
if nf := len(txdata.frames); nf != 1 {
l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf)
}
comm, err := l.AltDA.SetInput(ctx, txdata.CallData())
if err != nil {
l.Log.Error("Failed to post input to Alt DA", "error", err)
// requeue frame if we fail to post to the DA Provider so it can be retried
l.recordFailedTx(txdata.ID(), err)
return err
}
l.Log.Info("Set AltDA input", "commitment", comm, "tx", txdata.ID())
// TODO: this is kind of weird... we're overwriting the frame's data with the commitment
// but keeping the same ID just so we can delete/requeue the frames on success/failure
txdata.frames[0].data = comm.TxData()

l.queueTx(txdata, false, candidate, queue, receiptsCh)
return nil
}

Expand Down