Skip to content

Commit

Permalink
refactor client
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Kim <[email protected]>
  • Loading branch information
joshua-kim committed Dec 11, 2023
1 parent 159aafb commit 02e7588
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 96 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/DataDog/zstd v1.5.2
github.com/Microsoft/go-winio v0.5.2
github.com/NYTimes/gziphandler v1.1.1
github.com/ava-labs/coreth v0.12.10-rc.0
github.com/ava-labs/coreth v0.12.9-rc.9.0.20231211044900-cc5732617957
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34
github.com/btcsuite/btcd/btcutil v1.1.3
github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/ava-labs/coreth v0.12.10-rc.0 h1:qmuom7rtH5hc1E3lnqrMFNLFL1TMnEVa/2O8poB1YLU=
github.com/ava-labs/coreth v0.12.10-rc.0/go.mod h1:plFm/xzvWmx1+qJ3JQSTzF8+FdaA2xu7GgY/AdaZDfk=
github.com/ava-labs/coreth v0.12.9-rc.9.0.20231211044900-cc5732617957 h1:8Tz+sbIMQ8rm06/Qp1nUxJ8P1aWqVQbMys/+zh789Fo=
github.com/ava-labs/coreth v0.12.9-rc.9.0.20231211044900-cc5732617957/go.mod h1:lLwZaypzvvmWLEdatvINIpokXJxCXclODuyezsc938Y=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34 h1:mg9Uw6oZFJKytJxgxnl3uxZOs/SB8CVHg6Io4Tf99Zc=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34/go.mod h1:pJxaT9bUgeRNVmNRgtCHb7sFDIRKy7CzTQVi8gGNT6g=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
Expand Down
25 changes: 16 additions & 9 deletions network/p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/metric"
"github.com/ava-labs/avalanchego/utils/set"
)

Expand Down Expand Up @@ -40,11 +41,15 @@ type CrossChainAppResponseCallback func(
)

type Client struct {
handlerID uint64
handlerPrefix []byte
router *router
sender common.AppSender
options *clientOptions
handlerID uint64
handlerPrefix []byte
router *router
sender common.AppSender
appRequestFailedTime metric.Averager
appResponseTime metric.Averager
crossChainAppRequestFailedTime metric.Averager
crossChainAppResponseTime metric.Averager
options *clientOptions
}

// AppRequestAny issues an AppRequest to an arbitrary node decided by Client.
Expand Down Expand Up @@ -96,8 +101,9 @@ func (c *Client) AppRequest(
}

c.router.pendingAppRequests[requestID] = pendingAppRequest{
AppResponseCallback: onResponse,
metrics: c.router.handlers[c.handlerID].metrics,
callback: onResponse,
appRequestFailedTime: c.appRequestFailedTime,
appResponseTime: c.appResponseTime,
}
c.router.requestID += 2
}
Expand Down Expand Up @@ -159,8 +165,9 @@ func (c *Client) CrossChainAppRequest(
}

c.router.pendingCrossChainAppRequests[requestID] = pendingCrossChainAppRequest{
CrossChainAppResponseCallback: onResponse,
metrics: c.router.handlers[c.handlerID].metrics,
callback: onResponse,
crossChainAppRequestFailedTime: c.crossChainAppRequestFailedTime,
crossChainAppResponseTime: c.crossChainAppResponseTime,
}
c.router.requestID += 2

