Skip to content

Commit

Permalink
Add new pending transactions subscription.
Browse files Browse the repository at this point in the history
  • Loading branch information
mcdee committed Jan 7, 2022
1 parent 4b3bc71 commit 85eccae
Show file tree
Hide file tree
Showing 17 changed files with 504 additions and 89 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.16
require (
github.com/fatih/color v1.13.0 // indirect
github.com/goccy/go-yaml v1.9.4
github.com/gorilla/websocket v1.4.2 // indirect
github.com/mattn/go-colorable v0.1.11 // indirect
github.com/pkg/errors v0.9.1
github.com/rs/zerolog v1.26.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ github.com/goccy/go-yaml v1.9.4 h1:S0GCYjwHKVI6IHqio7QWNKNThUl6NLzFd/g8Z65Axw8=
github.com/goccy/go-yaml v1.9.4/go.mod h1:U/jl18uSupI5rdI2jmuCswEA2htH9eXfferR3KfscvA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
Expand Down
18 changes: 17 additions & 1 deletion jsonrpc/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,35 @@
package jsonrpc_test

import (
"encoding/hex"
"os"
"strings"
"testing"
"time"

"github.com/attestantio/go-execution-client/types"
"github.com/rs/zerolog"
)

// timeout for tests.
var timeout = 60 * time.Second

func TestMain(m *testing.M) {
zerolog.SetGlobalLevel(zerolog.Disabled)
zerolog.SetGlobalLevel(zerolog.TraceLevel)
if os.Getenv("JSONRPC_ADDRESS") != "" {
os.Exit(m.Run())
}
}

// strToHash is a helper to create a hash given a string representation.
func strToHash(input string) types.Hash {
bytes, err := hex.DecodeString(strings.TrimPrefix(input, "0x"))
if err != nil {
panic(err)
}

var hash types.Hash
copy(hash[:], bytes)

return hash
}
159 changes: 159 additions & 0 deletions jsonrpc/newpendingtransactions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright © 2022 Attestant Limited.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jsonrpc

import (
"context"
"encoding/json"
"fmt"

"github.com/attestantio/go-execution-client/spec"
"github.com/attestantio/go-execution-client/types"
"github.com/attestantio/go-execution-client/util"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
)

// NewPendingTransactions returns a subscription for pending transactions.
func (s *Service) NewPendingTransactions(ctx context.Context, ch chan *spec.Transaction) (*util.Subscription, error) {
conn, _, err := websocket.DefaultDialer.Dial(s.webSocketAddress, nil)
if err != nil {
return nil, errors.Wrap(err, "failed to connect to server")
}

// Set up the subscription.
if err := conn.WriteMessage(websocket.TextMessage, []byte(`{"jsonrpc":"2.0", "id": 1, "method": "eth_subscribe", "params": ["newPendingTransactions"]}`)); err != nil {
return nil, errors.Wrap(err, "failed to request subscription")
}

// Read the response to obtain the subscription ID.
_, msg, err := conn.ReadMessage()
if err != nil {
return nil, errors.Wrap(err, "failed to obtain subscription response")
}
log.Trace().Str("msg", string(msg)).Msg("Received subscription response")
res := newPendingTransactionsResult{}
if err := json.Unmarshal(msg, &res); err != nil {
return nil, errors.Wrap(err, "failed to obtain subscription ID")
}
log.Trace().Str("subscription", fmt.Sprintf("%#x", res.Result)).Msg("Received subscription ID")

// Handle incoming messages.
go s.receiveNewPendingTransactionMsg(ctx, conn, ch)

// Close the websocket when the context is done.
go s.closeSocketOnCtxDone(ctx, conn)

return &util.Subscription{
ID: res.Result,
}, nil
}

func (s *Service) receiveNewPendingTransactionMsg(ctx context.Context, conn *websocket.Conn, ch chan *spec.Transaction) {
for {
_, msg, err := conn.ReadMessage()
if ctx.Err() != nil {
// Context is done; leave.
return
}
if err != nil {
log.Error().Err(err).Msg("Failed to read received message")
continue
}
log.Info().Str("msg", string(msg)).Msg("Received message")
res := newPendingTransactionsEvent{}
if err := json.Unmarshal(msg, &res); err != nil {
log.Error().Err(err).Msg("Failed to unmarshal message")
continue
}

tx, err := s.Transaction(ctx, res.Params.Result)
if err != nil {
log.Error().Err(err).Str("tx_hash", fmt.Sprintf("%#x", res.Params.Result)).Msg("Failed to obtain transaction")
continue
}
ch <- tx
}
}

func (s *Service) closeSocketOnCtxDone(ctx context.Context, conn *websocket.Conn) {
<-ctx.Done()
log.Trace().Msg("Context done; closing websocket connection")

// Close the connection.
err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
log.Error().Err(err).Msg("Failed to close websocket")
return
}

log.Trace().Msg("Websocket connection closed")
}

type newPendingTransactionsResult struct {
Result []byte
}
type newPendingTransactionsResultJSON struct {
Result string `json:"result"`
}

// UnmarshalJSON implements json.Unmarshaler.
func (r *newPendingTransactionsResult) UnmarshalJSON(input []byte) error {
var data newPendingTransactionsResultJSON
if err := json.Unmarshal(input, &data); err != nil {
return errors.Wrap(err, "invalid JSON")
}

var err error
r.Result, err = util.StrToByteArray("result", data.Result)
if err != nil {
return err
}

return nil
}

type newPendingTransactionsEvent struct {
Params *newPendingTransactionsEventParams `json:"params"`
}

