Skip to content

Commit

Permalink
feat: mem store - fix flags, test expiration, & renamed to EIGENDA_PR…
Browse files Browse the repository at this point in the history
…OXY env prefix
  • Loading branch information
epociask committed May 23, 2024
1 parent 541614a commit b128d06
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 49 deletions.
30 changes: 15 additions & 15 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,45 +1,45 @@
# Server listening address
EIGEN_PLASMA_SERVER_ADDR=127.0.0.1
EIGENDA_PROXY_ADDR=127.0.0.1

# Server listening port (default: 3100)
EIGEN_PLASMA_SERVER_PORT=3100
EIGENDA_PROXY_PORT=3100

# Directory path to SRS tables
EIGEN_PLASMA_SERVER_EIGENDA_CACHE_PATH=
EIGENDA_PROXY_EIGENDA_CACHE_PATH=

# Directory path to g1.point file
EIGEN_PLASMA_SERVER_EIGENDA_KZG_G1_PATH=
EIGENDA_PROXY_EIGENDA_KZG_G1_PATH=

# Directory path to g2.point.powerOf2 file
EIGEN_PLASMA_SERVER_EIGENDA_G2_TAU_PATH=
EIGENDA_PROXY_EIGENDA_G2_TAU_PATH=

# RPC endpoint of the EigenDA disperser
EIGEN_PLASMA_SERVER_EIGENDA_RPC=
EIGENDA_PROXY_EIGENDA_RPC=

# Wait time between retries of EigenDA blob status queries (default: 5s)
EIGEN_PLASMA_SERVER_EIGENDA_STATUS_QUERY_INTERVAL=5s
EIGENDA_PROXY_EIGENDA_STATUS_QUERY_INTERVAL=5s

# Timeout for aborting an EigenDA blob dispersal (default: 25m0s)
EIGEN_PLASMA_SERVER_EIGENDA_STATUS_QUERY_TIMEOUT=25m0s
EIGENDA_PROXY_EIGENDA_STATUS_QUERY_TIMEOUT=25m0s

# Use TLS when connecting to the EigenDA disperser (default: true)
EIGEN_PLASMA_SERVER_EIGENDA_GRPC_USE_TLS=true
EIGENDA_PROXY_EIGENDA_GRPC_USE_TLS=true

# Color the log output if in terminal mode (default: false)
EIGEN_PLASMA_SERVER_LOG_COLOR=false
EIGENDA_PROXY_LOG_COLOR=false

# Format the log output (default: text)
# Supported formats: 'text', 'terminal', 'logfmt', 'json', 'json-pretty'
EIGEN_PLASMA_SERVER_LOG_FORMAT=text
EIGENDA_PROXY_LOG_FORMAT=text

# The lowest log level that will be output (default: INFO)
EIGEN_PLASMA_SERVER_LOG_LEVEL=INFO
EIGENDA_PROXY_LOG_LEVEL=INFO

# Metrics listening address (default: 0.0.0.0)
EIGEN_PLASMA_SERVER_METRICS_ADDR=0.0.0.0
EIGENDA_PROXY_METRICS_ADDR=0.0.0.0

# Enable the metrics server (default: false)
EIGEN_PLASMA_SERVER_METRICS_ENABLED=false
EIGENDA_PROXY_METRICS_ENABLED=false

# Metrics listening port (default: 7300)
EIGEN_PLASMA_SERVER_METRICS_PORT=7300
EIGENDA_PROXY_METRICS_PORT=7300
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@ Additional cli args are provided for targeting an EigenDA network backend:
- `--eigenda-status-query-timeout`: (default: 25m) Duration for which a client will wait for a blob to finalize after being sent for dispersal.
- `--eigenda-status-query-retry-interval`: (default: 5s) How often a client will attempt a retry when awaiting network blob finalization.
- `--eigenda-use-tls`: (default: true) Whether or not to use TLS for grpc communication with disperser.
- `eigenda-g1-path`: Directory path to g1.point file
- `eigenda-g2-power-of-tau`: Directory path to g2.point.powerOf2 file
- `eigenda-cache-path`: Directory path to dump cached SRS tables
- `--eigenda-g1-path`: Directory path to g1.point file
- `--eigenda-g2-power-of-tau`: Directory path to g2.point.powerOf2 file
- `--eigenda-cache-path`: Directory path to dump cached SRS tables

