Skip to content

Commit

Permalink
[Flaky unit test] Fix TestEndpointResolver (#6461)
Browse files Browse the repository at this point in the history
Wait for informers to have synced before creating new runtime objects,
to avoid the race condition between List and Watch with the fake
clientset.

Fixes #6460

Signed-off-by: Antonin Bas <[email protected]>
  • Loading branch information
antoninbas authored Jun 19, 2024
1 parent 4926f2d commit c1b514f
Showing 1 changed file with 13 additions and 5 deletions.
18 changes: 13 additions & 5 deletions pkg/agent/client/endpoint_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
)

const (
Expand Down Expand Up @@ -92,18 +93,25 @@ func getEndpointURL(ip string) *url.URL {
}
}

func runTestEndpointResolver(ctx context.Context, objects ...runtime.Object) (*fake.Clientset, *EndpointResolver) {
func runTestEndpointResolver(ctx context.Context, t *testing.T, objects ...runtime.Object) (*fake.Clientset, *EndpointResolver) {
k8sClient := fake.NewSimpleClientset(objects...)
resolver := NewEndpointResolver(k8sClient, testNamespace, testServiceName, testServicePort)
go resolver.Run(ctx)
// Wait for informers to sync to avoid race condition between List and Watch with the fake clientset.
// Note that we cannot call resolver.informerFactory.WaitForCacheSync instead, as that only
// waits until *started* informers' caches have been synced, and at this point
// resolver.informerFactory.Start may not have been called yet.
// We also check the return value of cache.WaitForCacheSync even though it should only be
// true if the context was cancelled. which should not happen in our test cases.
require.True(t, cache.WaitForCacheSync(ctx.Done(), resolver.serviceListerSynced, resolver.endpointsListerSynced))
return k8sClient, resolver
}

func TestEndpointResolver(t *testing.T) {
t.Run("add Service and Endpoints", func(t *testing.T) {
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
k8sClient, resolver := runTestEndpointResolver(ctx)
k8sClient, resolver := runTestEndpointResolver(ctx, t)
require.Nil(t, resolver.CurrentEndpointURL())
svc, endpoints := getTestObjects()
k8sClient.CoreV1().Services(testNamespace).Create(ctx, svc, metav1.CreateOptions{})
Expand All @@ -117,7 +125,7 @@ func TestEndpointResolver(t *testing.T) {
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
svc, endpoints := getTestObjects()
k8sClient, resolver := runTestEndpointResolver(ctx, svc, endpoints)
k8sClient, resolver := runTestEndpointResolver(ctx, t, svc, endpoints)
assert.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Equal(t, getEndpointURL(testEndpointIP1), resolver.CurrentEndpointURL())
}, 2*time.Second, 50*time.Millisecond)
Expand All @@ -132,7 +140,7 @@ func TestEndpointResolver(t *testing.T) {
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
svc, endpoints := getTestObjects()
k8sClient, resolver := runTestEndpointResolver(ctx, svc, endpoints)
k8sClient, resolver := runTestEndpointResolver(ctx, t, svc, endpoints)
assert.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Equal(t, getEndpointURL(testEndpointIP1), resolver.CurrentEndpointURL())
}, 2*time.Second, 50*time.Millisecond)
Expand All @@ -147,7 +155,7 @@ func TestEndpointResolver(t *testing.T) {
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
svc, endpoints := getTestObjects()
k8sClient, resolver := runTestEndpointResolver(ctx, svc, endpoints)
k8sClient, resolver := runTestEndpointResolver(ctx, t, svc, endpoints)
assert.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Equal(t, getEndpointURL(testEndpointIP1), resolver.CurrentEndpointURL())
}, 2*time.Second, 50*time.Millisecond)
Expand Down

0 comments on commit c1b514f

Please sign in to comment.