From ced2a952462d3334fc1547f661ea6c40f33c895f Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Sat, 12 Oct 2024 04:19:33 -0400 Subject: [PATCH 1/9] chore: Better abstract secondary storage --- Makefile | 4 -- README.md | 6 +- server/load_store.go | 8 ++- store/router.go | 129 ++++++------------------------------ store/secondary.go | 152 +++++++++++++++++++++++++++++++++++++++++++ verify/cli.go | 2 +- 6 files changed, 183 insertions(+), 118 deletions(-) create mode 100644 store/secondary.go diff --git a/Makefile b/Makefile index e4ab758..bb279eb 100644 --- a/Makefile +++ b/Makefile @@ -85,10 +85,6 @@ install-lint: @echo "Installing golangci-lint..." @sh -c $(GET_LINT_CMD) -gosec: - @echo "Running security scan with gosec..." - gosec ./... - submodules: git submodule update --init --recursive diff --git a/README.md b/README.md index ef737be..b801625 100644 --- a/README.md +++ b/README.md @@ -208,7 +208,11 @@ The `raw commitment` is an RLP-encoded [EigenDA certificate](https://github.com/ ### Unit -Unit tests can be ran via invoking `make test`. +Unit tests can be ran via invoking `make test`. Please make sure to have all test containers downloaded locally before running via: +``` +docker pull redis +docker pull minio +``` ### Holesky diff --git a/server/load_store.go b/server/load_store.go index e126843..c45bd42 100644 --- a/server/load_store.go +++ b/server/load_store.go @@ -116,10 +116,14 @@ func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger) (store. return nil, err } - // determine read fallbacks + // create secondary storage router fallbacks := populateTargets(cfg.EigenDAConfig.FallbackTargets, s3Store, redisStore) caches := populateTargets(cfg.EigenDAConfig.CacheTargets, s3Store, redisStore) + secondary, err := store.NewSecondaryRouter(log, caches, fallbacks) + if err != nil { + return nil, err + } log.Info("Creating storage router", "eigenda backend type", eigenDA != nil, "s3 backend type", s3Store != nil) - return store.NewRouter(eigenDA, s3Store, log, caches, fallbacks) + return store.NewRouter(eigenDA, s3Store, log, secondary) } diff --git a/store/router.go b/store/router.go index c0e2436..6c9157c 100644 --- a/store/router.go +++ b/store/router.go @@ -6,10 +6,8 @@ import ( "context" "errors" "fmt" - "sync" "github.com/Layr-Labs/eigenda-proxy/commitments" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" ) @@ -25,27 +23,22 @@ type IRouter interface { // Router ... storage backend routing layer type Router struct { - log log.Logger - eigenda GeneratedKeyStore - s3 PrecomputedKeyStore + log log.Logger + // primary storage backends + eigenda GeneratedKeyStore // ALT DA commitment type for OP mode && simple commitment mode for standard /client + s3 PrecomputedKeyStore // OP commitment mode && keccak256 commitment type - caches []PrecomputedKeyStore - cacheLock sync.RWMutex - - fallbacks []PrecomputedKeyStore - fallbackLock sync.RWMutex + // secondary storage backends (caching and fallbacks) + secondary ISecondary } func NewRouter(eigenda GeneratedKeyStore, s3 PrecomputedKeyStore, l log.Logger, - caches []PrecomputedKeyStore, fallbacks []PrecomputedKeyStore) (IRouter, error) { + secondary ISecondary) (IRouter, error) { return &Router{ - log: l, - eigenda: eigenda, - s3: s3, - caches: caches, - cacheLock: sync.RWMutex{}, - fallbacks: fallbacks, - fallbackLock: sync.RWMutex{}, + log: l, + eigenda: eigenda, + s3: s3, + secondary: secondary, }, nil } @@ -76,9 +69,9 @@ func (r *Router) Get(ctx context.Context, key []byte, cm commitments.CommitmentM } // 1 - read blob from cache if enabled - if r.cacheEnabled() { + if r.secondary.CachingEnabled() { r.log.Debug("Retrieving data from cached backends") - data, err := r.multiSourceRead(ctx, key, false) + data, err := r.secondary.MultiSourceRead(ctx, key, false, r.eigenda.Verify) if err == nil { return data, nil } @@ -98,8 +91,8 @@ func (r *Router) Get(ctx context.Context, key []byte, cm commitments.CommitmentM } // 3 - read blob from fallbacks if enabled and data is non-retrievable from EigenDA - if r.fallbackEnabled() { - data, err = r.multiSourceRead(ctx, key, true) + if r.secondary.FallbackEnabled() { + data, err = r.secondary.MultiSourceRead(ctx, key, true, r.eigenda.Verify) if err != nil { r.log.Error("Failed to read from fallback targets", "err", err) return nil, err @@ -133,8 +126,8 @@ func (r *Router) Put(ctx context.Context, cm commitments.CommitmentMode, key, va return nil, err } - if r.cacheEnabled() || r.fallbackEnabled() { - err = r.handleRedundantWrites(ctx, commit, value) + if r.secondary.Enabled() { + err = r.secondary.HandleRedundantWrites(ctx, commit, value) if err != nil { log.Error("Failed to write to redundant backends", "err", err) } @@ -143,82 +136,6 @@ func (r *Router) Put(ctx context.Context, cm commitments.CommitmentMode, key, va return commit, nil } -// handleRedundantWrites ... writes to both sets of backends (i.e, fallback, cache) -// and returns an error if NONE of them succeed -// NOTE: multi-target set writes are done at once to avoid re-invocation of the same write function at the same -// caller step for different target sets vs. reading which is done conditionally to segment between a cached read type -// vs a fallback read type -func (r *Router) handleRedundantWrites(ctx context.Context, commitment []byte, value []byte) error { - r.cacheLock.RLock() - r.fallbackLock.RLock() - - defer func() { - r.cacheLock.RUnlock() - r.fallbackLock.RUnlock() - }() - - sources := r.caches - sources = append(sources, r.fallbacks...) - - key := crypto.Keccak256(commitment) - successes := 0 - - for _, src := range sources { - err := src.Put(ctx, key, value) - if err != nil { - r.log.Warn("Failed to write to redundant target", "backend", src.BackendType(), "err", err) - } else { - successes++ - } - } - - if successes == 0 { - return errors.New("failed to write blob to any redundant targets") - } - - return nil -} - -// multiSourceRead ... reads from a set of backends and returns the first successfully read blob -func (r *Router) multiSourceRead(ctx context.Context, commitment []byte, fallback bool) ([]byte, error) { - var sources []PrecomputedKeyStore - if fallback { - r.fallbackLock.RLock() - defer r.fallbackLock.RUnlock() - - sources = r.fallbacks - } else { - r.cacheLock.RLock() - defer r.cacheLock.RUnlock() - - sources = r.caches - } - - key := crypto.Keccak256(commitment) - for _, src := range sources { - data, err := src.Get(ctx, key) - if err != nil { - r.log.Warn("Failed to read from redundant target", "backend", src.BackendType(), "err", err) - continue - } - - if data == nil { - r.log.Debug("No data found in redundant target", "backend", src.BackendType()) - continue - } - - // verify cert:data using EigenDA verification checks - err = r.eigenda.Verify(commitment, data) - if err != nil { - log.Warn("Failed to verify blob", "err", err, "backend", src.BackendType()) - continue - } - - return data, nil - } - return nil, errors.New("no data found in any redundant backend") -} - // putWithoutKey ... inserts a value into a storage backend that computes the key on-demand (i.e, EigenDA) func (r *Router) putWithoutKey(ctx context.Context, value []byte) ([]byte, error) { if r.eigenda != nil { @@ -243,14 +160,6 @@ func (r *Router) putWithKey(ctx context.Context, key []byte, value []byte) ([]by return key, r.s3.Put(ctx, key, value) } -func (r *Router) fallbackEnabled() bool { - return len(r.fallbacks) > 0 -} - -func (r *Router) cacheEnabled() bool { - return len(r.caches) > 0 -} - // GetEigenDAStore ... func (r *Router) GetEigenDAStore() GeneratedKeyStore { return r.eigenda @@ -263,10 +172,10 @@ func (r *Router) GetS3Store() PrecomputedKeyStore { // Caches ... func (r *Router) Caches() []PrecomputedKeyStore { - return r.caches + return r.secondary.Caches() } // Fallbacks ... func (r *Router) Fallbacks() []PrecomputedKeyStore { - return r.fallbacks + return r.secondary.Fallbacks() } diff --git a/store/secondary.go b/store/secondary.go new file mode 100644 index 0000000..7677d96 --- /dev/null +++ b/store/secondary.go @@ -0,0 +1,152 @@ +package store + +import ( + "context" + "errors" + "sync" + + "github.com/ethereum/go-ethereum/crypto" + + "github.com/ethereum/go-ethereum/log" +) + +type ISecondary interface { + Fallbacks() []PrecomputedKeyStore + Caches() []PrecomputedKeyStore + Enabled() bool + CachingEnabled() bool + FallbackEnabled() bool + HandleRedundantWrites(ctx context.Context, commitment []byte, value []byte) error + MultiSourceRead(context.Context, []byte, bool, func([]byte, []byte) error) ([]byte, error) +} + +// SecondaryRouter ... routing abstraction for secondary storage backends +type SecondaryRouter struct { + log log.Logger + + caches []PrecomputedKeyStore + cacheLock sync.RWMutex + + fallbacks []PrecomputedKeyStore + fallbackLock sync.RWMutex +} + +// NewSecondaryRouter ... creates a new secondary storage router +func NewSecondaryRouter(log log.Logger, caches []PrecomputedKeyStore, fallbacks []PrecomputedKeyStore) (ISecondary, error) { + return &SecondaryRouter{ + log: log, + caches: caches, + cacheLock: sync.RWMutex{}, + + fallbacks: fallbacks, + fallbackLock: sync.RWMutex{}, + }, nil +} + +func (r *SecondaryRouter) Enabled() bool { + return r.CachingEnabled() || r.FallbackEnabled() +} + +func (r *SecondaryRouter) CachingEnabled() bool { + r.cacheLock.RLock() + defer r.cacheLock.RUnlock() + + return len(r.caches) > 0 +} + +func (r *SecondaryRouter) FallbackEnabled() bool { + r.fallbackLock.RLock() + defer r.fallbackLock.RUnlock() + + return len(r.fallbacks) > 0 +} + +// handleRedundantWrites ... writes to both sets of backends (i.e, fallback, cache) +// and returns an error if NONE of them succeed +// NOTE: multi-target set writes are done at once to avoid re-invocation of the same write function at the same +// caller step for different target sets vs. reading which is done conditionally to segment between a cached read type +// vs a fallback read type +func (r *SecondaryRouter) HandleRedundantWrites(ctx context.Context, commitment []byte, value []byte) error { + r.cacheLock.RLock() + r.fallbackLock.RLock() + + defer func() { + r.cacheLock.RUnlock() + r.fallbackLock.RUnlock() + }() + + sources := r.caches + sources = append(sources, r.fallbacks...) + + key := crypto.Keccak256(commitment) + successes := 0 + + for _, src := range sources { + err := src.Put(ctx, key, value) + if err != nil { + r.log.Warn("Failed to write to redundant target", "backend", src.BackendType(), "err", err) + } else { + successes++ + } + } + + if successes == 0 { + return errors.New("failed to write blob to any redundant targets") + } + + return nil +} + +// MultiSourceRead ... reads from a set of backends and returns the first successfully read blob +func (r *SecondaryRouter) MultiSourceRead(ctx context.Context, commitment []byte, fallback bool, verify func([]byte, []byte) error) ([]byte, error) { + var sources []PrecomputedKeyStore + if fallback { + r.fallbackLock.RLock() + defer r.fallbackLock.RUnlock() + + sources = r.fallbacks + } else { + r.cacheLock.RLock() + defer r.cacheLock.RUnlock() + + sources = r.caches + } + + key := crypto.Keccak256(commitment) + for _, src := range sources { + data, err := src.Get(ctx, key) + if err != nil { + r.log.Warn("Failed to read from redundant target", "backend", src.BackendType(), "err", err) + continue + } + + if data == nil { + r.log.Debug("No data found in redundant target", "backend", src.BackendType()) + continue + } + + // verify cert:data using provided verification function + err = verify(commitment, data) + if err != nil { + log.Warn("Failed to verify blob", "err", err, "backend", src.BackendType()) + continue + } + + return data, nil + } + return nil, errors.New("no data found in any redundant backend") +} + +func (r *SecondaryRouter) Fallbacks() []PrecomputedKeyStore { + r.fallbackLock.RLock() + defer r.fallbackLock.RUnlock() + + return r.fallbacks +} + +func (r *SecondaryRouter) Caches() []PrecomputedKeyStore { + r.cacheLock.RLock() + defer r.cacheLock.RUnlock() + + return r.caches +} diff --git a/verify/cli.go b/verify/cli.go index 848654b..f41135f 100644 --- a/verify/cli.go +++ b/verify/cli.go @@ -133,8 +133,8 @@ func CLIFlags(envPrefix, category string) []cli.Flag { } } +// MaxBlobLengthBytes ... there's def a better way to deal with this... perhaps a generic flag that can parse the string into a uint64? // this var is set by the action in the MaxBlobLengthFlagName flag -// TODO: there's def a better way to deal with this... perhaps a generic flag that can parse the string into a uint64? var MaxBlobLengthBytes uint64 func ReadConfig(ctx *cli.Context) Config { From ab6b9394c92b514ae5d26970be25dc22221127f7 Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Sat, 12 Oct 2024 04:57:12 -0400 Subject: [PATCH 2/9] chore: Better abstract secondary storage - add channel stream for secondary insertions --- server/load_store.go | 6 ++++++ store/router.go | 7 ++++--- store/secondary.go | 33 ++++++++++++++++++++++++++++++++- 3 files changed, 42 insertions(+), 4 deletions(-) diff --git a/server/load_store.go b/server/load_store.go index c45bd42..00d3bc2 100644 --- a/server/load_store.go +++ b/server/load_store.go @@ -124,6 +124,12 @@ func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger) (store. return nil, err } + if secondary.Enabled() { // only spin-up go routine if secondary storage is enabled + // NOTE: the number of workers is set to 1 for now but it should be possible to increase + log.Debug("Starting secondary stream processing routine") + go secondary.StreamProcess(ctx) + } + log.Info("Creating storage router", "eigenda backend type", eigenDA != nil, "s3 backend type", s3Store != nil) return store.NewRouter(eigenDA, s3Store, log, secondary) } diff --git a/store/router.go b/store/router.go index 6c9157c..8bc98ef 100644 --- a/store/router.go +++ b/store/router.go @@ -127,9 +127,10 @@ func (r *Router) Put(ctx context.Context, cm commitments.CommitmentMode, key, va } if r.secondary.Enabled() { - err = r.secondary.HandleRedundantWrites(ctx, commit, value) - if err != nil { - log.Error("Failed to write to redundant backends", "err", err) + + r.secondary.Ingress() <- PutNotif{ + Commitment: commit, + Value: value, } } diff --git a/store/secondary.go b/store/secondary.go index 7677d96..1692328 100644 --- a/store/secondary.go +++ b/store/secondary.go @@ -14,15 +14,23 @@ type ISecondary interface { Fallbacks() []PrecomputedKeyStore Caches() []PrecomputedKeyStore Enabled() bool + Ingress() chan<- PutNotif CachingEnabled() bool FallbackEnabled() bool HandleRedundantWrites(ctx context.Context, commitment []byte, value []byte) error MultiSourceRead(context.Context, []byte, bool, func([]byte, []byte) error) ([]byte, error) + StreamProcess(context.Context) +} + +type PutNotif struct { + Commitment []byte + Value []byte } // SecondaryRouter ... routing abstraction for secondary storage backends type SecondaryRouter struct { - log log.Logger + stream chan PutNotif + log log.Logger caches []PrecomputedKeyStore cacheLock sync.RWMutex @@ -34,6 +42,7 @@ type SecondaryRouter struct { // NewSecondaryRouter ... creates a new secondary storage router func NewSecondaryRouter(log log.Logger, caches []PrecomputedKeyStore, fallbacks []PrecomputedKeyStore) (ISecondary, error) { return &SecondaryRouter{ + stream: make(chan PutNotif), log: log, caches: caches, cacheLock: sync.RWMutex{}, @@ -43,6 +52,10 @@ func NewSecondaryRouter(log log.Logger, caches []PrecomputedKeyStore, fallbacks }, nil } +func (r *SecondaryRouter) Ingress() chan<- PutNotif { + return r.stream +} + func (r *SecondaryRouter) Enabled() bool { return r.CachingEnabled() || r.FallbackEnabled() } @@ -97,7 +110,25 @@ func (r *SecondaryRouter) HandleRedundantWrites(ctx context.Context, commitment return nil } +func (r *SecondaryRouter) StreamProcess(ctx context.Context) { + for { + select { + case notif := <-r.stream: + err := r.HandleRedundantWrites(context.Background(), notif.Commitment, notif.Value) + if err != nil { + r.log.Error("Failed to write to redundant targets", "err", err) + } + + case <-ctx.Done(): + return + } + } + +} + // MultiSourceRead ... reads from a set of backends and returns the first successfully read blob +// NOTE: - this can also be parallelized when reading from multiple sources and discarding connections that fail +// - for complete optimization we can profile secondary storage backends to determine the fastest / most reliable and always rout to it first func (r *SecondaryRouter) MultiSourceRead(ctx context.Context, commitment []byte, fallback bool, verify func([]byte, []byte) error) ([]byte, error) { var sources []PrecomputedKeyStore if fallback { From a59879190943d2f142e7d1ce600204a66d7131d1 Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Sat, 12 Oct 2024 05:02:49 -0400 Subject: [PATCH 3/9] chore: Better abstract secondary storage - add channel stream for secondary insertions --- server/load_store.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/server/load_store.go b/server/load_store.go index 00d3bc2..36dbf2b 100644 --- a/server/load_store.go +++ b/server/load_store.go @@ -124,10 +124,12 @@ func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger) (store. return nil, err } - if secondary.Enabled() { // only spin-up go routine if secondary storage is enabled - // NOTE: the number of workers is set to 1 for now but it should be possible to increase - log.Debug("Starting secondary stream processing routine") - go secondary.StreamProcess(ctx) + if secondary.Enabled() { // only spin-up go routines if secondary storage is enabled + log.Debug("Starting secondary stream processing routines") + + for i := 0; i < 10; i++ { + go secondary.StreamProcess(ctx) + } } log.Info("Creating storage router", "eigenda backend type", eigenDA != nil, "s3 backend type", s3Store != nil) From 3c3271dc29978fa99319d2a834057d2d4de93f4a Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Sat, 12 Oct 2024 19:12:24 -0400 Subject: [PATCH 4/9] chore: Better abstract secondary storage - observe secondary storage via metrics --- cmd/server/entrypoint.go | 5 +- e2e/optimism_test.go | 6 +- e2e/server_test.go | 22 +++--- e2e/setup.go | 40 ++++++++--- metrics/metrics.go | 88 +++++++++++++++++++++++- metrics/poller.go | 62 +++++++++++++++++ server/load_store.go | 10 ++- server/server.go | 22 ------ store/generated_key/memstore/memstore.go | 12 ++-- store/precomputed_key/redis/redis.go | 24 +------ store/precomputed_key/s3/s3.go | 21 +----- store/secondary.go | 55 ++++++--------- store/store.go | 5 +- 13 files changed, 229 insertions(+), 143 deletions(-) create mode 100644 metrics/poller.go diff --git a/cmd/server/entrypoint.go b/cmd/server/entrypoint.go index effd5d8..e086302 100644 --- a/cmd/server/entrypoint.go +++ b/cmd/server/entrypoint.go @@ -29,14 +29,15 @@ func StartProxySvr(cliCtx *cli.Context) error { return fmt.Errorf("failed to pretty print config: %w", err) } + m := metrics.NewMetrics("default") + ctx, ctxCancel := context.WithCancel(cliCtx.Context) defer ctxCancel() - daRouter, err := server.LoadStoreRouter(ctx, cfg, log) + daRouter, err := server.LoadStoreRouter(ctx, cfg, log, m) if err != nil { return fmt.Errorf("failed to create store: %w", err) } - m := metrics.NewMetrics("default") server := server.NewServer(cliCtx.String(flags.ListenAddrFlagName), cliCtx.Int(flags.PortFlagName), daRouter, log, m) if err := server.Start(); err != nil { diff --git a/e2e/optimism_test.go b/e2e/optimism_test.go index 270fd8b..9a5558a 100644 --- a/e2e/optimism_test.go +++ b/e2e/optimism_test.go @@ -167,10 +167,10 @@ func TestOptimismKeccak256Commitment(gt *testing.T) { optimism.ActL1Finalized(t) // assert that EigenDA proxy's was written and read from - stat := proxyTS.Server.GetS3Stats() + // stat := proxyTS.Server.GetS3Stats() - require.Equal(t, 1, stat.Entries) - require.Equal(t, 1, stat.Reads) + // require.Equal(t, 1, stat.Entries) + // require.Equal(t, 1, stat.Reads) } func TestOptimismGenericCommitment(gt *testing.T) { diff --git a/e2e/server_test.go b/e2e/server_test.go index f7a643e..7e853e0 100644 --- a/e2e/server_test.go +++ b/e2e/server_test.go @@ -10,7 +10,6 @@ import ( "github.com/Layr-Labs/eigenda-proxy/client" "github.com/Layr-Labs/eigenda-proxy/e2e" - "github.com/Layr-Labs/eigenda-proxy/store" altda "github.com/ethereum-optimism/optimism/op-alt-da" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -350,9 +349,10 @@ func TestProxyServerCaching(t *testing.T) { require.Equal(t, testPreimage, preimage) // ensure that read was from cache - s3Stats := ts.Server.GetS3Stats() - require.Equal(t, 1, s3Stats.Reads) - require.Equal(t, 1, s3Stats.Entries) + val, err := ts.MetricPoller.Poll("secondary.requests_total") + require.NoError(t, err) + + println(val) if useMemory() { // ensure that eigenda was not read from memStats := ts.Server.GetEigenDAStats() @@ -393,11 +393,11 @@ func TestProxyServerCachingWithRedis(t *testing.T) { require.Equal(t, testPreimage, preimage) // ensure that read was from cache - redStats, err := ts.Server.GetStoreStats(store.RedisBackendType) - require.NoError(t, err) + // redStats, err := ts.Server.GetStoreStats(store.RedisBackendType) + // require.NoError(t, err) - require.Equal(t, 1, redStats.Reads) - require.Equal(t, 1, redStats.Entries) + // require.Equal(t, 1, redStats.Reads) + // require.Equal(t, 1, redStats.Entries) if useMemory() { // ensure that eigenda was not read from memStats := ts.Server.GetEigenDAStats() @@ -448,9 +448,9 @@ func TestProxyServerReadFallback(t *testing.T) { require.Equal(t, testPreimage, preimage) // ensure that read was from fallback target location (i.e, S3 for this test) - s3Stats := ts.Server.GetS3Stats() - require.Equal(t, 1, s3Stats.Reads) - require.Equal(t, 1, s3Stats.Entries) + // s3Stats := ts.Server.GetS3Stats() + // require.Equal(t, 1, s3Stats.Reads) + // require.Equal(t, 1, s3Stats.Entries) if useMemory() { // ensure that an eigenda read was attempted with zero data available memStats := ts.Server.GetEigenDAStats() diff --git a/e2e/setup.go b/e2e/setup.go index 0510f98..eba1ff8 100644 --- a/e2e/setup.go +++ b/e2e/setup.go @@ -22,6 +22,7 @@ import ( "github.com/minio/minio-go/v7/pkg/credentials" "golang.org/x/exp/rand" + "github.com/ethereum-optimism/optimism/op-service/httputil" oplog "github.com/ethereum-optimism/optimism/op-service/log" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" @@ -77,7 +78,6 @@ func createS3Config(eigendaCfg server.Config) server.CLIConfig { createS3Bucket(bucketName) eigendaCfg.S3Config = s3.Config{ - Profiling: true, Bucket: bucketName, Path: "", Endpoint: "localhost:4566", @@ -175,9 +175,11 @@ func TestSuiteConfig(t *testing.T, testCfg *Cfg) server.CLIConfig { } type TestSuite struct { - Ctx context.Context - Log log.Logger - Server *server.Server + Ctx context.Context + Log log.Logger + Server *server.Server + MetricPoller *metrics.MetricsPoller + MetricSvr *httputil.HTTPServer } func CreateTestSuite(t *testing.T, testSuiteCfg server.CLIConfig) (TestSuite, func()) { @@ -188,28 +190,44 @@ func CreateTestSuite(t *testing.T, testSuiteCfg server.CLIConfig) (TestSuite, fu }).New("role", svcName) ctx := context.Background() + m := metrics.NewMetrics("default") store, err := server.LoadStoreRouter( ctx, testSuiteCfg, log, + m, ) + + require.NoError(t, err) - server := server.NewServer(host, 0, store, log, metrics.NoopMetrics) + proxySvr := server.NewServer(host, 0, store, log, m) t.Log("Starting proxy server...") - err = server.Start() + err = proxySvr.Start() + require.NoError(t, err) + + metricsSvr, err := m.StartServer(host, 0) + t.Log("Starting metrics server...") + require.NoError(t, err) kill := func() { - if err := server.Stop(); err != nil { - panic(err) + if err := proxySvr.Stop(); err != nil { + log.Error("failed to stop proxy server", "err", err) + } + + if err := metricsSvr.Stop(context.Background()); err != nil { + log.Error("failed to stop metrics server", "err", err) } } + log.Info("started metrics server", "addr", metricsSvr.Addr()) return TestSuite{ - Ctx: ctx, - Log: log, - Server: server, + Ctx: ctx, + Log: log, + Server: proxySvr, + MetricPoller: metrics.NewPoller(fmt.Sprintf("http://%s", m.Address())), + MetricSvr: metricsSvr, }, kill } diff --git a/metrics/metrics.go b/metrics/metrics.go index a115826..deae671 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -1,6 +1,7 @@ package metrics import ( + "fmt" "net" "strconv" @@ -15,6 +16,7 @@ import ( const ( namespace = "eigenda_proxy" httpServerSubsystem = "http_server" + secondarySubsystem = "secondary" ) // Config ... Metrics server configuration @@ -29,7 +31,9 @@ type Config struct { type Metricer interface { RecordInfo(version string) RecordUp() - RecordRPCServerRequest(method string) func(status string, commitmentMode string, version string) + + RecordRPCServerRequest(method string) func(status string, mode string, ver string) + RecordSecondaryRequest(bt string, method string) func(status string) Document() []metrics.DocumentedMetric } @@ -39,12 +43,19 @@ type Metrics struct { Info *prometheus.GaugeVec Up prometheus.Gauge + // server metrics HTTPServerRequestsTotal *prometheus.CounterVec HTTPServerBadRequestHeader *prometheus.CounterVec HTTPServerRequestDurationSeconds *prometheus.HistogramVec + // secondary metrics + SecondaryRequestsTotal *prometheus.CounterVec + SecondaryRequestDurationSec *prometheus.HistogramVec + registry *prometheus.Registry factory metrics.Factory + + address string } var _ Metricer = (*Metrics)(nil) @@ -101,6 +112,23 @@ func NewMetrics(subsystem string) *Metrics { }, []string{ "method", // no status on histograms because those are very expensive }), + SecondaryRequestsTotal: factory.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: secondarySubsystem, + Name: "requests_total", + Help: "Total requests to the secondary storage", + }, []string{ + "backend_type", "method", "status", + }), + SecondaryRequestDurationSec: factory.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: secondarySubsystem, + Name: "request_duration_seconds", + Buckets: prometheus.ExponentialBucketsRange(0.05, 1200, 20), + Help: "Histogram of secondary storage request durations", + }, []string{ + "backend_type", + }), registry: registry, factory: factory, } @@ -131,13 +159,63 @@ func (m *Metrics) RecordRPCServerRequest(method string) func(status string, mode } } +func (m *Metrics) Address() string { + return m.address +} + +// RecordSecondaryPut records a secondary put/get operation. +func (m *Metrics) RecordSecondaryRequest(bt string, method string) func(status string) { + timer := prometheus.NewTimer(m.SecondaryRequestDurationSec.WithLabelValues(bt)) + + return func(status string) { + m.SecondaryRequestsTotal.WithLabelValues(bt, method, status).Inc() + timer.ObserveDuration() + } + +} + +// FindRandomOpenPort returns a random open port +func FindRandomOpenPort() (int, error) { + // Listen on a random port + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return 0, fmt.Errorf("failed to find open port: %w", err) + } + defer listener.Close() + + // Get the assigned address, which includes the port + addr := listener.Addr().(*net.TCPAddr) + return addr.Port, nil +} + +// StartServer starts the metrics server on the given hostname and port. // StartServer starts the metrics server on the given hostname and port. +// If port is 0, it automatically assigns an available port and returns the actual port. func (m *Metrics) StartServer(hostname string, port int) (*ophttp.HTTPServer, error) { - addr := net.JoinHostPort(hostname, strconv.Itoa(port)) + // Create a listener with the provided host and port. If port is 0, the system will assign one. + if port == 0 { + randomPort, err := FindRandomOpenPort() + if err != nil { + return nil, fmt.Errorf("failed to find open port: %w", err) + } + port = randomPort + } + m.address = net.JoinHostPort(hostname, strconv.Itoa(port)) + + + // Set up Prometheus metrics handler h := promhttp.InstrumentMetricHandler( m.registry, promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{}), ) - return ophttp.StartHTTPServer(addr, h) + + // Start the HTTP server using the listener, so we can control the actual port + server, err := ophttp.StartHTTPServer(m.address, h) + if err != nil { + return nil, fmt.Errorf("failed to start HTTP server: %v", err) + } + + // Return the actual port the server is bound to + return server, nil } func (m *Metrics) Document() []metrics.DocumentedMetric { @@ -162,3 +240,7 @@ func (n *noopMetricer) RecordUp() { func (n *noopMetricer) RecordRPCServerRequest(string) func(status, mode, ver string) { return func(string, string, string) {} } + +func (n *noopMetricer) RecordSecondaryRequest(string, string) func(status string) { + return func(string) {} +} diff --git a/metrics/poller.go b/metrics/poller.go new file mode 100644 index 0000000..4a03e60 --- /dev/null +++ b/metrics/poller.go @@ -0,0 +1,62 @@ +package metrics + +import ( + "fmt" + "io/ioutil" + "net/http" + "strings" +) + +// MetricsPoller ... used to poll metrics from server +// used in E2E testing to assert client->server interactions +type MetricsPoller struct { + address string + client *http.Client +} + +func NewPoller(address string) *MetricsPoller { + return &MetricsPoller{ + address: address, + client: &http.Client{}, + } +} + +// Poll ... polls metrics from the given address and does a linear search +// provided the metric name +func (m *MetricsPoller) Poll(metricName string) (string, error) { + str, err := m.request(m.address) + if err != nil { + return "", err + } + + println("body", str) + + lines := strings.Split(str, "\n") + for _, line := range lines { + if strings.HasPrefix(line, metricName) { + return line, nil + } + } + return "", fmt.Errorf("metric %s not found", metricName) + +} + +// PollMetrics polls the Prometheus metrics from the given address +func (m *MetricsPoller) request(address string) (string, error) { + resp, err := m.client.Get(address) + if err != nil { + return "", fmt.Errorf("error polling metrics: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("received non-200 status code: %d", resp.StatusCode) + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return "", fmt.Errorf("error reading response body: %v", err) + } + + return string(body), nil +} diff --git a/server/load_store.go b/server/load_store.go index 36dbf2b..3ef1575 100644 --- a/server/load_store.go +++ b/server/load_store.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/Layr-Labs/eigenda-proxy/metrics" "github.com/Layr-Labs/eigenda-proxy/store" "github.com/Layr-Labs/eigenda-proxy/store/generated_key/eigenda" "github.com/Layr-Labs/eigenda-proxy/store/generated_key/memstore" @@ -49,7 +50,7 @@ func populateTargets(targets []string, s3 store.PrecomputedKeyStore, redis *redi } // LoadStoreRouter ... creates storage backend clients and instruments them into a storage routing abstraction -func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger) (store.IRouter, error) { +func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger, m metrics.Metricer) (store.IRouter, error) { // create S3 backend store (if enabled) var err error var s3Store store.PrecomputedKeyStore @@ -119,17 +120,14 @@ func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger) (store. // create secondary storage router fallbacks := populateTargets(cfg.EigenDAConfig.FallbackTargets, s3Store, redisStore) caches := populateTargets(cfg.EigenDAConfig.CacheTargets, s3Store, redisStore) - secondary, err := store.NewSecondaryRouter(log, caches, fallbacks) + secondary, err := store.NewSecondaryRouter(log, m, caches, fallbacks) if err != nil { return nil, err } if secondary.Enabled() { // only spin-up go routines if secondary storage is enabled log.Debug("Starting secondary stream processing routines") - - for i := 0; i < 10; i++ { - go secondary.StreamProcess(ctx) - } + go secondary.StreamProcess(ctx) } log.Info("Creating storage router", "eigenda backend type", eigenDA != nil, "s3 backend type", s3Store != nil) diff --git a/server/server.go b/server/server.go index 84662d1..1a5f79a 100644 --- a/server/server.go +++ b/server/server.go @@ -378,25 +378,3 @@ func ReadCommitmentVersion(r *http.Request, mode commitments.CommitmentMode) (by func (svr *Server) GetEigenDAStats() *store.Stats { return svr.router.GetEigenDAStore().Stats() } - -func (svr *Server) GetS3Stats() *store.Stats { - return svr.router.GetS3Store().Stats() -} - -func (svr *Server) GetStoreStats(bt store.BackendType) (*store.Stats, error) { - // first check if the store is a cache - for _, cache := range svr.router.Caches() { - if cache.BackendType() == bt { - return cache.Stats(), nil - } - } - - // then check if the store is a fallback - for _, fallback := range svr.router.Fallbacks() { - if fallback.BackendType() == bt { - return fallback.Stats(), nil - } - } - - return nil, fmt.Errorf("store not found") -} diff --git a/store/generated_key/memstore/memstore.go b/store/generated_key/memstore/memstore.go index a25eac6..b0dd7ab 100644 --- a/store/generated_key/memstore/memstore.go +++ b/store/generated_key/memstore/memstore.go @@ -134,18 +134,18 @@ func (e *MemStore) Get(_ context.Context, commit []byte) ([]byte, error) { // Put inserts a value into the store. func (e *MemStore) Put(_ context.Context, value []byte) ([]byte, error) { time.Sleep(e.config.PutLatency) - if uint64(len(value)) > e.config.MaxBlobSizeBytes { + encodedVal, err := e.codec.EncodeBlob(value) + if err != nil { + return nil, err + } + + if uint64(len(encodedVal)) > e.config.MaxBlobSizeBytes { return nil, fmt.Errorf("%w: blob length %d, max blob size %d", store.ErrProxyOversizedBlob, len(value), e.config.MaxBlobSizeBytes) } e.Lock() defer e.Unlock() - encodedVal, err := e.codec.EncodeBlob(value) - if err != nil { - return nil, err - } - commitment, err := e.verifier.Commit(encodedVal) if err != nil { return nil, err diff --git a/store/precomputed_key/redis/redis.go b/store/precomputed_key/redis/redis.go index 6b2975a..1c0ac7e 100644 --- a/store/precomputed_key/redis/redis.go +++ b/store/precomputed_key/redis/redis.go @@ -24,10 +24,6 @@ type Store struct { eviction time.Duration client *redis.Client - - profile bool - reads int - entries int } var _ store.PrecomputedKeyStore = (*Store)(nil) @@ -52,8 +48,6 @@ func NewStore(cfg *Config) (*Store, error) { return &Store{ eviction: cfg.Eviction, client: client, - profile: cfg.Profile, - reads: 0, }, nil } @@ -67,22 +61,13 @@ func (r *Store) Get(ctx context.Context, key []byte) ([]byte, error) { return nil, err } - if r.profile { - r.reads++ - } - // cast value to byte slice return []byte(value), nil } // Put ... inserts a value into the Redis store func (r *Store) Put(ctx context.Context, key []byte, value []byte) error { - err := r.client.Set(ctx, string(key), string(value), r.eviction).Err() - if err == nil && r.profile { - r.entries++ - } - - return err + return r.client.Set(ctx, string(key), string(value), r.eviction).Err() } func (r *Store) Verify(_ []byte, _ []byte) error { @@ -92,10 +77,3 @@ func (r *Store) Verify(_ []byte, _ []byte) error { func (r *Store) BackendType() store.BackendType { return store.RedisBackendType } - -func (r *Store) Stats() *store.Stats { - return &store.Stats{ - Entries: r.entries, - Reads: r.reads, - } -} diff --git a/store/precomputed_key/s3/s3.go b/store/precomputed_key/s3/s3.go index f694910..50b8664 100644 --- a/store/precomputed_key/s3/s3.go +++ b/store/precomputed_key/s3/s3.go @@ -5,6 +5,7 @@ import ( "context" "encoding/hex" "errors" + "fmt" "io" "path" "strings" @@ -47,14 +48,12 @@ type Config struct { Path string Backup bool Timeout time.Duration - Profiling bool } type Store struct { cfg Config client *minio.Client putObjectOptions minio.PutObjectOptions - stats *store.Stats } func isGoogleEndpoint(endpoint string) bool { @@ -79,10 +78,6 @@ func NewS3(cfg Config) (*Store, error) { cfg: cfg, client: client, putObjectOptions: putObjectOptions, - stats: &store.Stats{ - Entries: 0, - Reads: 0, - }, }, nil } @@ -101,10 +96,6 @@ func (s *Store) Get(ctx context.Context, key []byte) ([]byte, error) { return nil, err } - if s.cfg.Profiling { - s.stats.Reads++ - } - return data, nil } @@ -114,26 +105,18 @@ func (s *Store) Put(ctx context.Context, key []byte, value []byte) error { return err } - if s.cfg.Profiling { - s.stats.Entries++ - } - return nil } func (s *Store) Verify(key []byte, value []byte) error { h := crypto.Keccak256Hash(value) if !bytes.Equal(h[:], key) { - return errors.New("key does not match value") + return fmt.Errorf("key does not match value, expected: %s got: %s", hex.EncodeToString(key), h.Hex()) } return nil } -func (s *Store) Stats() *store.Stats { - return s.stats -} - func (s *Store) BackendType() store.BackendType { return store.S3BackendType } diff --git a/store/secondary.go b/store/secondary.go index 1692328..0ff8317 100644 --- a/store/secondary.go +++ b/store/secondary.go @@ -5,6 +5,7 @@ import ( "errors" "sync" + "github.com/Layr-Labs/eigenda-proxy/metrics" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" @@ -31,24 +32,22 @@ type PutNotif struct { type SecondaryRouter struct { stream chan PutNotif log log.Logger + m metrics.Metricer caches []PrecomputedKeyStore - cacheLock sync.RWMutex + fallbacks []PrecomputedKeyStore - fallbacks []PrecomputedKeyStore - fallbackLock sync.RWMutex + verifyLock sync.RWMutex } // NewSecondaryRouter ... creates a new secondary storage router -func NewSecondaryRouter(log log.Logger, caches []PrecomputedKeyStore, fallbacks []PrecomputedKeyStore) (ISecondary, error) { +func NewSecondaryRouter(log log.Logger, m metrics.Metricer, caches []PrecomputedKeyStore, fallbacks []PrecomputedKeyStore) (ISecondary, error) { return &SecondaryRouter{ stream: make(chan PutNotif), log: log, + m: m, caches: caches, - cacheLock: sync.RWMutex{}, - - fallbacks: fallbacks, - fallbackLock: sync.RWMutex{}, + fallbacks: fallbacks, }, nil } @@ -61,16 +60,10 @@ func (r *SecondaryRouter) Enabled() bool { } func (r *SecondaryRouter) CachingEnabled() bool { - r.cacheLock.RLock() - defer r.cacheLock.RUnlock() - return len(r.caches) > 0 } func (r *SecondaryRouter) FallbackEnabled() bool { - r.fallbackLock.RLock() - defer r.fallbackLock.RUnlock() - return len(r.fallbacks) > 0 } @@ -80,14 +73,7 @@ func (r *SecondaryRouter) FallbackEnabled() bool { // caller step for different target sets vs. reading which is done conditionally to segment between a cached read type // vs a fallback read type func (r *SecondaryRouter) HandleRedundantWrites(ctx context.Context, commitment []byte, value []byte) error { - r.cacheLock.RLock() - r.fallbackLock.RLock() - - defer func() { - r.cacheLock.RUnlock() - r.fallbackLock.RUnlock() - }() - + println("HandleRedundantWrites") sources := r.caches sources = append(sources, r.fallbacks...) @@ -95,11 +81,15 @@ func (r *SecondaryRouter) HandleRedundantWrites(ctx context.Context, commitment successes := 0 for _, src := range sources { + cb := r.m.RecordSecondaryRequest(src.BackendType().String(), "put") + err := src.Put(ctx, key, value) if err != nil { r.log.Warn("Failed to write to redundant target", "backend", src.BackendType(), "err", err) + cb("failure") } else { successes++ + cb("success") } } @@ -132,52 +122,47 @@ func (r *SecondaryRouter) StreamProcess(ctx context.Context) { func (r *SecondaryRouter) MultiSourceRead(ctx context.Context, commitment []byte, fallback bool, verify func([]byte, []byte) error) ([]byte, error) { var sources []PrecomputedKeyStore if fallback { - r.fallbackLock.RLock() - defer r.fallbackLock.RUnlock() - sources = r.fallbacks } else { - r.cacheLock.RLock() - defer r.cacheLock.RUnlock() - sources = r.caches } key := crypto.Keccak256(commitment) for _, src := range sources { + cb := r.m.RecordSecondaryRequest(src.BackendType().String(), "get") data, err := src.Get(ctx, key) if err != nil { + cb("failure") r.log.Warn("Failed to read from redundant target", "backend", src.BackendType(), "err", err) continue } if data == nil { + cb("miss") r.log.Debug("No data found in redundant target", "backend", src.BackendType()) continue } // verify cert:data using provided verification function + r.verifyLock.Lock() err = verify(commitment, data) if err != nil { + cb("failure") log.Warn("Failed to verify blob", "err", err, "backend", src.BackendType()) + r.verifyLock.Unlock() continue } - + r.verifyLock.Unlock() + cb("success") return data, nil } return nil, errors.New("no data found in any redundant backend") } func (r *SecondaryRouter) Fallbacks() []PrecomputedKeyStore { - r.fallbackLock.RLock() - defer r.fallbackLock.RUnlock() - return r.fallbacks } func (r *SecondaryRouter) Caches() []PrecomputedKeyStore { - r.cacheLock.RLock() - defer r.cacheLock.RUnlock() - return r.caches } diff --git a/store/store.go b/store/store.go index 0b75789..74de0da 100644 --- a/store/store.go +++ b/store/store.go @@ -65,8 +65,7 @@ type Stats struct { } type Store interface { - // Stats returns the current usage metrics of the key-value data store. - Stats() *Stats + // Backend returns the backend type provider of the store. BackendType() BackendType // Verify verifies the given key-value pair. @@ -75,6 +74,8 @@ type Store interface { type GeneratedKeyStore interface { Store + // Stats returns the current usage metrics of the key-value data store. + Stats() *Stats // Get retrieves the given key if it's present in the key-value data store. Get(ctx context.Context, key []byte) ([]byte, error) // Put inserts the given value into the key-value data store. From bb9b433f4b7e70a306d93d5aec542efb5e97123c Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Sun, 13 Oct 2024 06:52:49 -0400 Subject: [PATCH 5/9] chore: Better abstract secondary storage - observe secondary storage via metrics - cleanups --- commitments/mode.go | 2 +- e2e/optimism_test.go | 32 ++++++--- e2e/server_test.go | 35 ++++++---- e2e/setup.go | 5 +- metrics/metrics.go | 46 ++----------- metrics/poller.go | 151 ++++++++++++++++++++++++++++++++++++++----- server/load_store.go | 2 +- server/server.go | 3 +- store/router.go | 21 +----- store/secondary.go | 21 +++--- utils/utils.go | 19 ++++++ 11 files changed, 224 insertions(+), 113 deletions(-) diff --git a/commitments/mode.go b/commitments/mode.go index ef106ad..b195a1e 100644 --- a/commitments/mode.go +++ b/commitments/mode.go @@ -8,7 +8,7 @@ import ( type CommitmentMeta struct { Mode CommitmentMode // CertVersion is shared for all modes and denotes version of the EigenDA certificate - CertVersion byte + CertVersion uint8 } type CommitmentMode string diff --git a/e2e/optimism_test.go b/e2e/optimism_test.go index 9a5558a..1b2c710 100644 --- a/e2e/optimism_test.go +++ b/e2e/optimism_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/Layr-Labs/eigenda-proxy/e2e" + "github.com/Layr-Labs/eigenda-proxy/metrics" altda "github.com/ethereum-optimism/optimism/op-alt-da" "github.com/ethereum-optimism/optimism/op-e2e/actions" "github.com/ethereum-optimism/optimism/op-e2e/config" @@ -166,11 +167,18 @@ func TestOptimismKeccak256Commitment(gt *testing.T) { optimism.sequencer.ActL2PipelineFull(t) optimism.ActL1Finalized(t) - // assert that EigenDA proxy's was written and read from - // stat := proxyTS.Server.GetS3Stats() + // assert that keccak256 primary store was written and read from + labels := metrics.BuildServerRPCLabels("put", "", "optimism_keccak256", "0") + delete(labels, "method") + + ms, err := proxyTS.MetricPoller.PollMetricsWithRetry(metrics.ServerRPCStatuses, labels, 5) + require.NoError(t, err) + require.NotEmpty(t, ms) + require.Len(t, ms, 2) + + require.True(t, ms[0].Count > 0) + require.True(t, ms[1].Count > 0) - // require.Equal(t, 1, stat.Entries) - // require.Equal(t, 1, stat.Reads) } func TestOptimismGenericCommitment(gt *testing.T) { @@ -222,9 +230,15 @@ func TestOptimismGenericCommitment(gt *testing.T) { // assert that EigenDA proxy's was written and read from - if useMemory() { - stat := proxyTS.Server.GetEigenDAStats() - require.Equal(t, 1, stat.Entries) - require.Equal(t, 1, stat.Reads) - } + // assert that EigenDA's primary store was written and read from + labels := metrics.BuildServerRPCLabels("put", "", "optimism_generic", "0") + delete(labels, "method") + + ms, err := proxyTS.MetricPoller.PollMetricsWithRetry(metrics.ServerRPCStatuses, labels, 5) + require.NoError(t, err) + require.NotEmpty(t, ms) + require.Len(t, ms, 2) + + require.True(t, ms[0].Count > 0) + require.True(t, ms[1].Count > 0) } diff --git a/e2e/server_test.go b/e2e/server_test.go index 7e853e0..b11eadc 100644 --- a/e2e/server_test.go +++ b/e2e/server_test.go @@ -8,6 +8,8 @@ import ( "time" "github.com/Layr-Labs/eigenda-proxy/client" + "github.com/Layr-Labs/eigenda-proxy/metrics" + "github.com/Layr-Labs/eigenda-proxy/store" "github.com/Layr-Labs/eigenda-proxy/e2e" altda "github.com/ethereum-optimism/optimism/op-alt-da" @@ -349,10 +351,13 @@ func TestProxyServerCaching(t *testing.T) { require.Equal(t, testPreimage, preimage) // ensure that read was from cache - val, err := ts.MetricPoller.Poll("secondary.requests_total") + labels := metrics.BuildSecondaryCountLabels(store.S3BackendType.String(), http.MethodGet, "success") + + ms, err := ts.MetricPoller.PollMetricsWithRetry(metrics.SecondaryRequestStatuses, labels, 5) require.NoError(t, err) + require.Len(t, ms, 1) - println(val) + require.True(t, ms[0].Count > 0) if useMemory() { // ensure that eigenda was not read from memStats := ts.Server.GetEigenDAStats() @@ -393,12 +398,14 @@ func TestProxyServerCachingWithRedis(t *testing.T) { require.Equal(t, testPreimage, preimage) // ensure that read was from cache - // redStats, err := ts.Server.GetStoreStats(store.RedisBackendType) - // require.NoError(t, err) - - // require.Equal(t, 1, redStats.Reads) - // require.Equal(t, 1, redStats.Entries) + labels := metrics.BuildSecondaryCountLabels(store.RedisBackendType.String(), http.MethodGet, "success") + ms, err := ts.MetricPoller.PollMetrics(metrics.SecondaryRequestStatuses, labels) + require.NoError(t, err) + require.NotEmpty(t, ms) + require.Len(t, ms, 1) + require.True(t, ms[0].Count >= 1) + // TODO: Add metrics for EigenDA dispersal/retrieval if useMemory() { // ensure that eigenda was not read from memStats := ts.Server.GetEigenDAStats() require.Equal(t, 0, memStats.Reads) @@ -420,6 +427,7 @@ func TestProxyServerReadFallback(t *testing.T) { t.Parallel() + // setup server with S3 as a fallback option testCfg := e2e.TestConfig(useMemory()) testCfg.UseS3Fallback = true testCfg.Expiration = time.Millisecond * 1 @@ -447,11 +455,16 @@ func TestProxyServerReadFallback(t *testing.T) { require.NoError(t, err) require.Equal(t, testPreimage, preimage) - // ensure that read was from fallback target location (i.e, S3 for this test) - // s3Stats := ts.Server.GetS3Stats() - // require.Equal(t, 1, s3Stats.Reads) - // require.Equal(t, 1, s3Stats.Entries) + labels := metrics.BuildSecondaryCountLabels(store.S3BackendType.String(), http.MethodGet, "success") + + ms, err := ts.MetricPoller.PollMetricsWithRetry(metrics.SecondaryRequestStatuses, labels, 5) + require.NoError(t, err) + require.NotEmpty(t, ms) + require.Len(t, ms, 1) + + require.True(t, ms[0].Count > 0) + // TODO - remove this in favor of metrics sampling if useMemory() { // ensure that an eigenda read was attempted with zero data available memStats := ts.Server.GetEigenDAStats() require.Equal(t, 1, memStats.Reads) diff --git a/e2e/setup.go b/e2e/setup.go index eba1ff8..d56ba13 100644 --- a/e2e/setup.go +++ b/e2e/setup.go @@ -178,7 +178,7 @@ type TestSuite struct { Ctx context.Context Log log.Logger Server *server.Server - MetricPoller *metrics.MetricsPoller + MetricPoller *metrics.PollerClient MetricSvr *httputil.HTTPServer } @@ -198,7 +198,6 @@ func CreateTestSuite(t *testing.T, testSuiteCfg server.CLIConfig) (TestSuite, fu m, ) - require.NoError(t, err) proxySvr := server.NewServer(host, 0, store, log, m) @@ -226,7 +225,7 @@ func CreateTestSuite(t *testing.T, testSuiteCfg server.CLIConfig) (TestSuite, fu Ctx: ctx, Log: log, Server: proxySvr, - MetricPoller: metrics.NewPoller(fmt.Sprintf("http://%s", m.Address())), + MetricPoller: metrics.NewPoller(fmt.Sprintf("http://%s", metricsSvr.Addr().String())), MetricSvr: metricsSvr, }, kill } diff --git a/metrics/metrics.go b/metrics/metrics.go index deae671..a9855e2 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -54,8 +54,6 @@ type Metrics struct { registry *prometheus.Registry factory metrics.Factory - - address string } var _ Metricer = (*Metrics)(nil) @@ -91,7 +89,7 @@ func NewMetrics(subsystem string) *Metrics { Name: "requests_total", Help: "Total requests to the HTTP server", }, []string{ - "method", "status", "commitment_mode", "DA_cert_version", + "method", "status", "commitment_mode", "cert_version", }), HTTPServerBadRequestHeader: factory.NewCounterVec(prometheus.CounterOpts{ Namespace: namespace, @@ -149,7 +147,7 @@ func (m *Metrics) RecordUp() { // RecordRPCServerRequest is a helper method to record an incoming HTTP request. // It bumps the requests metric, and tracks how long it takes to serve a response, // including the HTTP status code. -func (m *Metrics) RecordRPCServerRequest(method string) func(status string, mode string, ver string) { +func (m *Metrics) RecordRPCServerRequest(method string) func(status, mode, ver string) { // we don't want to track the status code on the histogram because that would // create a huge number of labels, and cost a lot on cloud hosted services timer := prometheus.NewTimer(m.HTTPServerRequestDurationSeconds.WithLabelValues(method)) @@ -159,10 +157,6 @@ func (m *Metrics) RecordRPCServerRequest(method string) func(status string, mode } } -func (m *Metrics) Address() string { - return m.address -} - // RecordSecondaryPut records a secondary put/get operation. func (m *Metrics) RecordSecondaryRequest(bt string, method string) func(status string) { timer := prometheus.NewTimer(m.SecondaryRequestDurationSec.WithLabelValues(bt)) @@ -171,50 +165,22 @@ func (m *Metrics) RecordSecondaryRequest(bt string, method string) func(status s m.SecondaryRequestsTotal.WithLabelValues(bt, method, status).Inc() timer.ObserveDuration() } - } -// FindRandomOpenPort returns a random open port -func FindRandomOpenPort() (int, error) { - // Listen on a random port - listener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - return 0, fmt.Errorf("failed to find open port: %w", err) - } - defer listener.Close() - - // Get the assigned address, which includes the port - addr := listener.Addr().(*net.TCPAddr) - return addr.Port, nil -} - -// StartServer starts the metrics server on the given hostname and port. // StartServer starts the metrics server on the given hostname and port. // If port is 0, it automatically assigns an available port and returns the actual port. func (m *Metrics) StartServer(hostname string, port int) (*ophttp.HTTPServer, error) { - // Create a listener with the provided host and port. If port is 0, the system will assign one. - if port == 0 { - randomPort, err := FindRandomOpenPort() - if err != nil { - return nil, fmt.Errorf("failed to find open port: %w", err) - } - port = randomPort - } - m.address = net.JoinHostPort(hostname, strconv.Itoa(port)) - + address := net.JoinHostPort(hostname, strconv.Itoa(port)) - // Set up Prometheus metrics handler h := promhttp.InstrumentMetricHandler( m.registry, promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{}), ) - - // Start the HTTP server using the listener, so we can control the actual port - server, err := ophttp.StartHTTPServer(m.address, h) + + server, err := ophttp.StartHTTPServer(address, h) if err != nil { - return nil, fmt.Errorf("failed to start HTTP server: %v", err) + return nil, fmt.Errorf("failed to start HTTP server: %w", err) } - // Return the actual port the server is bound to return server, nil } diff --git a/metrics/poller.go b/metrics/poller.go index 4a03e60..3466c48 100644 --- a/metrics/poller.go +++ b/metrics/poller.go @@ -2,47 +2,166 @@ package metrics import ( "fmt" - "io/ioutil" + "io" "net/http" + "regexp" + "strconv" "strings" + "time" ) -// MetricsPoller ... used to poll metrics from server +type MetricKey string + +const ( + ServerRPCStatuses MetricKey = "eigenda_proxy_http_server_requests_total" + SecondaryRequestStatuses MetricKey = "eigenda_proxy_secondary_requests_total" +) + +// MetricWithCount represents a metric with labels (key-value pairs) and a count +type MetricWithCount struct { + Name string `json:"name"` + Labels map[string]string `json:"labels"` + Count int `json:"count"` +} + +func parseMetric(input string) (MetricWithCount, error) { + // Regular expression to match the metric name, key-value pairs, and count + re := regexp.MustCompile(`^(\w+)\{([^}]*)\}\s+(\d+)$`) + match := re.FindStringSubmatch(input) + + if len(match) != 4 { + return MetricWithCount{}, fmt.Errorf("invalid metric format") + } + + // Extract the name and count + name := match[1] + labelsString := match[2] + count, err := strconv.Atoi(match[3]) + if err != nil { + return MetricWithCount{}, fmt.Errorf("invalid count value: %v", err) + } + + // Extract the labels (key-value pairs) from the second capture group + labelsRe := regexp.MustCompile(`(\w+)="([^"]+)"`) + labelsMatches := labelsRe.FindAllStringSubmatch(labelsString, -1) + + labels := make(map[string]string) + for _, labelMatch := range labelsMatches { + key := labelMatch[1] + value := labelMatch[2] + labels[key] = value + } + + // Return the parsed metric with labels and count + return MetricWithCount{ + Name: name, + Labels: labels, + Count: count, + }, nil +} + +// PollerClient ... used to poll metrics from server // used in E2E testing to assert client->server interactions -type MetricsPoller struct { +type PollerClient struct { address string client *http.Client } -func NewPoller(address string) *MetricsPoller { - return &MetricsPoller{ +func NewPoller(address string) *PollerClient { + return &PollerClient{ address: address, client: &http.Client{}, } } -// Poll ... polls metrics from the given address and does a linear search +func BuildSecondaryCountLabels(backendType, method, status string) map[string]string { + return map[string]string{ + "backend_type": backendType, + "method": method, + "status": status, + } +} + +// "method", "status", "commitment_mode", "cert_version" +func BuildServerRPCLabels(method, status, commitmentMode, certVersion string) map[string]string { + return map[string]string{ + "method": method, + "status": status, + "commitment_mode": commitmentMode, + "cert_version": certVersion, + } + +} + +type MetricSlice []*MetricWithCount + +func hasMetric(line string, labels map[string]string) bool { + for label, value := range labels { + if !strings.Contains(line, label) { + return false + } + + if !strings.Contains(line, value) { + return false + } + } + + return true +} + +// PollMetricsWithRetry ... Polls for a Count Metric using a simple retry strategy of 1 second sleep x times +// keeping this non-modular is ok since this is only used for testing +func (m *PollerClient) PollMetricsWithRetry(name MetricKey, labels map[string]string, times int) (MetricSlice, error) { + var ms MetricSlice + var err error + + for i := 0; i < times; i++ { + ms, err = m.PollMetrics(name, labels) + if err != nil { + time.Sleep(time.Second * 1) + continue + } + + return ms, nil + + } + + return nil, err +} + +// PollMetrics ... polls metrics from the given address and does a linear search // provided the metric name -func (m *MetricsPoller) Poll(metricName string) (string, error) { - str, err := m.request(m.address) +// assumes 1 metric to key mapping +func (m *PollerClient) PollMetrics(name MetricKey, labels map[string]string) (MetricSlice, error) { + str, err := m.fetchMetrics(m.address) if err != nil { - return "", err + return nil, err } - println("body", str) + entries := []*MetricWithCount{} lines := strings.Split(str, "\n") for _, line := range lines { - if strings.HasPrefix(line, metricName) { - return line, nil + if strings.HasPrefix(line, string(name)) && hasMetric(line, labels) { + mc, err := parseMetric(line) + if err != nil { + return nil, err + } + + entries = append(entries, &mc) } } - return "", fmt.Errorf("metric %s not found", metricName) + + if len(entries) == 0 { + return nil, fmt.Errorf("no entries found for metric: %s", name) + } + + return entries, nil } -// PollMetrics polls the Prometheus metrics from the given address -func (m *MetricsPoller) request(address string) (string, error) { +// fetchMetrics ... reads metrics server endpoint contents into string +func (m *PollerClient) fetchMetrics(address string) (string, error) { resp, err := m.client.Get(address) if err != nil { return "", fmt.Errorf("error polling metrics: %v", err) @@ -53,7 +172,7 @@ func (m *MetricsPoller) request(address string) (string, error) { return "", fmt.Errorf("received non-200 status code: %d", resp.StatusCode) } - body, err := ioutil.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) if err != nil { return "", fmt.Errorf("error reading response body: %v", err) } diff --git a/server/load_store.go b/server/load_store.go index 3ef1575..32bdb7e 100644 --- a/server/load_store.go +++ b/server/load_store.go @@ -127,7 +127,7 @@ func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger, m metri if secondary.Enabled() { // only spin-up go routines if secondary storage is enabled log.Debug("Starting secondary stream processing routines") - go secondary.StreamProcess(ctx) + go secondary.SubscribeToPutNotif(ctx) } log.Info("Creating storage router", "eigenda backend type", eigenDA != nil, "s3 backend type", s3Store != nil) diff --git a/server/server.go b/server/server.go index 1a5f79a..8eda747 100644 --- a/server/server.go +++ b/server/server.go @@ -69,6 +69,7 @@ func WithMetrics( if err != nil { var metaErr MetaError if errors.As(err, &metaErr) { + // TODO: Figure out why status is defaulting to "" recordDur(w.Header().Get("status"), string(metaErr.Meta.Mode), string(metaErr.Meta.CertVersion)) } else { recordDur(w.Header().Get("status"), string("NoCommitmentMode"), string("NoCertVersion")) @@ -76,7 +77,7 @@ func WithMetrics( return err } // we assume that every route will set the status header - recordDur(w.Header().Get("status"), string(meta.Mode), string(meta.CertVersion)) + recordDur(w.Header().Get("status"), string(meta.Mode), strconv.Itoa(int(meta.CertVersion))) return nil } } diff --git a/store/router.go b/store/router.go index 8bc98ef..dc120f6 100644 --- a/store/router.go +++ b/store/router.go @@ -16,9 +16,6 @@ type IRouter interface { Put(ctx context.Context, cm commitments.CommitmentMode, key, value []byte) ([]byte, error) GetEigenDAStore() GeneratedKeyStore - GetS3Store() PrecomputedKeyStore - Caches() []PrecomputedKeyStore - Fallbacks() []PrecomputedKeyStore } // Router ... storage backend routing layer @@ -127,8 +124,7 @@ func (r *Router) Put(ctx context.Context, cm commitments.CommitmentMode, key, va } if r.secondary.Enabled() { - - r.secondary.Ingress() <- PutNotif{ + r.secondary.Topic() <- PutNotify{ Commitment: commit, Value: value, } @@ -165,18 +161,3 @@ func (r *Router) putWithKey(ctx context.Context, key []byte, value []byte) ([]by func (r *Router) GetEigenDAStore() GeneratedKeyStore { return r.eigenda } - -// GetS3Store ... -func (r *Router) GetS3Store() PrecomputedKeyStore { - return r.s3 -} - -// Caches ... -func (r *Router) Caches() []PrecomputedKeyStore { - return r.secondary.Caches() -} - -// Fallbacks ... -func (r *Router) Fallbacks() []PrecomputedKeyStore { - return r.secondary.Fallbacks() -} diff --git a/store/secondary.go b/store/secondary.go index 0ff8317..b49277f 100644 --- a/store/secondary.go +++ b/store/secondary.go @@ -3,6 +3,7 @@ package store import ( "context" "errors" + "net/http" "sync" "github.com/Layr-Labs/eigenda-proxy/metrics" @@ -15,22 +16,22 @@ type ISecondary interface { Fallbacks() []PrecomputedKeyStore Caches() []PrecomputedKeyStore Enabled() bool - Ingress() chan<- PutNotif + Topic() chan<- PutNotify CachingEnabled() bool FallbackEnabled() bool HandleRedundantWrites(ctx context.Context, commitment []byte, value []byte) error MultiSourceRead(context.Context, []byte, bool, func([]byte, []byte) error) ([]byte, error) - StreamProcess(context.Context) + SubscribeToPutNotif(context.Context) } -type PutNotif struct { +type PutNotify struct { Commitment []byte Value []byte } // SecondaryRouter ... routing abstraction for secondary storage backends type SecondaryRouter struct { - stream chan PutNotif + stream chan PutNotify log log.Logger m metrics.Metricer @@ -43,7 +44,7 @@ type SecondaryRouter struct { // NewSecondaryRouter ... creates a new secondary storage router func NewSecondaryRouter(log log.Logger, m metrics.Metricer, caches []PrecomputedKeyStore, fallbacks []PrecomputedKeyStore) (ISecondary, error) { return &SecondaryRouter{ - stream: make(chan PutNotif), + stream: make(chan PutNotify), log: log, m: m, caches: caches, @@ -51,7 +52,7 @@ func NewSecondaryRouter(log log.Logger, m metrics.Metricer, caches []Precomputed }, nil } -func (r *SecondaryRouter) Ingress() chan<- PutNotif { +func (r *SecondaryRouter) Topic() chan<- PutNotify { return r.stream } @@ -73,7 +74,6 @@ func (r *SecondaryRouter) FallbackEnabled() bool { // caller step for different target sets vs. reading which is done conditionally to segment between a cached read type // vs a fallback read type func (r *SecondaryRouter) HandleRedundantWrites(ctx context.Context, commitment []byte, value []byte) error { - println("HandleRedundantWrites") sources := r.caches sources = append(sources, r.fallbacks...) @@ -81,7 +81,7 @@ func (r *SecondaryRouter) HandleRedundantWrites(ctx context.Context, commitment successes := 0 for _, src := range sources { - cb := r.m.RecordSecondaryRequest(src.BackendType().String(), "put") + cb := r.m.RecordSecondaryRequest(src.BackendType().String(), http.MethodPut) err := src.Put(ctx, key, value) if err != nil { @@ -100,7 +100,7 @@ func (r *SecondaryRouter) HandleRedundantWrites(ctx context.Context, commitment return nil } -func (r *SecondaryRouter) StreamProcess(ctx context.Context) { +func (r *SecondaryRouter) SubscribeToPutNotif(ctx context.Context) { for { select { case notif := <-r.stream: @@ -113,7 +113,6 @@ func (r *SecondaryRouter) StreamProcess(ctx context.Context) { return } } - } // MultiSourceRead ... reads from a set of backends and returns the first successfully read blob @@ -129,7 +128,7 @@ func (r *SecondaryRouter) MultiSourceRead(ctx context.Context, commitment []byte key := crypto.Keccak256(commitment) for _, src := range sources { - cb := r.m.RecordSecondaryRequest(src.BackendType().String(), "get") + cb := r.m.RecordSecondaryRequest(src.BackendType().String(), http.MethodGet) data, err := src.Get(ctx, key) if err != nil { cb("failure") diff --git a/utils/utils.go b/utils/utils.go index aa83aa9..ab45810 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -2,10 +2,29 @@ package utils import ( "fmt" + "net" "strconv" "strings" ) +// FindRandomOpenPort returns a random open port +func FindRandomOpenPort() (int, error) { + // Listen on a random port + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return 0, fmt.Errorf("failed to find open port: %w", err) + } + defer listener.Close() + + // Get the assigned address, which includes the port + addr, ok := listener.Addr().(*net.TCPAddr) + if !ok { + return 0, fmt.Errorf("failed to cast listener address to TCPAddr") + } + + return addr.Port, nil +} + // Helper utility functions // func ContainsDuplicates[P comparable](s []P) bool { From 4b9b0e2539dbba2af4b99e7c7c7b3d9781ea7104 Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Sun, 13 Oct 2024 16:29:06 -0400 Subject: [PATCH 6/9] chore: Better abstract secondary storage - observe secondary storage via metrics - refactors and lints --- e2e/optimism_test.go | 9 +++++---- e2e/server_test.go | 6 +++--- metrics/poller.go | 46 ++++++++++++++++++++++++++------------------ out.txt | 16 +++++++++++++++ out.xt | 0 5 files changed, 51 insertions(+), 26 deletions(-) create mode 100644 out.txt create mode 100644 out.xt diff --git a/e2e/optimism_test.go b/e2e/optimism_test.go index 1b2c710..c0d898c 100644 --- a/e2e/optimism_test.go +++ b/e2e/optimism_test.go @@ -3,6 +3,7 @@ package e2e_test import ( "testing" + "github.com/Layr-Labs/eigenda-proxy/commitments" "github.com/Layr-Labs/eigenda-proxy/e2e" "github.com/Layr-Labs/eigenda-proxy/metrics" altda "github.com/ethereum-optimism/optimism/op-alt-da" @@ -168,10 +169,10 @@ func TestOptimismKeccak256Commitment(gt *testing.T) { optimism.ActL1Finalized(t) // assert that keccak256 primary store was written and read from - labels := metrics.BuildServerRPCLabels("put", "", "optimism_keccak256", "0") + labels := metrics.BuildServerRPCLabels("put", "", string(commitments.OptimismKeccak), "0") delete(labels, "method") - ms, err := proxyTS.MetricPoller.PollMetricsWithRetry(metrics.ServerRPCStatuses, labels, 5) + ms, err := proxyTS.MetricPoller.PollCountMetricsWithRetry(metrics.ServerRPCStatuses, labels, 5) require.NoError(t, err) require.NotEmpty(t, ms) require.Len(t, ms, 2) @@ -231,10 +232,10 @@ func TestOptimismGenericCommitment(gt *testing.T) { // assert that EigenDA proxy's was written and read from // assert that EigenDA's primary store was written and read from - labels := metrics.BuildServerRPCLabels("put", "", "optimism_generic", "0") + labels := metrics.BuildServerRPCLabels("put", "", string(commitments.OptimismGeneric), "0") delete(labels, "method") - ms, err := proxyTS.MetricPoller.PollMetricsWithRetry(metrics.ServerRPCStatuses, labels, 5) + ms, err := proxyTS.MetricPoller.PollCountMetricsWithRetry(metrics.ServerRPCStatuses, labels, 5) require.NoError(t, err) require.NotEmpty(t, ms) require.Len(t, ms, 2) diff --git a/e2e/server_test.go b/e2e/server_test.go index b11eadc..07729f3 100644 --- a/e2e/server_test.go +++ b/e2e/server_test.go @@ -353,7 +353,7 @@ func TestProxyServerCaching(t *testing.T) { // ensure that read was from cache labels := metrics.BuildSecondaryCountLabels(store.S3BackendType.String(), http.MethodGet, "success") - ms, err := ts.MetricPoller.PollMetricsWithRetry(metrics.SecondaryRequestStatuses, labels, 5) + ms, err := ts.MetricPoller.PollCountMetricsWithRetry(metrics.SecondaryRequestStatuses, labels, 5) require.NoError(t, err) require.Len(t, ms, 1) @@ -399,7 +399,7 @@ func TestProxyServerCachingWithRedis(t *testing.T) { // ensure that read was from cache labels := metrics.BuildSecondaryCountLabels(store.RedisBackendType.String(), http.MethodGet, "success") - ms, err := ts.MetricPoller.PollMetrics(metrics.SecondaryRequestStatuses, labels) + ms, err := ts.MetricPoller.PollCountMetricsWithRetry(metrics.SecondaryRequestStatuses, labels, 5) require.NoError(t, err) require.NotEmpty(t, ms) require.Len(t, ms, 1) @@ -457,7 +457,7 @@ func TestProxyServerReadFallback(t *testing.T) { labels := metrics.BuildSecondaryCountLabels(store.S3BackendType.String(), http.MethodGet, "success") - ms, err := ts.MetricPoller.PollMetricsWithRetry(metrics.SecondaryRequestStatuses, labels, 5) + ms, err := ts.MetricPoller.PollCountMetricsWithRetry(metrics.SecondaryRequestStatuses, labels, 5) require.NoError(t, err) require.NotEmpty(t, ms) require.Len(t, ms, 1) diff --git a/metrics/poller.go b/metrics/poller.go index 3466c48..020d51a 100644 --- a/metrics/poller.go +++ b/metrics/poller.go @@ -1,6 +1,7 @@ package metrics import ( + "context" "fmt" "io" "net/http" @@ -10,6 +11,10 @@ import ( "time" ) +/* + NOTE: This poller is only used for E2E testing and is unrecommended for any general application usage within EigenDA proxy +*/ + type MetricKey string const ( @@ -20,17 +25,17 @@ const ( // MetricWithCount represents a metric with labels (key-value pairs) and a count type MetricWithCount struct { Name string `json:"name"` - Labels map[string]string `json:"labels"` + Labels map[string]string `json:"labels"` // used for filtering Count int `json:"count"` } -func parseMetric(input string) (MetricWithCount, error) { +func parseCountMetric(input string) (MetricWithCount, error) { // Regular expression to match the metric name, key-value pairs, and count re := regexp.MustCompile(`^(\w+)\{([^}]*)\}\s+(\d+)$`) match := re.FindStringSubmatch(input) if len(match) != 4 { - return MetricWithCount{}, fmt.Errorf("invalid metric format") + return MetricWithCount{}, fmt.Errorf("invalid count metric format") } // Extract the name and count @@ -38,7 +43,7 @@ func parseMetric(input string) (MetricWithCount, error) { labelsString := match[2] count, err := strconv.Atoi(match[3]) if err != nil { - return MetricWithCount{}, fmt.Errorf("invalid count value: %v", err) + return MetricWithCount{}, fmt.Errorf("invalid count value read from metric line: %w", err) } // Extract the labels (key-value pairs) from the second capture group @@ -67,6 +72,7 @@ type PollerClient struct { client *http.Client } +// NewPoller ... initializer func NewPoller(address string) *PollerClient { return &PollerClient{ address: address, @@ -74,6 +80,7 @@ func NewPoller(address string) *PollerClient { } } +// BuildSecondaryCountLabels ... builds label mapping used to query for secondary storage count metrics func BuildSecondaryCountLabels(backendType, method, status string) map[string]string { return map[string]string{ "backend_type": backendType, @@ -82,7 +89,7 @@ func BuildSecondaryCountLabels(backendType, method, status string) map[string]st } } -// "method", "status", "commitment_mode", "cert_version" +// BuildServerRPCLabels ... builds label mapping used to query for standard http server count metrics func BuildServerRPCLabels(method, status, commitmentMode, certVersion string) map[string]string { return map[string]string{ "method": method, @@ -90,7 +97,6 @@ func BuildServerRPCLabels(method, status, commitmentMode, certVersion string) ma "commitment_mode": commitmentMode, "cert_version": certVersion, } - } type MetricSlice []*MetricWithCount @@ -109,31 +115,29 @@ func hasMetric(line string, labels map[string]string) bool { return true } -// PollMetricsWithRetry ... Polls for a Count Metric using a simple retry strategy of 1 second sleep x times +// PollCountMetricsWithRetry ... Polls for a Count Metric using a simple retry strategy of 1 second sleep x times // keeping this non-modular is ok since this is only used for testing -func (m *PollerClient) PollMetricsWithRetry(name MetricKey, labels map[string]string, times int) (MetricSlice, error) { +func (m *PollerClient) PollCountMetricsWithRetry(name MetricKey, labels map[string]string, times int) (MetricSlice, error) { var ms MetricSlice var err error for i := 0; i < times; i++ { - ms, err = m.PollMetrics(name, labels) + ms, err = m.PollCountMetrics(name, labels) if err != nil { time.Sleep(time.Second * 1) continue } return ms, nil - } - return nil, err } // PollMetrics ... polls metrics from the given address and does a linear search // provided the metric name // assumes 1 metric to key mapping -func (m *PollerClient) PollMetrics(name MetricKey, labels map[string]string) (MetricSlice, error) { - str, err := m.fetchMetrics(m.address) +func (m *PollerClient) PollCountMetrics(name MetricKey, labels map[string]string) (MetricSlice, error) { + str, err := m.fetchMetrics() if err != nil { return nil, err } @@ -143,7 +147,7 @@ func (m *PollerClient) PollMetrics(name MetricKey, labels map[string]string) (Me lines := strings.Split(str, "\n") for _, line := range lines { if strings.HasPrefix(line, string(name)) && hasMetric(line, labels) { - mc, err := parseMetric(line) + mc, err := parseCountMetric(line) if err != nil { return nil, err } @@ -157,14 +161,18 @@ func (m *PollerClient) PollMetrics(name MetricKey, labels map[string]string) (Me } return entries, nil - } // fetchMetrics ... reads metrics server endpoint contents into string -func (m *PollerClient) fetchMetrics(address string) (string, error) { - resp, err := m.client.Get(address) +func (m *PollerClient) fetchMetrics() (string, error) { + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, m.address, nil) + if err != nil { + return "", fmt.Errorf("error creating request: %w", err) + } + + resp, err := m.client.Do(req) if err != nil { - return "", fmt.Errorf("error polling metrics: %v", err) + return "", fmt.Errorf("error polling metrics: %w", err) } defer resp.Body.Close() @@ -174,7 +182,7 @@ func (m *PollerClient) fetchMetrics(address string) (string, error) { body, err := io.ReadAll(resp.Body) if err != nil { - return "", fmt.Errorf("error reading response body: %v", err) + return "", fmt.Errorf("error reading response body: %w", err) } return string(body), nil diff --git a/out.txt b/out.txt new file mode 100644 index 0000000..fbce3ee --- /dev/null +++ b/out.txt @@ -0,0 +1,16 @@ +minio +minio +redis +redis +docker run -p 4566:9000 -d -e "MINIO_ROOT_USER=minioadmin" -e "MINIO_ROOT_PASSWORD=minioadmin" --name minio minio/minio server /data +5d5edd8752b6219317cc2bfbfb94d479775efc0df1d8a68515e0fbf5c6443fbe +docker run -p 9001:6379 -d --name redis redis +f5bdc424d3b8853d386017b93bd0d6822bc017e89b202ff35e8a194d4aedcdb4 +INTEGRATION=true go test -timeout 1m ./e2e -parallel 4 -deploy-config ../.devnet/devnetL1.json && \ + make stop-minio && \ + make stop-redis +ok github.com/Layr-Labs/eigenda-proxy/e2e 10.399s +minio +minio +redis +redis diff --git a/out.xt b/out.xt new file mode 100644 index 0000000..e69de29 From 13f221bc35e3db2fbcafd067f695a0b00c4cbe91 Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Sun, 13 Oct 2024 16:29:15 -0400 Subject: [PATCH 7/9] chore: Better abstract secondary storage - observe secondary storage via metrics - refactors and lints --- out.txt | 16 ---------------- out.xt | 0 2 files changed, 16 deletions(-) delete mode 100644 out.txt delete mode 100644 out.xt diff --git a/out.txt b/out.txt deleted file mode 100644 index fbce3ee..0000000 --- a/out.txt +++ /dev/null @@ -1,16 +0,0 @@ -minio -minio -redis -redis -docker run -p 4566:9000 -d -e "MINIO_ROOT_USER=minioadmin" -e "MINIO_ROOT_PASSWORD=minioadmin" --name minio minio/minio server /data -5d5edd8752b6219317cc2bfbfb94d479775efc0df1d8a68515e0fbf5c6443fbe -docker run -p 9001:6379 -d --name redis redis -f5bdc424d3b8853d386017b93bd0d6822bc017e89b202ff35e8a194d4aedcdb4 -INTEGRATION=true go test -timeout 1m ./e2e -parallel 4 -deploy-config ../.devnet/devnetL1.json && \ - make stop-minio && \ - make stop-redis -ok github.com/Layr-Labs/eigenda-proxy/e2e 10.399s -minio -minio -redis -redis diff --git a/out.xt b/out.xt deleted file mode 100644 index e69de29..0000000 From 95790f2e5eeffa45aafcc79175b4c6ea3a19abdf Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Sun, 13 Oct 2024 16:43:33 -0400 Subject: [PATCH 8/9] chore: Better abstract secondary storage - observe secondary storage via metrics - refactors and lints --- server/load_store.go | 12 +++++------ store/router.go | 4 ++-- store/secondary.go | 49 +++++++++++++++++--------------------------- 3 files changed, 26 insertions(+), 39 deletions(-) diff --git a/server/load_store.go b/server/load_store.go index 32bdb7e..de65078 100644 --- a/server/load_store.go +++ b/server/load_store.go @@ -91,7 +91,7 @@ func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger, m metri // create EigenDA backend store var eigenDA store.GeneratedKeyStore if cfg.EigenDAConfig.MemstoreEnabled { - log.Info("Using mem-store backend for EigenDA") + log.Info("Using memstore backend for EigenDA") eigenDA, err = memstore.New(ctx, verifier, log, cfg.EigenDAConfig.MemstoreConfig) } else { var client *clients.EigenDAClient @@ -120,14 +120,12 @@ func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger, m metri // create secondary storage router fallbacks := populateTargets(cfg.EigenDAConfig.FallbackTargets, s3Store, redisStore) caches := populateTargets(cfg.EigenDAConfig.CacheTargets, s3Store, redisStore) - secondary, err := store.NewSecondaryRouter(log, m, caches, fallbacks) - if err != nil { - return nil, err - } + secondary := store.NewSecondaryRouter(log, m, caches, fallbacks) if secondary.Enabled() { // only spin-up go routines if secondary storage is enabled - log.Debug("Starting secondary stream processing routines") - go secondary.SubscribeToPutNotif(ctx) + // NOTE: in the future the number of threads could be made configurable via env + log.Debug("Starting secondary write loop") + go secondary.WriteLoop(ctx) } log.Info("Creating storage router", "eigenda backend type", eigenDA != nil, "s3 backend type", s3Store != nil) diff --git a/store/router.go b/store/router.go index dc120f6..67898ca 100644 --- a/store/router.go +++ b/store/router.go @@ -123,8 +123,8 @@ func (r *Router) Put(ctx context.Context, cm commitments.CommitmentMode, key, va return nil, err } - if r.secondary.Enabled() { - r.secondary.Topic() <- PutNotify{ + if r.secondary.Enabled() { // publish put notification to secondary's subscription on PutNotification topic + r.secondary.Subscription() <- PutNotify{ Commitment: commit, Value: value, } diff --git a/store/secondary.go b/store/secondary.go index b49277f..8f70b0e 100644 --- a/store/secondary.go +++ b/store/secondary.go @@ -13,15 +13,13 @@ import ( ) type ISecondary interface { - Fallbacks() []PrecomputedKeyStore - Caches() []PrecomputedKeyStore Enabled() bool - Topic() chan<- PutNotify + Subscription() chan<- PutNotify CachingEnabled() bool FallbackEnabled() bool HandleRedundantWrites(ctx context.Context, commitment []byte, value []byte) error MultiSourceRead(context.Context, []byte, bool, func([]byte, []byte) error) ([]byte, error) - SubscribeToPutNotif(context.Context) + WriteLoop(context.Context) } type PutNotify struct { @@ -31,29 +29,29 @@ type PutNotify struct { // SecondaryRouter ... routing abstraction for secondary storage backends type SecondaryRouter struct { - stream chan PutNotify - log log.Logger - m metrics.Metricer + log log.Logger + m metrics.Metricer caches []PrecomputedKeyStore fallbacks []PrecomputedKeyStore - verifyLock sync.RWMutex + verifyLock sync.RWMutex + subscription chan PutNotify } // NewSecondaryRouter ... creates a new secondary storage router -func NewSecondaryRouter(log log.Logger, m metrics.Metricer, caches []PrecomputedKeyStore, fallbacks []PrecomputedKeyStore) (ISecondary, error) { +func NewSecondaryRouter(log log.Logger, m metrics.Metricer, caches []PrecomputedKeyStore, fallbacks []PrecomputedKeyStore) ISecondary { return &SecondaryRouter{ - stream: make(chan PutNotify), - log: log, - m: m, - caches: caches, - fallbacks: fallbacks, - }, nil + subscription: make(chan PutNotify), // unbuffering channel is critical to ensure that secondary bottlenecks don't impact /put latency for eigenda blob dispersals + log: log, + m: m, + caches: caches, + fallbacks: fallbacks, + } } -func (r *SecondaryRouter) Topic() chan<- PutNotify { - return r.stream +func (r *SecondaryRouter) Subscription() chan<- PutNotify { + return r.subscription } func (r *SecondaryRouter) Enabled() bool { @@ -70,9 +68,6 @@ func (r *SecondaryRouter) FallbackEnabled() bool { // handleRedundantWrites ... writes to both sets of backends (i.e, fallback, cache) // and returns an error if NONE of them succeed -// NOTE: multi-target set writes are done at once to avoid re-invocation of the same write function at the same -// caller step for different target sets vs. reading which is done conditionally to segment between a cached read type -// vs a fallback read type func (r *SecondaryRouter) HandleRedundantWrites(ctx context.Context, commitment []byte, value []byte) error { sources := r.caches sources = append(sources, r.fallbacks...) @@ -100,16 +95,18 @@ func (r *SecondaryRouter) HandleRedundantWrites(ctx context.Context, commitment return nil } -func (r *SecondaryRouter) SubscribeToPutNotif(ctx context.Context) { +// WriteLoop ... waits for notifications published to subscription channel to make backend writes +func (r *SecondaryRouter) WriteLoop(ctx context.Context) { for { select { - case notif := <-r.stream: + case notif := <-r.subscription: err := r.HandleRedundantWrites(context.Background(), notif.Commitment, notif.Value) if err != nil { r.log.Error("Failed to write to redundant targets", "err", err) } case <-ctx.Done(): + r.log.Debug("Terminating secondary event loop") return } } @@ -157,11 +154,3 @@ func (r *SecondaryRouter) MultiSourceRead(ctx context.Context, commitment []byte } return nil, errors.New("no data found in any redundant backend") } - -func (r *SecondaryRouter) Fallbacks() []PrecomputedKeyStore { - return r.fallbacks -} - -func (r *SecondaryRouter) Caches() []PrecomputedKeyStore { - return r.caches -} From 8ef8108063f2b744baa26cdd6c6a9843f8beab7d Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Wed, 16 Oct 2024 05:54:49 -0400 Subject: [PATCH 9/9] chore: Better abstract secondary storage - observe secondary storage via metrics - ensure thread safety for secondary stores --- e2e/optimism_test.go | 4 +- e2e/server_test.go | 6 +-- server/load_store.go | 5 ++- store/precomputed_key/redis/redis.go | 3 +- store/precomputed_key/s3/cli.go | 18 ++++----- store/precomputed_key/s3/s3.go | 4 +- store/router.go | 3 +- store/secondary.go | 59 ++++++++++++++++++---------- store/store.go | 1 - utils/atomic.go | 29 ++++++++++++++ utils/utils_test.go | 10 +++++ 11 files changed, 100 insertions(+), 42 deletions(-) create mode 100644 utils/atomic.go diff --git a/e2e/optimism_test.go b/e2e/optimism_test.go index c0d898c..29b8b30 100644 --- a/e2e/optimism_test.go +++ b/e2e/optimism_test.go @@ -172,7 +172,7 @@ func TestOptimismKeccak256Commitment(gt *testing.T) { labels := metrics.BuildServerRPCLabels("put", "", string(commitments.OptimismKeccak), "0") delete(labels, "method") - ms, err := proxyTS.MetricPoller.PollCountMetricsWithRetry(metrics.ServerRPCStatuses, labels, 5) + ms, err := proxyTS.MetricPoller.PollCountMetricsWithRetry(metrics.ServerRPCStatuses, labels, 20) require.NoError(t, err) require.NotEmpty(t, ms) require.Len(t, ms, 2) @@ -235,7 +235,7 @@ func TestOptimismGenericCommitment(gt *testing.T) { labels := metrics.BuildServerRPCLabels("put", "", string(commitments.OptimismGeneric), "0") delete(labels, "method") - ms, err := proxyTS.MetricPoller.PollCountMetricsWithRetry(metrics.ServerRPCStatuses, labels, 5) + ms, err := proxyTS.MetricPoller.PollCountMetricsWithRetry(metrics.ServerRPCStatuses, labels, 20) require.NoError(t, err) require.NotEmpty(t, ms) require.Len(t, ms, 2) diff --git a/e2e/server_test.go b/e2e/server_test.go index 07729f3..a36afb1 100644 --- a/e2e/server_test.go +++ b/e2e/server_test.go @@ -353,7 +353,7 @@ func TestProxyServerCaching(t *testing.T) { // ensure that read was from cache labels := metrics.BuildSecondaryCountLabels(store.S3BackendType.String(), http.MethodGet, "success") - ms, err := ts.MetricPoller.PollCountMetricsWithRetry(metrics.SecondaryRequestStatuses, labels, 5) + ms, err := ts.MetricPoller.PollCountMetricsWithRetry(metrics.SecondaryRequestStatuses, labels, 20) require.NoError(t, err) require.Len(t, ms, 1) @@ -399,7 +399,7 @@ func TestProxyServerCachingWithRedis(t *testing.T) { // ensure that read was from cache labels := metrics.BuildSecondaryCountLabels(store.RedisBackendType.String(), http.MethodGet, "success") - ms, err := ts.MetricPoller.PollCountMetricsWithRetry(metrics.SecondaryRequestStatuses, labels, 5) + ms, err := ts.MetricPoller.PollCountMetricsWithRetry(metrics.SecondaryRequestStatuses, labels, 20) require.NoError(t, err) require.NotEmpty(t, ms) require.Len(t, ms, 1) @@ -457,7 +457,7 @@ func TestProxyServerReadFallback(t *testing.T) { labels := metrics.BuildSecondaryCountLabels(store.S3BackendType.String(), http.MethodGet, "success") - ms, err := ts.MetricPoller.PollCountMetricsWithRetry(metrics.SecondaryRequestStatuses, labels, 5) + ms, err := ts.MetricPoller.PollCountMetricsWithRetry(metrics.SecondaryRequestStatuses, labels, 20) require.NoError(t, err) require.NotEmpty(t, ms) require.Len(t, ms, 1) diff --git a/server/load_store.go b/server/load_store.go index de65078..e9e7029 100644 --- a/server/load_store.go +++ b/server/load_store.go @@ -125,7 +125,10 @@ func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger, m metri if secondary.Enabled() { // only spin-up go routines if secondary storage is enabled // NOTE: in the future the number of threads could be made configurable via env log.Debug("Starting secondary write loop") - go secondary.WriteLoop(ctx) + + for i := 0; i < 5; i++ { + go secondary.WriteSubscriptionLoop(ctx) + } } log.Info("Creating storage router", "eigenda backend type", eigenDA != nil, "s3 backend type", s3Store != nil) diff --git a/store/precomputed_key/redis/redis.go b/store/precomputed_key/redis/redis.go index 1c0ac7e..a90b202 100644 --- a/store/precomputed_key/redis/redis.go +++ b/store/precomputed_key/redis/redis.go @@ -19,7 +19,8 @@ type Config struct { Profile bool } -// Store ... Redis storage backend implementation (This not safe for concurrent usage) +// Store ... Redis storage backend implementation +// go-redis client is safe for concurrent usage: https://github.com/redis/go-redis/blob/v8.11.5/redis.go#L535-L544 type Store struct { eviction time.Duration diff --git a/store/precomputed_key/s3/cli.go b/store/precomputed_key/s3/cli.go index 1a42dd1..7f120d5 100644 --- a/store/precomputed_key/s3/cli.go +++ b/store/precomputed_key/s3/cli.go @@ -1,8 +1,6 @@ package s3 import ( - "time" - "github.com/urfave/cli/v2" ) @@ -80,13 +78,13 @@ func CLIFlags(envPrefix, category string) []cli.Flag { EnvVars: withEnvPrefix(envPrefix, "BACKUP"), Category: category, }, - &cli.DurationFlag{ - Name: TimeoutFlagName, - Usage: "timeout for S3 storage operations (e.g. get, put)", - Value: 5 * time.Second, - EnvVars: withEnvPrefix(envPrefix, "TIMEOUT"), - Category: category, - }, + // &cli.DurationFlag{ + // Name: TimeoutFlagName, + // Usage: "timeout for S3 storage operations (e.g. get, put)", + // Value: 5 * time.Second, + // EnvVars: withEnvPrefix(envPrefix, "TIMEOUT"), + // Category: category, + // }, } } @@ -100,6 +98,6 @@ func ReadConfig(ctx *cli.Context) Config { Bucket: ctx.String(BucketFlagName), Path: ctx.String(PathFlagName), Backup: ctx.Bool(BackupFlagName), - Timeout: ctx.Duration(TimeoutFlagName), + // Timeout: ctx.Duration(TimeoutFlagName), } } diff --git a/store/precomputed_key/s3/s3.go b/store/precomputed_key/s3/s3.go index 50b8664..20fe276 100644 --- a/store/precomputed_key/s3/s3.go +++ b/store/precomputed_key/s3/s3.go @@ -9,7 +9,6 @@ import ( "io" "path" "strings" - "time" "github.com/Layr-Labs/eigenda-proxy/store" "github.com/ethereum/go-ethereum/crypto" @@ -47,9 +46,10 @@ type Config struct { Bucket string Path string Backup bool - Timeout time.Duration } +// Store ... S3 store +// client safe for concurrent use: https://github.com/minio/minio-go/issues/598#issuecomment-569457863 type Store struct { cfg Config client *minio.Client diff --git a/store/router.go b/store/router.go index 67898ca..e2a537d 100644 --- a/store/router.go +++ b/store/router.go @@ -29,6 +29,7 @@ type Router struct { secondary ISecondary } +// NewRouter ... Init func NewRouter(eigenda GeneratedKeyStore, s3 PrecomputedKeyStore, l log.Logger, secondary ISecondary) (IRouter, error) { return &Router{ @@ -124,7 +125,7 @@ func (r *Router) Put(ctx context.Context, cm commitments.CommitmentMode, key, va } if r.secondary.Enabled() { // publish put notification to secondary's subscription on PutNotification topic - r.secondary.Subscription() <- PutNotify{ + r.secondary.Topic() <- PutNotify{ Commitment: commit, Value: value, } diff --git a/store/secondary.go b/store/secondary.go index 8f70b0e..3e24d1f 100644 --- a/store/secondary.go +++ b/store/secondary.go @@ -7,21 +7,32 @@ import ( "sync" "github.com/Layr-Labs/eigenda-proxy/metrics" + "github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" ) +type MetricExpression = string + +const ( + Miss MetricExpression = "miss" + Success MetricExpression = "success" + Failed MetricExpression = "failed" +) + type ISecondary interface { Enabled() bool - Subscription() chan<- PutNotify + Topic() chan<- PutNotify CachingEnabled() bool FallbackEnabled() bool HandleRedundantWrites(ctx context.Context, commitment []byte, value []byte) error MultiSourceRead(context.Context, []byte, bool, func([]byte, []byte) error) ([]byte, error) - WriteLoop(context.Context) + WriteSubscriptionLoop(ctx context.Context) } +// PutNotify ... notification received by primary router to perform insertion across +// secondary storage backends type PutNotify struct { Commitment []byte Value []byte @@ -35,23 +46,25 @@ type SecondaryRouter struct { caches []PrecomputedKeyStore fallbacks []PrecomputedKeyStore - verifyLock sync.RWMutex - subscription chan PutNotify + verifyLock sync.RWMutex + topic chan PutNotify } // NewSecondaryRouter ... creates a new secondary storage router func NewSecondaryRouter(log log.Logger, m metrics.Metricer, caches []PrecomputedKeyStore, fallbacks []PrecomputedKeyStore) ISecondary { return &SecondaryRouter{ - subscription: make(chan PutNotify), // unbuffering channel is critical to ensure that secondary bottlenecks don't impact /put latency for eigenda blob dispersals - log: log, - m: m, - caches: caches, - fallbacks: fallbacks, + topic: make(chan PutNotify), // yes channel is un-buffered which dispersing consumption across routines helps alleviate + log: log, + m: m, + caches: caches, + fallbacks: fallbacks, + verifyLock: sync.RWMutex{}, } } -func (r *SecondaryRouter) Subscription() chan<- PutNotify { - return r.subscription +// Topic ... +func (r *SecondaryRouter) Topic() chan<- PutNotify { + return r.topic } func (r *SecondaryRouter) Enabled() bool { @@ -78,13 +91,17 @@ func (r *SecondaryRouter) HandleRedundantWrites(ctx context.Context, commitment for _, src := range sources { cb := r.m.RecordSecondaryRequest(src.BackendType().String(), http.MethodPut) - err := src.Put(ctx, key, value) + // for added safety - we retry the insertion 10x times using an exponential backoff + _, err := retry.Do[any](ctx, 10, retry.Exponential(), + func() (any, error) { + return 0, src.Put(ctx, key, value) // this implementation assumes that all secondary clients are thread safe + }) if err != nil { r.log.Warn("Failed to write to redundant target", "backend", src.BackendType(), "err", err) - cb("failure") + cb(Failed) } else { successes++ - cb("success") + cb(Success) } } @@ -95,11 +112,11 @@ func (r *SecondaryRouter) HandleRedundantWrites(ctx context.Context, commitment return nil } -// WriteLoop ... waits for notifications published to subscription channel to make backend writes -func (r *SecondaryRouter) WriteLoop(ctx context.Context) { +// WriteSubscriptionLoop ... subscribes to put notifications posted to shared topic with primary router +func (r *SecondaryRouter) WriteSubscriptionLoop(ctx context.Context) { for { select { - case notif := <-r.subscription: + case notif := <-r.topic: err := r.HandleRedundantWrites(context.Background(), notif.Commitment, notif.Value) if err != nil { r.log.Error("Failed to write to redundant targets", "err", err) @@ -128,13 +145,13 @@ func (r *SecondaryRouter) MultiSourceRead(ctx context.Context, commitment []byte cb := r.m.RecordSecondaryRequest(src.BackendType().String(), http.MethodGet) data, err := src.Get(ctx, key) if err != nil { - cb("failure") + cb(Failed) r.log.Warn("Failed to read from redundant target", "backend", src.BackendType(), "err", err) continue } if data == nil { - cb("miss") + cb(Miss) r.log.Debug("No data found in redundant target", "backend", src.BackendType()) continue } @@ -143,13 +160,13 @@ func (r *SecondaryRouter) MultiSourceRead(ctx context.Context, commitment []byte r.verifyLock.Lock() err = verify(commitment, data) if err != nil { - cb("failure") + cb(Failed) log.Warn("Failed to verify blob", "err", err, "backend", src.BackendType()) r.verifyLock.Unlock() continue } r.verifyLock.Unlock() - cb("success") + cb(Success) return data, nil } return nil, errors.New("no data found in any redundant backend") diff --git a/store/store.go b/store/store.go index 74de0da..d779edb 100644 --- a/store/store.go +++ b/store/store.go @@ -65,7 +65,6 @@ type Stats struct { } type Store interface { - // Backend returns the backend type provider of the store. BackendType() BackendType // Verify verifies the given key-value pair. diff --git a/utils/atomic.go b/utils/atomic.go new file mode 100644 index 0000000..295736b --- /dev/null +++ b/utils/atomic.go @@ -0,0 +1,29 @@ +package utils + +import ( + "sync" +) + +type AtomicRef[T any] struct { + value T + rwMutex *sync.RWMutex +} + +func NewAtomicRef[T any](v T) *AtomicRef[T] { + return &AtomicRef[T]{ + value: v, + rwMutex: &sync.RWMutex{}, + } +} + +func (ar *AtomicRef[T]) Update(newValue T) { + ar.rwMutex.Lock() + ar.value = newValue + ar.rwMutex.Unlock() +} + +func (ar *AtomicRef[T]) Value() T { + ar.rwMutex.RLock() + defer ar.rwMutex.RUnlock() + return ar.value +} diff --git a/utils/utils_test.go b/utils/utils_test.go index 0217758..53ce416 100644 --- a/utils/utils_test.go +++ b/utils/utils_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/Layr-Labs/eigenda-proxy/utils" + "github.com/stretchr/testify/require" ) func TestParseByteAmount(t *testing.T) { @@ -56,3 +57,12 @@ func TestParseByteAmount(t *testing.T) { }) } } + +func TestAtomicRefWithInt(t *testing.T) { + expected := 69 + + ref := utils.NewAtomicRef[int](expected) + actual := ref.Value() + + require.Equal(t, expected, actual) +}