Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redundant write on EigenDA failure #242

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ In the event that the EigenDA disperser or network is down, the proxy will retur

This behavior is turned on by default, but configurable via the `--eigenda.confirmation-timeout` flag (set to 15 mins by default currently). If a blob is not confirmed within this time, the proxy will return a 503 status code. This should be set long enough to accomodate for the disperser's batching interval (typically 10 minutes), signature gathering, and onchain submission.

This behavior can be modified to write to a secondary storage target when EigenDA write fails by setting the `--store.enable-write-on-eigenda-failure` flag to `true`. This flag only works with OP Stack-based rollups.

## Blob Lifecycle

> Warning: the below diagrams describe EigenDA V2 interactions. EigenDA V1 is very similar, but has slight discrepancies.
Expand Down
76 changes: 76 additions & 0 deletions e2e/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,3 +267,79 @@ func TestProxyReadFallback(t *testing.T) {
requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, common.S3BackendType)
requireDispersalRetrievalEigenDA(t, ts.Metrics.HTTPServerRequestsTotal, commitments.Standard)
}

// Ensures that when EigenDA write fails and enable-write-on-eigenda-failure is true,
// the data is not written to the secondary storage and the write fails.
func TestProxyRedundantWriteOnEigenDAFailure(t *testing.T) {
if !runIntegrationTests || runTestnetIntegrationTests {
t.Skip("Skipping test as INTEGRATION env var not set")
}

t.Parallel()

// Setup server with S3 as secondary and simulate EigenDA failure
testCfg := e2e.TestConfig(useMemory())
testCfg.UseS3Fallback = true
testCfg.UseWriteFallback = true
testCfg.SimulateEigenDAFailure = true

tsConfig := e2e.TestSuiteConfig(testCfg)
ts, kill := e2e.CreateTestSuite(tsConfig)
defer kill()

cfg := &client.Config{
URL: ts.Address(),
}
daClient := client.New(cfg)

// Write data when EigenDA is "failing"
expectedBlob := e2e.RandBytes(1_000_000)
t.Log("Setting input data on proxy server...")
blobInfo, err := daClient.SetData(ts.Ctx, expectedBlob)
require.NoError(t, err)

// Try to read data - should succeed because it was written to secondary
t.Log("Getting input data from proxy server...")
actualBlob, err := daClient.GetData(ts.Ctx, blobInfo)
require.NoError(t, err)
require.Equal(t, expectedBlob, actualBlob)

// Verify metrics show secondary write and read
requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, common.S3BackendType)
}

// Ensures that when EigenDA write fails and enable-write-on-eigenda-failure is false,
// the data is not written to the secondary storage.
func TestProxyRedundantWriteDisabled(t *testing.T) {
if !runIntegrationTests || runTestnetIntegrationTests {
t.Skip("Skipping test as INTEGRATION env var not set")
}

t.Parallel()

// Setup server with S3 as secondary and simulate EigenDA failure
testCfg := e2e.TestConfig(useMemory())
testCfg.UseS3Fallback = true
testCfg.UseWriteFallback = false
testCfg.SimulateEigenDAFailure = true

tsConfig := e2e.TestSuiteConfig(testCfg)
ts, kill := e2e.CreateTestSuite(tsConfig)
defer kill()

cfg := &client.Config{
URL: ts.Address(),
}
daClient := client.New(cfg)

// Write data when EigenDA is "failing"
expectedBlob := e2e.RandBytes(1_000_000)
t.Log("Setting input data on proxy server...")
blobInfo, err := daClient.SetData(ts.Ctx, expectedBlob)
require.Error(t, err)

// Try to read data - should fail because it was not written to secondary
t.Log("Getting input data from proxy server...")
_, err = daClient.GetData(ts.Ctx, blobInfo)
require.Error(t, err)
}
27 changes: 17 additions & 10 deletions e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,23 @@ type Cfg struct {
UseS3Caching bool
UseRedisCaching bool
UseS3Fallback bool
// enable writing to secondary storage on EigenDA failure
UseWriteFallback bool
// simulate EigenDA failure
SimulateEigenDAFailure bool
}

func TestConfig(useMemory bool) *Cfg {
return &Cfg{
UseMemory: useMemory,
Expiration: 14 * 24 * time.Hour,
UseKeccak256ModeS3: false,
UseS3Caching: false,
UseRedisCaching: false,
UseS3Fallback: false,
WriteThreadCount: 0,
UseMemory: useMemory,
Expiration: 14 * 24 * time.Hour,
UseKeccak256ModeS3: false,
UseS3Caching: false,
UseRedisCaching: false,
UseS3Fallback: false,
WriteThreadCount: 0,
UseWriteFallback: false,
SimulateEigenDAFailure: false,
}
}

