Skip to content

Commit

Permalink
feat: support eigenda
Browse files Browse the repository at this point in the history
  • Loading branch information
renlulu committed Mar 10, 2024
1 parent 8f33fea commit 253e87d
Show file tree
Hide file tree
Showing 18 changed files with 463 additions and 55 deletions.
51 changes: 51 additions & 0 deletions .github/workflows/docker-eigenda.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
name: Build nitro-eigenda Docker Image
on:
push:
tags: ["*"]

jobs:
docker:
runs-on: ubuntu-latest
strategy:
matrix:
include:
- name: build and push nitro-eigenda
image: nitro-eigenda
dockerfile: Dockerfile
context: .
buildargs: ''
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: "true"

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2

- name: Prepare Environment Variables
run: |
echo "SHORT_SHA=${GITHUB_SHA::7}" | tee -a $GITHUB_ENV
GIT_TAG=$(git tag --points-at HEAD)
echo "GIT_TAG=$GIT_TAG" | tee -a $GITHUB_ENV
echo "REF_NAME=$(echo ${GIT_TAG:-$GITHUB_REF_NAME} | sed 's/[^a-zA-Z0-9._]/-/g')" | tee -a $GITHUB_ENV
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.ECR_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.ECR_ACCESS_KEY }}
aws-region: us-west-2

- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v1

- name: ${{ matrix.name }}
uses: docker/build-push-action@v4
with:
context: ${{ matrix.context }}
push: true
tags: ${{ secrets.ECR_REGISTRY }}/${{ matrix.image }}:${{ github.sha }} , ${{ secrets.ECR_REGISTRY }}/${{ matrix.image }}:${{ env.SHORT_SHA }} , ${{ secrets.ECR_REGISTRY }}/${{ matrix.image }}:${{ env.REF_NAME }} , ${{ secrets.ECR_REGISTRY }}/${{ matrix.image }}:${{ env.REF_NAME }}-${{ env.SHORT_SHA }} , ${{ secrets.ECR_REGISTRY }}/${{ matrix.image }}:latest
file: ${{ matrix.dockerfile }}
provenance: false
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ COPY ./blsSignatures ./blsSignatures
COPY ./cmd/chaininfo ./cmd/chaininfo
COPY ./cmd/replay ./cmd/replay
COPY ./das/dastree ./das/dastree
COPY ./das/eigenda ./das/eigenda
COPY ./precompiles ./precompiles
COPY ./statetransfer ./statetransfer
COPY ./util ./util
Expand Down
1 change: 1 addition & 0 deletions arbitrator/jit/src/syscall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ fn get_field(env: &mut WasmEnv, source: u32, field: &[u8]) -> GoValue {
}
None => GoValue::Null,
},
(PROCESS_ID, b"pid") => GoValue::Number(1.),
_ => {
let field = String::from_utf8_lossy(field);
eprintln!("Go trying to access unimplemented unknown JS value {source} field {field}");
Expand Down
4 changes: 4 additions & 0 deletions arbitrator/wasm-libraries/go-stub/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ pub unsafe fn get_field(source: u32, field: &[u8]) -> GoValue {
return GoValue::Null;
}
}
} else if source == PROCESS_ID {
if field == b"pid" {
return GoValue::Number(1.);
}
}

