Skip to content

Commit

Permalink
chore: fix jetstream flaky test (kedacore#5372)
Browse files Browse the repository at this point in the history
* chore: fix jetstream flaky test

Signed-off-by: Jorge Turrado <[email protected]>

* use specific instances instead of default client

Signed-off-by: Jorge Turrado <[email protected]>

* Update mock responses

Signed-off-by: Jorge Turrado <[email protected]>

---------

Signed-off-by: Jorge Turrado <[email protected]>
Signed-off-by: Siva Guruvareddiar <[email protected]>
  • Loading branch information
JorTurFer authored and sguruvar committed Jan 15, 2024
1 parent a37620e commit f58f9a6
Showing 1 changed file with 17 additions and 25 deletions.
42 changes: 17 additions & 25 deletions pkg/scalers/nats_jetstream_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,7 @@ var testNATSJetStreamMockResponses = []parseNATSJetStreamMockResponsesTestData{
}

var testNATSJetStreamServerMockResponses = map[string][]byte{
"not-leader-1.localhost:8222": []byte(`{"server_name": "not-leader-1", "cluster": {"urls": ["leader.localhost.nats.svc:8222", "not-leader-2.localhost.nats.svc:8222"]}}`),
"not-leader-2.localhost:8222": []byte(`{"server_name": "not-leader-2", "cluster": {"urls": ["leader.localhost.nats.svc:8222", "not-leader-1.localhost.nats.svc:8222"]}}`),
"leader.localhost:8222": []byte(`{"server_name": "leader", "cluster": {"urls": ["not-leader-1.localhost.nats.svc:8222", "not-leader-2.localhost.nats.svc:8222"]}}`),
"localhost:8222": []byte(`{"server_name": "leader", "cluster": {"urls": ["leader.localhost.nats.svc:8222","not-leader-1.localhost.nats.svc:8222", "not-leader-2.localhost.nats.svc:8222"]}}`),
}

func TestNATSJetStreamIsActive(t *testing.T) {
Expand All @@ -234,8 +232,7 @@ func TestNATSJetStreamIsActive(t *testing.T) {
t.Fatal("Could not parse mock response struct:", err)
}

srv := natsMockHTTPJetStreamServer(t, mockResponseJSON)
defer srv.Close()
client, srv := natsMockHTTPJetStreamServer(t, mockResponseJSON)

ctx := context.Background()
meta, err := parseNATSJetStreamMetadata(&ScalerConfig{TriggerMetadata: mockResponse.metadata.metadataTestData.metadata, TriggerIndex: mockResponse.metadata.triggerIndex})
Expand All @@ -246,7 +243,7 @@ func TestNATSJetStreamIsActive(t *testing.T) {
mockJetStreamScaler := natsJetStreamScaler{
stream: nil,
metadata: meta,
httpClient: http.DefaultClient,
httpClient: client,
logger: InitializeLogger(&ScalerConfig{TriggerMetadata: mockResponse.metadata.metadataTestData.metadata, TriggerIndex: mockResponse.metadata.triggerIndex}, "nats_jetstream_scaler"),
}

Expand Down Expand Up @@ -285,12 +282,7 @@ func TestNATSJetStreamGetMetrics(t *testing.T) {
t.Fatal("Could not parse mock response struct:", err)
}

tr := http.DefaultTransport.(*http.Transport).Clone()
srv := natsMockHTTPJetStreamServer(t, mockResponseJSON)
defer func() {
srv.Close()
http.DefaultTransport = tr
}()
client, srv := natsMockHTTPJetStreamServer(t, mockResponseJSON)

ctx := context.Background()
meta, err := parseNATSJetStreamMetadata(&ScalerConfig{TriggerMetadata: mockResponse.metadata.metadataTestData.metadata, TriggerIndex: mockResponse.metadata.triggerIndex})
Expand All @@ -301,7 +293,7 @@ func TestNATSJetStreamGetMetrics(t *testing.T) {
mockJetStreamScaler := natsJetStreamScaler{
stream: nil,
metadata: meta,
httpClient: http.DefaultClient,
httpClient: client,
logger: InitializeLogger(&ScalerConfig{TriggerMetadata: mockResponse.metadata.metadataTestData.metadata, TriggerIndex: mockResponse.metadata.triggerIndex}, "nats_jetstream_scaler"),
}

Expand All @@ -316,14 +308,17 @@ func TestNATSJetStreamGetMetrics(t *testing.T) {
}
}

func natsMockHTTPJetStreamServer(t *testing.T, mockResponseJSON []byte) *httptest.Server {
func natsMockHTTPJetStreamServer(t *testing.T, mockResponseJSON []byte) (*http.Client, *httptest.Server) {
dialer := &net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}

// redirect leader.localhost for the clustered test
http.DefaultTransport.(*http.Transport).DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
client := &http.Client{
Transport: &http.Transport{},
}
client.Transport.(*http.Transport).DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
if strings.HasSuffix(addr, ".localhost:8222") {
addr = "127.0.0.1:8222"
}
Expand Down Expand Up @@ -367,16 +362,13 @@ func natsMockHTTPJetStreamServer(t *testing.T, mockResponseJSON []byte) *httptes
srv.Listener = l
srv.Start()

return srv
return client, srv
}

func TestNATSJetStreamgetNATSJetstreamMonitoringData(t *testing.T) {
tr := http.DefaultTransport.(*http.Transport).Clone()

invalidJSONServer := natsMockHTTPJetStreamServer(t, []byte(`{invalidJSON}`))
client, invalidJSONServer := natsMockHTTPJetStreamServer(t, []byte(`{invalidJSON}`))
defer func() {
invalidJSONServer.Close()
http.DefaultTransport = tr
}()

ctx := context.Background()
Expand All @@ -388,7 +380,7 @@ func TestNATSJetStreamgetNATSJetstreamMonitoringData(t *testing.T) {
mockJetStreamScaler := natsJetStreamScaler{
stream: nil,
metadata: meta,
httpClient: http.DefaultClient,
httpClient: client,
logger: InitializeLogger(&ScalerConfig{TriggerMetadata: testNATSJetStreamGoodMetadata, TriggerIndex: 0}, "nats_jetstream_scaler"),
}

Expand All @@ -399,7 +391,7 @@ func TestNATSJetStreamgetNATSJetstreamMonitoringData(t *testing.T) {
}

func TestNATSJetStreamGetNATSJetstreamNodeURL(t *testing.T) {
invalidJSONServer := natsMockHTTPJetStreamServer(t, []byte(`{invalidJSON}`))
client, invalidJSONServer := natsMockHTTPJetStreamServer(t, []byte(`{invalidJSON}`))
defer invalidJSONServer.Close()

meta, err := parseNATSJetStreamMetadata(&ScalerConfig{TriggerMetadata: testNATSJetStreamGoodMetadata, TriggerIndex: 0})
Expand All @@ -410,7 +402,7 @@ func TestNATSJetStreamGetNATSJetstreamNodeURL(t *testing.T) {
mockJetStreamScaler := natsJetStreamScaler{
stream: nil,
metadata: meta,
httpClient: http.DefaultClient,
httpClient: client,
logger: InitializeLogger(&ScalerConfig{TriggerMetadata: testNATSJetStreamGoodMetadata, TriggerIndex: 0}, "nats_jetstream_scaler"),
}

Expand All @@ -423,7 +415,7 @@ func TestNATSJetStreamGetNATSJetstreamNodeURL(t *testing.T) {
}

func TestNATSJetStreamGetNATSJetstreamServerURL(t *testing.T) {
invalidJSONServer := natsMockHTTPJetStreamServer(t, []byte(`{invalidJSON}`))
client, invalidJSONServer := natsMockHTTPJetStreamServer(t, []byte(`{invalidJSON}`))
defer invalidJSONServer.Close()

meta, err := parseNATSJetStreamMetadata(&ScalerConfig{TriggerMetadata: testNATSJetStreamGoodMetadata, TriggerIndex: 0})
Expand All @@ -434,7 +426,7 @@ func TestNATSJetStreamGetNATSJetstreamServerURL(t *testing.T) {
mockJetStreamScaler := natsJetStreamScaler{
stream: nil,
metadata: meta,
httpClient: http.DefaultClient,
httpClient: client,
logger: InitializeLogger(&ScalerConfig{TriggerMetadata: testNATSJetStreamGoodMetadata, TriggerIndex: 0}, "nats_jetstream_scaler"),
}

Expand Down

0 comments on commit f58f9a6

Please sign in to comment.