Skip to content

Commit

Permalink
fix(gateway/connector): await gateway connection before request (#15686)
Browse files Browse the repository at this point in the history
* fix(gateway/connector): enable await before send to gateway

* chore(webapi+gateway): lint and test fixes
  • Loading branch information
MStreet3 authored and george-dorin committed Jan 13, 2025
1 parent f2affc2 commit fa890e8
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 17 deletions.
2 changes: 2 additions & 0 deletions core/capabilities/compute/compute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/wasmtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/utils/matches"

cappkg "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
Expand Down Expand Up @@ -188,6 +189,7 @@ func TestComputeFetch(t *testing.T) {
th := setup(t, defaultConfig)

th.connector.EXPECT().DonID().Return("don-id")
th.connector.EXPECT().AwaitConnection(matches.AnyContext, "gateway1").Return(nil)
th.connector.EXPECT().GatewayIDs().Return([]string{"gateway1", "gateway2"})

msgID := strings.Join([]string{
Expand Down
11 changes: 9 additions & 2 deletions core/capabilities/webapi/outgoing_connector_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,15 @@ func (c *OutgoingConnectorHandler) HandleSingleNodeRequest(ctx context.Context,
}
sort.Strings(gatewayIDs)

err = c.gc.SignAndSendToGateway(ctx, gatewayIDs[0], body)
if err != nil {
selectedGateway := gatewayIDs[0]

l.Infow("selected gateway, awaiting connection", "gatewayID", selectedGateway)

if err := c.gc.AwaitConnection(ctx, selectedGateway); err != nil {
return nil, errors.Wrap(err, "await connection canceled")
}

if err := c.gc.SignAndSendToGateway(ctx, selectedGateway, body); err != nil {
return nil, errors.Wrap(err, "failed to send request to gateway")
}

Expand Down
3 changes: 3 additions & 0 deletions core/capabilities/webapi/outgoing_connector_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/utils/matches"

"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
gcmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector/mocks"
Expand All @@ -36,6 +37,7 @@ func TestHandleSingleNodeRequest(t *testing.T) {
msgID := "msgID"
testURL := "http://localhost:8080"
connector.EXPECT().DonID().Return("donID")
connector.EXPECT().AwaitConnection(matches.AnyContext, "gateway1").Return(nil)
connector.EXPECT().GatewayIDs().Return([]string{"gateway1"})

// build the expected body with the default timeout
Expand Down Expand Up @@ -82,6 +84,7 @@ func TestHandleSingleNodeRequest(t *testing.T) {
msgID := "msgID"
testURL := "http://localhost:8080"
connector.EXPECT().DonID().Return("donID")
connector.EXPECT().AwaitConnection(matches.AnyContext, "gateway1").Return(nil)
connector.EXPECT().GatewayIDs().Return([]string{"gateway1"})

// build the expected body with the defined timeout
Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/webapi/target/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestCapability_Execute(t *testing.T) {
require.NoError(t, err)

gatewayResp := gatewayResponse(t, msgID)

th.connector.EXPECT().AwaitConnection(mock.Anything, "gateway1").Return(nil)
th.connector.On("SignAndSendToGateway", mock.Anything, "gateway1", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
th.connectorHandler.HandleGatewayMessage(ctx, "gateway1", gatewayResp)
}).Once()
Expand Down
45 changes: 39 additions & 6 deletions core/services/gateway/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ type GatewayConnector interface {

AddHandler(methods []string, handler GatewayConnectorHandler) error
// SendToGateway takes a signed message as argument and sends it to the specified gateway
SendToGateway(ctx context.Context, gatewayId string, msg *api.Message) error
SendToGateway(ctx context.Context, gatewayID string, msg *api.Message) error
// SignAndSendToGateway signs the message and sends the message to the specified gateway
SignAndSendToGateway(ctx context.Context, gatewayID string, msg *api.MessageBody) error
// GatewayIDs returns the list of Gateway IDs
GatewayIDs() []string
// DonID returns the DON ID
DonID() string
AwaitConnection(ctx context.Context, gatewayID string) error
}

// Signer implementation needs to be provided by a GatewayConnector user (node)
Expand Down Expand Up @@ -78,12 +79,30 @@ func (c *gatewayConnector) HealthReport() map[string]error {
func (c *gatewayConnector) Name() string { return c.lggr.Name() }

type gatewayState struct {
// signal channel is closed once the gateway is connected
signalCh chan struct{}

conn network.WSConnectionWrapper
config ConnectorGatewayConfig
url *url.URL
wsClient network.WebSocketClient
}

// A gatewayState is connected when the signal channel is closed
func (gs *gatewayState) signal() {
close(gs.signalCh)
}

// awaitConn blocks until the gateway is connected or the context is done
func (gs *gatewayState) awaitConn(ctx context.Context) error {
select {
case <-ctx.Done():
return fmt.Errorf("await connection failed: %w", ctx.Err())
case <-gs.signalCh:
return nil
}
}

func NewGatewayConnector(config *ConnectorConfig, signer Signer, clock clockwork.Clock, lggr logger.Logger) (GatewayConnector, error) {
if config == nil || signer == nil || clock == nil || lggr == nil {
return nil, errors.New("nil dependency")
Expand Down Expand Up @@ -125,6 +144,7 @@ func NewGatewayConnector(config *ConnectorConfig, signer Signer, clock clockwork
config: gw,
url: parsedURL,
wsClient: network.NewWebSocketClient(config.WsClientConfig, connector, lggr),
signalCh: make(chan struct{}),
}
gateways[gw.Id] = gateway
urlToId[gw.URL] = gw.Id
Expand All @@ -150,17 +170,25 @@ func (c *gatewayConnector) AddHandler(methods []string, handler GatewayConnector
return nil
}

func (c *gatewayConnector) SendToGateway(ctx context.Context, gatewayId string, msg *api.Message) error {
func (c *gatewayConnector) AwaitConnection(ctx context.Context, gatewayID string) error {
gateway, ok := c.gateways[gatewayID]
if !ok {
return fmt.Errorf("invalid Gateway ID %s", gatewayID)
}
return gateway.awaitConn(ctx)
}

func (c *gatewayConnector) SendToGateway(ctx context.Context, gatewayID string, msg *api.Message) error {
data, err := c.codec.EncodeResponse(msg)
if err != nil {
return fmt.Errorf("error encoding response for gateway %s: %v", gatewayId, err)
return fmt.Errorf("error encoding response for gateway %s: %w", gatewayID, err)
}
gateway, ok := c.gateways[gatewayId]
gateway, ok := c.gateways[gatewayID]
if !ok {
return fmt.Errorf("invalid Gateway ID %s", gatewayId)
return fmt.Errorf("invalid Gateway ID %s", gatewayID)
}
if gateway.conn == nil {
return fmt.Errorf("connector not started")
return errors.New("connector not started")
}
return gateway.conn.Write(ctx, websocket.BinaryMessage, data)
}
Expand Down Expand Up @@ -242,10 +270,15 @@ func (c *gatewayConnector) reconnectLoop(gatewayState *gatewayState) {
} else {
c.lggr.Infow("connected successfully", "url", gatewayState.url)
closeCh := gatewayState.conn.Reset(conn)
gatewayState.signal()
<-closeCh
c.lggr.Infow("connection closed", "url", gatewayState.url)

// reset backoff
redialBackoff = utils.NewRedialBackoff()

// reset signal channel
gatewayState.signalCh = make(chan struct{})
}
select {
case <-c.shutdownCh:
Expand Down
63 changes: 55 additions & 8 deletions core/services/gateway/connector/mocks/gateway_connector.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions core/services/workflows/syncer/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
gcmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/utils/matches"
)

type wrapper struct {
Expand Down Expand Up @@ -48,6 +49,7 @@ func TestNewFetcherService(t *testing.T) {
fetcher.och.HandleGatewayMessage(ctx, "gateway1", gatewayResp)
}).Return(nil).Times(1)
connector.EXPECT().DonID().Return("don-id")
connector.EXPECT().AwaitConnection(matches.AnyContext, "gateway1").Return(nil)
connector.EXPECT().GatewayIDs().Return([]string{"gateway1", "gateway2"})

payload, err := fetcher.Fetch(ctx, url)
Expand Down

0 comments on commit fa890e8

Please sign in to comment.