Skip to content

Commit

Permalink
feat: mem store
Browse files Browse the repository at this point in the history
  • Loading branch information
epociask committed May 22, 2024
1 parent 02fa3d8 commit 541614a
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 24 deletions.
55 changes: 35 additions & 20 deletions cmd/daserver/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,40 @@ import (
"context"
"fmt"

"github.com/Layr-Labs/op-plasma-eigenda/eigenda"
"github.com/Layr-Labs/op-plasma-eigenda/metrics"
"github.com/Layr-Labs/op-plasma-eigenda/store"
"github.com/Layr-Labs/op-plasma-eigenda/verify"
"github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli/v2"

plasma "github.com/Layr-Labs/op-plasma-eigenda"
"github.com/Layr-Labs/op-plasma-eigenda/eigenda"
plasma_store "github.com/Layr-Labs/op-plasma-eigenda/store"
"github.com/Layr-Labs/op-plasma-eigenda/verify"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/opio"
)

func LoadStore(cfg CLIConfig, ctx context.Context, log log.Logger) (plasma.PlasmaStore, error) {
if cfg.MemStoreCfg.Enabled {
return store.NewMemStore(ctx, &cfg.MemStoreCfg)
}

daCfg := cfg.EigenDAConfig

v, err := verify.NewVerifier(daCfg.KzgConfig())
if err != nil {
return nil, err
}

return store.NewEigenDAStore(
ctx,
eigenda.NewEigenDAClient(
log,
daCfg,
),
v,
)
}

func StartDAServer(cliCtx *cli.Context) error {
if err := CheckRequired(cliCtx); err != nil {
return err
Expand All @@ -23,32 +46,19 @@ func StartDAServer(cliCtx *cli.Context) error {
if err := cfg.Check(); err != nil {
return err
}
ctx, ctxCancel := context.WithCancel(cliCtx.Context)
m := metrics.NewMetrics("default")

log := oplog.NewLogger(oplog.AppOut(cliCtx), oplog.ReadCLIConfig(cliCtx)).New("role", "eigenda_plasma_server")
oplog.SetGlobalLogHandler(log.Handler())

log.Info("Initializing EigenDA Plasma DA server...")

daCfg := cfg.EigenDAConfig

v, err := verify.NewVerifier(daCfg.KzgConfig())
da, err := LoadStore(cfg, ctx, log)
if err != nil {
return err
return fmt.Errorf("failed to create store: %w", err)
}

store, err := plasma_store.NewEigenDAStore(
cliCtx.Context,
eigenda.NewEigenDAClient(
log,
daCfg,
),
v,
)
if err != nil {
return fmt.Errorf("failed to create EigenDA store: %w", err)
}
server := plasma.NewDAServer(cliCtx.String(ListenAddrFlagName), cliCtx.Int(PortFlagName), store, log, m)
server := plasma.NewDAServer(cliCtx.String(ListenAddrFlagName), cliCtx.Int(PortFlagName), da, log, m)

if err := server.Start(); err != nil {
return fmt.Errorf("failed to start the DA server")
Expand All @@ -57,9 +67,14 @@ func StartDAServer(cliCtx *cli.Context) error {
}

defer func() {
// shutdown dependency routines
ctxCancel()

if err := server.Stop(); err != nil {
log.Error("failed to stop DA server", "err", err)
}

log.Info("successfully shutdown API server")
}()

if cfg.MetricsCfg.Enabled {
Expand Down
8 changes: 4 additions & 4 deletions cmd/daserver/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/urfave/cli/v2"

"github.com/Layr-Labs/op-plasma-eigenda/eigenda"
"github.com/Layr-Labs/op-plasma-eigenda/store"
opservice "github.com/ethereum-optimism/optimism/op-service"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
Expand Down Expand Up @@ -55,10 +56,9 @@ func init() {
var Flags []cli.Flag

type CLIConfig struct {
FileStoreDirPath string
S3Bucket string
EigenDAConfig eigenda.Config
MetricsCfg opmetrics.CLIConfig
MemStoreCfg store.MemStoreConfig
EigenDAConfig eigenda.Config
MetricsCfg opmetrics.CLIConfig
}

func ReadCLIConfig(ctx *cli.Context) CLIConfig {
Expand Down
130 changes: 130 additions & 0 deletions store/memory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package store

import (
"context"
"fmt"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/urfave/cli"
)

const (
MemStoreName = "memstore"
MemStoreFlagName = "enable"
ExpirationFlagName = "expiration"

DefaultPruneInterval = 1 * time.Second
)

type MemStoreConfig struct {
Enabled bool
BlobExpiration time.Duration
}

// MemStore is a simple in-memory store for blobs which uses an expiration
// time to evict blobs to best emulate the ephemeral nature of blobs dispersed to
// EigenDA operators.
type MemStore struct {
sync.RWMutex

cfg *MemStoreConfig
keyStarts map[string]time.Time
store map[string][]byte
}

func NewMemStore(ctx context.Context, cfg *MemStoreConfig) (*MemStore, error) {
store := &MemStore{
cfg: cfg,
keyStarts: make(map[string]time.Time),
store: make(map[string][]byte),
}

if cfg.BlobExpiration != 0 {
go store.EventLoop(ctx)
}

return store, nil
}

func (e *MemStore) EventLoop(ctx context.Context) {

timer := time.NewTicker(DefaultPruneInterval)

select {
case <-ctx.Done():
return

case <-timer.C:
e.pruneExpired()
}

}

func (e *MemStore) pruneExpired() {
e.RLock()
defer e.RUnlock()

for commit, dur := range e.keyStarts {
if time.Since(dur) >= e.cfg.BlobExpiration {
// prune expired blobs
e.Lock()
delete(e.keyStarts, commit)
delete(e.store, commit)
e.Unlock()
}
}

}

func (e *MemStore) Get(ctx context.Context, key []byte) ([]byte, error) {
e.RLock()
defer e.RUnlock()

if _, exists := e.store[common.Bytes2Hex(key)]; !exists {
return nil, fmt.Errorf("commitment key not found")
}

return e.store[string(key)], nil
}

func (e *MemStore) Put(ctx context.Context, value []byte) ([]byte, error) {
e.Lock()
defer e.Unlock()

commit := crypto.Keccak256Hash(value)

if _, exists := e.store[commit.String()]; !exists {
return nil, fmt.Errorf("commitment key not found")
}

return commit.Bytes(), nil
}

func ReadConfig(ctx *cli.Context) MemStoreConfig {
cfg := MemStoreConfig{
/* Required Flags */
Enabled: ctx.Bool(MemStoreName),
BlobExpiration: ctx.Duration(ExpirationFlagName),
}
return cfg
}

func CLIFlags(envPrefix string) []cli.Flag {

return []cli.Flag{
&cli.BoolFlag{
Name: MemStoreFlagName,
Usage: "Whether to use mem-store for DA logic.",
EnvVar: "MEMSTORE_ENABLED",
},
&cli.DurationFlag{
Name: ExpirationFlagName,
Usage: "Duration that a blob/commitment pair are allowed to live.",
Value: 25 * time.Minute,
EnvVar: "MEMSTORE_EXPIRATION",
},
}
}

0 comments on commit 541614a

Please sign in to comment.