type newPendingTransactionsEventParams struct {
Subscription []byte
Result types.Hash
}

type newPendingTransactionsResultParamsJSON struct {
Subscription string `json:"subscription"`
Result string `json:"result"`
}

// UnmarshalJSON implements json.Unmarshaler.
func (e *newPendingTransactionsEventParams) UnmarshalJSON(input []byte) error {
var data newPendingTransactionsResultParamsJSON
if err := json.Unmarshal(input, &data); err != nil {
return errors.Wrap(err, "invalid JSON")
}

var err error
e.Subscription, err = util.StrToByteArray("subscription", data.Subscription)
if err != nil {
return err
}
e.Result, err = util.StrToHash("result", data.Result)
if err != nil {
return err
}

return nil
}
50 changes: 50 additions & 0 deletions jsonrpc/newpendingtransactions_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright © 2021 Attestant Limited.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jsonrpc_test

import (
"context"
"os"
"testing"

execclient "github.com/attestantio/go-execution-client"
"github.com/attestantio/go-execution-client/jsonrpc"
"github.com/attestantio/go-execution-client/spec"
"github.com/stretchr/testify/require"
)

// TestNewPendingTransactions tests the TestNewPendingTransactions function.
func TestNewPendingTransactions(t *testing.T) {
ctx := context.Background()
s, err := jsonrpc.New(ctx,
jsonrpc.WithAddress(os.Getenv("JSONRPC_ADDRESS")),
jsonrpc.WithTimeout(timeout),
)
require.NoError(t, err)

ch := make(chan *spec.Transaction)
subscription, err := s.(execclient.NewPendingTransactionsProvider).NewPendingTransactions(ctx, ch)
require.NoError(t, err)
require.NotNil(t, subscription)

// Wait to see a transaction.
tx := <-ch
require.NotNil(t, tx)
// // TODO remove.
// {
// data, err := json.Marshal(tx)
// require.NoError(t, err)
// fmt.Printf("%s\n", string(data))
// }
}
18 changes: 15 additions & 3 deletions jsonrpc/parameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (
)

type parameters struct {
logLevel zerolog.Level
address string
timeout time.Duration
logLevel zerolog.Level
address string
webSocketAddress string
timeout time.Duration
}

// Parameter is the interface for service parameters.
Expand Down Expand Up @@ -51,6 +52,14 @@ func WithAddress(address string) Parameter {
})
}

// WithWebSocketAddress provides the address for the websocket endpoint.
// If not supplied it will use the value supplied as the address.
func WithWebSocketAddress(address string) Parameter {
return parameterFunc(func(p *parameters) {
p.webSocketAddress = address
})
}

// WithTimeout sets the maximum duration for all requests to the endpoint.
func WithTimeout(timeout time.Duration) Parameter {
return parameterFunc(func(p *parameters) {
Expand All @@ -73,6 +82,9 @@ func parseAndCheckParameters(params ...Parameter) (*parameters, error) {
if parameters.address == "" {
return nil, errors.New("no address specified")
}
if parameters.webSocketAddress == "" {
parameters.webSocketAddress = parameters.address
}
if parameters.timeout == 0 {
return nil, errors.New("no timeout specified")
}
Expand Down
28 changes: 21 additions & 7 deletions jsonrpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ type Service struct {
// Hold the initialising context to use for streams.
ctx context.Context

address string
client jsonrpc.RPCClient
timeout time.Duration
address string
webSocketAddress string
client jsonrpc.RPCClient
timeout time.Duration

// Client capability information.
isIssuanceProvider bool
Expand Down Expand Up @@ -75,15 +76,28 @@ func New(ctx context.Context, params ...Parameter) (execclient.Service, error) {
address = fmt.Sprintf("http://%s", parameters.address)
}

webSocketAddress := parameters.webSocketAddress
if strings.HasPrefix(webSocketAddress, "http://") {
webSocketAddress = fmt.Sprintf("ws://%s", webSocketAddress[7:])
}
if strings.HasPrefix(webSocketAddress, "https://") {
webSocketAddress = fmt.Sprintf("wss://%s", webSocketAddress[8:])
}
if !strings.HasPrefix(webSocketAddress, "ws") {
webSocketAddress = fmt.Sprintf("ws://%s", webSocketAddress)
}
log.Trace().Str("address", address).Str("web_socket_address", webSocketAddress).Msg("Addresses configured")

rpcClient := jsonrpc.NewClientWithOpts(address, &jsonrpc.RPCClientOpts{
HTTPClient: client,
})

s := &Service{
ctx: ctx,
client: rpcClient,
address: parameters.address,
timeout: parameters.timeout,
ctx: ctx,
client: rpcClient,
address: address,
webSocketAddress: webSocketAddress,
timeout: parameters.timeout,
}

// Fetch static values to confirm the connection is good.
Expand Down
37 changes: 37 additions & 0 deletions jsonrpc/transaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright © 2022 Attestant Limited.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jsonrpc

import (
"context"
"fmt"

"github.com/attestantio/go-execution-client/spec"
"github.com/attestantio/go-execution-client/types"
"github.com/pkg/errors"
)

// Transaction returns the transaction for the given transaction hash.
func (s *Service) Transaction(ctx context.Context, hash types.Hash) (*spec.Transaction, error) {
if len(hash) == 0 {
return nil, errors.New("hash nil")
}

var transaction spec.Transaction
if err := s.client.CallFor(&transaction, "eth_getTransactionByHash", fmt.Sprintf("%#x", hash)); err != nil {
return nil, err
}

return &transaction, nil
}
Loading

0 comments on commit 85eccae

Please sign in to comment.