diff --git a/cmd/daserver/entrypoint.go b/cmd/daserver/entrypoint.go index 9c8b951..014ea86 100644 --- a/cmd/daserver/entrypoint.go +++ b/cmd/daserver/entrypoint.go @@ -4,17 +4,40 @@ import ( "context" "fmt" + "github.com/Layr-Labs/op-plasma-eigenda/eigenda" "github.com/Layr-Labs/op-plasma-eigenda/metrics" + "github.com/Layr-Labs/op-plasma-eigenda/store" + "github.com/Layr-Labs/op-plasma-eigenda/verify" + "github.com/ethereum/go-ethereum/log" "github.com/urfave/cli/v2" plasma "github.com/Layr-Labs/op-plasma-eigenda" - "github.com/Layr-Labs/op-plasma-eigenda/eigenda" - plasma_store "github.com/Layr-Labs/op-plasma-eigenda/store" - "github.com/Layr-Labs/op-plasma-eigenda/verify" oplog "github.com/ethereum-optimism/optimism/op-service/log" "github.com/ethereum-optimism/optimism/op-service/opio" ) +func LoadStore(cfg CLIConfig, ctx context.Context, log log.Logger) (plasma.PlasmaStore, error) { + if cfg.MemStoreCfg.Enabled { + return store.NewMemStore(ctx, &cfg.MemStoreCfg) + } + + daCfg := cfg.EigenDAConfig + + v, err := verify.NewVerifier(daCfg.KzgConfig()) + if err != nil { + return nil, err + } + + return store.NewEigenDAStore( + ctx, + eigenda.NewEigenDAClient( + log, + daCfg, + ), + v, + ) +} + func StartDAServer(cliCtx *cli.Context) error { if err := CheckRequired(cliCtx); err != nil { return err @@ -23,6 +46,7 @@ func StartDAServer(cliCtx *cli.Context) error { if err := cfg.Check(); err != nil { return err } + ctx, ctxCancel := context.WithCancel(cliCtx.Context) m := metrics.NewMetrics("default") log := oplog.NewLogger(oplog.AppOut(cliCtx), oplog.ReadCLIConfig(cliCtx)).New("role", "eigenda_plasma_server") @@ -30,25 +54,11 @@ func StartDAServer(cliCtx *cli.Context) error { log.Info("Initializing EigenDA Plasma DA server...") - daCfg := cfg.EigenDAConfig - - v, err := verify.NewVerifier(daCfg.KzgConfig()) + da, err := LoadStore(cfg, ctx, log) if err != nil { - return err + return fmt.Errorf("failed to create store: %w", err) } - - store, err := plasma_store.NewEigenDAStore( - cliCtx.Context, - eigenda.NewEigenDAClient( - log, - daCfg, - ), - v, - ) - if err != nil { - return fmt.Errorf("failed to create EigenDA store: %w", err) - } - server := plasma.NewDAServer(cliCtx.String(ListenAddrFlagName), cliCtx.Int(PortFlagName), store, log, m) + server := plasma.NewDAServer(cliCtx.String(ListenAddrFlagName), cliCtx.Int(PortFlagName), da, log, m) if err := server.Start(); err != nil { return fmt.Errorf("failed to start the DA server") @@ -57,9 +67,14 @@ func StartDAServer(cliCtx *cli.Context) error { } defer func() { + // shutdown dependency routines + ctxCancel() + if err := server.Stop(); err != nil { log.Error("failed to stop DA server", "err", err) } + + log.Info("successfully shutdown API server") }() if cfg.MetricsCfg.Enabled { diff --git a/cmd/daserver/flags.go b/cmd/daserver/flags.go index 0dfdc72..a3d84a1 100644 --- a/cmd/daserver/flags.go +++ b/cmd/daserver/flags.go @@ -6,6 +6,7 @@ import ( "github.com/urfave/cli/v2" "github.com/Layr-Labs/op-plasma-eigenda/eigenda" + "github.com/Layr-Labs/op-plasma-eigenda/store" opservice "github.com/ethereum-optimism/optimism/op-service" oplog "github.com/ethereum-optimism/optimism/op-service/log" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" @@ -55,10 +56,9 @@ func init() { var Flags []cli.Flag type CLIConfig struct { - FileStoreDirPath string - S3Bucket string - EigenDAConfig eigenda.Config - MetricsCfg opmetrics.CLIConfig + MemStoreCfg store.MemStoreConfig + EigenDAConfig eigenda.Config + MetricsCfg opmetrics.CLIConfig } func ReadCLIConfig(ctx *cli.Context) CLIConfig { diff --git a/store/memory.go b/store/memory.go new file mode 100644 index 0000000..2a3a311 --- /dev/null +++ b/store/memory.go @@ -0,0 +1,130 @@ +package store + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/urfave/cli" +) + +const ( + MemStoreName = "memstore" + MemStoreFlagName = "enable" + ExpirationFlagName = "expiration" + + DefaultPruneInterval = 1 * time.Second +) + +type MemStoreConfig struct { + Enabled bool + BlobExpiration time.Duration +} + +// MemStore is a simple in-memory store for blobs which uses an expiration +// time to evict blobs to best emulate the ephemeral nature of blobs dispersed to +// EigenDA operators. +type MemStore struct { + sync.RWMutex + + cfg *MemStoreConfig + keyStarts map[string]time.Time + store map[string][]byte +} + +func NewMemStore(ctx context.Context, cfg *MemStoreConfig) (*MemStore, error) { + store := &MemStore{ + cfg: cfg, + keyStarts: make(map[string]time.Time), + store: make(map[string][]byte), + } + + if cfg.BlobExpiration != 0 { + go store.EventLoop(ctx) + } + + return store, nil +} + +func (e *MemStore) EventLoop(ctx context.Context) { + + timer := time.NewTicker(DefaultPruneInterval) + + select { + case <-ctx.Done(): + return + + case <-timer.C: + e.pruneExpired() + } + +} + +func (e *MemStore) pruneExpired() { + e.RLock() + defer e.RUnlock() + + for commit, dur := range e.keyStarts { + if time.Since(dur) >= e.cfg.BlobExpiration { + // prune expired blobs + e.Lock() + delete(e.keyStarts, commit) + delete(e.store, commit) + e.Unlock() + } + } + +} + +func (e *MemStore) Get(ctx context.Context, key []byte) ([]byte, error) { + e.RLock() + defer e.RUnlock() + + if _, exists := e.store[common.Bytes2Hex(key)]; !exists { + return nil, fmt.Errorf("commitment key not found") + } + + return e.store[string(key)], nil +} + +func (e *MemStore) Put(ctx context.Context, value []byte) ([]byte, error) { + e.Lock() + defer e.Unlock() + + commit := crypto.Keccak256Hash(value) + + if _, exists := e.store[commit.String()]; !exists { + return nil, fmt.Errorf("commitment key not found") + } + + return commit.Bytes(), nil +} + +func ReadConfig(ctx *cli.Context) MemStoreConfig { + cfg := MemStoreConfig{ + /* Required Flags */ + Enabled: ctx.Bool(MemStoreName), + BlobExpiration: ctx.Duration(ExpirationFlagName), + } + return cfg +} + +func CLIFlags(envPrefix string) []cli.Flag { + + return []cli.Flag{ + &cli.BoolFlag{ + Name: MemStoreFlagName, + Usage: "Whether to use mem-store for DA logic.", + EnvVar: "MEMSTORE_ENABLED", + }, + &cli.DurationFlag{ + Name: ExpirationFlagName, + Usage: "Duration that a blob/commitment pair are allowed to live.", + Value: 25 * time.Minute, + EnvVar: "MEMSTORE_EXPIRATION", + }, + } +}