From b128d06a2ae2735f836bfcd0ede011cbefea53f4 Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Wed, 22 May 2024 23:44:09 -0700 Subject: [PATCH] feat: mem store - fix flags, test expiration, & renamed to EIGENDA_PROXY env prefix --- .env.example | 30 +++++++++--------- README.md | 13 +++++--- cmd/daserver/entrypoint.go | 7 +++-- cmd/daserver/flags.go | 4 ++- server.go | 4 +-- store/memory.go | 50 ++++++++++++++++-------------- store/memory_test.go | 63 ++++++++++++++++++++++++++++++++++++++ 7 files changed, 122 insertions(+), 49 deletions(-) create mode 100644 store/memory_test.go diff --git a/.env.example b/.env.example index 39ba801..84f8252 100644 --- a/.env.example +++ b/.env.example @@ -1,45 +1,45 @@ # Server listening address -EIGEN_PLASMA_SERVER_ADDR=127.0.0.1 +EIGENDA_PROXY_ADDR=127.0.0.1 # Server listening port (default: 3100) -EIGEN_PLASMA_SERVER_PORT=3100 +EIGENDA_PROXY_PORT=3100 # Directory path to SRS tables -EIGEN_PLASMA_SERVER_EIGENDA_CACHE_PATH= +EIGENDA_PROXY_EIGENDA_CACHE_PATH= # Directory path to g1.point file -EIGEN_PLASMA_SERVER_EIGENDA_KZG_G1_PATH= +EIGENDA_PROXY_EIGENDA_KZG_G1_PATH= # Directory path to g2.point.powerOf2 file -EIGEN_PLASMA_SERVER_EIGENDA_G2_TAU_PATH= +EIGENDA_PROXY_EIGENDA_G2_TAU_PATH= # RPC endpoint of the EigenDA disperser -EIGEN_PLASMA_SERVER_EIGENDA_RPC= +EIGENDA_PROXY_EIGENDA_RPC= # Wait time between retries of EigenDA blob status queries (default: 5s) -EIGEN_PLASMA_SERVER_EIGENDA_STATUS_QUERY_INTERVAL=5s +EIGENDA_PROXY_EIGENDA_STATUS_QUERY_INTERVAL=5s # Timeout for aborting an EigenDA blob dispersal (default: 25m0s) -EIGEN_PLASMA_SERVER_EIGENDA_STATUS_QUERY_TIMEOUT=25m0s +EIGENDA_PROXY_EIGENDA_STATUS_QUERY_TIMEOUT=25m0s # Use TLS when connecting to the EigenDA disperser (default: true) -EIGEN_PLASMA_SERVER_EIGENDA_GRPC_USE_TLS=true +EIGENDA_PROXY_EIGENDA_GRPC_USE_TLS=true # Color the log output if in terminal mode (default: false) -EIGEN_PLASMA_SERVER_LOG_COLOR=false +EIGENDA_PROXY_LOG_COLOR=false # Format the log output (default: text) # Supported formats: 'text', 'terminal', 'logfmt', 'json', 'json-pretty' -EIGEN_PLASMA_SERVER_LOG_FORMAT=text +EIGENDA_PROXY_LOG_FORMAT=text # The lowest log level that will be output (default: INFO) -EIGEN_PLASMA_SERVER_LOG_LEVEL=INFO +EIGENDA_PROXY_LOG_LEVEL=INFO # Metrics listening address (default: 0.0.0.0) -EIGEN_PLASMA_SERVER_METRICS_ADDR=0.0.0.0 +EIGENDA_PROXY_METRICS_ADDR=0.0.0.0 # Enable the metrics server (default: false) -EIGEN_PLASMA_SERVER_METRICS_ENABLED=false +EIGENDA_PROXY_METRICS_ENABLED=false # Metrics listening port (default: 7300) -EIGEN_PLASMA_SERVER_METRICS_PORT=7300 +EIGENDA_PROXY_METRICS_PORT=7300 diff --git a/README.md b/README.md index 6c5ae25..aeb4561 100644 --- a/README.md +++ b/README.md @@ -14,9 +14,14 @@ Additional cli args are provided for targeting an EigenDA network backend: - `--eigenda-status-query-timeout`: (default: 25m) Duration for which a client will wait for a blob to finalize after being sent for dispersal. - `--eigenda-status-query-retry-interval`: (default: 5s) How often a client will attempt a retry when awaiting network blob finalization. - `--eigenda-use-tls`: (default: true) Whether or not to use TLS for grpc communication with disperser. -- `eigenda-g1-path`: Directory path to g1.point file -- `eigenda-g2-power-of-tau`: Directory path to g2.point.powerOf2 file -- `eigenda-cache-path`: Directory path to dump cached SRS tables +- `--eigenda-g1-path`: Directory path to g1.point file +- `--eigenda-g2-power-of-tau`: Directory path to g2.point.powerOf2 file +- `--eigenda-cache-path`: Directory path to dump cached SRS tables + +### In-Memory Storage +An ephemeral memory store backend can be used for faster feedback testing when performing rollup integrations. The following cli args can be used to target the feature: +* `--memstore.enabled`: Boolean feature flag +* `--memstore.expiration`: Duration for which a blob will exist ## Running Locally 1. Compile binary: `make da-server` @@ -79,5 +84,3 @@ The following specs are recommended for running on a single production server: - [op-stack](https://github.com/ethereum-optimism/optimism) - [plasma spec](https://specs.optimism.io/experimental/plasma.html) - [eigen da](https://github.com/Layr-Labs/eigenda) - - diff --git a/cmd/daserver/entrypoint.go b/cmd/daserver/entrypoint.go index 014ea86..70feeef 100644 --- a/cmd/daserver/entrypoint.go +++ b/cmd/daserver/entrypoint.go @@ -18,9 +18,11 @@ import ( func LoadStore(cfg CLIConfig, ctx context.Context, log log.Logger) (plasma.PlasmaStore, error) { if cfg.MemStoreCfg.Enabled { + log.Info("Using memstore backend") return store.NewMemStore(ctx, &cfg.MemStoreCfg) } + log.Info("Using eigenda backend") daCfg := cfg.EigenDAConfig v, err := verify.NewVerifier(daCfg.KzgConfig()) @@ -47,6 +49,8 @@ func StartDAServer(cliCtx *cli.Context) error { return err } ctx, ctxCancel := context.WithCancel(cliCtx.Context) + defer ctxCancel() + m := metrics.NewMetrics("default") log := oplog.NewLogger(oplog.AppOut(cliCtx), oplog.ReadCLIConfig(cliCtx)).New("role", "eigenda_plasma_server") @@ -67,9 +71,6 @@ func StartDAServer(cliCtx *cli.Context) error { } defer func() { - // shutdown dependency routines - ctxCancel() - if err := server.Stop(); err != nil { log.Error("failed to stop DA server", "err", err) } diff --git a/cmd/daserver/flags.go b/cmd/daserver/flags.go index a3d84a1..74e6272 100644 --- a/cmd/daserver/flags.go +++ b/cmd/daserver/flags.go @@ -17,7 +17,7 @@ const ( PortFlagName = "port" ) -const EnvVarPrefix = "EIGEN_PLASMA_SERVER" +const EnvVarPrefix = "EIGENDA_PROXY" func prefixEnvVars(name string) []string { return opservice.PrefixEnvVar(EnvVarPrefix, name) @@ -49,6 +49,7 @@ func init() { optionalFlags = append(optionalFlags, oplog.CLIFlags(EnvVarPrefix)...) optionalFlags = append(optionalFlags, eigenda.CLIFlags(EnvVarPrefix)...) optionalFlags = append(optionalFlags, opmetrics.CLIFlags(EnvVarPrefix)...) + optionalFlags = append(optionalFlags, store.CLIFlags(EnvVarPrefix)...) Flags = append(requiredFlags, optionalFlags...) } @@ -65,6 +66,7 @@ func ReadCLIConfig(ctx *cli.Context) CLIConfig { return CLIConfig{ EigenDAConfig: eigenda.ReadConfig(ctx), MetricsCfg: opmetrics.ReadCLIConfig(ctx), + MemStoreCfg: store.ReadConfig(ctx), } } diff --git a/server.go b/server.go index bfa070a..1bc42f3 100644 --- a/server.go +++ b/server.go @@ -72,7 +72,7 @@ func (d *DAServer) Start() error { d.endpoint = listener.Addr().String() - d.log.Info("Starting DA server on", d.endpoint) + d.log.Info("Starting DA server", "endpoint", d.endpoint) errCh := make(chan error, 1) go func() { if d.tls != nil { @@ -108,7 +108,7 @@ func (d *DAServer) Health(w http.ResponseWriter, r *http.Request) { func (d *DAServer) HandleGet(w http.ResponseWriter, r *http.Request) { d.log.Info("GET", "url", r.URL) - recordDur := d.m.RecordRPCServerRequest("put") + recordDur := d.m.RecordRPCServerRequest("get") defer recordDur() route := path.Dir(r.URL.Path) diff --git a/store/memory.go b/store/memory.go index 2a3a311..0990baf 100644 --- a/store/memory.go +++ b/store/memory.go @@ -8,15 +8,14 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" - "github.com/urfave/cli" + "github.com/urfave/cli/v2" ) const ( - MemStoreName = "memstore" - MemStoreFlagName = "enable" - ExpirationFlagName = "expiration" + MemStoreFlagName = "memstore.enabled" + ExpirationFlagName = "memstore.expiration" - DefaultPruneInterval = 1 * time.Second + DefaultPruneInterval = 500 * time.Millisecond ) type MemStoreConfig struct { @@ -35,6 +34,7 @@ type MemStore struct { store map[string][]byte } +// NewMemStore ... constructor func NewMemStore(ctx context.Context, cfg *MemStoreConfig) (*MemStore, error) { store := &MemStore{ cfg: cfg, @@ -64,49 +64,53 @@ func (e *MemStore) EventLoop(ctx context.Context) { } func (e *MemStore) pruneExpired() { - e.RLock() - defer e.RUnlock() + e.Lock() + defer e.Unlock() for commit, dur := range e.keyStarts { if time.Since(dur) >= e.cfg.BlobExpiration { - // prune expired blobs - e.Lock() delete(e.keyStarts, commit) delete(e.store, commit) - e.Unlock() } } } -func (e *MemStore) Get(ctx context.Context, key []byte) ([]byte, error) { +// Get fetches a value from the store. +func (e *MemStore) Get(ctx context.Context, commit []byte) ([]byte, error) { e.RLock() defer e.RUnlock() - if _, exists := e.store[common.Bytes2Hex(key)]; !exists { + key := "0x" + common.Bytes2Hex(commit) + if _, exists := e.store[key]; !exists { return nil, fmt.Errorf("commitment key not found") } - return e.store[string(key)], nil + return e.store[key], nil } +// Put inserts a value into the store. func (e *MemStore) Put(ctx context.Context, value []byte) ([]byte, error) { e.Lock() defer e.Unlock() commit := crypto.Keccak256Hash(value) - if _, exists := e.store[commit.String()]; !exists { - return nil, fmt.Errorf("commitment key not found") + if _, exists := e.store[commit.Hex()]; exists { + return nil, fmt.Errorf("commitment key already exists") } + e.store[commit.Hex()] = value + // add expiration + e.keyStarts[commit.Hex()] = time.Now() + return commit.Bytes(), nil } func ReadConfig(ctx *cli.Context) MemStoreConfig { cfg := MemStoreConfig{ /* Required Flags */ - Enabled: ctx.Bool(MemStoreName), + Enabled: ctx.Bool(MemStoreFlagName), BlobExpiration: ctx.Duration(ExpirationFlagName), } return cfg @@ -116,15 +120,15 @@ func CLIFlags(envPrefix string) []cli.Flag { return []cli.Flag{ &cli.BoolFlag{ - Name: MemStoreFlagName, - Usage: "Whether to use mem-store for DA logic.", - EnvVar: "MEMSTORE_ENABLED", + Name: MemStoreFlagName, + Usage: "Whether to use mem-store for DA logic.", + EnvVars: []string{"MEMSTORE_ENABLED"}, }, &cli.DurationFlag{ - Name: ExpirationFlagName, - Usage: "Duration that a blob/commitment pair are allowed to live.", - Value: 25 * time.Minute, - EnvVar: "MEMSTORE_EXPIRATION", + Name: ExpirationFlagName, + Usage: "Duration that a blob/commitment pair are allowed to live.", + Value: 25 * time.Minute, + EnvVars: []string{"MEMSTORE_EXPIRATION"}, }, } } diff --git a/store/memory_test.go b/store/memory_test.go new file mode 100644 index 0000000..65b7c6f --- /dev/null +++ b/store/memory_test.go @@ -0,0 +1,63 @@ +package store + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +const ( + testPreimage = "Four score and seven years ago" +) + +func TestGetSet(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ms, err := NewMemStore( + ctx, + &MemStoreConfig{ + Enabled: true, + BlobExpiration: time.Hour * 1000, + }, + ) + + assert.NoError(t, err) + + expected := []byte(testPreimage) + key, err := ms.Put(ctx, expected) + assert.NoError(t, err) + + actual, err := ms.Get(ctx, key) + assert.NoError(t, err) + + assert.Equal(t, actual, expected) +} + +func TestExpiration(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ms, err := NewMemStore( + ctx, + &MemStoreConfig{ + Enabled: true, + BlobExpiration: time.Millisecond * 10, + }, + ) + + assert.NoError(t, err) + + preimage := []byte(testPreimage) + key, err := ms.Put(ctx, preimage) + assert.NoError(t, err) + + // sleep 1ms and verify that older entries are removed + time.Sleep(time.Second * 1) + + _, err = ms.Get(ctx, key) + assert.Error(t, err) + +}