Expand Down Expand Up @@ -208,10 +214,11 @@ func TestSuiteConfig(testCfg *Cfg) server.CLIConfig {
},
MemstoreEnabled: testCfg.UseMemory,
MemstoreConfig: memstore.Config{
BlobExpiration: testCfg.Expiration,
MaxBlobSizeBytes: maxBlobLengthBytes,
BlobExpiration: testCfg.Expiration,
MaxBlobSizeBytes: maxBlobLengthBytes,
SimulateEigenDAFailure: testCfg.SimulateEigenDAFailure,
},

UseWriteFallback: testCfg.UseWriteFallback,
StorageConfig: store.Config{
AsyncPutWorkers: testCfg.WriteThreadCount,
},
Expand Down
11 changes: 10 additions & 1 deletion flags/eigendaflags/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ var (
EthRPCURLFlagName = withFlagPrefix("eth-rpc")
SvcManagerAddrFlagName = withFlagPrefix("svc-manager-addr")
// Flags that are proxy specific, and not used by the eigenda-client
PutRetriesFlagName = withFlagPrefix("put-retries")
PutRetriesFlagName = withFlagPrefix("put-retries")
EnableWriteFallbackFlagName = withFlagPrefix("enable-write-on-eigenda-failure")
)

func withFlagPrefix(s string) string {
Expand Down Expand Up @@ -163,6 +164,14 @@ func CLIFlags(envPrefix, category string) []cli.Flag {
EnvVars: []string{withEnvPrefix(envPrefix, "PUT_RETRIES")},
Category: category,
},
&cli.BoolFlag{
Name: EnableWriteFallbackFlagName,
Usage: "Enable writing to secondary storage when EigenDA write fails. " +
"DANGER: incompatible with secure rollup integrations (i.e, working fraud or validity proofs). Default is false.",
Value: false,
EnvVars: []string{withEnvPrefix(envPrefix, "WRITE_ON_EIGENDA_FAILURE")},
Category: category,
},
}
}

Expand Down
24 changes: 13 additions & 11 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import (
)

type Config struct {
EdaClientConfig clients.EigenDAClientConfig
MemstoreConfig memstore.Config
StorageConfig store.Config
VerifierConfig verify.Config
PutRetries uint
EdaClientConfig clients.EigenDAClientConfig
MemstoreConfig memstore.Config
StorageConfig store.Config
VerifierConfig verify.Config
PutRetries uint
UseWriteFallback bool

MemstoreEnabled bool
}
Expand All @@ -27,12 +28,13 @@ type Config struct {
func ReadConfig(ctx *cli.Context) Config {
edaClientConfig := eigendaflags.ReadConfig(ctx)
return Config{
EdaClientConfig: edaClientConfig,
VerifierConfig: verify.ReadConfig(ctx, edaClientConfig),
PutRetries: ctx.Uint(eigendaflags.PutRetriesFlagName),
MemstoreEnabled: ctx.Bool(memstore.EnabledFlagName),
MemstoreConfig: memstore.ReadConfig(ctx),
StorageConfig: store.ReadConfig(ctx),
EdaClientConfig: edaClientConfig,
VerifierConfig: verify.ReadConfig(ctx, edaClientConfig),
PutRetries: ctx.Uint(eigendaflags.PutRetriesFlagName),
UseWriteFallback: ctx.Bool(eigendaflags.EnableWriteFallbackFlagName),
MemstoreEnabled: ctx.Bool(memstore.EnabledFlagName),
MemstoreConfig: memstore.ReadConfig(ctx),
StorageConfig: store.ReadConfig(ctx),
}
}

Expand Down
2 changes: 1 addition & 1 deletion server/load_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,5 @@ func LoadStoreManager(ctx context.Context, cfg CLIConfig, log logging.Logger, m
"s3", s3Store != nil,
"redis", redisStore != nil,
)
return store.NewManager(eigenDA, s3Store, log, secondary)
return store.NewManager(eigenDA, s3Store, log, secondary, cfg.EigenDAConfig.UseWriteFallback)
}
13 changes: 13 additions & 0 deletions store/generated_key/memstore/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type Config struct {
// artificial latency added for memstore backend to mimic eigenda's latency
PutLatency time.Duration
GetLatency time.Duration
// SimulateEigenDAFailure forces Put operations to fail, simulating EigenDA failures
SimulateEigenDAFailure bool
}

