diff --git a/Makefile b/Makefile index e4ab758..bb279eb 100644 --- a/Makefile +++ b/Makefile @@ -85,10 +85,6 @@ install-lint: @echo "Installing golangci-lint..." @sh -c $(GET_LINT_CMD) -gosec: - @echo "Running security scan with gosec..." - gosec ./... - submodules: git submodule update --init --recursive diff --git a/README.md b/README.md index ef737be..b801625 100644 --- a/README.md +++ b/README.md @@ -208,7 +208,11 @@ The `raw commitment` is an RLP-encoded [EigenDA certificate](https://github.com/ ### Unit -Unit tests can be ran via invoking `make test`. +Unit tests can be ran via invoking `make test`. Please make sure to have all test containers downloaded locally before running via: +``` +docker pull redis +docker pull minio +``` ### Holesky diff --git a/cmd/server/entrypoint.go b/cmd/server/entrypoint.go index effd5d8..e086302 100644 --- a/cmd/server/entrypoint.go +++ b/cmd/server/entrypoint.go @@ -29,14 +29,15 @@ func StartProxySvr(cliCtx *cli.Context) error { return fmt.Errorf("failed to pretty print config: %w", err) } + m := metrics.NewMetrics("default") + ctx, ctxCancel := context.WithCancel(cliCtx.Context) defer ctxCancel() - daRouter, err := server.LoadStoreRouter(ctx, cfg, log) + daRouter, err := server.LoadStoreRouter(ctx, cfg, log, m) if err != nil { return fmt.Errorf("failed to create store: %w", err) } - m := metrics.NewMetrics("default") server := server.NewServer(cliCtx.String(flags.ListenAddrFlagName), cliCtx.Int(flags.PortFlagName), daRouter, log, m) if err := server.Start(); err != nil { diff --git a/commitments/mode.go b/commitments/mode.go index ef106ad..b195a1e 100644 --- a/commitments/mode.go +++ b/commitments/mode.go @@ -8,7 +8,7 @@ import ( type CommitmentMeta struct { Mode CommitmentMode // CertVersion is shared for all modes and denotes version of the EigenDA certificate - CertVersion byte + CertVersion uint8 } type CommitmentMode string diff --git a/e2e/optimism_test.go b/e2e/optimism_test.go index 270fd8b..29b8b30 100644 --- a/e2e/optimism_test.go +++ b/e2e/optimism_test.go @@ -3,7 +3,9 @@ package e2e_test import ( "testing" + "github.com/Layr-Labs/eigenda-proxy/commitments" "github.com/Layr-Labs/eigenda-proxy/e2e" + "github.com/Layr-Labs/eigenda-proxy/metrics" altda "github.com/ethereum-optimism/optimism/op-alt-da" "github.com/ethereum-optimism/optimism/op-e2e/actions" "github.com/ethereum-optimism/optimism/op-e2e/config" @@ -166,11 +168,18 @@ func TestOptimismKeccak256Commitment(gt *testing.T) { optimism.sequencer.ActL2PipelineFull(t) optimism.ActL1Finalized(t) - // assert that EigenDA proxy's was written and read from - stat := proxyTS.Server.GetS3Stats() + // assert that keccak256 primary store was written and read from + labels := metrics.BuildServerRPCLabels("put", "", string(commitments.OptimismKeccak), "0") + delete(labels, "method") + + ms, err := proxyTS.MetricPoller.PollCountMetricsWithRetry(metrics.ServerRPCStatuses, labels, 20) + require.NoError(t, err) + require.NotEmpty(t, ms) + require.Len(t, ms, 2) + + require.True(t, ms[0].Count > 0) + require.True(t, ms[1].Count > 0) - require.Equal(t, 1, stat.Entries) - require.Equal(t, 1, stat.Reads) } func TestOptimismGenericCommitment(gt *testing.T) { @@ -222,9 +231,15 @@ func TestOptimismGenericCommitment(gt *testing.T) { // assert that EigenDA proxy's was written and read from - if useMemory() { - stat := proxyTS.Server.GetEigenDAStats() - require.Equal(t, 1, stat.Entries) - require.Equal(t, 1, stat.Reads) - } + // assert that EigenDA's primary store was written and read from + labels := metrics.BuildServerRPCLabels("put", "", string(commitments.OptimismGeneric), "0") + delete(labels, "method") + + ms, err := proxyTS.MetricPoller.PollCountMetricsWithRetry(metrics.ServerRPCStatuses, labels, 20) + require.NoError(t, err) + require.NotEmpty(t, ms) + require.Len(t, ms, 2) + + require.True(t, ms[0].Count > 0) + require.True(t, ms[1].Count > 0) } diff --git a/e2e/server_test.go b/e2e/server_test.go index f7a643e..a36afb1 100644 --- a/e2e/server_test.go +++ b/e2e/server_test.go @@ -8,9 +8,10 @@ import ( "time" "github.com/Layr-Labs/eigenda-proxy/client" + "github.com/Layr-Labs/eigenda-proxy/metrics" + "github.com/Layr-Labs/eigenda-proxy/store" "github.com/Layr-Labs/eigenda-proxy/e2e" - "github.com/Layr-Labs/eigenda-proxy/store" altda "github.com/ethereum-optimism/optimism/op-alt-da" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -350,9 +351,13 @@ func TestProxyServerCaching(t *testing.T) { require.Equal(t, testPreimage, preimage) // ensure that read was from cache - s3Stats := ts.Server.GetS3Stats() - require.Equal(t, 1, s3Stats.Reads) - require.Equal(t, 1, s3Stats.Entries) + labels := metrics.BuildSecondaryCountLabels(store.S3BackendType.String(), http.MethodGet, "success") + + ms, err := ts.MetricPoller.PollCountMetricsWithRetry(metrics.SecondaryRequestStatuses, labels, 20) + require.NoError(t, err) + require.Len(t, ms, 1) + + require.True(t, ms[0].Count > 0) if useMemory() { // ensure that eigenda was not read from memStats := ts.Server.GetEigenDAStats() @@ -393,12 +398,14 @@ func TestProxyServerCachingWithRedis(t *testing.T) { require.Equal(t, testPreimage, preimage) // ensure that read was from cache - redStats, err := ts.Server.GetStoreStats(store.RedisBackendType) + labels := metrics.BuildSecondaryCountLabels(store.RedisBackendType.String(), http.MethodGet, "success") + ms, err := ts.MetricPoller.PollCountMetricsWithRetry(metrics.SecondaryRequestStatuses, labels, 20) require.NoError(t, err) + require.NotEmpty(t, ms) + require.Len(t, ms, 1) + require.True(t, ms[0].Count >= 1) - require.Equal(t, 1, redStats.Reads) - require.Equal(t, 1, redStats.Entries) - + // TODO: Add metrics for EigenDA dispersal/retrieval if useMemory() { // ensure that eigenda was not read from memStats := ts.Server.GetEigenDAStats() require.Equal(t, 0, memStats.Reads) @@ -420,6 +427,7 @@ func TestProxyServerReadFallback(t *testing.T) { t.Parallel() + // setup server with S3 as a fallback option testCfg := e2e.TestConfig(useMemory()) testCfg.UseS3Fallback = true testCfg.Expiration = time.Millisecond * 1 @@ -447,11 +455,16 @@ func TestProxyServerReadFallback(t *testing.T) { require.NoError(t, err) require.Equal(t, testPreimage, preimage) - // ensure that read was from fallback target location (i.e, S3 for this test) - s3Stats := ts.Server.GetS3Stats() - require.Equal(t, 1, s3Stats.Reads) - require.Equal(t, 1, s3Stats.Entries) + labels := metrics.BuildSecondaryCountLabels(store.S3BackendType.String(), http.MethodGet, "success") + + ms, err := ts.MetricPoller.PollCountMetricsWithRetry(metrics.SecondaryRequestStatuses, labels, 20) + require.NoError(t, err) + require.NotEmpty(t, ms) + require.Len(t, ms, 1) + + require.True(t, ms[0].Count > 0) + // TODO - remove this in favor of metrics sampling if useMemory() { // ensure that an eigenda read was attempted with zero data available memStats := ts.Server.GetEigenDAStats() require.Equal(t, 1, memStats.Reads) diff --git a/e2e/setup.go b/e2e/setup.go index 0510f98..d56ba13 100644 --- a/e2e/setup.go +++ b/e2e/setup.go @@ -22,6 +22,7 @@ import ( "github.com/minio/minio-go/v7/pkg/credentials" "golang.org/x/exp/rand" + "github.com/ethereum-optimism/optimism/op-service/httputil" oplog "github.com/ethereum-optimism/optimism/op-service/log" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" @@ -77,7 +78,6 @@ func createS3Config(eigendaCfg server.Config) server.CLIConfig { createS3Bucket(bucketName) eigendaCfg.S3Config = s3.Config{ - Profiling: true, Bucket: bucketName, Path: "", Endpoint: "localhost:4566", @@ -175,9 +175,11 @@ func TestSuiteConfig(t *testing.T, testCfg *Cfg) server.CLIConfig { } type TestSuite struct { - Ctx context.Context - Log log.Logger - Server *server.Server + Ctx context.Context + Log log.Logger + Server *server.Server + MetricPoller *metrics.PollerClient + MetricSvr *httputil.HTTPServer } func CreateTestSuite(t *testing.T, testSuiteCfg server.CLIConfig) (TestSuite, func()) { @@ -188,28 +190,43 @@ func CreateTestSuite(t *testing.T, testSuiteCfg server.CLIConfig) (TestSuite, fu }).New("role", svcName) ctx := context.Background() + m := metrics.NewMetrics("default") store, err := server.LoadStoreRouter( ctx, testSuiteCfg, log, + m, ) + require.NoError(t, err) - server := server.NewServer(host, 0, store, log, metrics.NoopMetrics) + proxySvr := server.NewServer(host, 0, store, log, m) t.Log("Starting proxy server...") - err = server.Start() + err = proxySvr.Start() + require.NoError(t, err) + + metricsSvr, err := m.StartServer(host, 0) + t.Log("Starting metrics server...") + require.NoError(t, err) kill := func() { - if err := server.Stop(); err != nil { - panic(err) + if err := proxySvr.Stop(); err != nil { + log.Error("failed to stop proxy server", "err", err) + } + + if err := metricsSvr.Stop(context.Background()); err != nil { + log.Error("failed to stop metrics server", "err", err) } } + log.Info("started metrics server", "addr", metricsSvr.Addr()) return TestSuite{ - Ctx: ctx, - Log: log, - Server: server, + Ctx: ctx, + Log: log, + Server: proxySvr, + MetricPoller: metrics.NewPoller(fmt.Sprintf("http://%s", metricsSvr.Addr().String())), + MetricSvr: metricsSvr, }, kill } diff --git a/metrics/metrics.go b/metrics/metrics.go index a115826..a9855e2 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -1,6 +1,7 @@ package metrics import ( + "fmt" "net" "strconv" @@ -15,6 +16,7 @@ import ( const ( namespace = "eigenda_proxy" httpServerSubsystem = "http_server" + secondarySubsystem = "secondary" ) // Config ... Metrics server configuration @@ -29,7 +31,9 @@ type Config struct { type Metricer interface { RecordInfo(version string) RecordUp() - RecordRPCServerRequest(method string) func(status string, commitmentMode string, version string) + + RecordRPCServerRequest(method string) func(status string, mode string, ver string) + RecordSecondaryRequest(bt string, method string) func(status string) Document() []metrics.DocumentedMetric } @@ -39,10 +43,15 @@ type Metrics struct { Info *prometheus.GaugeVec Up prometheus.Gauge + // server metrics HTTPServerRequestsTotal *prometheus.CounterVec HTTPServerBadRequestHeader *prometheus.CounterVec HTTPServerRequestDurationSeconds *prometheus.HistogramVec + // secondary metrics + SecondaryRequestsTotal *prometheus.CounterVec + SecondaryRequestDurationSec *prometheus.HistogramVec + registry *prometheus.Registry factory metrics.Factory } @@ -80,7 +89,7 @@ func NewMetrics(subsystem string) *Metrics { Name: "requests_total", Help: "Total requests to the HTTP server", }, []string{ - "method", "status", "commitment_mode", "DA_cert_version", + "method", "status", "commitment_mode", "cert_version", }), HTTPServerBadRequestHeader: factory.NewCounterVec(prometheus.CounterOpts{ Namespace: namespace, @@ -101,6 +110,23 @@ func NewMetrics(subsystem string) *Metrics { }, []string{ "method", // no status on histograms because those are very expensive }), + SecondaryRequestsTotal: factory.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: secondarySubsystem, + Name: "requests_total", + Help: "Total requests to the secondary storage", + }, []string{ + "backend_type", "method", "status", + }), + SecondaryRequestDurationSec: factory.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: secondarySubsystem, + Name: "request_duration_seconds", + Buckets: prometheus.ExponentialBucketsRange(0.05, 1200, 20), + Help: "Histogram of secondary storage request durations", + }, []string{ + "backend_type", + }), registry: registry, factory: factory, } @@ -121,7 +147,7 @@ func (m *Metrics) RecordUp() { // RecordRPCServerRequest is a helper method to record an incoming HTTP request. // It bumps the requests metric, and tracks how long it takes to serve a response, // including the HTTP status code. -func (m *Metrics) RecordRPCServerRequest(method string) func(status string, mode string, ver string) { +func (m *Metrics) RecordRPCServerRequest(method string) func(status, mode, ver string) { // we don't want to track the status code on the histogram because that would // create a huge number of labels, and cost a lot on cloud hosted services timer := prometheus.NewTimer(m.HTTPServerRequestDurationSeconds.WithLabelValues(method)) @@ -131,13 +157,31 @@ func (m *Metrics) RecordRPCServerRequest(method string) func(status string, mode } } +// RecordSecondaryPut records a secondary put/get operation. +func (m *Metrics) RecordSecondaryRequest(bt string, method string) func(status string) { + timer := prometheus.NewTimer(m.SecondaryRequestDurationSec.WithLabelValues(bt)) + + return func(status string) { + m.SecondaryRequestsTotal.WithLabelValues(bt, method, status).Inc() + timer.ObserveDuration() + } +} + // StartServer starts the metrics server on the given hostname and port. +// If port is 0, it automatically assigns an available port and returns the actual port. func (m *Metrics) StartServer(hostname string, port int) (*ophttp.HTTPServer, error) { - addr := net.JoinHostPort(hostname, strconv.Itoa(port)) + address := net.JoinHostPort(hostname, strconv.Itoa(port)) + h := promhttp.InstrumentMetricHandler( m.registry, promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{}), ) - return ophttp.StartHTTPServer(addr, h) + + server, err := ophttp.StartHTTPServer(address, h) + if err != nil { + return nil, fmt.Errorf("failed to start HTTP server: %w", err) + } + + return server, nil } func (m *Metrics) Document() []metrics.DocumentedMetric { @@ -162,3 +206,7 @@ func (n *noopMetricer) RecordUp() { func (n *noopMetricer) RecordRPCServerRequest(string) func(status, mode, ver string) { return func(string, string, string) {} } + +func (n *noopMetricer) RecordSecondaryRequest(string, string) func(status string) { + return func(string) {} +} diff --git a/metrics/poller.go b/metrics/poller.go new file mode 100644 index 0000000..020d51a --- /dev/null +++ b/metrics/poller.go @@ -0,0 +1,189 @@ +package metrics + +import ( + "context" + "fmt" + "io" + "net/http" + "regexp" + "strconv" + "strings" + "time" +) + +/* + NOTE: This poller is only used for E2E testing and is unrecommended for any general application usage within EigenDA proxy +*/ + +type MetricKey string + +const ( + ServerRPCStatuses MetricKey = "eigenda_proxy_http_server_requests_total" + SecondaryRequestStatuses MetricKey = "eigenda_proxy_secondary_requests_total" +) + +// MetricWithCount represents a metric with labels (key-value pairs) and a count +type MetricWithCount struct { + Name string `json:"name"` + Labels map[string]string `json:"labels"` // used for filtering + Count int `json:"count"` +} + +func parseCountMetric(input string) (MetricWithCount, error) { + // Regular expression to match the metric name, key-value pairs, and count + re := regexp.MustCompile(`^(\w+)\{([^}]*)\}\s+(\d+)$`) + match := re.FindStringSubmatch(input) + + if len(match) != 4 { + return MetricWithCount{}, fmt.Errorf("invalid count metric format") + } + + // Extract the name and count + name := match[1] + labelsString := match[2] + count, err := strconv.Atoi(match[3]) + if err != nil { + return MetricWithCount{}, fmt.Errorf("invalid count value read from metric line: %w", err) + } + + // Extract the labels (key-value pairs) from the second capture group + labelsRe := regexp.MustCompile(`(\w+)="([^"]+)"`) + labelsMatches := labelsRe.FindAllStringSubmatch(labelsString, -1) + + labels := make(map[string]string) + for _, labelMatch := range labelsMatches { + key := labelMatch[1] + value := labelMatch[2] + labels[key] = value + } + + // Return the parsed metric with labels and count + return MetricWithCount{ + Name: name, + Labels: labels, + Count: count, + }, nil +} + +// PollerClient ... used to poll metrics from server +// used in E2E testing to assert client->server interactions +type PollerClient struct { + address string + client *http.Client +} + +// NewPoller ... initializer +func NewPoller(address string) *PollerClient { + return &PollerClient{ + address: address, + client: &http.Client{}, + } +} + +// BuildSecondaryCountLabels ... builds label mapping used to query for secondary storage count metrics +func BuildSecondaryCountLabels(backendType, method, status string) map[string]string { + return map[string]string{ + "backend_type": backendType, + "method": method, + "status": status, + } +} + +// BuildServerRPCLabels ... builds label mapping used to query for standard http server count metrics +func BuildServerRPCLabels(method, status, commitmentMode, certVersion string) map[string]string { + return map[string]string{ + "method": method, + "status": status, + "commitment_mode": commitmentMode, + "cert_version": certVersion, + } +} + +type MetricSlice []*MetricWithCount + +func hasMetric(line string, labels map[string]string) bool { + for label, value := range labels { + if !strings.Contains(line, label) { + return false + } + + if !strings.Contains(line, value) { + return false + } + } + + return true +} + +// PollCountMetricsWithRetry ... Polls for a Count Metric using a simple retry strategy of 1 second sleep x times +// keeping this non-modular is ok since this is only used for testing +func (m *PollerClient) PollCountMetricsWithRetry(name MetricKey, labels map[string]string, times int) (MetricSlice, error) { + var ms MetricSlice + var err error + + for i := 0; i < times; i++ { + ms, err = m.PollCountMetrics(name, labels) + if err != nil { + time.Sleep(time.Second * 1) + continue + } + + return ms, nil + } + return nil, err +} + +// PollMetrics ... polls metrics from the given address and does a linear search +// provided the metric name +// assumes 1 metric to key mapping +func (m *PollerClient) PollCountMetrics(name MetricKey, labels map[string]string) (MetricSlice, error) { + str, err := m.fetchMetrics() + if err != nil { + return nil, err + } + + entries := []*MetricWithCount{} + + lines := strings.Split(str, "\n") + for _, line := range lines { + if strings.HasPrefix(line, string(name)) && hasMetric(line, labels) { + mc, err := parseCountMetric(line) + if err != nil { + return nil, err + } + + entries = append(entries, &mc) + } + } + + if len(entries) == 0 { + return nil, fmt.Errorf("no entries found for metric: %s", name) + } + + return entries, nil +} + +// fetchMetrics ... reads metrics server endpoint contents into string +func (m *PollerClient) fetchMetrics() (string, error) { + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, m.address, nil) + if err != nil { + return "", fmt.Errorf("error creating request: %w", err) + } + + resp, err := m.client.Do(req) + if err != nil { + return "", fmt.Errorf("error polling metrics: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("received non-200 status code: %d", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", fmt.Errorf("error reading response body: %w", err) + } + + return string(body), nil +} diff --git a/server/load_store.go b/server/load_store.go index e126843..e9e7029 100644 --- a/server/load_store.go +++ b/server/load_store.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/Layr-Labs/eigenda-proxy/metrics" "github.com/Layr-Labs/eigenda-proxy/store" "github.com/Layr-Labs/eigenda-proxy/store/generated_key/eigenda" "github.com/Layr-Labs/eigenda-proxy/store/generated_key/memstore" @@ -49,7 +50,7 @@ func populateTargets(targets []string, s3 store.PrecomputedKeyStore, redis *redi } // LoadStoreRouter ... creates storage backend clients and instruments them into a storage routing abstraction -func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger) (store.IRouter, error) { +func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger, m metrics.Metricer) (store.IRouter, error) { // create S3 backend store (if enabled) var err error var s3Store store.PrecomputedKeyStore @@ -90,7 +91,7 @@ func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger) (store. // create EigenDA backend store var eigenDA store.GeneratedKeyStore if cfg.EigenDAConfig.MemstoreEnabled { - log.Info("Using mem-store backend for EigenDA") + log.Info("Using memstore backend for EigenDA") eigenDA, err = memstore.New(ctx, verifier, log, cfg.EigenDAConfig.MemstoreConfig) } else { var client *clients.EigenDAClient @@ -116,10 +117,20 @@ func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger) (store. return nil, err } - // determine read fallbacks + // create secondary storage router fallbacks := populateTargets(cfg.EigenDAConfig.FallbackTargets, s3Store, redisStore) caches := populateTargets(cfg.EigenDAConfig.CacheTargets, s3Store, redisStore) + secondary := store.NewSecondaryRouter(log, m, caches, fallbacks) + + if secondary.Enabled() { // only spin-up go routines if secondary storage is enabled + // NOTE: in the future the number of threads could be made configurable via env + log.Debug("Starting secondary write loop") + + for i := 0; i < 5; i++ { + go secondary.WriteSubscriptionLoop(ctx) + } + } log.Info("Creating storage router", "eigenda backend type", eigenDA != nil, "s3 backend type", s3Store != nil) - return store.NewRouter(eigenDA, s3Store, log, caches, fallbacks) + return store.NewRouter(eigenDA, s3Store, log, secondary) } diff --git a/server/server.go b/server/server.go index 84662d1..8eda747 100644 --- a/server/server.go +++ b/server/server.go @@ -69,6 +69,7 @@ func WithMetrics( if err != nil { var metaErr MetaError if errors.As(err, &metaErr) { + // TODO: Figure out why status is defaulting to "" recordDur(w.Header().Get("status"), string(metaErr.Meta.Mode), string(metaErr.Meta.CertVersion)) } else { recordDur(w.Header().Get("status"), string("NoCommitmentMode"), string("NoCertVersion")) @@ -76,7 +77,7 @@ func WithMetrics( return err } // we assume that every route will set the status header - recordDur(w.Header().Get("status"), string(meta.Mode), string(meta.CertVersion)) + recordDur(w.Header().Get("status"), string(meta.Mode), strconv.Itoa(int(meta.CertVersion))) return nil } } @@ -378,25 +379,3 @@ func ReadCommitmentVersion(r *http.Request, mode commitments.CommitmentMode) (by func (svr *Server) GetEigenDAStats() *store.Stats { return svr.router.GetEigenDAStore().Stats() } - -func (svr *Server) GetS3Stats() *store.Stats { - return svr.router.GetS3Store().Stats() -} - -func (svr *Server) GetStoreStats(bt store.BackendType) (*store.Stats, error) { - // first check if the store is a cache - for _, cache := range svr.router.Caches() { - if cache.BackendType() == bt { - return cache.Stats(), nil - } - } - - // then check if the store is a fallback - for _, fallback := range svr.router.Fallbacks() { - if fallback.BackendType() == bt { - return fallback.Stats(), nil - } - } - - return nil, fmt.Errorf("store not found") -} diff --git a/store/generated_key/memstore/memstore.go b/store/generated_key/memstore/memstore.go index a25eac6..b0dd7ab 100644 --- a/store/generated_key/memstore/memstore.go +++ b/store/generated_key/memstore/memstore.go @@ -134,18 +134,18 @@ func (e *MemStore) Get(_ context.Context, commit []byte) ([]byte, error) { // Put inserts a value into the store. func (e *MemStore) Put(_ context.Context, value []byte) ([]byte, error) { time.Sleep(e.config.PutLatency) - if uint64(len(value)) > e.config.MaxBlobSizeBytes { + encodedVal, err := e.codec.EncodeBlob(value) + if err != nil { + return nil, err + } + + if uint64(len(encodedVal)) > e.config.MaxBlobSizeBytes { return nil, fmt.Errorf("%w: blob length %d, max blob size %d", store.ErrProxyOversizedBlob, len(value), e.config.MaxBlobSizeBytes) } e.Lock() defer e.Unlock() - encodedVal, err := e.codec.EncodeBlob(value) - if err != nil { - return nil, err - } - commitment, err := e.verifier.Commit(encodedVal) if err != nil { return nil, err diff --git a/store/precomputed_key/redis/redis.go b/store/precomputed_key/redis/redis.go index 6b2975a..a90b202 100644 --- a/store/precomputed_key/redis/redis.go +++ b/store/precomputed_key/redis/redis.go @@ -19,15 +19,12 @@ type Config struct { Profile bool } -// Store ... Redis storage backend implementation (This not safe for concurrent usage) +// Store ... Redis storage backend implementation +// go-redis client is safe for concurrent usage: https://github.com/redis/go-redis/blob/v8.11.5/redis.go#L535-L544 type Store struct { eviction time.Duration client *redis.Client - - profile bool - reads int - entries int } var _ store.PrecomputedKeyStore = (*Store)(nil) @@ -52,8 +49,6 @@ func NewStore(cfg *Config) (*Store, error) { return &Store{ eviction: cfg.Eviction, client: client, - profile: cfg.Profile, - reads: 0, }, nil } @@ -67,22 +62,13 @@ func (r *Store) Get(ctx context.Context, key []byte) ([]byte, error) { return nil, err } - if r.profile { - r.reads++ - } - // cast value to byte slice return []byte(value), nil } // Put ... inserts a value into the Redis store func (r *Store) Put(ctx context.Context, key []byte, value []byte) error { - err := r.client.Set(ctx, string(key), string(value), r.eviction).Err() - if err == nil && r.profile { - r.entries++ - } - - return err + return r.client.Set(ctx, string(key), string(value), r.eviction).Err() } func (r *Store) Verify(_ []byte, _ []byte) error { @@ -92,10 +78,3 @@ func (r *Store) Verify(_ []byte, _ []byte) error { func (r *Store) BackendType() store.BackendType { return store.RedisBackendType } - -func (r *Store) Stats() *store.Stats { - return &store.Stats{ - Entries: r.entries, - Reads: r.reads, - } -} diff --git a/store/precomputed_key/s3/cli.go b/store/precomputed_key/s3/cli.go index 1a42dd1..7f120d5 100644 --- a/store/precomputed_key/s3/cli.go +++ b/store/precomputed_key/s3/cli.go @@ -1,8 +1,6 @@ package s3 import ( - "time" - "github.com/urfave/cli/v2" ) @@ -80,13 +78,13 @@ func CLIFlags(envPrefix, category string) []cli.Flag { EnvVars: withEnvPrefix(envPrefix, "BACKUP"), Category: category, }, - &cli.DurationFlag{ - Name: TimeoutFlagName, - Usage: "timeout for S3 storage operations (e.g. get, put)", - Value: 5 * time.Second, - EnvVars: withEnvPrefix(envPrefix, "TIMEOUT"), - Category: category, - }, + // &cli.DurationFlag{ + // Name: TimeoutFlagName, + // Usage: "timeout for S3 storage operations (e.g. get, put)", + // Value: 5 * time.Second, + // EnvVars: withEnvPrefix(envPrefix, "TIMEOUT"), + // Category: category, + // }, } } @@ -100,6 +98,6 @@ func ReadConfig(ctx *cli.Context) Config { Bucket: ctx.String(BucketFlagName), Path: ctx.String(PathFlagName), Backup: ctx.Bool(BackupFlagName), - Timeout: ctx.Duration(TimeoutFlagName), + // Timeout: ctx.Duration(TimeoutFlagName), } } diff --git a/store/precomputed_key/s3/s3.go b/store/precomputed_key/s3/s3.go index f694910..20fe276 100644 --- a/store/precomputed_key/s3/s3.go +++ b/store/precomputed_key/s3/s3.go @@ -5,10 +5,10 @@ import ( "context" "encoding/hex" "errors" + "fmt" "io" "path" "strings" - "time" "github.com/Layr-Labs/eigenda-proxy/store" "github.com/ethereum/go-ethereum/crypto" @@ -46,15 +46,14 @@ type Config struct { Bucket string Path string Backup bool - Timeout time.Duration - Profiling bool } +// Store ... S3 store +// client safe for concurrent use: https://github.com/minio/minio-go/issues/598#issuecomment-569457863 type Store struct { cfg Config client *minio.Client putObjectOptions minio.PutObjectOptions - stats *store.Stats } func isGoogleEndpoint(endpoint string) bool { @@ -79,10 +78,6 @@ func NewS3(cfg Config) (*Store, error) { cfg: cfg, client: client, putObjectOptions: putObjectOptions, - stats: &store.Stats{ - Entries: 0, - Reads: 0, - }, }, nil } @@ -101,10 +96,6 @@ func (s *Store) Get(ctx context.Context, key []byte) ([]byte, error) { return nil, err } - if s.cfg.Profiling { - s.stats.Reads++ - } - return data, nil } @@ -114,26 +105,18 @@ func (s *Store) Put(ctx context.Context, key []byte, value []byte) error { return err } - if s.cfg.Profiling { - s.stats.Entries++ - } - return nil } func (s *Store) Verify(key []byte, value []byte) error { h := crypto.Keccak256Hash(value) if !bytes.Equal(h[:], key) { - return errors.New("key does not match value") + return fmt.Errorf("key does not match value, expected: %s got: %s", hex.EncodeToString(key), h.Hex()) } return nil } -func (s *Store) Stats() *store.Stats { - return s.stats -} - func (s *Store) BackendType() store.BackendType { return store.S3BackendType } diff --git a/store/router.go b/store/router.go index c0e2436..e2a537d 100644 --- a/store/router.go +++ b/store/router.go @@ -6,10 +6,8 @@ import ( "context" "errors" "fmt" - "sync" "github.com/Layr-Labs/eigenda-proxy/commitments" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" ) @@ -18,34 +16,27 @@ type IRouter interface { Put(ctx context.Context, cm commitments.CommitmentMode, key, value []byte) ([]byte, error) GetEigenDAStore() GeneratedKeyStore - GetS3Store() PrecomputedKeyStore - Caches() []PrecomputedKeyStore - Fallbacks() []PrecomputedKeyStore } // Router ... storage backend routing layer type Router struct { - log log.Logger - eigenda GeneratedKeyStore - s3 PrecomputedKeyStore + log log.Logger + // primary storage backends + eigenda GeneratedKeyStore // ALT DA commitment type for OP mode && simple commitment mode for standard /client + s3 PrecomputedKeyStore // OP commitment mode && keccak256 commitment type - caches []PrecomputedKeyStore - cacheLock sync.RWMutex - - fallbacks []PrecomputedKeyStore - fallbackLock sync.RWMutex + // secondary storage backends (caching and fallbacks) + secondary ISecondary } +// NewRouter ... Init func NewRouter(eigenda GeneratedKeyStore, s3 PrecomputedKeyStore, l log.Logger, - caches []PrecomputedKeyStore, fallbacks []PrecomputedKeyStore) (IRouter, error) { + secondary ISecondary) (IRouter, error) { return &Router{ - log: l, - eigenda: eigenda, - s3: s3, - caches: caches, - cacheLock: sync.RWMutex{}, - fallbacks: fallbacks, - fallbackLock: sync.RWMutex{}, + log: l, + eigenda: eigenda, + s3: s3, + secondary: secondary, }, nil } @@ -76,9 +67,9 @@ func (r *Router) Get(ctx context.Context, key []byte, cm commitments.CommitmentM } // 1 - read blob from cache if enabled - if r.cacheEnabled() { + if r.secondary.CachingEnabled() { r.log.Debug("Retrieving data from cached backends") - data, err := r.multiSourceRead(ctx, key, false) + data, err := r.secondary.MultiSourceRead(ctx, key, false, r.eigenda.Verify) if err == nil { return data, nil } @@ -98,8 +89,8 @@ func (r *Router) Get(ctx context.Context, key []byte, cm commitments.CommitmentM } // 3 - read blob from fallbacks if enabled and data is non-retrievable from EigenDA - if r.fallbackEnabled() { - data, err = r.multiSourceRead(ctx, key, true) + if r.secondary.FallbackEnabled() { + data, err = r.secondary.MultiSourceRead(ctx, key, true, r.eigenda.Verify) if err != nil { r.log.Error("Failed to read from fallback targets", "err", err) return nil, err @@ -133,92 +124,16 @@ func (r *Router) Put(ctx context.Context, cm commitments.CommitmentMode, key, va return nil, err } - if r.cacheEnabled() || r.fallbackEnabled() { - err = r.handleRedundantWrites(ctx, commit, value) - if err != nil { - log.Error("Failed to write to redundant backends", "err", err) + if r.secondary.Enabled() { // publish put notification to secondary's subscription on PutNotification topic + r.secondary.Topic() <- PutNotify{ + Commitment: commit, + Value: value, } } return commit, nil } -// handleRedundantWrites ... writes to both sets of backends (i.e, fallback, cache) -// and returns an error if NONE of them succeed -// NOTE: multi-target set writes are done at once to avoid re-invocation of the same write function at the same -// caller step for different target sets vs. reading which is done conditionally to segment between a cached read type -// vs a fallback read type -func (r *Router) handleRedundantWrites(ctx context.Context, commitment []byte, value []byte) error { - r.cacheLock.RLock() - r.fallbackLock.RLock() - - defer func() { - r.cacheLock.RUnlock() - r.fallbackLock.RUnlock() - }() - - sources := r.caches - sources = append(sources, r.fallbacks...) - - key := crypto.Keccak256(commitment) - successes := 0 - - for _, src := range sources { - err := src.Put(ctx, key, value) - if err != nil { - r.log.Warn("Failed to write to redundant target", "backend", src.BackendType(), "err", err) - } else { - successes++ - } - } - - if successes == 0 { - return errors.New("failed to write blob to any redundant targets") - } - - return nil -} - -// multiSourceRead ... reads from a set of backends and returns the first successfully read blob -func (r *Router) multiSourceRead(ctx context.Context, commitment []byte, fallback bool) ([]byte, error) { - var sources []PrecomputedKeyStore - if fallback { - r.fallbackLock.RLock() - defer r.fallbackLock.RUnlock() - - sources = r.fallbacks - } else { - r.cacheLock.RLock() - defer r.cacheLock.RUnlock() - - sources = r.caches - } - - key := crypto.Keccak256(commitment) - for _, src := range sources { - data, err := src.Get(ctx, key) - if err != nil { - r.log.Warn("Failed to read from redundant target", "backend", src.BackendType(), "err", err) - continue - } - - if data == nil { - r.log.Debug("No data found in redundant target", "backend", src.BackendType()) - continue - } - - // verify cert:data using EigenDA verification checks - err = r.eigenda.Verify(commitment, data) - if err != nil { - log.Warn("Failed to verify blob", "err", err, "backend", src.BackendType()) - continue - } - - return data, nil - } - return nil, errors.New("no data found in any redundant backend") -} - // putWithoutKey ... inserts a value into a storage backend that computes the key on-demand (i.e, EigenDA) func (r *Router) putWithoutKey(ctx context.Context, value []byte) ([]byte, error) { if r.eigenda != nil { @@ -243,30 +158,7 @@ func (r *Router) putWithKey(ctx context.Context, key []byte, value []byte) ([]by return key, r.s3.Put(ctx, key, value) } -func (r *Router) fallbackEnabled() bool { - return len(r.fallbacks) > 0 -} - -func (r *Router) cacheEnabled() bool { - return len(r.caches) > 0 -} - // GetEigenDAStore ... func (r *Router) GetEigenDAStore() GeneratedKeyStore { return r.eigenda } - -// GetS3Store ... -func (r *Router) GetS3Store() PrecomputedKeyStore { - return r.s3 -} - -// Caches ... -func (r *Router) Caches() []PrecomputedKeyStore { - return r.caches -} - -// Fallbacks ... -func (r *Router) Fallbacks() []PrecomputedKeyStore { - return r.fallbacks -} diff --git a/store/secondary.go b/store/secondary.go new file mode 100644 index 0000000..3e24d1f --- /dev/null +++ b/store/secondary.go @@ -0,0 +1,173 @@ +package store + +import ( + "context" + "errors" + "net/http" + "sync" + + "github.com/Layr-Labs/eigenda-proxy/metrics" + "github.com/ethereum-optimism/optimism/op-service/retry" + "github.com/ethereum/go-ethereum/crypto" + + "github.com/ethereum/go-ethereum/log" +) + +type MetricExpression = string + +const ( + Miss MetricExpression = "miss" + Success MetricExpression = "success" + Failed MetricExpression = "failed" +) + +type ISecondary interface { + Enabled() bool + Topic() chan<- PutNotify + CachingEnabled() bool + FallbackEnabled() bool + HandleRedundantWrites(ctx context.Context, commitment []byte, value []byte) error + MultiSourceRead(context.Context, []byte, bool, func([]byte, []byte) error) ([]byte, error) + WriteSubscriptionLoop(ctx context.Context) +} + +// PutNotify ... notification received by primary router to perform insertion across +// secondary storage backends +type PutNotify struct { + Commitment []byte + Value []byte +} + +// SecondaryRouter ... routing abstraction for secondary storage backends +type SecondaryRouter struct { + log log.Logger + m metrics.Metricer + + caches []PrecomputedKeyStore + fallbacks []PrecomputedKeyStore + + verifyLock sync.RWMutex + topic chan PutNotify +} + +// NewSecondaryRouter ... creates a new secondary storage router +func NewSecondaryRouter(log log.Logger, m metrics.Metricer, caches []PrecomputedKeyStore, fallbacks []PrecomputedKeyStore) ISecondary { + return &SecondaryRouter{ + topic: make(chan PutNotify), // yes channel is un-buffered which dispersing consumption across routines helps alleviate + log: log, + m: m, + caches: caches, + fallbacks: fallbacks, + verifyLock: sync.RWMutex{}, + } +} + +// Topic ... +func (r *SecondaryRouter) Topic() chan<- PutNotify { + return r.topic +} + +func (r *SecondaryRouter) Enabled() bool { + return r.CachingEnabled() || r.FallbackEnabled() +} + +func (r *SecondaryRouter) CachingEnabled() bool { + return len(r.caches) > 0 +} + +func (r *SecondaryRouter) FallbackEnabled() bool { + return len(r.fallbacks) > 0 +} + +// handleRedundantWrites ... writes to both sets of backends (i.e, fallback, cache) +// and returns an error if NONE of them succeed +func (r *SecondaryRouter) HandleRedundantWrites(ctx context.Context, commitment []byte, value []byte) error { + sources := r.caches + sources = append(sources, r.fallbacks...) + + key := crypto.Keccak256(commitment) + successes := 0 + + for _, src := range sources { + cb := r.m.RecordSecondaryRequest(src.BackendType().String(), http.MethodPut) + + // for added safety - we retry the insertion 10x times using an exponential backoff + _, err := retry.Do[any](ctx, 10, retry.Exponential(), + func() (any, error) { + return 0, src.Put(ctx, key, value) // this implementation assumes that all secondary clients are thread safe + }) + if err != nil { + r.log.Warn("Failed to write to redundant target", "backend", src.BackendType(), "err", err) + cb(Failed) + } else { + successes++ + cb(Success) + } + } + + if successes == 0 { + return errors.New("failed to write blob to any redundant targets") + } + + return nil +} + +// WriteSubscriptionLoop ... subscribes to put notifications posted to shared topic with primary router +func (r *SecondaryRouter) WriteSubscriptionLoop(ctx context.Context) { + for { + select { + case notif := <-r.topic: + err := r.HandleRedundantWrites(context.Background(), notif.Commitment, notif.Value) + if err != nil { + r.log.Error("Failed to write to redundant targets", "err", err) + } + + case <-ctx.Done(): + r.log.Debug("Terminating secondary event loop") + return + } + } +} + +// MultiSourceRead ... reads from a set of backends and returns the first successfully read blob +// NOTE: - this can also be parallelized when reading from multiple sources and discarding connections that fail +// - for complete optimization we can profile secondary storage backends to determine the fastest / most reliable and always rout to it first +func (r *SecondaryRouter) MultiSourceRead(ctx context.Context, commitment []byte, fallback bool, verify func([]byte, []byte) error) ([]byte, error) { + var sources []PrecomputedKeyStore + if fallback { + sources = r.fallbacks + } else { + sources = r.caches + } + + key := crypto.Keccak256(commitment) + for _, src := range sources { + cb := r.m.RecordSecondaryRequest(src.BackendType().String(), http.MethodGet) + data, err := src.Get(ctx, key) + if err != nil { + cb(Failed) + r.log.Warn("Failed to read from redundant target", "backend", src.BackendType(), "err", err) + continue + } + + if data == nil { + cb(Miss) + r.log.Debug("No data found in redundant target", "backend", src.BackendType()) + continue + } + + // verify cert:data using provided verification function + r.verifyLock.Lock() + err = verify(commitment, data) + if err != nil { + cb(Failed) + log.Warn("Failed to verify blob", "err", err, "backend", src.BackendType()) + r.verifyLock.Unlock() + continue + } + r.verifyLock.Unlock() + cb(Success) + return data, nil + } + return nil, errors.New("no data found in any redundant backend") +} diff --git a/store/store.go b/store/store.go index 0b75789..d779edb 100644 --- a/store/store.go +++ b/store/store.go @@ -65,8 +65,6 @@ type Stats struct { } type Store interface { - // Stats returns the current usage metrics of the key-value data store. - Stats() *Stats // Backend returns the backend type provider of the store. BackendType() BackendType // Verify verifies the given key-value pair. @@ -75,6 +73,8 @@ type Store interface { type GeneratedKeyStore interface { Store + // Stats returns the current usage metrics of the key-value data store. + Stats() *Stats // Get retrieves the given key if it's present in the key-value data store. Get(ctx context.Context, key []byte) ([]byte, error) // Put inserts the given value into the key-value data store. diff --git a/utils/atomic.go b/utils/atomic.go new file mode 100644 index 0000000..295736b --- /dev/null +++ b/utils/atomic.go @@ -0,0 +1,29 @@ +package utils + +import ( + "sync" +) + +type AtomicRef[T any] struct { + value T + rwMutex *sync.RWMutex +} + +func NewAtomicRef[T any](v T) *AtomicRef[T] { + return &AtomicRef[T]{ + value: v, + rwMutex: &sync.RWMutex{}, + } +} + +func (ar *AtomicRef[T]) Update(newValue T) { + ar.rwMutex.Lock() + ar.value = newValue + ar.rwMutex.Unlock() +} + +func (ar *AtomicRef[T]) Value() T { + ar.rwMutex.RLock() + defer ar.rwMutex.RUnlock() + return ar.value +} diff --git a/utils/utils.go b/utils/utils.go index aa83aa9..ab45810 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -2,10 +2,29 @@ package utils import ( "fmt" + "net" "strconv" "strings" ) +// FindRandomOpenPort returns a random open port +func FindRandomOpenPort() (int, error) { + // Listen on a random port + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return 0, fmt.Errorf("failed to find open port: %w", err) + } + defer listener.Close() + + // Get the assigned address, which includes the port + addr, ok := listener.Addr().(*net.TCPAddr) + if !ok { + return 0, fmt.Errorf("failed to cast listener address to TCPAddr") + } + + return addr.Port, nil +} + // Helper utility functions // func ContainsDuplicates[P comparable](s []P) bool { diff --git a/utils/utils_test.go b/utils/utils_test.go index 0217758..53ce416 100644 --- a/utils/utils_test.go +++ b/utils/utils_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/Layr-Labs/eigenda-proxy/utils" + "github.com/stretchr/testify/require" ) func TestParseByteAmount(t *testing.T) { @@ -56,3 +57,12 @@ func TestParseByteAmount(t *testing.T) { }) } } + +func TestAtomicRefWithInt(t *testing.T) { + expected := 69 + + ref := utils.NewAtomicRef[int](expected) + actual := ref.Value() + + require.Equal(t, expected, actual) +} diff --git a/verify/cli.go b/verify/cli.go index 848654b..f41135f 100644 --- a/verify/cli.go +++ b/verify/cli.go @@ -133,8 +133,8 @@ func CLIFlags(envPrefix, category string) []cli.Flag { } } +// MaxBlobLengthBytes ... there's def a better way to deal with this... perhaps a generic flag that can parse the string into a uint64? // this var is set by the action in the MaxBlobLengthFlagName flag -// TODO: there's def a better way to deal with this... perhaps a generic flag that can parse the string into a uint64? var MaxBlobLengthBytes uint64 func ReadConfig(ctx *cli.Context) Config {