Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Better abstract secondary storage #182

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
4 changes: 0 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ install-lint:
@echo "Installing golangci-lint..."
@sh -c $(GET_LINT_CMD)

gosec:
@echo "Running security scan with gosec..."
gosec ./...

submodules:
git submodule update --init --recursive

Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,11 @@ The `raw commitment` is an RLP-encoded [EigenDA certificate](https://github.com/

### Unit

Unit tests can be ran via invoking `make test`.
Unit tests can be ran via invoking `make test`. Please make sure to have all test containers downloaded locally before running via:
```
docker pull redis
docker pull minio
```
Comment on lines +211 to +215
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this really needed? I would think testcontainer would still pull the images when attempting to run them if they are not present locally?


### Holesky

Expand Down
5 changes: 3 additions & 2 deletions cmd/server/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ func StartProxySvr(cliCtx *cli.Context) error {
return fmt.Errorf("failed to pretty print config: %w", err)
}

m := metrics.NewMetrics("default")

ctx, ctxCancel := context.WithCancel(cliCtx.Context)
defer ctxCancel()

daRouter, err := server.LoadStoreRouter(ctx, cfg, log)
daRouter, err := server.LoadStoreRouter(ctx, cfg, log, m)
if err != nil {
return fmt.Errorf("failed to create store: %w", err)
}
m := metrics.NewMetrics("default")
server := server.NewServer(cliCtx.String(flags.ListenAddrFlagName), cliCtx.Int(flags.PortFlagName), daRouter, log, m)

if err := server.Start(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion commitments/mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
type CommitmentMeta struct {
Mode CommitmentMode
// CertVersion is shared for all modes and denotes version of the EigenDA certificate
CertVersion byte
CertVersion uint8
}

type CommitmentMode string
Expand Down
33 changes: 24 additions & 9 deletions e2e/optimism_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package e2e_test
import (
"testing"

"github.com/Layr-Labs/eigenda-proxy/commitments"
"github.com/Layr-Labs/eigenda-proxy/e2e"
"github.com/Layr-Labs/eigenda-proxy/metrics"
altda "github.com/ethereum-optimism/optimism/op-alt-da"
"github.com/ethereum-optimism/optimism/op-e2e/actions"
"github.com/ethereum-optimism/optimism/op-e2e/config"
Expand Down Expand Up @@ -166,11 +168,18 @@ func TestOptimismKeccak256Commitment(gt *testing.T) {
optimism.sequencer.ActL2PipelineFull(t)
optimism.ActL1Finalized(t)

// assert that EigenDA proxy's was written and read from
stat := proxyTS.Server.GetS3Stats()
// assert that keccak256 primary store was written and read from
labels := metrics.BuildServerRPCLabels("put", "", string(commitments.OptimismKeccak), "0")
delete(labels, "method")

ms, err := proxyTS.MetricPoller.PollCountMetricsWithRetry(metrics.ServerRPCStatuses, labels, 20)
require.NoError(t, err)
require.NotEmpty(t, ms)
require.Len(t, ms, 2)

require.True(t, ms[0].Count > 0)
require.True(t, ms[1].Count > 0)

require.Equal(t, 1, stat.Entries)
require.Equal(t, 1, stat.Reads)
}

func TestOptimismGenericCommitment(gt *testing.T) {
Expand Down Expand Up @@ -222,9 +231,15 @@ func TestOptimismGenericCommitment(gt *testing.T) {

// assert that EigenDA proxy's was written and read from

if useMemory() {
stat := proxyTS.Server.GetEigenDAStats()
require.Equal(t, 1, stat.Entries)
require.Equal(t, 1, stat.Reads)
}
// assert that EigenDA's primary store was written and read from
labels := metrics.BuildServerRPCLabels("put", "", string(commitments.OptimismGeneric), "0")
delete(labels, "method")

ms, err := proxyTS.MetricPoller.PollCountMetricsWithRetry(metrics.ServerRPCStatuses, labels, 20)
require.NoError(t, err)
require.NotEmpty(t, ms)
require.Len(t, ms, 2)

require.True(t, ms[0].Count > 0)
require.True(t, ms[1].Count > 0)
}
37 changes: 25 additions & 12 deletions e2e/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
"time"

"github.com/Layr-Labs/eigenda-proxy/client"
"github.com/Layr-Labs/eigenda-proxy/metrics"
"github.com/Layr-Labs/eigenda-proxy/store"

"github.com/Layr-Labs/eigenda-proxy/e2e"
"github.com/Layr-Labs/eigenda-proxy/store"
altda "github.com/ethereum-optimism/optimism/op-alt-da"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -350,9 +351,13 @@ func TestProxyServerCaching(t *testing.T) {
require.Equal(t, testPreimage, preimage)

// ensure that read was from cache
s3Stats := ts.Server.GetS3Stats()
require.Equal(t, 1, s3Stats.Reads)
require.Equal(t, 1, s3Stats.Entries)
labels := metrics.BuildSecondaryCountLabels(store.S3BackendType.String(), http.MethodGet, "success")

ms, err := ts.MetricPoller.PollCountMetricsWithRetry(metrics.SecondaryRequestStatuses, labels, 20)
require.NoError(t, err)
require.Len(t, ms, 1)

require.True(t, ms[0].Count > 0)

if useMemory() { // ensure that eigenda was not read from
memStats := ts.Server.GetEigenDAStats()
Expand Down Expand Up @@ -393,12 +398,14 @@ func TestProxyServerCachingWithRedis(t *testing.T) {
require.Equal(t, testPreimage, preimage)

// ensure that read was from cache
redStats, err := ts.Server.GetStoreStats(store.RedisBackendType)
labels := metrics.BuildSecondaryCountLabels(store.RedisBackendType.String(), http.MethodGet, "success")
ms, err := ts.MetricPoller.PollCountMetricsWithRetry(metrics.SecondaryRequestStatuses, labels, 20)
require.NoError(t, err)
require.NotEmpty(t, ms)
require.Len(t, ms, 1)
require.True(t, ms[0].Count >= 1)

require.Equal(t, 1, redStats.Reads)
require.Equal(t, 1, redStats.Entries)

// TODO: Add metrics for EigenDA dispersal/retrieval
if useMemory() { // ensure that eigenda was not read from
memStats := ts.Server.GetEigenDAStats()
require.Equal(t, 0, memStats.Reads)
Expand All @@ -420,6 +427,7 @@ func TestProxyServerReadFallback(t *testing.T) {

t.Parallel()

// setup server with S3 as a fallback option
testCfg := e2e.TestConfig(useMemory())
testCfg.UseS3Fallback = true
testCfg.Expiration = time.Millisecond * 1
Expand Down Expand Up @@ -447,11 +455,16 @@ func TestProxyServerReadFallback(t *testing.T) {
require.NoError(t, err)
require.Equal(t, testPreimage, preimage)

// ensure that read was from fallback target location (i.e, S3 for this test)
s3Stats := ts.Server.GetS3Stats()
require.Equal(t, 1, s3Stats.Reads)
require.Equal(t, 1, s3Stats.Entries)
labels := metrics.BuildSecondaryCountLabels(store.S3BackendType.String(), http.MethodGet, "success")

ms, err := ts.MetricPoller.PollCountMetricsWithRetry(metrics.SecondaryRequestStatuses, labels, 20)
require.NoError(t, err)
require.NotEmpty(t, ms)
require.Len(t, ms, 1)

require.True(t, ms[0].Count > 0)

// TODO - remove this in favor of metrics sampling
if useMemory() { // ensure that an eigenda read was attempted with zero data available
memStats := ts.Server.GetEigenDAStats()
require.Equal(t, 1, memStats.Reads)
Expand Down
39 changes: 28 additions & 11 deletions e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/minio/minio-go/v7/pkg/credentials"
"golang.org/x/exp/rand"

"github.com/ethereum-optimism/optimism/op-service/httputil"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"

Expand Down Expand Up @@ -77,7 +78,6 @@ func createS3Config(eigendaCfg server.Config) server.CLIConfig {
createS3Bucket(bucketName)

eigendaCfg.S3Config = s3.Config{
Profiling: true,
Bucket: bucketName,
Path: "",
Endpoint: "localhost:4566",
Expand Down Expand Up @@ -175,9 +175,11 @@ func TestSuiteConfig(t *testing.T, testCfg *Cfg) server.CLIConfig {
}

type TestSuite struct {
Ctx context.Context
Log log.Logger
Server *server.Server
Ctx context.Context
Log log.Logger
Server *server.Server
MetricPoller *metrics.PollerClient
MetricSvr *httputil.HTTPServer
}

func CreateTestSuite(t *testing.T, testSuiteCfg server.CLIConfig) (TestSuite, func()) {
Expand All @@ -188,28 +190,43 @@ func CreateTestSuite(t *testing.T, testSuiteCfg server.CLIConfig) (TestSuite, fu
}).New("role", svcName)

ctx := context.Background()
m := metrics.NewMetrics("default")
store, err := server.LoadStoreRouter(
ctx,
testSuiteCfg,
log,
m,
)

require.NoError(t, err)
server := server.NewServer(host, 0, store, log, metrics.NoopMetrics)
proxySvr := server.NewServer(host, 0, store, log, m)

t.Log("Starting proxy server...")
err = server.Start()
err = proxySvr.Start()
require.NoError(t, err)

metricsSvr, err := m.StartServer(host, 0)
t.Log("Starting metrics server...")

require.NoError(t, err)

kill := func() {
if err := server.Stop(); err != nil {
panic(err)
if err := proxySvr.Stop(); err != nil {
log.Error("failed to stop proxy server", "err", err)
}

if err := metricsSvr.Stop(context.Background()); err != nil {
log.Error("failed to stop metrics server", "err", err)
}
}
log.Info("started metrics server", "addr", metricsSvr.Addr())

return TestSuite{
Ctx: ctx,
Log: log,
Server: server,
Ctx: ctx,
Log: log,
Server: proxySvr,
MetricPoller: metrics.NewPoller(fmt.Sprintf("http://%s", metricsSvr.Addr().String())),
MetricSvr: metricsSvr,
}, kill
}

Expand Down
58 changes: 53 additions & 5 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metrics

import (
"fmt"
"net"
"strconv"

Expand All @@ -15,6 +16,7 @@ import (
const (
namespace = "eigenda_proxy"
httpServerSubsystem = "http_server"
secondarySubsystem = "secondary"
)

// Config ... Metrics server configuration
Expand All @@ -29,7 +31,9 @@ type Config struct {
type Metricer interface {
RecordInfo(version string)
RecordUp()
RecordRPCServerRequest(method string) func(status string, commitmentMode string, version string)

RecordRPCServerRequest(method string) func(status string, mode string, ver string)
RecordSecondaryRequest(bt string, method string) func(status string)

Document() []metrics.DocumentedMetric
}
Expand All @@ -39,10 +43,15 @@ type Metrics struct {
Info *prometheus.GaugeVec
Up prometheus.Gauge

// server metrics
HTTPServerRequestsTotal *prometheus.CounterVec
HTTPServerBadRequestHeader *prometheus.CounterVec
HTTPServerRequestDurationSeconds *prometheus.HistogramVec

// secondary metrics
SecondaryRequestsTotal *prometheus.CounterVec
SecondaryRequestDurationSec *prometheus.HistogramVec

registry *prometheus.Registry
factory metrics.Factory
}
Expand Down Expand Up @@ -80,7 +89,7 @@ func NewMetrics(subsystem string) *Metrics {
Name: "requests_total",
Help: "Total requests to the HTTP server",
}, []string{
"method", "status", "commitment_mode", "DA_cert_version",
"method", "status", "commitment_mode", "cert_version",
}),
HTTPServerBadRequestHeader: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Expand All @@ -101,6 +110,23 @@ func NewMetrics(subsystem string) *Metrics {
}, []string{
"method", // no status on histograms because those are very expensive
}),
SecondaryRequestsTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: secondarySubsystem,
Name: "requests_total",
Help: "Total requests to the secondary storage",
}, []string{
"backend_type", "method", "status",
}),
SecondaryRequestDurationSec: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: secondarySubsystem,
Name: "request_duration_seconds",
Buckets: prometheus.ExponentialBucketsRange(0.05, 1200, 20),
Help: "Histogram of secondary storage request durations",
}, []string{
"backend_type",
}),
registry: registry,
factory: factory,
}
Expand All @@ -121,7 +147,7 @@ func (m *Metrics) RecordUp() {
// RecordRPCServerRequest is a helper method to record an incoming HTTP request.
// It bumps the requests metric, and tracks how long it takes to serve a response,
// including the HTTP status code.
func (m *Metrics) RecordRPCServerRequest(method string) func(status string, mode string, ver string) {
func (m *Metrics) RecordRPCServerRequest(method string) func(status, mode, ver string) {
// we don't want to track the status code on the histogram because that would
// create a huge number of labels, and cost a lot on cloud hosted services
timer := prometheus.NewTimer(m.HTTPServerRequestDurationSeconds.WithLabelValues(method))
Expand All @@ -131,13 +157,31 @@ func (m *Metrics) RecordRPCServerRequest(method string) func(status string, mode
}
}

// RecordSecondaryPut records a secondary put/get operation.
func (m *Metrics) RecordSecondaryRequest(bt string, method string) func(status string) {
timer := prometheus.NewTimer(m.SecondaryRequestDurationSec.WithLabelValues(bt))

return func(status string) {
m.SecondaryRequestsTotal.WithLabelValues(bt, method, status).Inc()
timer.ObserveDuration()
}
}

// StartServer starts the metrics server on the given hostname and port.
// If port is 0, it automatically assigns an available port and returns the actual port.
func (m *Metrics) StartServer(hostname string, port int) (*ophttp.HTTPServer, error) {
addr := net.JoinHostPort(hostname, strconv.Itoa(port))
address := net.JoinHostPort(hostname, strconv.Itoa(port))

h := promhttp.InstrumentMetricHandler(
m.registry, promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{}),
)
return ophttp.StartHTTPServer(addr, h)

server, err := ophttp.StartHTTPServer(address, h)
if err != nil {
return nil, fmt.Errorf("failed to start HTTP server: %w", err)
}

return server, nil
}

func (m *Metrics) Document() []metrics.DocumentedMetric {
Expand All @@ -162,3 +206,7 @@ func (n *noopMetricer) RecordUp() {
func (n *noopMetricer) RecordRPCServerRequest(string) func(status, mode, ver string) {
return func(string, string, string) {}
}

func (n *noopMetricer) RecordSecondaryRequest(string, string) func(status string) {
return func(string) {}
}
Loading
Loading