diff --git a/blxr/blxerr/errors.go b/blxr/blxerr/errors.go new file mode 100644 index 0000000000..93df0abe04 --- /dev/null +++ b/blxr/blxerr/errors.go @@ -0,0 +1,17 @@ +package blxerr + +import ( + "fmt" + "math/big" +) + +// ProposedBlockLessProfitableErr happens when a proposed block cannot be accepted because of the block is less profitable +// than a cached block. +type ProposedBlockLessProfitableErr struct { + RewardThreshold *big.Int +} + +func (err ProposedBlockLessProfitableErr) Error() string { + // do not change error message as it's sent to validator's clients and builders parse it! + return fmt.Sprintf("block cannot be accepted. Reward threshold is %d", err.RewardThreshold) +} diff --git a/blxr/blxerr/errors_test.go b/blxr/blxerr/errors_test.go new file mode 100644 index 0000000000..7d85b4ae57 --- /dev/null +++ b/blxr/blxerr/errors_test.go @@ -0,0 +1,39 @@ +package blxerr + +import ( + "math/big" + "testing" +) + +func TestProposedBlockLessProfitableErr_Error(t *testing.T) { + type fields struct { + RewardThreshold *big.Int + } + tests := []struct { + name string + fields fields + want string + }{ + { + "with reward value", + fields{big.NewInt(10_000)}, + "block cannot be accepted. Reward threshold is 10000", + }, + + { + "without reward value", + fields{nil}, + "block cannot be accepted. Reward threshold is ", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := ProposedBlockLessProfitableErr{ + RewardThreshold: tt.fields.RewardThreshold, + } + if got := err.Error(); got != tt.want { + t.Errorf("Error() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/blxr/version/version.go b/blxr/version/version.go new file mode 100644 index 0000000000..0e6b83a188 --- /dev/null +++ b/blxr/version/version.go @@ -0,0 +1,19 @@ +package version + +import ( + "runtime/debug" +) + +var commitHash = func() string { + if info, ok := debug.ReadBuildInfo(); ok { + for _, setting := range info.Settings { + if setting.Key == "vcs.revision" { + return setting.Value + } + } + } + return "" +}() + +// CommitHash returns the commit hash of the current build +func CommitHash() string { return commitHash } diff --git a/build/ci.go b/build/ci.go index b1dd476b17..3a5bee7662 100644 --- a/build/ci.go +++ b/build/ci.go @@ -354,13 +354,14 @@ func doLint(cmdline []string) { linter := downloadLinter(*cachedir) lflags := []string{"run", "--config", ".golangci.yml"} + build.MustRunCommand("./build/goimports.sh") // BX: add run of default goimports build.MustRunCommand(linter, append(lflags, packages...)...) fmt.Println("You have achieved perfection.") } // downloadLinter downloads and unpacks golangci-lint. func downloadLinter(cachedir string) string { - const version = "1.52.2" + const version = "1.55.2" csdb := build.MustLoadChecksums("build/checksums.txt") arch := runtime.GOARCH diff --git a/build/goimports.sh b/build/goimports.sh index 1fcace6a4b..b4da6a8cc2 100755 --- a/build/goimports.sh +++ b/build/goimports.sh @@ -9,7 +9,7 @@ find_files() { -o -path './crypto/bn256' \ -o -path '*/vendor/*' \ \) -prune \ - \) -name '*.go' + \) -name '*.go' -not -name '*.pb.go' -not -name '*_rlp.go' } GOFMT="gofmt -s -w" diff --git a/cmd/clef/main.go b/cmd/clef/main.go index 06a8cd7ab7..ddf23c95bd 100644 --- a/cmd/clef/main.go +++ b/cmd/clef/main.go @@ -738,7 +738,7 @@ func signer(c *cli.Context) error { if err != nil { utils.Fatalf("Could not register API: %w", err) } - handler := node.NewHTTPHandlerStack(srv, cors, vhosts, nil) + handler := node.NewHTTPHandlerStack(srv, cors, vhosts, nil, nil) // set port port := c.Int(rpcPortFlag.Name) diff --git a/cmd/geth/consolecmd_test.go b/cmd/geth/consolecmd_test.go index 58872a71c9..4ffc2c1759 100644 --- a/cmd/geth/consolecmd_test.go +++ b/cmd/geth/consolecmd_test.go @@ -30,7 +30,7 @@ import ( ) const ( - ipcAPIs = "admin:1.0 debug:1.0 eth:1.0 miner:1.0 net:1.0 parlia:1.0 rpc:1.0 txpool:1.0 web3:1.0" + ipcAPIs = "admin:1.0 debug:1.0 eth:1.0 mev:1.0 miner:1.0 net:1.0 parlia:1.0 rpc:1.0 txpool:1.0 web3:1.0" httpAPIs = "eth:1.0 net:1.0 rpc:1.0 web3:1.0" ) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index f11d56d53a..825520f6fe 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -170,7 +170,7 @@ var ( utils.BLSPasswordFileFlag, utils.BLSWalletDirFlag, utils.VoteJournalDirFlag, - }, utils.NetworkFlags, utils.DatabasePathFlags) + }, utils.NetworkFlags, utils.DatabasePathFlags, utils.HTTPSecuredFlags, utils.MinerMEVFlags) rpcFlags = []cli.Flag{ utils.HTTPEnabledFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index e7dac8e4ce..3c2dc45192 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1522,6 +1522,7 @@ func SetNodeConfig(ctx *cli.Context, cfg *node.Config) { SetP2PConfig(ctx, &cfg.P2P) setIPC(ctx, cfg) setHTTP(ctx, cfg) + setHTTPSecuredIP(ctx, cfg) setGraphQL(ctx, cfg) setWS(ctx, cfg) setNodeUserIdent(ctx, cfg) @@ -1800,6 +1801,7 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { setMiner(ctx, &cfg.Miner) setRequiredBlocks(ctx, cfg) setLes(ctx, cfg) + setMEV(ctx, stack, &cfg.Miner) // Cap the cache allowance and tune the garbage collector mem, err := gopsutil.VirtualMemory() diff --git a/cmd/utils/flags_blxr.go b/cmd/utils/flags_blxr.go new file mode 100644 index 0000000000..64ac3ebe10 --- /dev/null +++ b/cmd/utils/flags_blxr.go @@ -0,0 +1,195 @@ +package utils + +import ( + "strings" + + "github.com/ethereum/go-ethereum/accounts" + "github.com/ethereum/go-ethereum/accounts/keystore" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/internal/flags" + "github.com/ethereum/go-ethereum/miner" + "github.com/ethereum/go-ethereum/node" + "github.com/urfave/cli/v2" +) + +var ( + HTTPSecuredIPPortFlag = &cli.IntFlag{ + Name: "http.securedipport", + Usage: "HTTP-RPC server secured by IP listening port", + Value: node.DefaultHTTPSecuredIPPort, + Category: flags.APICategory, + } + HTTPSecuredIPAllowedIPsFlag = &cli.StringFlag{ + Name: "http.securedipvallowedipss", + Usage: "Comma separated list of IPs from which to accept requests (server enforced). Accepts '*' wildcard.", + Value: strings.Join(node.DefaultConfig.HTTPSecuredIPAllowedIPs, ","), + Category: flags.APICategory, + } + HTTPSecuredIPApiFlag = &cli.StringFlag{ + Name: "http.securedipapi", + Usage: "Comma separated list of API's offered over the HTTP-RPC secured by IP interface", + Value: "", + Category: flags.APICategory, + } + + HTTPSecuredFlags = []cli.Flag{ + HTTPSecuredIPPortFlag, + HTTPSecuredIPAllowedIPsFlag, + HTTPSecuredIPApiFlag, + } +) + +// setHTTPSecuredIP creates the HTTP MEV RPC listener interface string from the set +// command line flags, returning empty if the HTTP endpoint is disabled. +func setHTTPSecuredIP(ctx *cli.Context, cfg *node.Config) { + if ctx.Bool(HTTPEnabledFlag.Name) { + if cfg.HTTPHost == "" { + cfg.HTTPHost = "127.0.0.1" + } + if ctx.IsSet(HTTPListenAddrFlag.Name) { + cfg.HTTPHost = ctx.String(HTTPListenAddrFlag.Name) + } + } + + if ctx.IsSet(HTTPSecuredIPPortFlag.Name) { + cfg.HTTPSecuredIPPort = ctx.Int(HTTPSecuredIPPortFlag.Name) + } + + if ctx.IsSet(HTTPCORSDomainFlag.Name) { + cfg.HTTPCors = SplitAndTrim(ctx.String(HTTPCORSDomainFlag.Name)) + } + + if ctx.IsSet(HTTPSecuredIPApiFlag.Name) { + cfg.HTTPSecuredIPModules = SplitAndTrim(ctx.String(HTTPSecuredIPApiFlag.Name)) + } + + if ctx.IsSet(HTTPSecuredIPAllowedIPsFlag.Name) { + cfg.HTTPSecuredIPAllowedIPs = SplitAndTrim(ctx.String(HTTPSecuredIPAllowedIPsFlag.Name)) + } + + if ctx.IsSet(HTTPPathPrefixFlag.Name) { + cfg.HTTPPathPrefix = ctx.String(HTTPPathPrefixFlag.Name) + } + + if ctx.IsSet(AllowUnprotectedTxs.Name) { + cfg.AllowUnprotectedTxs = ctx.Bool(AllowUnprotectedTxs.Name) + } +} + +var ( + MinerMEVRelaysFlag = &cli.StringSliceFlag{ + Name: "miner.mevrelays", + Usage: "Destinations to register the validator each epoch. The miner will accept proposed blocks from these urls, if they are profitable.", + Category: flags.MinerCategory, + } + MinerMEVProposedBlockUriFlag = &cli.StringFlag{ + Name: "miner.mevproposedblockuri", + Usage: "The uri MEV relays should send the proposedBlock to.", + Category: flags.MinerCategory, + } + MinerMEVProposedBlockNamespaceFlag = &cli.StringFlag{ + Name: "miner.mevproposedblocknamespace", + Usage: "The namespace implements the proposedBlock function (default = eth). ", + Value: "eth", + Category: flags.MinerCategory, + } + MinerPreferMEVRelays = &cli.StringSliceFlag{ + Name: "miner.prefer-mev-relays", + Usage: "Enable preferring specific mev-relays", + Category: flags.MinerCategory, + } + + MinerMEVFlags = []cli.Flag{ + MinerMEVRelaysFlag, + MinerMEVProposedBlockUriFlag, + MinerMEVProposedBlockNamespaceFlag, + MinerPreferMEVRelays, + } +) + +func setMEV(ctx *cli.Context, stack *node.Node, cfg *miner.Config) { + if ctx.IsSet(MinerMEVRelaysFlag.Name) { + cfg.MEVRelays = ctx.StringSlice(MinerMEVRelaysFlag.Name) + } + + if len(cfg.MEVRelays) != 0 { + if ctx.IsSet(MinerPreferMEVRelays.Name) { + cfg.PreferMEVRelays = miner.NewAcceptRelayMap(ctx.StringSlice(MinerPreferMEVRelays.Name)...) + } else { + cfg.PreferMEVRelays = miner.NewAcceptRelayMap() + } + } + + if len(cfg.MEVRelays) == 0 { + return + } + + keystores := stack.AccountManager().Backends(keystore.KeyStoreType) + if len(keystores) == 0 { + return + } + + ks, ok := keystores[0].(*keystore.KeyStore) + if !ok { + return + } + + if ctx.IsSet(MinerMEVProposedBlockUriFlag.Name) { + cfg.ProposedBlockUri = ctx.String(MinerMEVProposedBlockUriFlag.Name) + } + + if cfg.ProposedBlockNamespace == "" && ctx.IsSet(MinerMEVProposedBlockNamespaceFlag.Name) { + cfg.ProposedBlockNamespace = ctx.String(MinerMEVProposedBlockNamespaceFlag.Name) + } + + account, err := ks.Find(accounts.Account{Address: cfg.Etherbase}) + if err != nil { + Fatalf("Could not find the validator public address %v to sign the registerValidator message, %v", cfg.Etherbase, err) + } + + registerHash := accounts.TextHash([]byte(cfg.ProposedBlockUri)) + passwordList := MakePasswordList(ctx) + if passwordList == nil { + cfg.RegisterValidatorSignedHash, err = ks.SignHash(account, registerHash) + if err != nil { + Fatalf("Failed sign registerValidator message unlocked with error: %v", err) + } + } else { + passwordFound := false + for _, password := range passwordList { + cfg.RegisterValidatorSignedHash, err = ks.SignHashWithPassphrase(account, password, registerHash) + if err == nil { + passwordFound = true + break + } + } + if !passwordFound { + Fatalf("Failed sign registerValidator message with passphrase with error") + } + } + + signature := make([]byte, crypto.SignatureLength) + copy(signature, cfg.RegisterValidatorSignedHash) + // verify the validator public address used to sign the registerValidator message + if len(signature) != crypto.SignatureLength { + Fatalf("signature used to sign registerValidator must be %d bytes long", crypto.SignatureLength) + } + + if signature[crypto.RecoveryIDOffset] == 27 || signature[crypto.RecoveryIDOffset] == 28 { + signature[crypto.RecoveryIDOffset] -= 27 // Transform yellow paper V from 27/28 to 0/1 + } + + if signature[crypto.RecoveryIDOffset] != 0 && signature[crypto.RecoveryIDOffset] != 1 { + Fatalf("invalid Ethereum signature of the registerValidator (V is not 0, or 1, or 27 or 28), it is %v", cfg.RegisterValidatorSignedHash[crypto.RecoveryIDOffset]) + } + + rpk, err := crypto.SigToPub(registerHash, signature) + if err != nil { + Fatalf("Failed to get validator public address from the registerValidator signed message %v", err) + } + + addr := crypto.PubkeyToAddress(*rpk) + if addr != account.Address { + Fatalf("Validator public Etherbase %v was not used to sign the registerValidator %v", account.Address, addr) + } +} diff --git a/eth/api_backend_blxr.go b/eth/api_backend_blxr.go new file mode 100644 index 0000000000..416ba878d7 --- /dev/null +++ b/eth/api_backend_blxr.go @@ -0,0 +1,38 @@ +package eth + +import ( + "context" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rpc" +) + +func (b *EthAPIBackend) ProposedBlock(ctx context.Context, mevRelay string, blockNumber *big.Int, prevBlockHash common.Hash, reward *big.Int, gasLimit uint64, gasUsed uint64, txs types.Transactions, unRevertedHashes map[common.Hash]struct{}) (simDuration time.Duration, err error) { + return b.eth.miner.ProposedBlock(ctx, mevRelay, blockNumber, prevBlockHash, reward, gasLimit, gasUsed, txs, unRevertedHashes) +} + +func (b *EthAPIBackend) AddRelay(ctx context.Context, mevRelay string) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + return b.eth.miner.AddRelay(ctx, mevRelay) + } +} + +func (b *EthAPIBackend) RemoveRelay(ctx context.Context, mevRelay string) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + return b.eth.miner.RemoveRelay(mevRelay) + } +} + +func (b *EthAPIBackend) BlockNumber(ctx context.Context) uint64 { + header, _ := b.HeaderByNumber(ctx, rpc.LatestBlockNumber) // latest header should always be available + return header.Number.Uint64() +} diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 23fb21975b..134d7530b1 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -833,7 +833,7 @@ func TestVoteSubscription(t *testing.T) { backend, sys = newTestFilterSystem(t, db, Config{Timeout: 5 * time.Minute}) api = NewFilterAPI(sys, false) votes = []*types.VoteEnvelope{ - &types.VoteEnvelope{ + { VoteAddress: types.BLSPublicKey{}, Signature: types.BLSSignature{}, Data: &types.VoteData{ @@ -843,7 +843,7 @@ func TestVoteSubscription(t *testing.T) { TargetHash: common.BytesToHash(common.Hex2Bytes(string(rune(1)))), }, }, - &types.VoteEnvelope{ + { VoteAddress: types.BLSPublicKey{}, Signature: types.BLSSignature{}, Data: &types.VoteData{ @@ -853,7 +853,7 @@ func TestVoteSubscription(t *testing.T) { TargetHash: common.BytesToHash(common.Hex2Bytes(string(rune(2)))), }, }, - &types.VoteEnvelope{ + { VoteAddress: types.BLSPublicKey{}, Signature: types.BLSSignature{}, Data: &types.VoteData{ @@ -863,7 +863,7 @@ func TestVoteSubscription(t *testing.T) { TargetHash: common.BytesToHash(common.Hex2Bytes(string(rune(3)))), }, }, - &types.VoteEnvelope{ + { VoteAddress: types.BLSPublicKey{}, Signature: types.BLSSignature{}, Data: &types.VoteData{ diff --git a/ethclient/ethclient_test.go b/ethclient/ethclient_test.go index 45e0e9f413..f5d24218f3 100644 --- a/ethclient/ethclient_test.go +++ b/ethclient/ethclient_test.go @@ -738,7 +738,7 @@ func sendTransactionConditional(ec *Client) error { root := common.HexToHash("0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421") return ec.SendTransactionConditional(context.Background(), tx, ethapi.TransactionOpts{ KnownAccounts: map[common.Address]ethapi.AccountStorage{ - testAddr: ethapi.AccountStorage{ + testAddr: { StorageRoot: &root, }, }, diff --git a/go.mod b/go.mod index ed760753e6..3ff2cdd282 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,11 @@ module github.com/ethereum/go-ethereum -go 1.21 +go 1.21.4 toolchain go1.21.5 +require github.com/bloXroute-Labs/bx-mev-tools/pkg/maputil v0.0.0-20231218204341-427df0534ddf + require ( github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.3.0 github.com/VictoriaMetrics/fastcache v1.12.1 diff --git a/go.sum b/go.sum index f7987ab3ec..6e78cf92f3 100644 --- a/go.sum +++ b/go.sum @@ -188,6 +188,8 @@ github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kB github.com/bits-and-blooms/bitset v1.7.0 h1:YjAGVd3XmtK9ktAbX8Zg2g2PwLIMjGREZJHlV4j7NEo= github.com/bits-and-blooms/bitset v1.7.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= +github.com/bloXroute-Labs/bx-mev-tools/pkg/maputil v0.0.0-20231218204341-427df0534ddf h1:oMuFsU2VcWfPNBgg+YBxkSL0SXMxmFAeezoDDNUGf10= +github.com/bloXroute-Labs/bx-mev-tools/pkg/maputil v0.0.0-20231218204341-427df0534ddf/go.mod h1:KIa7ny4RSMejp5S0cPlORMkPJXnyu2jDiXb0ElQdy18= github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+WjiTceGBHIoTvn60HIbs7Hm7bcHjyrSqYB9c= github.com/bnb-chain/greenfield-tendermint v0.0.0-20230417032003-4cda1f296fb2 h1:jubavYCs/mCFj/g6Utl+l4SfpykdBdWJFPsvb9FcEXU= github.com/bnb-chain/greenfield-tendermint v0.0.0-20230417032003-4cda1f296fb2/go.mod h1:9q11eHNRY9FDwFH+4pompzPNGv//Z3VcfvkELaHJPMs= diff --git a/graphql/service.go b/graphql/service.go index 4ca427658a..37cd64ec08 100644 --- a/graphql/service.go +++ b/graphql/service.go @@ -119,7 +119,7 @@ func newHandler(stack *node.Node, backend ethapi.Backend, filterSystem *filters. return nil, err } h := handler{Schema: s} - handler := node.NewHTTPHandlerStack(h, cors, vhosts, nil) + handler := node.NewHTTPHandlerStack(h, cors, vhosts, nil, nil) stack.RegisterHandler("GraphQL UI", "/graphql/ui", GraphiQL{}) stack.RegisterHandler("GraphQL UI", "/graphql/ui/", GraphiQL{}) diff --git a/internal/ethapi/api_blxr.go b/internal/ethapi/api_blxr.go new file mode 100644 index 0000000000..9e7774002a --- /dev/null +++ b/internal/ethapi/api_blxr.go @@ -0,0 +1,128 @@ +package ethapi + +import ( + "context" + "errors" + "fmt" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rpc" +) + +const timestampFormat = "2006-01-02 15:04:05.000000" + +// RegisterValidatorArgs has the epoch and the uri to send the proposedBlock to the validator +type RegisterValidatorArgs struct { + Data hexutil.Bytes `json:"data"` // bytes of string with callback ProposedBlockUri + Signature hexutil.Bytes `json:"signature"` + Namespace string `json:"namespace"` + CommitHash string `json:"commitHash"` + GasCeil uint64 `json:"gasCeil"` +} + +// MevAPI provides an API to MEV endpoints. +type MevAPI struct{ b Backend } + +// NewMevAPI creates a new MEV protocol API. +func NewMevAPI(b Backend) *MevAPI { return &MevAPI{b} } + +type ProposedBlockArgs struct { + MEVRelay string `json:"mevRelay,omitempty"` + BlockNumber rpc.BlockNumber `json:"blockNumber"` + PrevBlockHash common.Hash `json:"prevBlockHash"` + BlockReward *big.Int `json:"blockReward"` + GasLimit uint64 `json:"gasLimit"` + GasUsed uint64 `json:"gasUsed"` + Payload []hexutil.Bytes `json:"payload"` + UnRevertedHashes []common.Hash `json:"unRevertedHashes,omitempty"` +} + +type ProposedBlockResponse struct { + ReceivedAt string `json:"receivedAt"` + SimulatedDuration time.Duration `json:"simulatedDuration"` + ResponseSentAt string `json:"responseSentAt"` +} + +// ProposedBlock will submit the block to the miner worker +func (s *MevAPI) ProposedBlock(ctx context.Context, args ProposedBlockArgs) (*ProposedBlockResponse, error) { + progress := s.b.SyncProgress() + if progress.CurrentBlock < progress.HighestBlock { + return nil, fmt.Errorf( + "syncing in the process. Current block: %d, highest block: %d", + progress.CurrentBlock, progress.HighestBlock) + } + + var ( + receivedAt = time.Now().UTC() + txs types.Transactions + ) + + if len(args.Payload) == 0 { + return nil, errors.New("block missing txs") + } + + if args.BlockNumber == 0 { + return nil, errors.New("block missing blockNumber") + } + + currentBlock := s.b.CurrentBlock() + blockOnChain := currentBlock.Number + proposedBlockNumber := big.NewInt(args.BlockNumber.Int64()) + + if proposedBlockNumber.Cmp(blockOnChain) < 1 { + log.Info(log.MEVPrefix+"Validating ProposedBlock failed", "blockNumber", args.BlockNumber.String(), "onChainBlockNumber", blockOnChain.String(), "onChainBlockHash", currentBlock.Hash().String(), "prevBlockHash", args.PrevBlockHash.String(), "mevRelay", args.MEVRelay) + return nil, fmt.Errorf("blockNumber is incorrect. proposedBlockNumber: %v onChainBlockNumber: %v onChainBlockHash %v", args.BlockNumber, blockOnChain, currentBlock.Hash().String()) + } + + for _, encodedTx := range args.Payload { + tx := new(types.Transaction) + if err := tx.UnmarshalBinary(encodedTx); err != nil { + return nil, err + } + txs = append(txs, tx) + } + + unRevertedHashes := make(map[common.Hash]struct{}, len(args.UnRevertedHashes)) + for _, hash := range args.UnRevertedHashes { + unRevertedHashes[hash] = struct{}{} + } + + simDuration, err := s.b.ProposedBlock(ctx, args.MEVRelay, proposedBlockNumber, args.PrevBlockHash, args.BlockReward, args.GasLimit, args.GasUsed, txs, unRevertedHashes) + if err != nil { + return nil, err + } + + return &ProposedBlockResponse{ + ReceivedAt: receivedAt.Format(timestampFormat), + SimulatedDuration: simDuration, + ResponseSentAt: time.Now().UTC().Format(timestampFormat), + }, nil +} + +type AddRelayArgs struct { + MEVRelay string `json:"mevRelay"` +} + +// AddRelay will submit the block to the miner worker +func (s *MevAPI) AddRelay(ctx context.Context, args AddRelayArgs) error { + return s.b.AddRelay(ctx, args.MEVRelay) +} + +type RemoveRelayArgs struct { + MEVRelay string `json:"mevRelay"` +} + +// RemoveRelay will submit the block to the miner worker +func (s *MevAPI) RemoveRelay(ctx context.Context, args RemoveRelayArgs) error { + return s.b.RemoveRelay(ctx, args.MEVRelay) +} + +// BlockNumber returns the block number of the chain head. +func (s *MevAPI) BlockNumber(ctx context.Context) (hexutil.Uint64, error) { + return hexutil.Uint64(s.b.BlockNumber(ctx)), nil +} diff --git a/internal/ethapi/api_blxr_test.go b/internal/ethapi/api_blxr_test.go new file mode 100644 index 0000000000..49671535ac --- /dev/null +++ b/internal/ethapi/api_blxr_test.go @@ -0,0 +1,18 @@ +package ethapi + +import ( + "context" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +func (b testBackend) ProposedBlock(context.Context, string, *big.Int, common.Hash, *big.Int, uint64, uint64, types.Transactions, map[common.Hash]struct{}) (time.Duration, error) { + return 0, nil +} + +func (b testBackend) AddRelay(context.Context, string) error { return nil } +func (b testBackend) RemoveRelay(context.Context, string) error { return nil } +func (b testBackend) BlockNumber(_ context.Context) uint64 { return 0 } diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index d71d7e8eba..44b404d00c 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -40,6 +40,8 @@ import ( // Backend interface provides the common API services (that are provided by // both full and light clients) with access to necessary functions. type Backend interface { + MEVBackend + // General Ethereum API SyncProgress() ethereum.SyncProgress @@ -105,7 +107,7 @@ type Backend interface { func GetAPIs(apiBackend Backend) []rpc.API { nonceLock := new(AddrLocker) - return []rpc.API{ + return append([]rpc.API{ { Namespace: "eth", Service: NewEthereumAPI(apiBackend), @@ -128,5 +130,5 @@ func GetAPIs(apiBackend Backend) []rpc.API { Namespace: "personal", Service: NewPersonalAccountAPI(apiBackend, nonceLock), }, - } + }, getMEVAPIs(apiBackend)...) } diff --git a/internal/ethapi/backend_blxr.go b/internal/ethapi/backend_blxr.go new file mode 100644 index 0000000000..b39a06eafc --- /dev/null +++ b/internal/ethapi/backend_blxr.go @@ -0,0 +1,28 @@ +package ethapi + +import ( + "context" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rpc" +) + +// MEVBackend interface provides the common API services (that are provided by +// both full and light clients) with access to MEV functions. +type MEVBackend interface { + ProposedBlock(ctx context.Context, mevRelay string, blockNumber *big.Int, prevBlockHash common.Hash, reward *big.Int, gasLimit uint64, gasUsed uint64, txs types.Transactions, unRevertedHashes map[common.Hash]struct{}) (simDuration time.Duration, err error) + AddRelay(ctx context.Context, mevRelay string) error + RemoveRelay(ctx context.Context, mevRelay string) error + BlockNumber(ctx context.Context) uint64 +} + +func getMEVAPIs(apiBackend Backend) []rpc.API { + mevAPI := NewMevAPI(apiBackend) + return []rpc.API{ + {Namespace: "mev", Service: mevAPI}, + {Namespace: "eth", Service: mevAPI}, + } +} diff --git a/internal/ethapi/transaction_args_blxr_test.go b/internal/ethapi/transaction_args_blxr_test.go new file mode 100644 index 0000000000..2ac275e20e --- /dev/null +++ b/internal/ethapi/transaction_args_blxr_test.go @@ -0,0 +1,18 @@ +package ethapi + +import ( + "context" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +func (b *backendMock) ProposedBlock(context.Context, string, *big.Int, common.Hash, *big.Int, uint64, uint64, types.Transactions, map[common.Hash]struct{}) (time.Duration, error) { + return 0, nil +} + +func (b *backendMock) AddRelay(context.Context, string) error { return nil } +func (b *backendMock) RemoveRelay(context.Context, string) error { return nil } +func (b *backendMock) BlockNumber(_ context.Context) uint64 { return 0 } diff --git a/internal/ethapi/transaction_options_test.go b/internal/ethapi/transaction_options_test.go index 479f46a145..1483a7c01a 100644 --- a/internal/ethapi/transaction_options_test.go +++ b/internal/ethapi/transaction_options_test.go @@ -31,7 +31,7 @@ func TestTransactionOptsJSONUnMarshalTrip(t *testing.T) { false, ethapi.TransactionOpts{ KnownAccounts: map[common.Address]ethapi.AccountStorage{ - common.HexToAddress("0x6b3A8798E5Fb9fC5603F3aB5eA2e8136694e55d0"): ethapi.AccountStorage{ + common.HexToAddress("0x6b3A8798E5Fb9fC5603F3aB5eA2e8136694e55d0"): { StorageRoot: ptr(common.HexToHash("0x290decd9548b62a8d60345a988386fc84ba6bc95484008f6362f93160ef3e563")), }, }, @@ -43,7 +43,7 @@ func TestTransactionOptsJSONUnMarshalTrip(t *testing.T) { false, ethapi.TransactionOpts{ KnownAccounts: map[common.Address]ethapi.AccountStorage{ - common.HexToAddress("0x6b3A8798E5Fb9fC5603F3aB5eA2e8136694e55d0"): ethapi.AccountStorage{ + common.HexToAddress("0x6b3A8798E5Fb9fC5603F3aB5eA2e8136694e55d0"): { StorageRoot: nil, StorageSlots: map[common.Hash]common.Hash{ common.HexToHash("0xc65a7bb8d6351c1cf70c95a316cc6a92839c986682d98bc35f958f4883f9d2a8"): common.HexToHash("0x"), diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index ae76af4090..dee74ba7c8 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -29,6 +29,7 @@ var Modules = map[string]string{ "rpc": RpcJs, "txpool": TxpoolJs, "dev": DevJs, + "mev": MEVJs, } const CliqueJs = ` @@ -614,6 +615,21 @@ web3._extend({ call: 'eth_getBlockReceipts', params: 1, }), + new web3._extend.Method({ + name: 'proposedBlock', + call: 'eth_proposedBlock', + params: 1, + }), + new web3._extend.Method({ + name: 'addRelay', + call: 'eth_addRelay', + params: 1, + }), + new web3._extend.Method({ + name: 'removeRelay', + call: 'eth_removeRelay', + params: 1, + }), ], properties: [ new web3._extend.Property({ diff --git a/internal/web3ext/web3ext_blxr.go b/internal/web3ext/web3ext_blxr.go new file mode 100644 index 0000000000..fecd3f6d00 --- /dev/null +++ b/internal/web3ext/web3ext_blxr.go @@ -0,0 +1,32 @@ +package web3ext + +const MEVJs = ` +web3._extend({ + property: 'mev', + methods: [ + new web3._extend.Method({ + name: 'proposedBlock', + call: 'mev_proposedBlock', + params: 1, + }), + new web3._extend.Method({ + name: 'addRelay', + call: 'mev_addRelay', + params: 1, + }), + new web3._extend.Method({ + name: 'removeRelay', + call: 'mev_removeRelay', + params: 1, + }), + ], + + properties: [ + new web3._extend.Property({ + name: 'blockNumber', + getter: 'mev_blockNumber', + outputFormatter: web3._extend.utils.toDecimal + }), + ], +}); +` diff --git a/log/prefixes_blxr.go b/log/prefixes_blxr.go new file mode 100644 index 0000000000..32a6c17ac1 --- /dev/null +++ b/log/prefixes_blxr.go @@ -0,0 +1,6 @@ +package log + +const ( + // MEVPrefix is the prefix for all MEV-related logs. + MEVPrefix = "[MEV] " +) diff --git a/miner/miner.go b/miner/miner.go index 4db6140803..fab4c612ae 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -56,6 +56,12 @@ type Config struct { NewPayloadTimeout time.Duration // The maximum time allowance for creating a new payload DisableVoteAttestation bool // Whether to skip assembling vote attestation + + MEVRelays []string `toml:",omitempty"` // RPC clients to register validator each epoch + PreferMEVRelays *AcceptRelayMap `toml:",omitempty"` // Prefer blocks from MEV relays + ProposedBlockUri string `toml:",omitempty"` // received proposedBlocks on that uri + ProposedBlockNamespace string `toml:",omitempty"` // define the namespace of proposedBlock + RegisterValidatorSignedHash []byte `toml:"-"` // signed value of crypto.Keccak256([]byte(ProposedBlockUri)) } // DefaultConfig contains default settings for miner. @@ -70,6 +76,7 @@ var DefaultConfig = Config{ Recommit: 3 * time.Second, NewPayloadTimeout: 2 * time.Second, DelayLeftOver: 50 * time.Millisecond, + PreferMEVRelays: NewAcceptRelayMap(), } // Miner creates blocks and searches for proof-of-work values. @@ -83,6 +90,11 @@ type Miner struct { worker *worker wg sync.WaitGroup + + mevRelays *ClientMapping + proposedBlockUri string + proposedBlockNamespace string + signedProposedBlockUri []byte } func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, isLocalBlock func(header *types.Header) bool) *Miner { @@ -94,6 +106,11 @@ func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *even startCh: make(chan struct{}), stopCh: make(chan struct{}), worker: newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, false), + + mevRelays: NewClientMap(config.MEVRelays), + proposedBlockUri: config.ProposedBlockUri, + proposedBlockNamespace: config.ProposedBlockNamespace, + signedProposedBlockUri: config.RegisterValidatorSignedHash, } miner.wg.Add(1) go miner.update() @@ -114,9 +131,18 @@ func (miner *Miner) update() { } }() + chainBlockCh := make(chan core.ChainHeadEvent, chainHeadChanSize) + + chainBlockSub := miner.eth.BlockChain().SubscribeChainBlockEvent(chainBlockCh) + defer chainBlockSub.Unsubscribe() + shouldStart := false canStart := true dlEventCh := events.Chan() + + // miner started at the middle of an epoch, we want to register it + miner.registerValidator() + for { select { case ev := <-dlEventCh: @@ -159,12 +185,19 @@ func (miner *Miner) update() { miner.worker.start() } shouldStart = true + + case block := <-chainBlockCh: + if isNewEpoch(block.Block) { + miner.registerValidator() + } case <-miner.stopCh: shouldStart = false miner.worker.stop() case <-miner.exitCh: miner.worker.close() return + case <-chainBlockSub.Err(): + return } } } diff --git a/miner/miner_blxr.go b/miner/miner_blxr.go new file mode 100644 index 0000000000..aa1fc8c0a1 --- /dev/null +++ b/miner/miner_blxr.go @@ -0,0 +1,307 @@ +package miner + +import ( + "context" + "encoding/json" + "fmt" + "math/big" + "sync" + "sync/atomic" + "time" + + "github.com/bloXroute-Labs/bx-mev-tools/pkg/maputil" + "github.com/ethereum/go-ethereum/blxr/blxerr" + "github.com/ethereum/go-ethereum/blxr/version" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/internal/ethapi" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rpc" + "github.com/pkg/errors" +) + +type ClientMap map[string]*rpc.Client + +type ClientMapping struct { + mx *sync.RWMutex + clientMap ClientMap +} + +func NewClientMap(relays []string) *ClientMapping { + c := &ClientMapping{ + mx: new(sync.RWMutex), + clientMap: make(ClientMap), + } + + for _, relay := range relays { + client, err := rpc.Dial(relay) + if err != nil { + log.Warn(log.MEVPrefix+"Failed to dial MEV relay", "dest", relay, "err", err) + continue + } + + c.clientMap[relay] = client + } + + return c +} + +func (c *ClientMapping) Len() int { + c.mx.RLock() + defer c.mx.RUnlock() + return len(c.clientMap) +} + +func (c *ClientMapping) Mapping() ClientMap { + clientMap := make(ClientMap, len(c.clientMap)) + + c.mx.RLock() + for k, v := range c.clientMap { + clientMap[k] = v + } + c.mx.RUnlock() + + return clientMap +} + +func (c *ClientMapping) Get(relay string) (*rpc.Client, bool) { + c.mx.RLock() + client, ok := c.clientMap[relay] + c.mx.RUnlock() + + return client, ok +} + +func (c *ClientMapping) Add(relay string) (*rpc.Client, error) { + c.mx.Lock() + defer c.mx.Unlock() + + client, err := rpc.Dial(relay) + if err != nil { + return nil, err + } + + c.clientMap[relay] = client + + return client, nil +} + +func (c *ClientMapping) Remove(relay string) error { + c.mx.Lock() + defer c.mx.Unlock() + + if _, ok := c.clientMap[relay]; !ok { + return fmt.Errorf("relay %s not found", relay) + } + + delete(c.clientMap, relay) + + return nil +} + +type ProposedBlockResult struct { + SimDuration time.Duration + Accepted bool + BestReward *big.Int +} + +// ProposedBlock add the block to the list of works. +// In case if the block cannot be accepted due to it is less profitable than the validator has, +// it returns blxerr.ProposedBlockLessProfitableErr error with a minimal reward threshold inside. +func (miner *Miner) ProposedBlock(ctx context.Context, mevRelay string, blockNumber *big.Int, prevBlockHash common.Hash, reward *big.Int, gasLimit uint64, gasUsed uint64, txs types.Transactions, unReverted map[common.Hash]struct{}) (simDuration time.Duration, err error) { + var ( + isBlockSkipped bool + simWork *bestProposedWork + ) + + endOfProposingWindow := time.Unix(int64(miner.eth.BlockChain().CurrentBlock().Time+miner.worker.chainConfig.Parlia.Period), 0).Add(-miner.worker.config.DelayLeftOver) + + timeout := time.Until(endOfProposingWindow) + if timeout <= 0 { + err = fmt.Errorf("proposed block is too late, end of proposing window %s, appeared %s later", endOfProposingWindow, common.PrettyDuration(timeout)) + return + } + + proposingCtx, proposingCancel := context.WithTimeout(ctx, timeout) + defer proposingCancel() + + currentGasLimit := atomic.LoadUint64(miner.worker.currentGasLimit) + previousBlockGasLimit := atomic.LoadUint64(miner.worker.prevBlockGasLimit) + defer func() { + logCtx := []any{ + "blockNumber", blockNumber, + "mevRelay", mevRelay, + "prevBlockHash", prevBlockHash.Hex(), + "proposedReward", reward, + "gasLimit", gasLimit, + "gasUsed", gasUsed, + "txCount", len(txs), + "unRevertedCount", len(unReverted), + "isBlockSkipped", isBlockSkipped, + "currentGasLimit", currentGasLimit, + "timestamp", time.Now().UTC().Format(timestampFormat), + "simDuration", simDuration, + } + + if err != nil { + logCtx = append(logCtx, "err", err) + } + + log.Debug(log.MEVPrefix+"Received proposedBlock", logCtx...) + }() + isBlockSkipped = gasUsed > currentGasLimit + if isBlockSkipped { + err = fmt.Errorf("proposed block gasUsed %v exceeds the current block gas limit %v", gasUsed, currentGasLimit) + return + } + desiredGasLimit := core.CalcGasLimit(previousBlockGasLimit, miner.worker.config.GasCeil) + if desiredGasLimit != gasLimit { + log.Warn(log.MEVPrefix+"proposedBlock has wrong gasLimit", "MEVRelay", mevRelay, "blockNumber", blockNumber, "validatorGasLimit", desiredGasLimit, "proposedBlockGasLimit", gasLimit) + err = fmt.Errorf("proposed block gasLimit %v is different than the validator gasLimit %v", gasLimit, desiredGasLimit) + return + } + args := &ProposedBlockArgs{ + mevRelay: mevRelay, + blockNumber: blockNumber, + prevBlockHash: prevBlockHash, + blockReward: reward, + gasLimit: gasLimit, + gasUsed: gasUsed, + txs: txs, + unReverted: unReverted, + } + simWork, simDuration, err = miner.worker.simulateProposedBlock(proposingCtx, args) + if err != nil { + err = fmt.Errorf("processing and simulating proposedBlock failed, %v", err) + return + } + if simWork == nil { + // do not return error, when the block is skipped + return + } + + select { + case <-proposingCtx.Done(): + return simDuration, errors.WithMessage(proposingCtx.Err(), "failed to propose block due to context timeout") + default: + // unblock and continue + } + + accepted, bestReward := miner.worker.handleProposedBlock(&ProposedBlock{args: args, simulatedWork: simWork, simDuration: simDuration}) + if !accepted { + return simDuration, blxerr.ProposedBlockLessProfitableErr{RewardThreshold: bestReward} + } + + return simDuration, nil +} + +func (miner *Miner) registerValidator() { + log.Info(log.MEVPrefix + "register validator to MEV relays") + registerValidatorArgs := ðapi.RegisterValidatorArgs{ + Data: []byte(miner.proposedBlockUri), + Signature: miner.signedProposedBlockUri, + Namespace: miner.proposedBlockNamespace, + CommitHash: version.CommitHash(), + GasCeil: miner.worker.config.GasCeil, + } + for dest, destClient := range miner.mevRelays.Mapping() { + go func(dest string, destinationClient *rpc.Client, registerValidatorArgs *ethapi.RegisterValidatorArgs) { + var result any + + if err := destinationClient.Call( + &result, "eth_registerValidator", registerValidatorArgs, + ); err != nil { + log.Warn(log.MEVPrefix+"Failed to register validator to MEV relay", "dest", dest, "err", err) + return + } + + log.Debug(log.MEVPrefix+"register validator to MEV relay", "dest", dest, "result", result) + }(dest, destClient, registerValidatorArgs) + } +} + +func (miner *Miner) AddRelay(ctx context.Context, relay string) error { + client, err := miner.mevRelays.Add(relay) + if err != nil { + return err + } + + log.Info(log.MEVPrefix+"register validator to MEV relay", "dest", relay) + registerValidatorArgs := ðapi.RegisterValidatorArgs{ + Data: []byte(miner.proposedBlockUri), + Signature: miner.signedProposedBlockUri, + Namespace: miner.proposedBlockNamespace, + CommitHash: version.CommitHash(), + GasCeil: miner.worker.config.GasCeil, + } + + var result any + + if err = client.CallContext( + ctx, &result, "eth_registerValidator", registerValidatorArgs, + ); err != nil { + log.Warn(log.MEVPrefix+"Failed to register validator to MEV relay", "dest", relay, "err", err) + return err + } + + log.Debug(log.MEVPrefix+"register validator to MEV relay", "dest", relay, "result", result) + + return nil +} + +func (miner *Miner) RemoveRelay(relay string) error { + return miner.mevRelays.Remove(relay) +} + +func isNewEpoch(block *types.Block) bool { + return block.NumberU64()%params.BSCChainConfig.Parlia.Epoch == 0 +} + +// AcceptRelayMap is a map of MEV relays that the miner accepts +type AcceptRelayMap struct { + relays *maputil.Set[string] + len int +} + +// NewAcceptRelayMap creates a new AcceptRelayMap +func NewAcceptRelayMap(relays ...string) *AcceptRelayMap { + return new(AcceptRelayMap).fromSlice(relays...) +} + +func (m *AcceptRelayMap) fromSlice(relays ...string) *AcceptRelayMap { + m.relays = maputil.SetOf(relays...) + m.len = m.relays.Size() + + return m +} + +// Accept returns true if the miner accepts the MEV relay +func (m *AcceptRelayMap) Accept(mevRelay string) bool { + if m.len == 0 || m.relays == nil { + return true + } + + return m.relays.Has(mevRelay) +} + +// MarshalText converts the AcceptRelayMap object into a text representation. +func (m *AcceptRelayMap) MarshalText() (text []byte, err error) { + acceptRelays := make([]string, 0, m.len) + m.relays.Each(func(key string) bool { acceptRelays = append(acceptRelays, key); return true }) + + return json.Marshal(acceptRelays) +} + +// UnmarshalText parses the text representation back into an AcceptRelayMap object. +func (m *AcceptRelayMap) UnmarshalText(text []byte) (err error) { + var acceptRelays []string + if err = json.Unmarshal(text, &acceptRelays); err != nil { + return + } + + m.fromSlice(acceptRelays...) + + return +} diff --git a/miner/miner_blxr_test.go b/miner/miner_blxr_test.go new file mode 100644 index 0000000000..32f13b9ef1 --- /dev/null +++ b/miner/miner_blxr_test.go @@ -0,0 +1,46 @@ +package miner + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestAcceptRelayMap(t *testing.T) { + t.Run("MarshalUnmarshal", func(t *testing.T) { + originalRelays := []string{"relay1", "relay2", "relay3"} + arm := NewAcceptRelayMap(originalRelays...) + + // Marshal + text, err := arm.MarshalText() + require.NoError(t, err) + + // Unmarshal + arm2 := NewAcceptRelayMap() + require.NoError(t, arm2.UnmarshalText(text)) + require.Equal(t, arm.len, arm2.len) + + for _, relay := range append(originalRelays, "relay4") { + require.Equal(t, arm.Accept(relay), arm2.Accept(relay)) + } + }) + + t.Run("Empty", func(t *testing.T) { + arm := NewAcceptRelayMap() + + // Marshal and Unmarshal an empty map + text, err := arm.MarshalText() + require.NoError(t, err) + + arm2 := NewAcceptRelayMap() + require.NoError(t, arm2.UnmarshalText(text)) + require.Equal(t, arm.len, arm2.len) + require.Equal(t, arm.relays.Size(), arm2.relays.Size()) + require.Equal(t, arm.relays, arm2.relays) + }) + + t.Run("UnmarshalError", func(t *testing.T) { + // Check if an error is returned due to invalid JSON + require.Error(t, new(AcceptRelayMap).UnmarshalText([]byte("not a valid json"))) + }) +} diff --git a/miner/worker.go b/miner/worker.go index 0b8f248742..b4c0bd75e4 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -219,31 +219,42 @@ type worker struct { fullTaskHook func() // Method to call before pushing the full sealing task. resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval. recentMinedBlocks *lru.Cache + + // Proposed block + bestProposedBlockLock sync.RWMutex + // Key (k): The block number (blockNum) + // Value (v): A nested map representing the mapping between (k - prevBlockHash) and the corresponding proposed work(v - environment,reward) + bestProposedBlockInfo map[uint64]bestProposedWorks + currentGasLimit *uint64 + prevBlockGasLimit *uint64 } func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(header *types.Header) bool, init bool) *worker { recentMinedBlocks, _ := lru.New(recentMinedCacheLimit) worker := &worker{ - prefetcher: core.NewStatePrefetcher(chainConfig, eth.BlockChain(), engine), - config: config, - chainConfig: chainConfig, - engine: engine, - eth: eth, - chain: eth.BlockChain(), - mux: mux, - isLocalBlock: isLocalBlock, - coinbase: config.Etherbase, - extra: config.ExtraData, - pendingTasks: make(map[common.Hash]*task), - chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), - newWorkCh: make(chan *newWorkReq), - getWorkCh: make(chan *getWorkReq), - taskCh: make(chan *task), - resultCh: make(chan *types.Block, resultQueueSize), - startCh: make(chan struct{}, 1), - exitCh: make(chan struct{}), - resubmitIntervalCh: make(chan time.Duration), - recentMinedBlocks: recentMinedBlocks, + prefetcher: core.NewStatePrefetcher(chainConfig, eth.BlockChain(), engine), + config: config, + chainConfig: chainConfig, + engine: engine, + eth: eth, + chain: eth.BlockChain(), + mux: mux, + isLocalBlock: isLocalBlock, + coinbase: config.Etherbase, + extra: config.ExtraData, + pendingTasks: make(map[common.Hash]*task), + chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), + newWorkCh: make(chan *newWorkReq), + getWorkCh: make(chan *getWorkReq), + taskCh: make(chan *task), + resultCh: make(chan *types.Block, resultQueueSize), + startCh: make(chan struct{}, 1), + exitCh: make(chan struct{}), + resubmitIntervalCh: make(chan time.Duration), + recentMinedBlocks: recentMinedBlocks, + currentGasLimit: new(uint64), + prevBlockGasLimit: new(uint64), + bestProposedBlockInfo: make(map[uint64]bestProposedWorks), } // Subscribe events for blockchain worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) @@ -267,11 +278,12 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus } worker.newpayloadTimeout = newpayloadTimeout - worker.wg.Add(4) + worker.wg.Add(5) go worker.mainLoop() go worker.newWorkLoop(recommit) go worker.resultLoop() go worker.taskLoop() + go worker.proposedLoop() // Submit first work to initialize pending state. if init { @@ -408,7 +420,14 @@ func (w *worker) newWorkLoop(recommit time.Duration) { for { select { case <-w.startCh: - clearPending(w.chain.CurrentBlock().Number.Uint64()) + currentChainNumber := w.chain.CurrentBlock().Number.Uint64() + clearPending(currentChainNumber) + w.bestProposedBlockLock.Lock() + if proposedWorks, ok := w.bestProposedBlockInfo[currentChainNumber]; ok { + proposedWorks.discard() + delete(w.bestProposedBlockInfo, currentChainNumber) + } + w.bestProposedBlockLock.Unlock() timestamp = time.Now().Unix() commit(commitInterruptNewHead) @@ -689,18 +708,23 @@ func (w *worker) updateSnapshot(env *environment) { } func (w *worker) commitTransaction(env *environment, tx *txpool.Transaction, receiptProcessors ...core.ReceiptProcessor) ([]*types.Log, error) { + return w.commitTransactionOld(env, tx.Tx, receiptProcessors...) +} + +// TODO: (Michael Kalashnikov) [BEFORE v1.3.5] check how can it handled in native way instead of creating separate method. +func (w *worker) commitTransactionOld(env *environment, tx *types.Transaction, receiptProcessors ...core.ReceiptProcessor) ([]*types.Log, error) { var ( snap = env.state.Snapshot() gp = env.gasPool.Gas() ) - receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &env.coinbase, env.gasPool, env.state, env.header, tx.Tx, &env.header.GasUsed, *w.chain.GetVMConfig(), receiptProcessors...) + receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &env.coinbase, env.gasPool, env.state, env.header, tx, &env.header.GasUsed, *w.chain.GetVMConfig(), receiptProcessors...) if err != nil { env.state.RevertToSnapshot(snap) env.gasPool.SetGas(gp) return nil, err } - env.txs = append(env.txs, tx.Tx) + env.txs = append(env.txs, tx) env.receipts = append(env.receipts, receipt) return receipt.Logs, nil @@ -967,8 +991,12 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, *big.Int, e log.Warn("Block building is interrupted", "allowance", common.PrettyDuration(w.newpayloadTimeout)) } } - fees := work.state.GetBalance(consensus.SystemAddress) - block, _, err := w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, nil, work.receipts, params.withdrawals) + + bestWork := work.copy() + defer func() { bestWork.discard() }() // wrapping with a function since we reassign bestWork later + + fees := bestWork.state.GetBalance(consensus.SystemAddress) + block, _, err := w.engine.FinalizeAndAssemble(w.chain, bestWork.header, bestWork.state, bestWork.txs, nil, bestWork.receipts, params.withdrawals) if err != nil { return nil, nil, err } @@ -1029,6 +1057,7 @@ LOOP: } prevWork = work workList = append(workList, work) + atomic.StoreUint64(w.currentGasLimit, work.header.GasLimit) delay := w.engine.Delay(w.chain, work.header, &w.config.DelayLeftOver) if delay == nil { @@ -1131,15 +1160,16 @@ LOOP: } // get the most profitable work bestWork := workList[0] - bestReward := new(big.Int) + bestReward := big.NewInt(0) for i, wk := range workList { balance := wk.state.GetBalance(consensus.SystemAddress) log.Debug("Get the most profitable work", "index", i, "balance", balance, "bestReward", bestReward) if balance.Cmp(bestReward) > 0 { bestWork = wk - bestReward = balance + bestReward.Set(balance) } } + bestWork = w.getBestWorkBetweenInternalAndProposedBlock(bestWork, callerTypeCommitWork) w.commit(bestWork, w.fullTaskHook, true, start) // Swap out the old work with the new one, terminating any leftover diff --git a/miner/worker_blxr.go b/miner/worker_blxr.go new file mode 100644 index 0000000000..6c07715ded --- /dev/null +++ b/miner/worker_blxr.go @@ -0,0 +1,403 @@ +package miner + +import ( + "context" + "errors" + "fmt" + "math/big" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" +) + +const ( + // inTurnDifficulty when validator has main proposer duty + inTurnDifficulty = 2 + + //callerType to getBestWork + callerTypeGenerateWork = "generateWork" + callerTypeCommitWork = "commitWork" + + // timestamp format + timestampFormat = "2006-01-02 15:04:05.000000" +) + +var zeroReward = big.NewInt(0) + +// ProposedBlockArgs defines the argument of a proposed block +type ProposedBlockArgs struct { + mevRelay string + blockNumber *big.Int + prevBlockHash common.Hash + blockReward *big.Int + gasLimit uint64 + gasUsed uint64 + txs types.Transactions + unReverted map[common.Hash]struct{} +} + +// ProposedBlock defines the argument of a proposed block and simulated work +type ProposedBlock struct { + args *ProposedBlockArgs + simulatedWork *bestProposedWork + simDuration time.Duration +} + +type bestProposedWorks map[string]*bestProposedWork + +// discard is unsafe operation, caller needs to protect before performing discard +func (b bestProposedWorks) discard() { + for prevHash, w := range b { + if w.work != nil { + w.work.discard() + } + delete(b, prevHash) + } +} + +type bestProposedWork struct { + work *environment + reward *big.Int + mevRelay string +} + +func (w *worker) AcceptProposedBlock(mevRelay string) bool { + if w.config == nil || w.config.PreferMEVRelays == nil { + return true + } + + return w.config.PreferMEVRelays.Accept(mevRelay) +} + +// isBlockMoreProfitableUnsafe is a concurrency unsafe implementation of isBlockMoreProfitable. +func (w *worker) isBlockMoreProfitableUnsafe(num uint64, prevHash string, reward *big.Int) (bool, *big.Int, *environment) { + // NOTE: if you need to add more returning values, please create a structure to return to prevent growing values. + + bestWorks, ok := w.bestProposedBlockInfo[num] + if !ok || bestWorks == nil { + return true, big.NewInt(0), nil + } + + best, ok := bestWorks[prevHash] + if !ok || best == nil || best.reward == nil { + return true, big.NewInt(0), nil + } + + // compare if the best knowing reward is higher than the proposed block reward + return reward.Cmp(best.reward) == 1, best.reward, best.work +} + +// isBlockMoreProfitable is a concurrency safe function to return true if the given block is more profitable than knowing blocks. +// The function also returns reward and work of the best known block. +func (w *worker) isBlockMoreProfitable(num uint64, prevHash string, reward *big.Int) (bool, *big.Int, *environment) { + w.bestProposedBlockLock.RLock() + defer w.bestProposedBlockLock.RUnlock() + + return w.isBlockMoreProfitableUnsafe(num, prevHash, reward) +} + +// return true if block was accepted and previous best reward +func (w *worker) handleProposedBlock(block *ProposedBlock) (bool, *big.Int) { + var ( + blockNum = block.args.blockNumber.Uint64() + prevBlockHash = block.args.prevBlockHash.String() + blockReward = block.simulatedWork.reward + ) + + if !w.AcceptProposedBlock(block.args.mevRelay) { + log.Debug(log.MEVPrefix+"Received proposedBlock from non-preferred mevRelay", "blockNumber", blockNum, "prevBlockHash", prevBlockHash, "newProposedBlockReward", blockReward, "newProposedBlockGasLimit", block.args.gasLimit, "newProposedBlockGasUsed", block.args.gasUsed, "newProposedBlockTxCount", len(block.args.txs), "mevRelay", block.args.mevRelay, "timestamp", time.Now().UTC().Format(timestampFormat)) + return false, blockReward + } + + // Skip if the proposed block is less profitable than a known one. + if profitable, bestReward, _ := w.isBlockMoreProfitable(blockNum, prevBlockHash, blockReward); !profitable { + log.Debug(log.MEVPrefix+"Skipping proposedBlock", "blockNumber", blockNum, "prevBlockHash", prevBlockHash, "newProposedBlockReward", blockReward, "previousProposedBlockReward", bestReward, "newProposedBlockGasLimit", block.args.gasLimit, "newProposedBlockGasUsed", block.args.gasUsed, "newProposedBlockTxCount", len(block.args.txs), "mevRelay", block.args.mevRelay, "timestamp", time.Now().UTC().Format(timestampFormat)) + return false, bestReward + } + + w.bestProposedBlockLock.Lock() + defer w.bestProposedBlockLock.Unlock() + + bestWorks, ok := w.bestProposedBlockInfo[blockNum] + // It's the first block for this block number + if (!ok || bestWorks == nil) && block.simulatedWork.work != nil { + w.bestProposedBlockInfo[blockNum] = bestProposedWorks{prevBlockHash: block.simulatedWork} + log.Info(log.MEVPrefix+"Received proposedBlock, this is the first proposed block for this block number and previous block hash", "blockNumber", block.args.blockNumber, "prevBlockHash", block.args.prevBlockHash.String(), "newProposedBlockReward", blockReward, "newProposedBlockGasUsed", block.args.gasUsed, "mevRelay", block.args.mevRelay, "newProposedBlockTxCount", len(block.args.txs), "simulatedDuration", block.simDuration, "timestamp", time.Now().UTC().Format(timestampFormat)) + return true, zeroReward + } + + // Double check if the proposed block is more profitable than a known block. + // We need this check because of a possible map mutations between mutex RUnlock and mutex Lock. + // bestReward and bestWork are Reward and Work of the best block we know excluding the proposed one. + profitable, bestReward, bestWork := w.isBlockMoreProfitableUnsafe(blockNum, prevBlockHash, blockReward) + if !profitable { + log.Info(log.MEVPrefix+"Received proposedBlock reward is not higher than previously proposed block", "blockNumber", blockNum, "prevBlockHash", prevBlockHash, "previousProposedBlockReward", bestReward, "newProposedBlockReward", blockReward, "newProposedBlockTxCount", len(block.simulatedWork.work.txs), "newProposedBlockGasUsed", block.args.gasUsed, "mevRelay", block.args.mevRelay, "simulatedDuration", block.simDuration, "timestamp", time.Now().UTC().Format(timestampFormat)) + return false, bestReward + } + + // The proposed block is more profitable than an existing one. + // Override the best block with the current one. + log.Info(log.MEVPrefix+"Received proposedBlock, replacing previously proposedBlock after simulation", "blockNumber", blockNum, "prevBlockHash", prevBlockHash, "previousProposedBlockReward", bestReward, "newProposedBlockReward", blockReward, "newProposedBlockTxCount", len(block.simulatedWork.work.txs), "mevRelay", block.args.mevRelay, "newProposedBlockGasUsed", block.args.gasUsed, "simulatedDuration", block.simDuration, "timestamp", time.Now().UTC().Format(timestampFormat)) + w.bestProposedBlockInfo[blockNum][prevBlockHash] = block.simulatedWork + + if bestWork != nil { + bestWork.discard() + } + + return true, bestReward +} + +// proposedLoop is responsible for generating and submitting sealing work based on +// proposed blocks +func (w *worker) proposedLoop() { + chainBlockCh := make(chan core.ChainHeadEvent, chainHeadChanSize) + + chainBlockSub := w.eth.BlockChain().SubscribeChainBlockEvent(chainBlockCh) + + defer w.wg.Done() + defer chainBlockSub.Unsubscribe() + defer func() { + if w.current != nil { + w.current.discard() + } + }() + + for { + select { + case block := <-chainBlockCh: + // each block will have its own interruptCh to stop work with a reason + atomic.StoreUint64(w.prevBlockGasLimit, block.Block.GasLimit()) + worksToDiscard := make([]bestProposedWorks, 0) + w.bestProposedBlockLock.Lock() + for blockNumber, works := range w.bestProposedBlockInfo { + if blockNumber <= w.chain.CurrentBlock().Number.Uint64() { + worksToDiscard = append(worksToDiscard, works) + delete(w.bestProposedBlockInfo, blockNumber) + } + } + w.bestProposedBlockLock.Unlock() + for _, works := range worksToDiscard { + works.discard() + } + + // System stopped + case <-w.exitCh: + return + case <-chainBlockSub.Err(): + return + } + } +} + +func (w *worker) getBestWorkBetweenInternalAndProposedBlock(internalWork *environment, callerType string) *environment { + var ( + preferProposedBlock bool + internalBlockReward = new(big.Int).Set(internalWork.state.GetBalance(consensus.SystemAddress)) // added for logging + bestReward = new(big.Int).Set(internalBlockReward) + bestWork = internalWork + proposedBlockReward = new(big.Int) + validatorHasMainProposerDuty = internalWork.header.Difficulty.Cmp(new(big.Int).SetInt64(inTurnDifficulty)) == 0 + ) + + if !validatorHasMainProposerDuty { + // To prevent bundle leakage + return bestWork + } + + w.bestProposedBlockLock.RLock() + defer w.bestProposedBlockLock.RUnlock() + + logCtx := []any{ + "blockNumber", bestWork.header.Number, + "prevBlockHash", bestWork.header.ParentHash.String(), + "type", callerType, + "timestamp", time.Now().UTC().Format(timestampFormat), + } + + works, ok := w.bestProposedBlockInfo[internalWork.header.Number.Uint64()] + if !ok || works == nil { + log.Info(log.MEVPrefix+"Prefer internal or proposedBlock", append(logCtx, "internalBlockReward", internalBlockReward, "preferProposedBlock", preferProposedBlock)...) + return bestWork + } + + if proposedBlock, exist := works[internalWork.header.ParentHash.String()]; exist && proposedBlock != nil && proposedBlock.work != nil { + preferProposedBlock = proposedBlock.reward != nil && proposedBlock.reward.Cmp(bestReward) > 0 && w.AcceptProposedBlock(proposedBlock.mevRelay) + proposedBlockReward.Set(proposedBlock.reward) + if preferProposedBlock { + logCtx = append(logCtx, "mevRelay", proposedBlock.mevRelay) + if bestWork != nil { + bestWork.discard() + } + bestWork = proposedBlock.work + bestReward.Set(proposedBlock.reward) + } + } + + log.Info(log.MEVPrefix+"Prefer internal or proposedBlock", append(logCtx, "internalBlockReward", internalBlockReward, "preferProposedBlock", preferProposedBlock, "proposedBlockReward", proposedBlockReward)...) + return bestWork +} + +// fillTransactionsProposedBlock retrieves the pending transactions from the txpool and fills them +// into the given sealing block. The transaction selection and ordering strategy can +// be customized with the plugin in the future. +func (w *worker) fillTransactionsProposedBlock(ctx context.Context, env *environment, block *ProposedBlockArgs) (error, *big.Int) { + gasLimit := env.header.GasLimit + if env.gasPool == nil { + env.gasPool = new(core.GasPool).AddGas(gasLimit) + if w.chain.Config().IsEuler(env.header.Number) { + env.gasPool.SubGas(params.SystemTxsGas * 3) + } else { + env.gasPool.SubGas(params.SystemTxsGas) + } + } + + var coalescedLogs []*types.Log + // initialize bloom processors + processorCapacity := 100 + bloomProcessors := core.NewAsyncReceiptBloomGenerator(processorCapacity) + + var blockReward *big.Int + + signal := commitInterruptNone + for i, tx := range block.txs { + select { + default: + case <-ctx.Done(): + err := ctx.Err() + log.Trace("Filling of transaction stopped", "canceledAt", fmt.Sprintf("%d/%d", i, len(block.txs)), "err", err) + return err, nil + } + + // If we don't have enough gas for any further transactions then we're done + if env.gasPool.Gas() < params.TxGas { + log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas) + signal = commitInterruptOutOfGas + break + } + + if tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) { + return errors.New("block payload is incorrect"), nil + } + + txHash := tx.Hash() + + // Start executing the transaction + env.state.SetTxContext(txHash, env.tcount) + + _, err := w.commitTransactionOld(env, tx, bloomProcessors) + if err != nil { + log.Error(log.MEVPrefix+"Failed to commit transaction on proposedBlock", "blockNumber", block.blockNumber.String(), "fromMevRelay", block.mevRelay, "proposedTxs", len(block.txs), "failedTx", txHash.String()) + return err, nil + } + + if env.receipts[len(env.receipts)-1].Status == types.ReceiptStatusFailed { + if _, ok := block.unReverted[txHash]; ok { + log.Warn(log.MEVPrefix+"bundle reverted proposedBlock", "blockNumber", block.blockNumber, "fromMevRelay", block.mevRelay, "revertedTx", txHash.Hex()) + return errors.New("bundle reverted"), nil + } + } + } + + tcount := len(env.txs) + blockReward = env.state.GetBalance(consensus.SystemAddress) + log.Debug(log.MEVPrefix+"Processing proposedBlock", "blockNumber", block.blockNumber.String(), + "fromMevRelay", block.mevRelay, + "proposedReward", block.blockReward, "actualReward", blockReward, + "proposedGasUsed", block.gasUsed, "actualGasUsed", env.receipts[len(env.receipts)-1].CumulativeGasUsed, + "proposedTxsCount", len(block.txs), "actualTxsCount", tcount) + + if tcount < len(block.txs) { + return errors.New("block parameters mismatch"), nil + } + + bloomProcessors.Close() + if !w.isRunning() && len(coalescedLogs) > 0 { + // We don't push the pendingLogsEvent while we are sealing. The reason is that + // when we are sealing, the worker will regenerate a sealing block every 3 seconds. + // In order to avoid pushing the repeated pendingLog, we disable the pending log pushing. + + // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined + // logs by filling in the block hash when the block was mined by the local miner. This can + // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed. + cpy := make([]*types.Log, len(coalescedLogs)) + for i, l := range coalescedLogs { + cpy[i] = new(types.Log) + *cpy[i] = *l + } + w.pendingLogsFeed.Send(cpy) + } + return signalToErr(signal), blockReward +} + +// simulateProposedBlock generates a sealing block based on a proposed block. +func (w *worker) simulateProposedBlock(ctx context.Context, proposedBlock *ProposedBlockArgs) (*bestProposedWork, time.Duration, error) { + var ( + start = time.Now() + reward = new(big.Int).Set(proposedBlock.blockReward) + ) + + w.bestProposedBlockLock.RLock() + if bestWorks, ok := w.bestProposedBlockInfo[proposedBlock.blockNumber.Uint64()]; ok && bestWorks != nil { + previousProposedBlockWork, exist := bestWorks[proposedBlock.prevBlockHash.String()] + if exist && previousProposedBlockWork != nil && previousProposedBlockWork.reward != nil { + if previousProposedBlockWork.reward.Cmp(reward) > 0 { + log.Debug(log.MEVPrefix+"Skipping proposedBlock", "blockNumber", proposedBlock.blockNumber, "prevBlockHash", proposedBlock.prevBlockHash.String(), "newProposedBlockReward", reward, "previousProposedBlockReward", previousProposedBlockWork.reward, "newProposedBlockGasLimit", proposedBlock.gasLimit, "newProposedBlockGasUsed", proposedBlock.gasUsed, "newProposedBlockTxCount", len(proposedBlock.txs), "mevRelay", proposedBlock.mevRelay, "timestamp", time.Now().UTC().Format(timestampFormat)) + w.bestProposedBlockLock.RUnlock() + return nil, 0, nil + } + } + } + w.bestProposedBlockLock.RUnlock() + + // Set the coinbase if the worker is running or it's required + var coinbase common.Address + if w.isRunning() { + if w.coinbase == (common.Address{}) { + return nil, time.Since(start), errors.New("refusing to mine without etherbase") + } + coinbase = w.coinbase // Use the preset address as the fee recipient + } + + work, err := w.prepareWork(&generateParams{ + timestamp: uint64(time.Now().Unix()), + coinbase: coinbase, + }) + if err != nil { + return nil, time.Since(start), err + } + defer work.discard() + + select { + default: + case <-ctx.Done(): + return nil, time.Since(start), ctx.Err() + } + + // Fill transactions from the proposed block + err, blockReward := w.fillTransactionsProposedBlock(ctx, work, proposedBlock) + if err != nil { + return nil, time.Since(start), err + } + + nextBlock := big.NewInt(0).Add(big.NewInt(1), w.eth.BlockChain().CurrentBlock().Number) + + if nextBlock.Cmp(proposedBlock.blockNumber) != 0 { + // block was changed during validation, need to ignore this proposedBlock + return nil, time.Since(start), errors.New("chain changed") + } + + bestWork := &bestProposedWork{ + work: work.copy(), + reward: new(big.Int).Set(blockReward), + mevRelay: proposedBlock.mevRelay, + } + totalDuration := time.Since(start) + log.Debug(log.MEVPrefix+"simulated proposedBlock", "blockNumber", proposedBlock.blockNumber, "blockReward", blockReward, "fromMevRelay", proposedBlock.mevRelay, "duration", totalDuration, "timestamp", time.Now().UTC().Format(timestampFormat)) + return bestWork, totalDuration, nil +} diff --git a/miner/worker_blxr_test.go b/miner/worker_blxr_test.go new file mode 100644 index 0000000000..975da19068 --- /dev/null +++ b/miner/worker_blxr_test.go @@ -0,0 +1,331 @@ +package miner + +import ( + "context" + "math/big" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/params" + "github.com/stretchr/testify/assert" +) + +const ( + testPreferableRelay = "preferable" + testNonPreferableRelay = "non-preferable" +) + +var ( + testConfigWithPreferRelay = &Config{ + Recommit: time.Second, + GasCeil: params.GenesisGasLimit, + PreferMEVRelays: NewAcceptRelayMap(testPreferableRelay), + } +) + +func Test_worker_isBlockMoreProfitable(t *testing.T) { + t.Run("no blocks information", func(t *testing.T) { + var ( + w = worker{} + blockNum uint64 = 200 + prevHash = hash("HASH-199").String() + blockReward = big.NewInt(1000) + ) + profitable, reward, work := w.isBlockMoreProfitableUnsafe(blockNum, prevHash, blockReward) + + assert.True(t, profitable) + assert.Equal(t, uint64(0), reward.Uint64()) + assert.Nil(t, work) + }) + + t.Run("no info about the block", func(t *testing.T) { + var ( + w = worker{ + bestProposedBlockInfo: blockInfoBuilder(199, hash("HASH-198"), 100, testPreferableRelay), + } + blockNum uint64 = 200 + prevHash = hash("HASH-199").String() + blockReward = big.NewInt(1000) + ) + profitable, reward, work := w.isBlockMoreProfitableUnsafe(blockNum, prevHash, blockReward) + + assert.True(t, profitable) + assert.Equal(t, uint64(0), reward.Uint64()) + assert.Nil(t, work) + }) + + t.Run("no info about the hash", func(t *testing.T) { + var ( + w = worker{ + // imagine we know Block-200 on top of Block-198 + bestProposedBlockInfo: blockInfoBuilder(200, hash("HASH-198"), 100, testPreferableRelay), + } + blockNum uint64 = 200 + prevHash = hash("HASH-199").String() + blockReward = big.NewInt(1000) + ) + profitable, reward, work := w.isBlockMoreProfitableUnsafe(blockNum, prevHash, blockReward) + + assert.True(t, profitable) + assert.Equal(t, uint64(0), reward.Uint64()) + assert.Nil(t, work) + }) + + t.Run("more profitable", func(t *testing.T) { + var ( + w = worker{ + bestProposedBlockInfo: blockInfoBuilder(200, hash("HASH-199"), 100, testPreferableRelay), + } + blockNum uint64 = 200 + prevHash = hash("HASH-199").String() + blockReward = big.NewInt(1000) + ) + profitable, reward, work := w.isBlockMoreProfitableUnsafe(blockNum, prevHash, blockReward) + + assert.True(t, profitable) + assert.Equal(t, uint64(100), reward.Uint64()) + assert.NotNil(t, work) + }) + + t.Run("less profitable", func(t *testing.T) { + var ( + w = worker{ + bestProposedBlockInfo: blockInfoBuilder(200, hash("HASH-199"), 5000, testPreferableRelay), + } + blockNum uint64 = 200 + prevHash = hash("HASH-199").String() + blockReward = big.NewInt(1000) + ) + profitable, reward, work := w.isBlockMoreProfitableUnsafe(blockNum, prevHash, blockReward) + + assert.False(t, profitable) + assert.Equal(t, uint64(5000), reward.Uint64()) + assert.NotNil(t, work) + }) +} + +func requireMutexIsNotBlocked(t *testing.T, w *worker, block ProposedBlock) { + t.Run("test if previous call has not blocked mutex", func(t *testing.T) { + var ( + done = make(chan struct{}) + ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) + ) + defer cancel() + + go func() { + w.handleProposedBlock(&block) + close(done) + }() + + select { + case <-ctx.Done(): + t.Fatalf("prevous call didn't unlock mutex") + case <-done: + // fine + } + }) +} + +func Test_worker_handleProposedBlock(t *testing.T) { + t.Run("no blocks information", func(t *testing.T) { + var ( + w = worker{ + bestProposedBlockLock: sync.RWMutex{}, + bestProposedBlockInfo: make(map[uint64]bestProposedWorks), + config: testConfigWithPreferRelay, + } + block = proposedBlockBuilder(200, hash("HASH-199"), 1000, testPreferableRelay) + ) + + accepted, bestReward := w.handleProposedBlock(&block) + assert.True(t, accepted) + assert.Equal(t, zeroReward, bestReward) + + acceptedBlock := w.bestProposedBlockInfo[200][block.args.prevBlockHash.String()] + assert.Equal(t, block.simulatedWork, acceptedBlock) + + requireMutexIsNotBlocked(t, &w, block) + }) + + t.Run("no current block information", func(t *testing.T) { + var ( + w = worker{ + bestProposedBlockLock: sync.RWMutex{}, + bestProposedBlockInfo: blockInfoBuilder(199, hash("HASH-198"), 1000, testPreferableRelay), + config: testConfigWithPreferRelay, + } + block = proposedBlockBuilder(200, hash("HASH-199"), 1000, testPreferableRelay) + ) + + accepted, bestReward := w.handleProposedBlock(&block) + assert.True(t, accepted) + assert.Equal(t, zeroReward, bestReward) + + acceptedBlock := w.bestProposedBlockInfo[200][block.args.prevBlockHash.String()] + assert.Equal(t, block.simulatedWork, acceptedBlock) + + requireMutexIsNotBlocked(t, &w, block) + }) + + t.Run("no blockNumber+prevBlockHash information", func(t *testing.T) { + var ( + w = worker{ + bestProposedBlockLock: sync.RWMutex{}, + bestProposedBlockInfo: blockInfoBuilder(200, hash("HASH-198"), 1000, testPreferableRelay), + config: testConfigWithPreferRelay, + } + block = proposedBlockBuilder(200, hash("HASH-199"), 1000, testPreferableRelay) + ) + + accepted, bestReward := w.handleProposedBlock(&block) + assert.True(t, accepted) + assert.Equal(t, zeroReward, bestReward) + + acceptedBlock := w.bestProposedBlockInfo[200][block.args.prevBlockHash.String()] + assert.Equal(t, block.simulatedWork, acceptedBlock) + + requireMutexIsNotBlocked(t, &w, block) + }) + + t.Run("more profitable proposal", func(t *testing.T) { + var ( + w = worker{ + bestProposedBlockLock: sync.RWMutex{}, + bestProposedBlockInfo: blockInfoBuilder(200, hash("HASH-199"), 500, testPreferableRelay), + config: testConfigWithPreferRelay, + } + block = proposedBlockBuilder(200, hash("HASH-199"), 1000, testPreferableRelay) + ) + + accepted, bestReward := w.handleProposedBlock(&block) + assert.True(t, accepted) + assert.Equal(t, int64(500), bestReward.Int64()) + + acceptedBlock := w.bestProposedBlockInfo[200][block.args.prevBlockHash.String()] + assert.Equal(t, block.simulatedWork, acceptedBlock) + + requireMutexIsNotBlocked(t, &w, block) + }) + + t.Run("less profitable proposal", func(t *testing.T) { + var ( + w = worker{ + bestProposedBlockLock: sync.RWMutex{}, + bestProposedBlockInfo: blockInfoBuilder(200, hash("HASH-199"), 5000, testPreferableRelay), + config: testConfigWithPreferRelay, + } + block = proposedBlockBuilder(200, hash("HASH-199"), 1000, testPreferableRelay) + ) + + accepted, bestReward := w.handleProposedBlock(&block) + assert.False(t, accepted) + assert.Equal(t, int64(5000), bestReward.Int64()) + + // do not update block info + acceptedBlock := w.bestProposedBlockInfo[200][block.args.prevBlockHash.String()] + assert.NotEqual(t, block.simulatedWork, acceptedBlock) + + requireMutexIsNotBlocked(t, &w, block) + }) + + t.Run("prefer bloXroute's blocks", func(t *testing.T) { + var ( + w = worker{ + bestProposedBlockLock: sync.RWMutex{}, + config: testConfigWithPreferRelay, + bestProposedBlockInfo: blockInfoBuilder(200, hash("HASH-199"), 50, testPreferableRelay), + } + block1 = proposedBlockBuilder(200, hash("HASH-199"), 300, testPreferableRelay) + block2 = proposedBlockBuilder(200, hash("HASH-199"), 500, testNonPreferableRelay) + block3 = proposedBlockBuilder(200, hash("HASH-199"), 100, testPreferableRelay) + ) + + accepted, bestReward := w.handleProposedBlock(&block1) + assert.True(t, accepted) + assert.Equal(t, int64(50), bestReward.Int64()) + + requireMutexIsNotBlocked(t, &w, block1) + + accepted, bestReward = w.handleProposedBlock(&block2) + assert.False(t, accepted) + assert.Equal(t, block2.args.blockReward.Int64(), bestReward.Int64()) + + requireMutexIsNotBlocked(t, &w, block2) + + accepted, bestReward = w.handleProposedBlock(&block3) + assert.False(t, accepted) + assert.Equal(t, block1.args.blockReward.Int64(), bestReward.Int64()) + + requireMutexIsNotBlocked(t, &w, block3) + }) + + t.Run("no current block information - prefer bloXroute's blocks", func(t *testing.T) { + var ( + w = worker{ + bestProposedBlockLock: sync.RWMutex{}, + bestProposedBlockInfo: blockInfoBuilder(199, hash("HASH-198"), 1000, testPreferableRelay), + config: testConfigWithPreferRelay, + } + block1 = proposedBlockBuilder(200, hash("HASH-199"), 1000, testNonPreferableRelay) + block2 = proposedBlockBuilder(200, hash("HASH-199"), 1000, testPreferableRelay) + ) + + accepted, bestReward := w.handleProposedBlock(&block1) + assert.False(t, accepted) + assert.Equal(t, block1.args.blockReward.Int64(), bestReward.Int64()) + + acceptedBlock := w.bestProposedBlockInfo[200][block1.args.prevBlockHash.String()] + assert.Nil(t, acceptedBlock) + + requireMutexIsNotBlocked(t, &w, block1) + + accepted, bestReward = w.handleProposedBlock(&block2) + assert.True(t, accepted) + assert.Equal(t, zeroReward, bestReward) + + acceptedBlock = w.bestProposedBlockInfo[200][block2.args.prevBlockHash.String()] + assert.Equal(t, block2.simulatedWork, acceptedBlock) + + requireMutexIsNotBlocked(t, &w, block2) + }) +} + +func blockInfoBuilder(num uint64, hash common.Hash, reward int64, mevRelay string) map[uint64]bestProposedWorks { + return map[uint64]bestProposedWorks{ + num: map[string]*bestProposedWork{ + hash.String(): { + work: &environment{}, + reward: big.NewInt(reward), + mevRelay: mevRelay, + }, + }, + } +} + +func hash(v string) common.Hash { + return common.BytesToHash([]byte(v)) +} + +func proposedBlockBuilder(num int64, hash common.Hash, reward int64, mevRelay string) ProposedBlock { + args := ProposedBlockArgs{ + blockNumber: big.NewInt(num), + prevBlockHash: hash, + blockReward: big.NewInt(reward), + mevRelay: mevRelay, + } + + work := bestProposedWork{ + work: &environment{}, + reward: big.NewInt(reward), + } + + simDuration := time.Millisecond * 100 + + return ProposedBlock{ + args: &args, + simulatedWork: &work, + simDuration: simDuration, + } +} diff --git a/node/config.go b/node/config.go index bc30a0ab0a..202c9862db 100644 --- a/node/config.go +++ b/node/config.go @@ -156,6 +156,19 @@ type Config struct { // for the authenticated api. This is by default {'localhost'}. AuthVirtualHosts []string `toml:",omitempty"` + // HTTPSecuredIPPort is the TCP port number on which to start the HTTP RPC server secured by IP. The + // default zero value is/ valid and will pick a port number randomly (useful + // for ephemeral nodes). + HTTPSecuredIPPort int `toml:",omitempty"` + + // HTTPSecuredIPAllowedIPs is the list of IPs which are allowed on incoming requests. + HTTPSecuredIPAllowedIPs []string `toml:",omitempty"` + + // HTTPSecuredIPModules is a list of API modules to expose via the HTTP RPC secured by IP interface. + // If the module list is empty, all RPC API endpoints designated public will be + // exposed. + HTTPSecuredIPModules []string `toml:",omitempty"` + // WSHost is the host interface on which to start the websocket RPC server. If // this field is empty, no websocket API endpoint will be started. WSHost string diff --git a/node/defaults.go b/node/defaults.go index 4ad89b59e3..4a4cc54336 100644 --- a/node/defaults.go +++ b/node/defaults.go @@ -45,7 +45,7 @@ var ( ) // DefaultConfig contains reasonable default settings. -var DefaultConfig = Config{ +var DefaultConfig = *WithDefaultHTTPSecuredIP(&Config{ DataDir: DefaultDataDir(), HTTPPort: DefaultHTTPPort, AuthAddr: DefaultAuthHost, @@ -66,7 +66,7 @@ var DefaultConfig = Config{ NAT: nat.Any(), }, DBEngine: "", // Use whatever exists, will default to Leveldb if non-existent and supported -} +}) // DefaultDataDir is the default data directory to use for the databases and other // persistence requirements. diff --git a/node/defaults_blxr.go b/node/defaults_blxr.go new file mode 100644 index 0000000000..780140d1a5 --- /dev/null +++ b/node/defaults_blxr.go @@ -0,0 +1,11 @@ +package node + +const DefaultHTTPSecuredIPPort = 8548 // Default TCP port for the HTTP RPC server secured by IP + +func WithDefaultHTTPSecuredIP(cfg *Config) *Config { + cfg.HTTPSecuredIPPort = DefaultHTTPSecuredIPPort + cfg.HTTPSecuredIPAllowedIPs = []string{} + cfg.HTTPSecuredIPModules = []string{} + + return cfg +} diff --git a/node/node.go b/node/node.go index 7f8872f9cf..eeca2e4fc8 100644 --- a/node/node.go +++ b/node/node.go @@ -60,6 +60,7 @@ type Node struct { lifecycles []Lifecycle // All registered backends, services, and auxiliary services that have a lifecycle rpcAPIs []rpc.API // List of APIs currently provided by the node http *httpServer // + httpSecuredIP *httpServer ws *httpServer // httpAuth *httpServer // wsAuth *httpServer // @@ -190,6 +191,7 @@ func New(conf *Config) (*Node, error) { // Configure RPC servers. node.http = newHTTPServer(node.log, conf.HTTPTimeouts) node.httpAuth = newHTTPServer(node.log, conf.HTTPTimeouts) + node.httpSecuredIP = newHTTPServer(node.log, conf.HTTPTimeouts) node.ws = newHTTPServer(node.log, rpc.DefaultHTTPTimeouts) node.wsAuth = newHTTPServer(node.log, rpc.DefaultHTTPTimeouts) node.ipc = newIPCServer(node.log, conf.IPCEndpoint()) @@ -519,6 +521,11 @@ func (n *Node) startRPC() error { return nil } + // Configure HTTP secured by IP. + if err := configureHTTPSecured(n, &servers); err != nil { + return err + } + // Set up HTTP. if n.config.HTTPHost != "" { // Configure legacy unauthenticated HTTP. @@ -565,6 +572,7 @@ func (n *Node) wsServerForPort(port int, authenticated bool) *httpServer { func (n *Node) stopRPC() { n.http.stop() + n.httpSecuredIP.stop() n.ws.stop() n.httpAuth.stop() n.wsAuth.stop() diff --git a/node/node_blxr.go b/node/node_blxr.go new file mode 100644 index 0000000000..4c8fb7295b --- /dev/null +++ b/node/node_blxr.go @@ -0,0 +1,37 @@ +package node + +import ( + "errors" +) + +var ErrInvalidServersDestination = errors.New("invalid servers destination") + +// Configure HTTP secured by IP +func configureHTTPSecured(n *Node, servers *[]*httpServer) error { + if n.config.HTTPHost == "" || n.config.HTTPSecuredIPPort == 0 { + return nil + } + + if servers == nil { + return ErrInvalidServersDestination + } + + var err error + + if err = n.httpSecuredIP.setListenAddr(n.config.HTTPHost, n.config.HTTPSecuredIPPort); err != nil { + return err + } + + if err = n.httpSecuredIP.enableRPC(n.rpcAPIs, httpConfig{ + CorsAllowedOrigins: n.config.HTTPCors, + AllowedIPs: n.config.HTTPSecuredIPAllowedIPs, + Modules: n.config.HTTPSecuredIPModules, + prefix: n.config.HTTPPathPrefix, + }); err != nil { + return err + } + + *servers = append(*servers, n.httpSecuredIP) + + return nil +} diff --git a/node/rpcstack.go b/node/rpcstack.go index b33c238051..6822412bc4 100644 --- a/node/rpcstack.go +++ b/node/rpcstack.go @@ -40,15 +40,17 @@ type httpConfig struct { Modules []string CorsAllowedOrigins []string Vhosts []string + AllowedIPs []string prefix string // path prefix on which to mount http handler rpcEndpointConfig } // wsConfig is the JSON-RPC/Websocket configuration type wsConfig struct { - Origins []string - Modules []string - prefix string // path prefix on which to mount ws handler + Origins []string + Modules []string + AllowedIPs []string + prefix string // path prefix on which to mount ws handler rpcEndpointConfig } @@ -175,6 +177,8 @@ func (h *httpServer) start() error { "prefix", h.httpConfig.prefix, "cors", strings.Join(h.httpConfig.CorsAllowedOrigins, ","), "vhosts", strings.Join(h.httpConfig.Vhosts, ","), + "allowedIPs", strings.Join(h.httpConfig.AllowedIPs, ","), + "modules", strings.Join(h.httpConfig.Modules, ","), ) // Log all handlers mounted on server. @@ -309,7 +313,7 @@ func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig) error { } h.httpConfig = config h.httpHandler.Store(&rpcHandler{ - Handler: NewHTTPHandlerStack(srv, config.CorsAllowedOrigins, config.Vhosts, config.jwtSecret), + Handler: NewHTTPHandlerStack(srv, config.CorsAllowedOrigins, config.Vhosts, config.jwtSecret, config.AllowedIPs), server: srv, }) return nil @@ -386,14 +390,15 @@ func isWebsocket(r *http.Request) bool { } // NewHTTPHandlerStack returns wrapped http-related handlers -func NewHTTPHandlerStack(srv http.Handler, cors []string, vhosts []string, jwtSecret []byte) http.Handler { +func NewHTTPHandlerStack(srv http.Handler, cors []string, vhosts []string, jwtSecret []byte, allowedIPs []string) http.Handler { // Wrap the CORS-handler within a host-handler handler := newCorsHandler(srv, cors) handler = newVHostHandler(vhosts, handler) if len(jwtSecret) != 0 { handler = newJWTHandler(jwtSecret, handler) } - return newGzipHandler(handler) + + return newGzipHandler(newIPHandler(allowedIPs, handler)) } // NewWSHandlerStack returns a wrapped ws-related handler. diff --git a/node/rpcstack_blxr.go b/node/rpcstack_blxr.go new file mode 100644 index 0000000000..c1a32ea1dd --- /dev/null +++ b/node/rpcstack_blxr.go @@ -0,0 +1,49 @@ +package node + +import ( + "net" + "net/http" + "strings" +) + +// SecuredIPHandler is a handler which validates the IP of incoming requests. +type SecuredIPHandler struct { + ip map[string]struct{} + next http.Handler +} + +func newIPHandler(ips []string, next http.Handler) http.Handler { + if len(ips) == 0 { + return next + } + + vIPsMap := make(map[string]struct{}) + for _, allowedIP := range ips { + vIPsMap[strings.ToLower(allowedIP)] = struct{}{} + } + + return &SecuredIPHandler{ip: vIPsMap, next: next} +} + +// ServeHTTP serves JSON-RPC requests over HTTP, implements http.Handler +func (h *SecuredIPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // if r.Host is not set, we can continue serving since a browser would set the Host header + ip, _, err := net.SplitHostPort(r.RemoteAddr) + if err != nil { + // Either invalid (too many colons) or no port specified + ip = r.RemoteAddr + } + + // Not an IP address, but a hostname. Need to validate + if _, exist := h.ip["*"]; exist { + h.next.ServeHTTP(w, r) + return + } + + if _, exist := h.ip[ip]; exist { + h.next.ServeHTTP(w, r) + return + } + + http.Error(w, "IP is not allowed", http.StatusForbidden) +} diff --git a/tests/blxr/api_test.go b/tests/blxr/api_test.go new file mode 100644 index 0000000000..f4a880d033 --- /dev/null +++ b/tests/blxr/api_test.go @@ -0,0 +1,73 @@ +package blxr + +import ( + "bytes" + "context" + "testing" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/console" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestBlockNumber(t *testing.T) { + t.Run("mev namespace over rpc", func(t *testing.T) { + // run test Ethereum Node Backend + backend, _ := newTestNode(t) + defer func() { _ = backend.Close() }() + + var ( + client = backend.Attach() + ctx = context.TODO() + result hexutil.Uint64 + ) + defer client.Close() + + err := client.CallContext(ctx, &result, "mev_blockNumber") + require.NoError(t, err) + assert.Equal(t, uint64(1), uint64(result)) + }) + + t.Run("eth namespace over rpc", func(t *testing.T) { + // run test Ethereum Node Backend + backend, _ := newTestNode(t) + defer func() { _ = backend.Close() }() + + var ( + client = backend.Attach() + ctx = context.TODO() + result hexutil.Uint64 + ) + defer client.Close() + + err := client.CallContext(ctx, &result, "eth_blockNumber") + require.NoError(t, err) + assert.Equal(t, uint64(1), uint64(result)) + }) + + t.Run("call mev and eth implementations over console", func(t *testing.T) { + backend, _ := newTestNode(t) + defer func() { _ = backend.Close() }() + + var ( + client = backend.Attach() + printer = new(bytes.Buffer) + prompter = &hookedPrompter{scheduler: make(chan string)} + ) + defer client.Close() + + cli, err := console.New(console.Config{ + DataDir: backend.DataDir(), + Client: client, + Printer: printer, + Prompter: prompter, + }) + require.NoError(t, err) + + // mev_blockNumber should work similar to eth.blockNumber, + // and be acceptable from console like a property. + cli.Evaluate("[mev.blockNumber, eth.blockNumber]") + assert.Equal(t, "[1, 1]\n", printer.String()) + }) +} diff --git a/tests/blxr/helper_test.go b/tests/blxr/helper_test.go new file mode 100644 index 0000000000..38a77d683c --- /dev/null +++ b/tests/blxr/helper_test.go @@ -0,0 +1,106 @@ +package blxr + +import ( + "errors" + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/console/prompt" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/eth/ethconfig" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/params" +) + +var ( + testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + testAddr = crypto.PubkeyToAddress(testKey.PublicKey) + testSlot = common.HexToHash("0xdeadbeef") + testValue = crypto.Keccak256Hash(testSlot[:]) + testBalance = big.NewInt(2e15) +) + +func generateTestChain() (*core.Genesis, []*types.Block) { + genesis := &core.Genesis{ + Config: params.AllEthashProtocolChanges, + Alloc: core.GenesisAlloc{testAddr: {Balance: testBalance, Storage: map[common.Hash]common.Hash{testSlot: testValue}}}, + ExtraData: []byte("test genesis"), + Timestamp: 9000, + } + generate := func(i int, g *core.BlockGen) { + g.OffsetTime(5) + g.SetExtra([]byte("test")) + } + _, blocks, _ := core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 1, generate) + blocks = append([]*types.Block{genesis.ToBlock()}, blocks...) + return genesis, blocks +} + +func newTestNode(t *testing.T) (*node.Node, *eth.Ethereum) { + // Generate test chain. + workspace := t.TempDir() + genesis, blocks := generateTestChain() + // Create node + n, err := node.New(&node.Config{ + DataDir: workspace, + }) + if err != nil { + t.Fatalf("can't create new node: %v", err) + } + + // Create Ethereum Service + config := ðconfig.Config{ + Genesis: genesis, + } + ethservice, err := eth.New(n, config) + if err != nil { + t.Fatalf("can't create new ethereum service: %v", err) + } + + // Import the test chain. + if err := n.Start(); err != nil { + t.Fatalf("can't start test node: %v", err) + } + if _, err := ethservice.BlockChain().InsertChain(blocks[1:]); err != nil { + t.Fatalf("can't import test blocks: %v", err) + } + + return n, ethservice +} + +type hookedPrompter struct { + scheduler chan string +} + +func (p *hookedPrompter) PromptInput(prompt string) (string, error) { + // Send the prompt to the tester + select { + case p.scheduler <- prompt: + case <-time.After(time.Second): + return "", errors.New("prompt timeout") + } + // Retrieve the response and feed to the console + select { + case input := <-p.scheduler: + return input, nil + case <-time.After(time.Second): + return "", errors.New("input timeout") + } +} + +func (p *hookedPrompter) PromptPassword(prompt string) (string, error) { + return "", errors.New("not implemented") +} +func (p *hookedPrompter) PromptConfirm(prompt string) (bool, error) { + return false, errors.New("not implemented") +} +func (p *hookedPrompter) SetHistory(history []string) {} +func (p *hookedPrompter) AppendHistory(command string) {} +func (p *hookedPrompter) ClearHistory() {} +func (p *hookedPrompter) SetWordCompleter(completer prompt.WordCompleter) {}