/*
Expand Down Expand Up @@ -107,6 +109,12 @@ func (e *MemStore) pruneExpired() {
func (e *MemStore) Get(_ context.Context, commit []byte) ([]byte, error) {
time.Sleep(e.config.GetLatency)
e.reads++

// Simulate EigenDA failure if configured
if e.config.SimulateEigenDAFailure {
return nil, fmt.Errorf("simulated EigenDA failure")
}

e.RLock()
defer e.RUnlock()

Expand Down Expand Up @@ -146,6 +154,11 @@ func (e *MemStore) Put(_ context.Context, value []byte) ([]byte, error) {
e.Lock()
defer e.Unlock()

// Simulate EigenDA failure if configured
if e.config.SimulateEigenDAFailure {
return nil, fmt.Errorf("simulated EigenDA failure")
}

commitment, err := e.verifier.Commit(encodedVal)
if err != nil {
return nil, err
Expand Down
33 changes: 27 additions & 6 deletions store/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/Layr-Labs/eigenda-proxy/commitments"
"github.com/Layr-Labs/eigenda-proxy/common"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/ethereum/go-ethereum/crypto"
)

// IManager ... read/write interface
Expand All @@ -27,16 +28,18 @@ type Manager struct {

// secondary storage backends (caching and fallbacks)
secondary ISecondary
// redundant write flag
useWriteFallback bool
}

// NewManager ... Init
func NewManager(eigenda common.GeneratedKeyStore, s3 common.PrecomputedKeyStore, l logging.Logger,
secondary ISecondary) (IManager, error) {
func NewManager(eigenda common.GeneratedKeyStore, s3 common.PrecomputedKeyStore, l logging.Logger, secondary ISecondary, useWriteFallback bool) (IManager, error) {
return &Manager{
log: l,
eigenda: eigenda,
s3: s3,
secondary: secondary,
log: l,
eigenda: eigenda,
s3: s3,
secondary: secondary,
useWriteFallback: useWriteFallback,
}, nil
}

Expand Down Expand Up @@ -124,6 +127,24 @@ func (m *Manager) Put(ctx context.Context, cm commitments.CommitmentMode, key, v
}

if err != nil {
m.log.Error("Failed to write to EigenDA backend", "err", err)

// don't do redundant write to hide the misuse/misconfiguration of the proxy
if errors.Is(err, common.ErrProxyOversizedBlob) {
return nil, err
}

// write to EigenDA failed, which shouldn't happen if the backend is functioning properly
// use the payload as the key to keep the batcher alive
if m.useWriteFallback && m.secondary.Enabled() && !m.secondary.AsyncWriteEntry() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the constraint you're imposing is that secondary writes can't be asynchronous when writeFallback mode enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I want to be sure that the write to the secondary storage backend succeeds before returning response to the batcher.

redundantErr := m.secondary.HandleRedundantWrites(ctx, value, value)
if redundantErr != nil {
m.log.Error("Failed to write to redundant backends", "err", redundantErr)
return nil, redundantErr
}

return crypto.Keccak256(value), nil
}
return nil, err
}

Expand Down
20 changes: 19 additions & 1 deletion store/secondary.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package store

import (
"bytes"
"context"
"errors"
"net/http"
"sync"

"github.com/Layr-Labs/eigenda-proxy/common"
"github.com/Layr-Labs/eigenda-proxy/metrics"
verifypackage "github.com/Layr-Labs/eigenda-proxy/verify"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"

"github.com/ethereum/go-ethereum/log"
)
Expand Down Expand Up @@ -153,6 +156,14 @@ func (sm *SecondaryManager) MultiSourceRead(ctx context.Context, commitment []by
}

key := crypto.Keccak256(commitment)

// check if key is an RLP encoded certificate, if not, assume it's a cache key
var cert verifypackage.Certificate
err := rlp.DecodeBytes(commitment, &cert)
if err != nil {
key = commitment
}

for _, src := range sources {
cb := sm.m.RecordSecondaryRequest(src.BackendType().String(), http.MethodGet)
data, err := src.Get(ctx, key)
Expand All @@ -170,7 +181,14 @@ func (sm *SecondaryManager) MultiSourceRead(ctx context.Context, commitment []by

// verify cert:data using provided verification function
sm.verifyLock.Lock()
err = verify(ctx, commitment, data)

if bytes.Equal(key, commitment) {
err = src.Verify(ctx, commitment, data)
} else {
// verify cert:data using EigenDA verification checks
err = verify(ctx, commitment, data)
}

if err != nil {
cb(Failed)
log.Warn("Failed to verify blob", "err", err, "backend", src.BackendType())
Expand Down