### In-Memory Storage
An ephemeral memory store backend can be used for faster feedback testing when performing rollup integrations. The following cli args can be used to target the feature:
* `--memstore.enabled`: Boolean feature flag
* `--memstore.expiration`: Duration for which a blob will exist

## Running Locally
1. Compile binary: `make da-server`
Expand Down Expand Up @@ -79,5 +84,3 @@ The following specs are recommended for running on a single production server:
- [op-stack](https://github.com/ethereum-optimism/optimism)
- [plasma spec](https://specs.optimism.io/experimental/plasma.html)
- [eigen da](https://github.com/Layr-Labs/eigenda)


7 changes: 4 additions & 3 deletions cmd/daserver/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (

func LoadStore(cfg CLIConfig, ctx context.Context, log log.Logger) (plasma.PlasmaStore, error) {
if cfg.MemStoreCfg.Enabled {
log.Info("Using memstore backend")
return store.NewMemStore(ctx, &cfg.MemStoreCfg)
}

log.Info("Using eigenda backend")
daCfg := cfg.EigenDAConfig

v, err := verify.NewVerifier(daCfg.KzgConfig())
Expand All @@ -47,6 +49,8 @@ func StartDAServer(cliCtx *cli.Context) error {
return err
}
ctx, ctxCancel := context.WithCancel(cliCtx.Context)
defer ctxCancel()

m := metrics.NewMetrics("default")

log := oplog.NewLogger(oplog.AppOut(cliCtx), oplog.ReadCLIConfig(cliCtx)).New("role", "eigenda_plasma_server")
Expand All @@ -67,9 +71,6 @@ func StartDAServer(cliCtx *cli.Context) error {
}

defer func() {
// shutdown dependency routines
ctxCancel()

if err := server.Stop(); err != nil {
log.Error("failed to stop DA server", "err", err)
}
Expand Down
4 changes: 3 additions & 1 deletion cmd/daserver/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const (
PortFlagName = "port"
)

const EnvVarPrefix = "EIGEN_PLASMA_SERVER"
const EnvVarPrefix = "EIGENDA_PROXY"

func prefixEnvVars(name string) []string {
return opservice.PrefixEnvVar(EnvVarPrefix, name)
Expand Down Expand Up @@ -49,6 +49,7 @@ func init() {
optionalFlags = append(optionalFlags, oplog.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, eigenda.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, opmetrics.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, store.CLIFlags(EnvVarPrefix)...)
Flags = append(requiredFlags, optionalFlags...)
}

Expand All @@ -65,6 +66,7 @@ func ReadCLIConfig(ctx *cli.Context) CLIConfig {
return CLIConfig{
EigenDAConfig: eigenda.ReadConfig(ctx),
MetricsCfg: opmetrics.ReadCLIConfig(ctx),
MemStoreCfg: store.ReadConfig(ctx),
}
}

Expand Down
4 changes: 2 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (d *DAServer) Start() error {

d.endpoint = listener.Addr().String()

d.log.Info("Starting DA server on", d.endpoint)
d.log.Info("Starting DA server", "endpoint", d.endpoint)
errCh := make(chan error, 1)
go func() {
if d.tls != nil {
Expand Down Expand Up @@ -108,7 +108,7 @@ func (d *DAServer) Health(w http.ResponseWriter, r *http.Request) {

func (d *DAServer) HandleGet(w http.ResponseWriter, r *http.Request) {
d.log.Info("GET", "url", r.URL)
recordDur := d.m.RecordRPCServerRequest("put")
recordDur := d.m.RecordRPCServerRequest("get")
defer recordDur()

route := path.Dir(r.URL.Path)
Expand Down
50 changes: 27 additions & 23 deletions store/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/urfave/cli"
"github.com/urfave/cli/v2"
)

const (
MemStoreName = "memstore"
MemStoreFlagName = "enable"
ExpirationFlagName = "expiration"
MemStoreFlagName = "memstore.enabled"
ExpirationFlagName = "memstore.expiration"

DefaultPruneInterval = 1 * time.Second
DefaultPruneInterval = 500 * time.Millisecond
)

type MemStoreConfig struct {
Expand All @@ -35,6 +34,7 @@ type MemStore struct {
store map[string][]byte
}

// NewMemStore ... constructor
func NewMemStore(ctx context.Context, cfg *MemStoreConfig) (*MemStore, error) {
store := &MemStore{
cfg: cfg,
Expand Down Expand Up @@ -64,49 +64,53 @@ func (e *MemStore) EventLoop(ctx context.Context) {
}

func (e *MemStore) pruneExpired() {
e.RLock()
defer e.RUnlock()
e.Lock()
defer e.Unlock()

for commit, dur := range e.keyStarts {
if time.Since(dur) >= e.cfg.BlobExpiration {
// prune expired blobs
e.Lock()
delete(e.keyStarts, commit)
delete(e.store, commit)
e.Unlock()
}
}

}

func (e *MemStore) Get(ctx context.Context, key []byte) ([]byte, error) {
// Get fetches a value from the store.
func (e *MemStore) Get(ctx context.Context, commit []byte) ([]byte, error) {
e.RLock()
defer e.RUnlock()

if _, exists := e.store[common.Bytes2Hex(key)]; !exists {
key := "0x" + common.Bytes2Hex(commit)
if _, exists := e.store[key]; !exists {
return nil, fmt.Errorf("commitment key not found")
}

return e.store[string(key)], nil
return e.store[key], nil
}

// Put inserts a value into the store.
func (e *MemStore) Put(ctx context.Context, value []byte) ([]byte, error) {
e.Lock()
defer e.Unlock()

commit := crypto.Keccak256Hash(value)

if _, exists := e.store[commit.String()]; !exists {
return nil, fmt.Errorf("commitment key not found")
if _, exists := e.store[commit.Hex()]; exists {
return nil, fmt.Errorf("commitment key already exists")
}

e.store[commit.Hex()] = value
// add expiration
e.keyStarts[commit.Hex()] = time.Now()

return commit.Bytes(), nil
}

func ReadConfig(ctx *cli.Context) MemStoreConfig {
cfg := MemStoreConfig{
/* Required Flags */
Enabled: ctx.Bool(MemStoreName),
Enabled: ctx.Bool(MemStoreFlagName),
BlobExpiration: ctx.Duration(ExpirationFlagName),
}
return cfg
Expand All @@ -116,15 +120,15 @@ func CLIFlags(envPrefix string) []cli.Flag {

return []cli.Flag{
&cli.BoolFlag{
Name: MemStoreFlagName,
Usage: "Whether to use mem-store for DA logic.",
EnvVar: "MEMSTORE_ENABLED",
Name: MemStoreFlagName,
Usage: "Whether to use mem-store for DA logic.",
EnvVars: []string{"MEMSTORE_ENABLED"},
},
&cli.DurationFlag{
Name: ExpirationFlagName,
Usage: "Duration that a blob/commitment pair are allowed to live.",
Value: 25 * time.Minute,
EnvVar: "MEMSTORE_EXPIRATION",
Name: ExpirationFlagName,
Usage: "Duration that a blob/commitment pair are allowed to live.",
Value: 25 * time.Minute,
EnvVars: []string{"MEMSTORE_EXPIRATION"},
},
}
}
63 changes: 63 additions & 0 deletions store/memory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package store

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

const (
testPreimage = "Four score and seven years ago"
)

func TestGetSet(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ms, err := NewMemStore(
ctx,
&MemStoreConfig{
Enabled: true,
BlobExpiration: time.Hour * 1000,
},
)

assert.NoError(t, err)

expected := []byte(testPreimage)
key, err := ms.Put(ctx, expected)
assert.NoError(t, err)

actual, err := ms.Get(ctx, key)
assert.NoError(t, err)

assert.Equal(t, actual, expected)
}

func TestExpiration(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ms, err := NewMemStore(
ctx,
&MemStoreConfig{
Enabled: true,
BlobExpiration: time.Millisecond * 10,
},
)

assert.NoError(t, err)

preimage := []byte(testPreimage)
key, err := ms.Put(ctx, preimage)
assert.NoError(t, err)

// sleep 1ms and verify that older entries are removed
time.Sleep(time.Second * 1)

_, err = ms.Get(ctx, key)
assert.Error(t, err)

}

0 comments on commit b128d06

Please sign in to comment.