From f459eb5ea06c5bf720b30eeea14d38062a00b288 Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Fri, 24 May 2024 22:49:13 -0700 Subject: [PATCH] feat: Data domain filtering --- cmd/daserver/entrypoint.go | 2 +- common/common.go | 29 +++++ eigenda/commitment.go | 6 + server.go | 197 -------------------------------- server/params.go | 28 +++++ server/server.go | 224 +++++++++++++++++++++++++++++++++++++ test/e2e_test.go | 7 +- 7 files changed, 291 insertions(+), 202 deletions(-) delete mode 100644 server.go create mode 100644 server/params.go create mode 100644 server/server.go diff --git a/cmd/daserver/entrypoint.go b/cmd/daserver/entrypoint.go index bf2dee6..d207ca1 100644 --- a/cmd/daserver/entrypoint.go +++ b/cmd/daserver/entrypoint.go @@ -11,7 +11,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/urfave/cli/v2" - proxy "github.com/Layr-Labs/op-plasma-eigenda" + proxy "github.com/Layr-Labs/op-plasma-eigenda/server" oplog "github.com/ethereum-optimism/optimism/op-service/log" "github.com/ethereum-optimism/optimism/op-service/opio" ) diff --git a/common/common.go b/common/common.go index 34829f0..1baaa26 100644 --- a/common/common.go +++ b/common/common.go @@ -1,5 +1,34 @@ package common +import "fmt" + +var ( + ErrInvalidDomainType = fmt.Errorf("invalid domain type") +) + +// DomainType is a enumeration type for the different data domains for which a +// blob can exist between +type DomainType uint8 + +const ( + BinaryDomain DomainType = iota + PolyDomain + UnknownDomain +) + +func StrToDomainType(s string) DomainType { + switch s { + case "binary": + return BinaryDomain + case "polynomial": + return PolyDomain + default: + return UnknownDomain + } +} + +// Helper utility functions // + func EqualSlices[P comparable](s1, s2 []P) bool { if len(s1) != len(s2) { return false diff --git a/eigenda/commitment.go b/eigenda/commitment.go index 7cd1a80..cec7328 100644 --- a/eigenda/commitment.go +++ b/eigenda/commitment.go @@ -4,6 +4,7 @@ import ( "errors" op_plasma "github.com/ethereum-optimism/optimism/op-plasma" + "github.com/ethereum/go-ethereum/common/hexutil" ) // ErrCommitmentLength is returned when the commitment length is invalid. @@ -35,6 +36,11 @@ func (c Commitment) Encode() []byte { return append([]byte{byte(EigenDA), byte(EigenV0)}, c...) } +func StringToCommit(key string) (Commitment, error) { + comm, _ := hexutil.Decode(key) + return DecodeCommitment(comm) +} + // DecodeCommitment verifies and decodes an EigenDACommit from raw encoded bytes. func DecodeCommitment(commitment []byte) (Commitment, error) { if len(commitment) <= 3 { diff --git a/server.go b/server.go deleted file mode 100644 index 8747c38..0000000 --- a/server.go +++ /dev/null @@ -1,197 +0,0 @@ -package proxy - -import ( - "context" - "errors" - "fmt" - "io" - "net" - "net/http" - "path" - "strconv" - "time" - - "github.com/Layr-Labs/op-plasma-eigenda/eigenda" - "github.com/Layr-Labs/op-plasma-eigenda/metrics" - "github.com/ethereum-optimism/optimism/op-service/rpc" - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/log" -) - -var ( - ErrNotFound = errors.New("not found") -) - -type Store interface { - // Get retrieves the given key if it's present in the key-value data store. - Get(ctx context.Context, key []byte) ([]byte, error) - // Put inserts the given value into the key-value data store. - Put(ctx context.Context, value []byte) (key []byte, err error) -} - -type DAServer struct { - log log.Logger - endpoint string - store Store - m metrics.Metricer - tls *rpc.ServerTLSConfig - httpServer *http.Server - listener net.Listener -} - -func NewServer(host string, port int, store Store, log log.Logger, m metrics.Metricer) *DAServer { - endpoint := net.JoinHostPort(host, strconv.Itoa(port)) - return &DAServer{ - m: m, - log: log, - endpoint: endpoint, - store: store, - httpServer: &http.Server{ - Addr: endpoint, - ReadHeaderTimeout: 10 * time.Second, - // aligned with existing blob finalization times - WriteTimeout: 40 * time.Minute, - }, - } -} - -func (d *DAServer) Start() error { - mux := http.NewServeMux() - - mux.HandleFunc("/get/", d.HandleGet) - mux.HandleFunc("/put/", d.HandlePut) - mux.HandleFunc("/health", d.Health) - - d.httpServer.Handler = mux - - listener, err := net.Listen("tcp", d.endpoint) - if err != nil { - return fmt.Errorf("failed to listen: %w", err) - } - d.listener = listener - - d.endpoint = listener.Addr().String() - - d.log.Info("Starting DA server", "endpoint", d.endpoint) - errCh := make(chan error, 1) - go func() { - if d.tls != nil { - if err := d.httpServer.ServeTLS(d.listener, "", ""); err != nil { - errCh <- err - } - } else { - if err := d.httpServer.Serve(d.listener); err != nil { - errCh <- err - } - } - }() - - // verify that the server comes up - tick := time.NewTimer(10 * time.Millisecond) - defer tick.Stop() - - select { - case err := <-errCh: - return fmt.Errorf("http server failed: %w", err) - case <-tick.C: - return nil - } -} - -func (d *DAServer) Health(w http.ResponseWriter, r *http.Request) { - d.log.Info("GET", "url", r.URL) - recordDur := d.m.RecordRPCServerRequest("health") - defer recordDur() - - w.WriteHeader(http.StatusOK) -} - -func (d *DAServer) HandleGet(w http.ResponseWriter, r *http.Request) { - d.log.Info("GET", "url", r.URL) - recordDur := d.m.RecordRPCServerRequest("get") - defer recordDur() - - route := path.Dir(r.URL.Path) - if route != "/get" { - d.log.Info("invalid route", "route", route) - w.WriteHeader(http.StatusBadRequest) - return - } - - key := path.Base(r.URL.Path) - comm, err := hexutil.Decode(key) - if err != nil { - d.log.Info("failed to decode commitment bytes from hex", "err", err, "key", key) - w.WriteHeader(http.StatusBadRequest) - return - } - - decodedComm, err := eigenda.DecodeCommitment(comm) - if err != nil { - d.log.Info("failed to decode commitment", "err", err, "key", key) - w.WriteHeader(http.StatusBadRequest) - return - } - - input, err := d.store.Get(r.Context(), decodedComm) - if err != nil && errors.Is(err, ErrNotFound) { - d.log.Info("no entry found in DA store", "key", key) - w.WriteHeader(http.StatusNotFound) - return - } - if err != nil { - d.log.Error("internal server error", "err", err) - w.WriteHeader(http.StatusInternalServerError) - return - } - if _, err := w.Write(input); err != nil { - d.log.Error("failed to write response", "err", err) - w.WriteHeader(http.StatusInternalServerError) - return - } -} - -func (d *DAServer) HandlePut(w http.ResponseWriter, r *http.Request) { - d.log.Info("PUT", "url", r.URL) - recordDur := d.m.RecordRPCServerRequest("put") - defer recordDur() - - route := path.Dir(r.URL.Path) - if route != "/put" { - w.WriteHeader(http.StatusBadRequest) - return - } - - input, err := io.ReadAll(r.Body) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - - var comm []byte - if comm, err = d.store.Put(r.Context(), input); err != nil { - d.log.Error("Failed to store commitment to the DA server", "err", err, "comm", comm) - w.WriteHeader(http.StatusInternalServerError) - return - } - - // write out encoded commitment - if _, err := w.Write(eigenda.Commitment.Encode(comm)); err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } -} - -func (b *DAServer) Endpoint() string { - return b.listener.Addr().String() -} - -func (b *DAServer) Stop() error { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := b.httpServer.Shutdown(ctx); err != nil { - b.log.Error("Failed to shutdown DA server", "err", err) - return err - } - return nil -} diff --git a/server/params.go b/server/params.go new file mode 100644 index 0000000..89d0b92 --- /dev/null +++ b/server/params.go @@ -0,0 +1,28 @@ +package proxy + +import ( + "net/http" + + "github.com/Layr-Labs/op-plasma-eigenda/common" +) + +const ( + GetRoute = "/get" + PutRoute = "/put" + + DomainFilterKey = "domain" +) + +func ReadDomainFilter(r *http.Request) (common.DomainType, error) { + query := r.URL.Query() + key := query.Get(DomainFilterKey) + if key == "" { // default + return common.BinaryDomain, nil + } + dt := common.StrToDomainType(key) + if dt == common.UnknownDomain { + return common.UnknownDomain, common.ErrInvalidDomainType + } + + return dt, nil +} diff --git a/server/server.go b/server/server.go new file mode 100644 index 0000000..d9afe15 --- /dev/null +++ b/server/server.go @@ -0,0 +1,224 @@ +package proxy + +import ( + "context" + "errors" + "fmt" + "io" + "net" + "net/http" + "path" + "strconv" + "time" + + "github.com/Layr-Labs/op-plasma-eigenda/eigenda" + "github.com/Layr-Labs/op-plasma-eigenda/metrics" + "github.com/ethereum-optimism/optimism/op-service/rpc" + "github.com/ethereum/go-ethereum/log" +) + +var ( + ErrNotFound = errors.New("not found") +) + +const ( + invalidDomain = "invalid domain type" +) + +type Store interface { + // Get retrieves the given key if it's present in the key-value data store. + Get(ctx context.Context, key []byte) ([]byte, error) + // Put inserts the given value into the key-value data store. + Put(ctx context.Context, value []byte) (key []byte, err error) +} + +type Server struct { + log log.Logger + endpoint string + store Store + m metrics.Metricer + tls *rpc.ServerTLSConfig + httpServer *http.Server + listener net.Listener +} + +func NewServer(host string, port int, store Store, log log.Logger, m metrics.Metricer) *Server { + endpoint := net.JoinHostPort(host, strconv.Itoa(port)) + return &Server{ + m: m, + log: log, + endpoint: endpoint, + store: store, + httpServer: &http.Server{ + Addr: endpoint, + ReadHeaderTimeout: 10 * time.Second, + // aligned with existing blob finalization times + WriteTimeout: 40 * time.Minute, + }, + } +} + +// WithVerify is a middleware that verifies the route path. +func WithVerify(handleFn func(http.ResponseWriter, *http.Request), path string) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + route := r.URL.Path + + if route != path { + w.WriteHeader(http.StatusBadRequest) + return + } + + handleFn(w, r) + } +} + +// WithMetrics is a middleware that records metrics for the route path. +func WithMetrics(handleFn func(http.ResponseWriter, *http.Request), m metrics.Metricer) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + recordDur := m.RecordRPCServerRequest(r.URL.Path) + defer recordDur() + + handleFn(w, r) + } +} + +func WithLogging(handleFn func(http.ResponseWriter, *http.Request), log log.Logger) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + log.Info("request", "method", r.Method, "url", r.URL) + handleFn(w, r) + } +} + +func (svr *Server) Start() error { + mux := http.NewServeMux() + + mux.HandleFunc(GetRoute, WithMetrics(WithVerify(svr.HandleGet, GetRoute), svr.m)) + mux.HandleFunc(PutRoute, WithMetrics(WithVerify(svr.HandlePut, PutRoute), svr.m)) + mux.HandleFunc("/health", WithVerify(svr.Health, "/health")) + + svr.httpServer.Handler = mux + + listener, err := net.Listen("tcp", svr.endpoint) + if err != nil { + return fmt.Errorf("failed to listen: %w", err) + } + svr.listener = listener + + svr.endpoint = listener.Addr().String() + + svr.log.Info("Starting DA server", "endpoint", svr.endpoint) + errCh := make(chan error, 1) + go func() { + if svr.tls != nil { + if err := svr.httpServer.ServeTLS(svr.listener, "", ""); err != nil { + errCh <- err + } + } else { + if err := svr.httpServer.Serve(svr.listener); err != nil { + errCh <- err + } + } + }() + + // verify that the server comes up + tick := time.NewTimer(10 * time.Millisecond) + defer tick.Stop() + + select { + case err := <-errCh: + return fmt.Errorf("http server failed: %w", err) + case <-tick.C: + return nil + } +} + +func (svr *Server) Endpoint() string { + return svr.listener.Addr().String() +} + +func (svr *Server) Stop() error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := svr.httpServer.Shutdown(ctx); err != nil { + svr.log.Error("Failed to shutdown proxy server", "err", err) + return err + } + return nil +} +func (svr *Server) Health(w http.ResponseWriter, r *http.Request) { + svr.log.Info("GET", "url", r.URL) + + w.WriteHeader(http.StatusOK) +} + +func (svr *Server) HandleGet(w http.ResponseWriter, r *http.Request) { + svr.log.Info("GET", "url", r.URL) + + // TODO: Add domain filtering + _, err := ReadDomainFilter(r) + if err != nil { + svr.WriteBadRequest(w, invalidDomain) + return + } + + key := path.Base(r.URL.Path) + comm, err := eigenda.StringToCommit(key) + if err != nil { + svr.log.Info("failed to decode commitment", "err", err, "key", key) + w.WriteHeader(http.StatusBadRequest) + return + } + + input, err := svr.store.Get(r.Context(), comm) + if err != nil && errors.Is(err, ErrNotFound) { + svr.WriteNotFound(w, err.Error()) + return + } + + if err != nil { + svr.WriteInternalError(w, err) + return + } + + svr.WriteResponse(w, input) +} + +func (svr *Server) HandlePut(w http.ResponseWriter, r *http.Request) { + svr.log.Info("PUT", "url", r.URL) + + input, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + var comm []byte + if comm, err = svr.store.Put(r.Context(), input); err != nil { + svr.WriteInternalError(w, err) + return + } + + // write out encoded commitment + svr.WriteResponse(w, comm) +} + +func (svr *Server) WriteResponse(w http.ResponseWriter, data []byte) { + if _, err := w.Write(data); err != nil { + svr.WriteInternalError(w, err) + } +} + +func (svr *Server) WriteInternalError(w http.ResponseWriter, err error) { + svr.log.Error("internal server error", "err", err) + w.WriteHeader(http.StatusInternalServerError) +} + +func (svr *Server) WriteNotFound(w http.ResponseWriter, msg string) { + svr.log.Info("not found", "msg", msg) + w.WriteHeader(http.StatusNotFound) +} + +func (svr *Server) WriteBadRequest(w http.ResponseWriter, msg string) { + svr.log.Info("bad request", "msg", msg) + w.WriteHeader(http.StatusBadRequest) +} diff --git a/test/e2e_test.go b/test/e2e_test.go index 70460c4..f9fd728 100644 --- a/test/e2e_test.go +++ b/test/e2e_test.go @@ -9,10 +9,9 @@ import ( "time" "github.com/Layr-Labs/eigenda/encoding/kzg" - plasma "github.com/Layr-Labs/op-plasma-eigenda" - proxy "github.com/Layr-Labs/op-plasma-eigenda" "github.com/Layr-Labs/op-plasma-eigenda/eigenda" "github.com/Layr-Labs/op-plasma-eigenda/metrics" + proxy "github.com/Layr-Labs/op-plasma-eigenda/server" "github.com/Layr-Labs/op-plasma-eigenda/store" "github.com/Layr-Labs/op-plasma-eigenda/verify" op_plasma "github.com/ethereum-optimism/optimism/op-plasma" @@ -34,7 +33,7 @@ const ( type TestSuite struct { ctx context.Context log log.Logger - server *plasma.DAServer + server *proxy.Server } func createTestSuite(t *testing.T) (TestSuite, func()) { @@ -99,7 +98,7 @@ func createTestSuite(t *testing.T) (TestSuite, func()) { }, kill } -func TestE2EPutGetLogicForEigenDAStore(t *testing.T) { +func TestOptimismPlasmaE2E(t *testing.T) { ts, kill := createTestSuite(t) defer kill()