Expand Down
5 changes: 2 additions & 3 deletions network/p2p/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ func TestGossiperGossip(t *testing.T) {

handler, err := NewHandler[*testTx](responseSet, tt.config, prometheus.NewRegistry())
require.NoError(err)
_, err = responseNetwork.NewAppProtocol(0x0, handler)
require.NoError(err)
require.NoError(responseNetwork.AddHandler(0x0, handler))

requestSender := &common.SenderTest{
SendAppRequestF: func(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, request []byte) error {
Expand Down Expand Up @@ -162,7 +161,7 @@ func TestGossiperGossip(t *testing.T) {
require.NoError(requestSet.Add(item))
}

requestClient, err := requestNetwork.NewAppProtocol(0x0, nil)
requestClient, err := requestNetwork.NewClient(0x0)
require.NoError(err)

config := Config{
Expand Down
58 changes: 52 additions & 6 deletions network/p2p/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package p2p
import (
"context"
"encoding/binary"
"fmt"
"sync"
"time"

Expand All @@ -15,6 +16,7 @@ import (
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/metric"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/version"
)
Expand Down Expand Up @@ -117,12 +119,47 @@ func (n *Network) Disconnected(_ context.Context, nodeID ids.NodeID) error {
return nil
}

// NewAppProtocol reserves an identifier for an application protocol handler and
// returns a Client that can be used to send messages for the corresponding
// protocol.
func (n *Network) NewAppProtocol(handlerID uint64, handler Handler, options ...ClientOption) (*Client, error) {
if err := n.router.addHandler(handlerID, handler); err != nil {
return nil, err
// NewClient returns a Client that can be used to send messages for the
// corresponding protocol.
func (n *Network) NewClient(handlerID uint64, options ...ClientOption) (*Client, error) {
appRequestFailedTime, err := metric.NewAverager(
n.namespace,
fmt.Sprintf("client_%d_app_request_failed_time", handlerID),
"app request failed time (ns)",
n.metrics,
)
if err != nil {
return nil, fmt.Errorf("failed to register app request failed metric for client %d: %w", handlerID, err)
}

appResponseTime, err := metric.NewAverager(
n.namespace,
fmt.Sprintf("client_%d_app_response_time", handlerID),
"app response time (ns)",
n.metrics,
)
if err != nil {
return nil, fmt.Errorf("failed to register app response metric for client %d: %w", handlerID, err)
}

crossChainAppRequestFailedTime, err := metric.NewAverager(
n.namespace,
fmt.Sprintf("client_%d_cross_chain_app_request_failed_time", handlerID),
"app request failed time (ns)",
n.metrics,
)
if err != nil {
return nil, fmt.Errorf("failed to register cross-chain app request failed metric for client %d: %w", handlerID, err)
}

crossChainAppResponseTime, err := metric.NewAverager(
n.namespace,
fmt.Sprintf("client_%d_cross_chain_app_response_time", handlerID),
"cross chain app response time (ns)",
n.metrics,
)
if err != nil {
return nil, fmt.Errorf("failed to register cross-chain app response metric for client %d: %w", handlerID, err)
}

client := &Client{
Expand All @@ -135,6 +172,10 @@ func (n *Network) NewAppProtocol(handlerID uint64, handler Handler, options ...C
peers: n.Peers,
},
},
appRequestFailedTime: appRequestFailedTime,
appResponseTime: appResponseTime,
crossChainAppRequestFailedTime: crossChainAppRequestFailedTime,
crossChainAppResponseTime: crossChainAppResponseTime,
}

for _, option := range options {
Expand All @@ -144,6 +185,11 @@ func (n *Network) NewAppProtocol(handlerID uint64, handler Handler, options ...C
return client, nil
}

// AddHandler reserves an identifier for an application protocol
func (n *Network) AddHandler(handlerID uint64, handler Handler) error {
return n.router.addHandler(handlerID, handler)
}

// Peers contains metadata about the current set of connected peers
type Peers struct {
lock sync.RWMutex
Expand Down
10 changes: 6 additions & 4 deletions network/p2p/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ func TestAppRequestResponse(t *testing.T) {
handler := mocks.NewMockHandler(ctrl)
n := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "")
require.NoError(n.Connected(context.Background(), nodeID, nil))
client, err := n.NewAppProtocol(handlerID, handler)
require.NoError(n.AddHandler(handlerID, handler))
client, err := n.NewClient(handlerID)
require.NoError(err)

wg := &sync.WaitGroup{}
Expand Down Expand Up @@ -350,7 +351,8 @@ func TestAppRequestDuplicateRequestIDs(t *testing.T) {
}).AnyTimes()

require.NoError(network.Connected(context.Background(), nodeID, nil))
client, err := network.NewAppProtocol(0x1, handler)
require.NoError(network.AddHandler(0x1, handler))
client, err := network.NewClient(0x1)
require.NoError(err)

onResponse := func(ctx context.Context, nodeID ids.NodeID, got []byte, err error) {
Expand Down Expand Up @@ -493,7 +495,7 @@ func TestAppRequestAnyNodeSelection(t *testing.T) {
require.NoError(n.Connected(context.Background(), peer, &version.Application{}))
}

client, err := n.NewAppProtocol(1, nil)
client, err := n.NewClient(1)
require.NoError(err)

err = client.AppRequestAny(context.Background(), []byte("foobar"), nil)
Expand Down Expand Up @@ -582,7 +584,7 @@ func TestNodeSamplerClientOption(t *testing.T) {
require.NoError(network.Connected(ctx, peer, nil))
}

client, err := network.NewAppProtocol(0x0, nil, tt.option(t, network))
client, err := network.NewClient(0, tt.option(t, network))
require.NoError(err)

if err = client.AppRequestAny(ctx, []byte("request"), nil); err != nil {
Expand Down
90 changes: 19 additions & 71 deletions network/p2p/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,24 @@ var (
_ common.AppHandler = (*router)(nil)
)

type metrics struct {
appRequestTime metric.Averager
appRequestFailedTime metric.Averager
appResponseTime metric.Averager
appGossipTime metric.Averager
crossChainAppRequestTime metric.Averager
crossChainAppRequestFailedTime metric.Averager
crossChainAppResponseTime metric.Averager
}

type pendingAppRequest struct {
*metrics
AppResponseCallback
callback AppResponseCallback
appRequestFailedTime metric.Averager
appResponseTime metric.Averager
}

type pendingCrossChainAppRequest struct {
*metrics
CrossChainAppResponseCallback
callback CrossChainAppResponseCallback
crossChainAppRequestFailedTime metric.Averager
crossChainAppResponseTime metric.Averager
}

// meteredHandler emits metrics for a Handler
type meteredHandler struct {
*responder
*metrics
appRequestTime metric.Averager
appGossipTime metric.Averager
crossChainAppRequestTime metric.Averager
}

// router routes incoming application messages to the corresponding registered
Expand Down Expand Up @@ -109,26 +103,6 @@ func (r *router) addHandler(handlerID uint64, handler Handler) error {
return fmt.Errorf("failed to register app request metric for handler_%d: %w", handlerID, err)
}

appRequestFailedTime, err := metric.NewAverager(
r.namespace,
fmt.Sprintf("handler_%d_app_request_failed", handlerID),
"app request failed time (ns)",
r.metrics,
)
if err != nil {
return fmt.Errorf("failed to register app request failed metric for handler_%d: %w", handlerID, err)
}

appResponseTime, err := metric.NewAverager(
r.namespace,
fmt.Sprintf("handler_%d_app_response", handlerID),
"app response time (ns)",
r.metrics,
)
if err != nil {
return fmt.Errorf("failed to register app response metric for handler_%d: %w", handlerID, err)
}

appGossipTime, err := metric.NewAverager(
r.namespace,
fmt.Sprintf("handler_%d_app_gossip", handlerID),
Expand All @@ -149,42 +123,16 @@ func (r *router) addHandler(handlerID uint64, handler Handler) error {
return fmt.Errorf("failed to register cross-chain app request metric for handler_%d: %w", handlerID, err)
}

crossChainAppRequestFailedTime, err := metric.NewAverager(
r.namespace,
fmt.Sprintf("handler_%d_cross_chain_app_request_failed", handlerID),
"app request failed time (ns)",
r.metrics,
)
if err != nil {
return fmt.Errorf("failed to register cross-chain app request failed metric for handler_%d: %w", handlerID, err)
}

crossChainAppResponseTime, err := metric.NewAverager(
r.namespace,
fmt.Sprintf("handler_%d_cross_chain_app_response", handlerID),
"cross chain app response time (ns)",
r.metrics,
)
if err != nil {
return fmt.Errorf("failed to register cross-chain app response metric for handler_%d: %w", handlerID, err)
}

r.handlers[handlerID] = &meteredHandler{
responder: &responder{
Handler: handler,
handlerID: handlerID,
log: r.log,
sender: r.sender,
},
metrics: &metrics{
appRequestTime: appRequestTime,
appRequestFailedTime: appRequestFailedTime,
appResponseTime: appResponseTime,
appGossipTime: appGossipTime,
crossChainAppRequestTime: crossChainAppRequestTime,
crossChainAppRequestFailedTime: crossChainAppRequestFailedTime,
crossChainAppResponseTime: crossChainAppResponseTime,
},
appRequestTime: appRequestTime,
appGossipTime: appGossipTime,
crossChainAppRequestTime: crossChainAppRequestTime,
}

return nil
Expand Down Expand Up @@ -214,7 +162,7 @@ func (r *router) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID ui
return err
}

handler.metrics.appRequestTime.Observe(float64(time.Since(start)))
handler.appRequestTime.Observe(float64(time.Since(start)))
return nil
}

Expand All @@ -231,7 +179,7 @@ func (r *router) AppRequestFailed(ctx context.Context, nodeID ids.NodeID, reques
return ErrUnrequestedResponse
}

pending.AppResponseCallback(ctx, nodeID, nil, ErrAppRequestFailed)
pending.callback(ctx, nodeID, nil, ErrAppRequestFailed)
pending.appRequestFailedTime.Observe(float64(time.Since(start)))
return nil
}
Expand All @@ -249,7 +197,7 @@ func (r *router) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID u
return ErrUnrequestedResponse
}

pending.AppResponseCallback(ctx, nodeID, response, nil)
pending.callback(ctx, nodeID, response, nil)
pending.appResponseTime.Observe(float64(time.Since(start)))
return nil
}
Expand All @@ -273,7 +221,7 @@ func (r *router) AppGossip(ctx context.Context, nodeID ids.NodeID, gossip []byte

handler.AppGossip(ctx, nodeID, parsedMsg)

handler.metrics.appGossipTime.Observe(float64(time.Since(start)))
handler.appGossipTime.Observe(float64(time.Since(start)))
return nil
}

Expand Down Expand Up @@ -307,7 +255,7 @@ func (r *router) CrossChainAppRequest(
return err
}

handler.metrics.crossChainAppRequestTime.Observe(float64(time.Since(start)))
handler.crossChainAppRequestTime.Observe(float64(time.Since(start)))
return nil
}

Expand All @@ -324,7 +272,7 @@ func (r *router) CrossChainAppRequestFailed(ctx context.Context, chainID ids.ID,
return ErrUnrequestedResponse
}

pending.CrossChainAppResponseCallback(ctx, chainID, nil, ErrAppRequestFailed)
pending.callback(ctx, chainID, nil, ErrAppRequestFailed)
pending.crossChainAppRequestFailedTime.Observe(float64(time.Since(start)))
return nil
}
Expand All @@ -342,7 +290,7 @@ func (r *router) CrossChainAppResponse(ctx context.Context, chainID ids.ID, requ
return ErrUnrequestedResponse
}

pending.CrossChainAppResponseCallback(ctx, chainID, response, nil)
pending.callback(ctx, chainID, response, nil)
pending.crossChainAppResponseTime.Observe(float64(time.Since(start)))
return nil
}
Expand Down

0 comments on commit 02e7588

Please sign in to comment.