Skip to content

Commit

Permalink
test(batcher): add e2e test for concurrent altda requests
Browse files Browse the repository at this point in the history
  • Loading branch information
samlaf committed Aug 28, 2024
1 parent 60b7c2a commit 947b05c
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 50 deletions.
27 changes: 23 additions & 4 deletions op-alt-da/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package altda
import (
"fmt"
"net/url"
"time"

"github.com/urfave/cli/v2"
)
Expand All @@ -11,7 +12,9 @@ var (
EnabledFlagName = altDAFlags("enabled")
DaServerAddressFlagName = altDAFlags("da-server")
VerifyOnReadFlagName = altDAFlags("verify-on-read")
DaServiceFlag = altDAFlags("da-service")
DaServiceFlagName = altDAFlags("da-service")
PutTimeoutFlagName = altDAFlags("put-timeout")
GetTimeoutFlagName = altDAFlags("get-timeout")
)

// altDAFlags returns the flag names for altDA
Expand Down Expand Up @@ -46,12 +49,24 @@ func CLIFlags(envPrefix string, category string) []cli.Flag {
Category: category,
},
&cli.BoolFlag{
Name: DaServiceFlag,
Name: DaServiceFlagName,
Usage: "Use DA service type where commitments are generated by Alt-DA server",
Value: false,
EnvVars: altDAEnvs(envPrefix, "DA_SERVICE"),
Category: category,
},
&cli.DurationFlag{
Name: PutTimeoutFlagName,
Usage: "Timeout for put requests. 0 means no timeout.",
Value: time.Duration(0),
EnvVars: altDAEnvs(envPrefix, "PUT_TIMEOUT"),
},
&cli.DurationFlag{
Name: GetTimeoutFlagName,
Usage: "Timeout for get requests. 0 means no timeout.",
Value: time.Duration(0),
EnvVars: altDAEnvs(envPrefix, "GET_TIMEOUT"),
},
}
}

Expand All @@ -60,6 +75,8 @@ type CLIConfig struct {
DAServerURL string
VerifyOnRead bool
GenericDA bool
PutTimeout time.Duration
GetTimeout time.Duration
}

func (c CLIConfig) Check() error {
Expand All @@ -75,14 +92,16 @@ func (c CLIConfig) Check() error {
}

func (c CLIConfig) NewDAClient() *DAClient {
return &DAClient{url: c.DAServerURL, verify: c.VerifyOnRead, precompute: !c.GenericDA}
return &DAClient{url: c.DAServerURL, verify: c.VerifyOnRead, precompute: !c.GenericDA, getTimeout: c.GetTimeout, putTimeout: c.PutTimeout}
}

func ReadCLIConfig(c *cli.Context) CLIConfig {
return CLIConfig{
Enabled: c.Bool(EnabledFlagName),
DAServerURL: c.String(DaServerAddressFlagName),
VerifyOnRead: c.Bool(VerifyOnReadFlagName),
GenericDA: c.Bool(DaServiceFlag),
GenericDA: c.Bool(DaServiceFlagName),
PutTimeout: c.Duration(PutTimeoutFlagName),
GetTimeout: c.Duration(GetTimeoutFlagName),
}
}
18 changes: 14 additions & 4 deletions op-alt-da/daclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/http"
"time"
)

// ErrNotFound is returned when the server could not find the input.
Expand All @@ -23,10 +24,16 @@ type DAClient struct {
verify bool
// whether commitment is precomputable (only applicable to keccak256)
precompute bool
getTimeout time.Duration
putTimeout time.Duration
}

func NewDAClient(url string, verify bool, pc bool) *DAClient {
return &DAClient{url, verify, pc}
return &DAClient{
url: url,
verify: verify,
precompute: pc,
}
}

