diff --git a/pkg/agent/client/endpoint_resolver_test.go b/pkg/agent/client/endpoint_resolver_test.go index e3fc6017667..30bb757ad3b 100644 --- a/pkg/agent/client/endpoint_resolver_test.go +++ b/pkg/agent/client/endpoint_resolver_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" ) const ( @@ -92,10 +93,17 @@ func getEndpointURL(ip string) *url.URL { } } -func runTestEndpointResolver(ctx context.Context, objects ...runtime.Object) (*fake.Clientset, *EndpointResolver) { +func runTestEndpointResolver(ctx context.Context, t *testing.T, objects ...runtime.Object) (*fake.Clientset, *EndpointResolver) { k8sClient := fake.NewSimpleClientset(objects...) resolver := NewEndpointResolver(k8sClient, testNamespace, testServiceName, testServicePort) go resolver.Run(ctx) + // Wait for informers to sync to avoid race condition between List and Watch with the fake clientset. + // Note that we cannot call resolver.informerFactory.WaitForCacheSync instead, as that only + // waits until *started* informers' caches have been synced, and at this point + // resolver.informerFactory.Start may not have been called yet. + // We also check the return value of cache.WaitForCacheSync even though it should only be + // true if the context was cancelled. which should not happen in our test cases. + require.True(t, cache.WaitForCacheSync(ctx.Done(), resolver.serviceListerSynced, resolver.endpointsListerSynced)) return k8sClient, resolver } @@ -103,7 +111,7 @@ func TestEndpointResolver(t *testing.T) { t.Run("add Service and Endpoints", func(t *testing.T) { ctx, cancelFn := context.WithCancel(context.Background()) defer cancelFn() - k8sClient, resolver := runTestEndpointResolver(ctx) + k8sClient, resolver := runTestEndpointResolver(ctx, t) require.Nil(t, resolver.CurrentEndpointURL()) svc, endpoints := getTestObjects() k8sClient.CoreV1().Services(testNamespace).Create(ctx, svc, metav1.CreateOptions{}) @@ -117,7 +125,7 @@ func TestEndpointResolver(t *testing.T) { ctx, cancelFn := context.WithCancel(context.Background()) defer cancelFn() svc, endpoints := getTestObjects() - k8sClient, resolver := runTestEndpointResolver(ctx, svc, endpoints) + k8sClient, resolver := runTestEndpointResolver(ctx, t, svc, endpoints) assert.EventuallyWithT(t, func(t *assert.CollectT) { assert.Equal(t, getEndpointURL(testEndpointIP1), resolver.CurrentEndpointURL()) }, 2*time.Second, 50*time.Millisecond) @@ -132,7 +140,7 @@ func TestEndpointResolver(t *testing.T) { ctx, cancelFn := context.WithCancel(context.Background()) defer cancelFn() svc, endpoints := getTestObjects() - k8sClient, resolver := runTestEndpointResolver(ctx, svc, endpoints) + k8sClient, resolver := runTestEndpointResolver(ctx, t, svc, endpoints) assert.EventuallyWithT(t, func(t *assert.CollectT) { assert.Equal(t, getEndpointURL(testEndpointIP1), resolver.CurrentEndpointURL()) }, 2*time.Second, 50*time.Millisecond) @@ -147,7 +155,7 @@ func TestEndpointResolver(t *testing.T) { ctx, cancelFn := context.WithCancel(context.Background()) defer cancelFn() svc, endpoints := getTestObjects() - k8sClient, resolver := runTestEndpointResolver(ctx, svc, endpoints) + k8sClient, resolver := runTestEndpointResolver(ctx, t, svc, endpoints) assert.EventuallyWithT(t, func(t *assert.CollectT) { assert.Equal(t, getEndpointURL(testEndpointIP1), resolver.CurrentEndpointURL()) }, 2*time.Second, 50*time.Millisecond)