if let Some(source) = DynamicObjectPool::singleton().get(source).cloned() {
Expand Down
51 changes: 39 additions & 12 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/offchainlabs/nitro/cmd/chaininfo"
"github.com/offchainlabs/nitro/cmd/genericconf"
"github.com/offchainlabs/nitro/das"
"github.com/offchainlabs/nitro/das/eigenda"
"github.com/offchainlabs/nitro/solgen/go/bridgegen"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/headerreader"
Expand Down Expand Up @@ -71,6 +72,7 @@ type BatchPoster struct {
gasRefunderAddr common.Address
building *buildingBatch
daWriter das.DataAvailabilityServiceWriter
eigenDAWriter eigenda.EigenDAWriter
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
firstEphemeralError time.Time // first time a continuous error suspected to be ephemeral occurred
Expand Down Expand Up @@ -100,8 +102,9 @@ const (
)

type BatchPosterConfig struct {
Enable bool `koanf:"enable"`
DisableDasFallbackStoreDataOnChain bool `koanf:"disable-das-fallback-store-data-on-chain" reload:"hot"`
Enable bool `koanf:"enable"`
DisableDasFallbackStoreDataOnChain bool `koanf:"disable-das-fallback-store-data-on-chain" reload:"hot"`
DisableEigenDAFallbackStoreDataOnChain bool `koanf:"disable-eigenda-fallback-store-data-on-chain" reload:"hot"`
// Max batch size.
MaxSize int `koanf:"max-size" reload:"hot"`
// Max batch post delay.
Expand Down Expand Up @@ -219,15 +222,16 @@ var TestBatchPosterConfig = BatchPosterConfig{
}

type BatchPosterOpts struct {
DataPosterDB ethdb.Database
L1Reader *headerreader.HeaderReader
Inbox *InboxTracker
Streamer *TransactionStreamer
SyncMonitor *SyncMonitor
Config BatchPosterConfigFetcher
DeployInfo *chaininfo.RollupAddresses
TransactOpts *bind.TransactOpts
DAWriter das.DataAvailabilityServiceWriter
DataPosterDB ethdb.Database
L1Reader *headerreader.HeaderReader
Inbox *InboxTracker
Streamer *TransactionStreamer
SyncMonitor *SyncMonitor
Config BatchPosterConfigFetcher
DeployInfo *chaininfo.RollupAddresses
TransactOpts *bind.TransactOpts
DAWriter das.DataAvailabilityServiceWriter
EigenDAWriter eigenda.EigenDAWriter
}

func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, error) {
Expand Down Expand Up @@ -272,6 +276,7 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e
gasRefunderAddr: opts.Config().gasRefunder,
bridgeAddr: opts.DeployInfo.Bridge,
daWriter: opts.DAWriter,
eigenDAWriter: opts.EigenDAWriter,
redisLock: redisLock,
}
b.messagesPerBatch, err = arbmath.NewMovingAverage[uint64](20)
Expand Down Expand Up @@ -1035,6 +1040,28 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
}
}

if b.daWriter == nil && b.eigenDAWriter != nil {
log.Info("Start to write data to eigenda: ", "data", hex.EncodeToString(sequencerMsg))
daRef, err := b.eigenDAWriter.Store(ctx, sequencerMsg)
if errors.Is(err, eigenda.ErrBatchToEigenDA) {
if config.DisableEigenDAFallbackStoreDataOnChain {
log.Warn("Falling back to storing data on chain", "err", err)
return false, errors.New("unable to post batch to EigenDA and fallback storing data on chain is disabled")
}
log.Warn("Falling back to storing data on chain")
} else if err != nil {
return false, err
} else {
pointer, err := b.eigenDAWriter.Serialize(daRef)
if err != nil {
log.Warn("DaRef serialization failed", "err", err)
return false, errors.New("DaRef serialization failed")
}
log.Info("EigenDA transaction receipt(data pointer): ", "hash", hex.EncodeToString(daRef.BatchHeaderHash), "index", daRef.BlobIndex)
sequencerMsg = pointer
}
}

data, err := b.encodeAddBatch(new(big.Int).SetUint64(batchPosition.NextSeqNum), batchPosition.MessageCount, b.building.msgCount, sequencerMsg, b.building.segments.delayedMsg)
if err != nil {
return false, err
Expand Down Expand Up @@ -1222,4 +1249,4 @@ func (b *BatchPoster) StopAndWait() {
b.StopWaiter.StopAndWait()
b.dataPoster.StopAndWait()
b.redisLock.StopAndWait()
}
}
2 changes: 1 addition & 1 deletion arbnode/delayed_seq_reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestSequencerReorgFromDelayed(t *testing.T) {
defer cancel()

exec, streamer, db, _ := NewTransactionStreamerForTest(t, common.Address{})
tracker, err := NewInboxTracker(db, streamer, nil)
tracker, err := NewInboxTracker(db, streamer, nil, nil)
Require(t, err)

err = streamer.Start(ctx)
Expand Down
7 changes: 5 additions & 2 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcaster"
m "github.com/offchainlabs/nitro/broadcaster/message"
"github.com/offchainlabs/nitro/das/eigenda"
"github.com/offchainlabs/nitro/staker"
"github.com/offchainlabs/nitro/util/containers"
)
Expand All @@ -38,12 +39,13 @@ type InboxTracker struct {
mutex sync.Mutex
validator *staker.BlockValidator
das arbstate.DataAvailabilityReader
eigenDA eigenda.EigenDAReader

batchMetaMutex sync.Mutex
batchMeta *containers.LruCache[uint64, BatchMetadata]
}