// GetInput returns the input data for the given encoded commitment bytes.
Expand All @@ -35,7 +42,8 @@ func (c *DAClient) GetInput(ctx context.Context, comm CommitmentData) ([]byte, e
if err != nil {
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
}
resp, err := http.DefaultClient.Do(req)
client := &http.Client{Timeout: c.getTimeout}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -91,7 +99,8 @@ func (c *DAClient) setInputWithCommit(ctx context.Context, comm CommitmentData,
return fmt.Errorf("failed to create HTTP request: %w", err)
}
req.Header.Set("Content-Type", "application/octet-stream")
resp, err := http.DefaultClient.Do(req)
client := &http.Client{Timeout: c.putTimeout}
resp, err := client.Do(req)
if err != nil {
return err
}
Expand All @@ -116,7 +125,8 @@ func (c *DAClient) setInput(ctx context.Context, img []byte) (CommitmentData, er
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
}
req.Header.Set("Content-Type", "application/octet-stream")
resp, err := http.DefaultClient.Do(req)
client := &http.Client{Timeout: c.putTimeout}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
Expand Down
38 changes: 2 additions & 36 deletions op-alt-da/daclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,14 @@ package altda

import (
"context"
"fmt"
"math/rand"
"sync"
"testing"

"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)

type MemStore struct {
db map[string][]byte
lock sync.RWMutex
}

func NewMemStore() *MemStore {
return &MemStore{
db: make(map[string][]byte),
}
}

// Get retrieves the given key if it's present in the key-value store.
func (s *MemStore) Get(ctx context.Context, key []byte) ([]byte, error) {
s.lock.RLock()
defer s.lock.RUnlock()

if entry, ok := s.db[string(key)]; ok {
return common.CopyBytes(entry), nil
}
return nil, ErrNotFound
}

// Put inserts the given value into the key-value store.
func (s *MemStore) Put(ctx context.Context, key []byte, value []byte) error {
s.lock.Lock()
defer s.lock.Unlock()

s.db[string(key)] = common.CopyBytes(value)
return nil
}

func TestDAClientPrecomputed(t *testing.T) {
store := NewMemStore()
logger := testlog.Logger(t, log.LevelDebug)
Expand All @@ -56,7 +22,7 @@ func TestDAClientPrecomputed(t *testing.T) {

cfg := CLIConfig{
Enabled: true,
DAServerURL: fmt.Sprintf("http://%s", server.Endpoint()),
DAServerURL: server.HttpEndpoint(),
VerifyOnRead: true,
}
require.NoError(t, cfg.Check())
Expand Down Expand Up @@ -113,7 +79,7 @@ func TestDAClientService(t *testing.T) {

cfg := CLIConfig{
Enabled: true,
DAServerURL: fmt.Sprintf("http://%s", server.Endpoint()),
DAServerURL: server.HttpEndpoint(),
VerifyOnRead: false,
GenericDA: false,
}
Expand Down
86 changes: 86 additions & 0 deletions op-alt-da/damock.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ import (
"context"
"errors"
"io"
"net/http"
"sync"
"time"

"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -99,3 +103,85 @@ func (d *AltDADisabled) OnFinalizedHeadSignal(f HeadSignalFn) {
func (d *AltDADisabled) AdvanceL1Origin(ctx context.Context, l1 L1Fetcher, blockId eth.BlockID) error {
return ErrNotEnabled
}

type FakeDAServer struct {
*DAServer
putRequestLatency time.Duration
getRequestLatency time.Duration
}

func NewFakeDAServer(host string, port int, log log.Logger) *FakeDAServer {
store := NewMemStore()
fakeDAServer := &FakeDAServer{
DAServer: NewDAServer(host, port, store, log, true),
putRequestLatency: 0,
getRequestLatency: 0,
}
return fakeDAServer
}

func (s *FakeDAServer) HandleGet(w http.ResponseWriter, r *http.Request) {
if s.getRequestLatency > 0 {
time.Sleep(s.getRequestLatency)
}
s.DAServer.HandleGet(w, r)
}

func (s *FakeDAServer) HandlePut(w http.ResponseWriter, r *http.Request) {
if s.putRequestLatency > 0 {
time.Sleep(s.putRequestLatency)
}
s.DAServer.HandlePut(w, r)
}

func (s *FakeDAServer) Start() error {
err := s.DAServer.Start()
if err != nil {
return err
}
// Override the HandleGet/Put method registrations
mux := http.NewServeMux()
mux.HandleFunc("/get/", s.HandleGet)
mux.HandleFunc("/put/", s.HandlePut)
s.httpServer.Handler = mux
return nil
}

func (s *FakeDAServer) SetPutRequestLatency(latency time.Duration) {
s.putRequestLatency = latency
}

func (s *FakeDAServer) SetGetRequestLatency(latency time.Duration) {
s.getRequestLatency = latency
}

type MemStore struct {
db map[string][]byte
lock sync.RWMutex
}

func NewMemStore() *MemStore {
return &MemStore{
db: make(map[string][]byte),
}
}

// Get retrieves the given key if it's present in the key-value store.
func (s *MemStore) Get(ctx context.Context, key []byte) ([]byte, error) {
s.lock.RLock()
defer s.lock.RUnlock()

if entry, ok := s.db[string(key)]; ok {
return common.CopyBytes(entry), nil
}
return nil, ErrNotFound
}

// Put inserts the given value into the key-value store.
func (s *MemStore) Put(ctx context.Context, key []byte, value []byte) error {
s.lock.Lock()
defer s.lock.Unlock()

s.db[string(key)] = common.CopyBytes(value)
return nil
}
4 changes: 2 additions & 2 deletions op-alt-da/daserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ func (d *DAServer) HandlePut(w http.ResponseWriter, r *http.Request) {
}
}

func (b *DAServer) Endpoint() string {
return b.listener.Addr().String()
func (b *DAServer) HttpEndpoint() string {
return fmt.Sprintf("http://%s", b.listener.Addr().String())
}

func (b *DAServer) Stop() error {
Expand Down
38 changes: 34 additions & 4 deletions op-e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"

altda "github.com/ethereum-optimism/optimism/op-alt-da"
bss "github.com/ethereum-optimism/optimism/op-batcher/batcher"
batcherFlags "github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-chain-ops/genesis"
Expand Down Expand Up @@ -164,10 +165,11 @@ func DefaultSystemConfig(t testing.TB) SystemConfig {
},
},
Loggers: map[string]log.Logger{
RoleVerif: testlog.Logger(t, log.LevelInfo).New("role", RoleVerif),
RoleSeq: testlog.Logger(t, log.LevelInfo).New("role", RoleSeq),
"batcher": testlog.Logger(t, log.LevelInfo).New("role", "batcher"),
"proposer": testlog.Logger(t, log.LevelInfo).New("role", "proposer"),
RoleVerif: testlog.Logger(t, log.LevelInfo).New("role", RoleVerif),
RoleSeq: testlog.Logger(t, log.LevelInfo).New("role", RoleSeq),
"batcher": testlog.Logger(t, log.LevelInfo).New("role", "batcher"),
"proposer": testlog.Logger(t, log.LevelInfo).New("role", "proposer"),
"da-server": testlog.Logger(t, log.LevelInfo).New("role", "da-server"),
},
GethOptions: map[string][]geth.GethOption{},
P2PTopology: nil, // no P2P connectivity by default
Expand Down Expand Up @@ -356,6 +358,7 @@ type System struct {
L2OutputSubmitter *l2os.ProposerService
BatchSubmitter *bss.BatcherService
Mocknet mocknet.Mocknet
FakeAltDAServer *altda.FakeDAServer

L1BeaconAPIAddr string

Expand Down Expand Up @@ -588,6 +591,16 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
}
}

var rollupAltDAConfig *rollup.AltDAConfig
if cfg.DeployConfig.UseAltDA {
rollupAltDAConfig = &rollup.AltDAConfig{
DAChallengeAddress: cfg.L1Deployments.DataAvailabilityChallengeProxy,
DAChallengeWindow: cfg.DeployConfig.DAChallengeWindow,
DAResolveWindow: cfg.DeployConfig.DAResolveWindow,
CommitmentType: altda.GenericCommitmentString,
}
}

makeRollupConfig := func() rollup.Config {
return rollup.Config{
Genesis: rollup.Genesis{
Expand Down Expand Up @@ -619,6 +632,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
GraniteTime: cfg.DeployConfig.GraniteTime(uint64(cfg.DeployConfig.L1GenesisBlockTimestamp)),
InteropTime: cfg.DeployConfig.InteropTime(uint64(cfg.DeployConfig.L1GenesisBlockTimestamp)),
ProtocolVersionsAddress: cfg.L1Deployments.ProtocolVersionsProxy,
AltDAConfig: rollupAltDAConfig,
}
}
defaultConfig := makeRollupConfig()
Expand Down Expand Up @@ -906,6 +920,21 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
compressionAlgo = derive.Brotli10
}

var batcherAltDACLIConfig altda.CLIConfig
if cfg.DeployConfig.UseAltDA {
fakeAltDAServer := altda.NewFakeDAServer("127.0.0.1", 0, sys.Cfg.Loggers["da-server"])
if err := fakeAltDAServer.Start(); err != nil {
return nil, fmt.Errorf("failed to start fake altDA server: %w", err)
}
sys.FakeAltDAServer = fakeAltDAServer

batcherAltDACLIConfig = altda.CLIConfig{
Enabled: cfg.DeployConfig.UseAltDA,
DAServerURL: fakeAltDAServer.HttpEndpoint(),
VerifyOnRead: true,
GenericDA: true,
}
}
batcherCLIConfig := &bss.CLIConfig{
L1EthRpc: sys.EthInstances[RoleL1].WSEndpoint(),
L2EthRpc: sys.EthInstances[RoleSeq].WSEndpoint(),
Expand All @@ -928,6 +957,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
MaxBlocksPerSpanBatch: cfg.BatcherMaxBlocksPerSpanBatch,
DataAvailabilityType: sys.Cfg.DataAvailabilityType,
CompressionAlgo: compressionAlgo,
AltDA: batcherAltDACLIConfig,
}
// Batch Submitter
batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.Cfg.Loggers["batcher"])
Expand Down
Loading

0 comments on commit 947b05c

Please sign in to comment.