func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arbstate.DataAvailabilityReader) (*InboxTracker, error) {
func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arbstate.DataAvailabilityReader, eigenDAReader eigenda.EigenDAReader) (*InboxTracker, error) {
// We support a nil txStreamer for the pruning code
if txStreamer != nil && txStreamer.chainConfig.ArbitrumChainParams.DataAvailabilityCommittee && das == nil {
return nil, errors.New("data availability service required but unconfigured")
Expand All @@ -52,6 +54,7 @@ func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arb
db: db,
txStreamer: txStreamer,
das: das,
eigenDA: eigenDAReader,
batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000),
}
return tracker, nil
Expand Down Expand Up @@ -603,7 +606,7 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L
ctx: ctx,
client: client,
}
multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, t.das, arbstate.KeysetValidate)
multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, t.das, t.eigenDA, arbstate.KeysetValidate)
batchMessageCounts := make(map[uint64]arbutil.MessageIndex)
currentpos := prevbatchmeta.MessageCount + 1
for {
Expand Down
34 changes: 24 additions & 10 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/offchainlabs/nitro/broadcaster"
"github.com/offchainlabs/nitro/cmd/chaininfo"
"github.com/offchainlabs/nitro/das"
"github.com/offchainlabs/nitro/das/eigenda"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/execution/gethexec"
"github.com/offchainlabs/nitro/solgen/go/bridgegen"
Expand Down Expand Up @@ -85,6 +86,7 @@ type Config struct {
Staker staker.L1ValidatorConfig `koanf:"staker" reload:"hot"`
SeqCoordinator SeqCoordinatorConfig `koanf:"seq-coordinator"`
DataAvailability das.DataAvailabilityConfig `koanf:"data-availability"`
EigenDA eigenda.EigenDAConfig `koanf:"eigen-da"`
SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"`
Dangerous DangerousConfig `koanf:"dangerous"`
TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"`
Expand Down Expand Up @@ -142,6 +144,7 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feed
staker.L1ValidatorConfigAddOptions(prefix+".staker", f)
SeqCoordinatorConfigAddOptions(prefix+".seq-coordinator", f)
das.DataAvailabilityConfigAddNodeOptions(prefix+".data-availability", f)
eigenda.EigenDAConfigAddOptions(prefix+".eigen-da", f)
SyncMonitorConfigAddOptions(prefix+".sync-monitor", f)
DangerousConfigAddOptions(prefix+".dangerous", f)
TransactionStreamerConfigAddOptions(prefix+".transaction-streamer", f)
Expand Down Expand Up @@ -484,6 +487,8 @@ func createNodeImpl(
var daWriter das.DataAvailabilityServiceWriter
var daReader das.DataAvailabilityServiceReader
var dasLifecycleManager *das.LifecycleManager
var eigenDAReader eigenda.EigenDAReader
var eigenDAWriter eigenda.EigenDAWriter
if config.DataAvailability.Enable {
if config.BatchPoster.Enable {
daWriter, daReader, dasLifecycleManager, err = das.CreateBatchPosterDAS(ctx, &config.DataAvailability, dataSigner, l1client, deployInfo.SequencerInbox)
Expand All @@ -507,9 +512,16 @@ func createNodeImpl(
}
} else if l2Config.ArbitrumChainParams.DataAvailabilityCommittee {
return nil, errors.New("a data availability service is required for this chain, but it was not configured")
} else if config.EigenDA.Enable {
eigenDAService, err := eigenda.NewEigenDA(config.EigenDA.Rpc)
if err != nil {
return nil, err
}
eigenDAReader = eigenDAService
eigenDAWriter = eigenDAService
}

inboxTracker, err := NewInboxTracker(arbDb, txStreamer, daReader)
inboxTracker, err := NewInboxTracker(arbDb, txStreamer, daReader, eigenDAReader)
if err != nil {
return nil, err
}
Expand All @@ -528,6 +540,7 @@ func createNodeImpl(
exec,
rawdb.NewTable(arbDb, storage.BlockValidatorPrefix),
daReader,
eigenDAReader,
func() *staker.BlockValidatorConfig { return &configFetcher.Get().BlockValidator },
stack,
)
Expand Down Expand Up @@ -634,15 +647,16 @@ func createNodeImpl(
return nil, errors.New("batchposter, but no TxOpts")
}
batchPoster, err = NewBatchPoster(ctx, &BatchPosterOpts{
DataPosterDB: rawdb.NewTable(arbDb, storage.BatchPosterPrefix),
L1Reader: l1Reader,
Inbox: inboxTracker,
Streamer: txStreamer,
SyncMonitor: syncMonitor,
Config: func() *BatchPosterConfig { return &configFetcher.Get().BatchPoster },
DeployInfo: deployInfo,
TransactOpts: txOptsBatchPoster,
DAWriter: daWriter,
DataPosterDB: rawdb.NewTable(arbDb, storage.BatchPosterPrefix),
L1Reader: l1Reader,
Inbox: inboxTracker,
Streamer: txStreamer,
SyncMonitor: syncMonitor,
Config: func() *BatchPosterConfig { return &configFetcher.Get().BatchPoster },
DeployInfo: deployInfo,
TransactOpts: txOptsBatchPoster,
DAWriter: daWriter,
EigenDAWriter: eigenDAWriter,
})
if err != nil {
return nil, err
Expand Down
26 changes: 23 additions & 3 deletions arbstate/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"
"errors"
"io"
"math/big"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/offchainlabs/nitro/arbos/l1pricing"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/das/dastree"
"github.com/offchainlabs/nitro/das/eigenda"
"github.com/offchainlabs/nitro/zeroheavy"
)

Expand Down Expand Up @@ -50,7 +52,7 @@ const maxZeroheavyDecompressedLen = 101*MaxDecompressedLen/100 + 64
const MaxSegmentsPerSequencerMessage = 100 * 1024
const MinLifetimeSecondsForDataAvailabilityCert = 7 * 24 * 60 * 60 // one week

func parseSequencerMessage(ctx context.Context, batchNum uint64, data []byte, dasReader DataAvailabilityReader, keysetValidationMode KeysetValidationMode) (*sequencerMessage, error) {
func parseSequencerMessage(ctx context.Context, batchNum uint64, data []byte, dasReader DataAvailabilityReader, eigenDAReader eigenda.EigenDAReader, keysetValidationMode KeysetValidationMode) (*sequencerMessage, error) {
if len(data) < 40 {
return nil, errors.New("sequencer message missing L1 header")
}
Expand All @@ -63,6 +65,7 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, data []byte, da
segments: [][]byte{},
}
payload := data[40:]
log.Info("Inbox parse sequencer message: ", "payload", hex.EncodeToString(payload))

if len(payload) > 0 && IsDASMessageHeaderByte(payload[0]) {
if dasReader == nil {
Expand All @@ -79,6 +82,21 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, data []byte, da
}
}

if len(payload) > 0 && eigenda.IsEigenDAMessageHeaderByte(payload[0]) {
if eigenDAReader == nil {
log.Error("No EigenDA Reader configured, but sequencer message found with EigenDA header")
} else {
var err error
payload, err = eigenda.RecoverPayloadFromEigenDABatch(ctx, batchNum, payload[1:], eigenDAReader, nil)
if err != nil {
return nil, err
}
if payload == nil {
return parsedMsg, nil
}
}
}

if len(payload) > 0 && IsZeroheavyEncodedHeaderByte(payload[0]) {
pl, err := io.ReadAll(io.LimitReader(zeroheavy.NewZeroheavyDecoder(bytes.NewReader(payload[1:])), int64(maxZeroheavyDecompressedLen)))
if err != nil {
Expand Down Expand Up @@ -242,6 +260,7 @@ type inboxMultiplexer struct {
backend InboxBackend
delayedMessagesRead uint64
dasReader DataAvailabilityReader
eigenDAReader eigenda.EigenDAReader
cachedSequencerMessage *sequencerMessage
cachedSequencerMessageNum uint64
cachedSegmentNum uint64
Expand All @@ -251,11 +270,12 @@ type inboxMultiplexer struct {
keysetValidationMode KeysetValidationMode
}

func NewInboxMultiplexer(backend InboxBackend, delayedMessagesRead uint64, dasReader DataAvailabilityReader, keysetValidationMode KeysetValidationMode) arbostypes.InboxMultiplexer {
func NewInboxMultiplexer(backend InboxBackend, delayedMessagesRead uint64, dasReader DataAvailabilityReader, eigenDAReader eigenda.EigenDAReader, keysetValidationMode KeysetValidationMode) arbostypes.InboxMultiplexer {
return &inboxMultiplexer{
backend: backend,
delayedMessagesRead: delayedMessagesRead,
dasReader: dasReader,
eigenDAReader: eigenDAReader,
keysetValidationMode: keysetValidationMode,
}
}
Expand All @@ -276,7 +296,7 @@ func (r *inboxMultiplexer) Pop(ctx context.Context) (*arbostypes.MessageWithMeta
}
r.cachedSequencerMessageNum = r.backend.GetSequencerInboxPosition()
var err error
r.cachedSequencerMessage, err = parseSequencerMessage(ctx, r.cachedSequencerMessageNum, bytes, r.dasReader, r.keysetValidationMode)
r.cachedSequencerMessage, err = parseSequencerMessage(ctx, r.cachedSequencerMessageNum, bytes, r.dasReader, r.eigenDAReader, r.keysetValidationMode)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 253e87d

Please sign in to comment.