diff --git a/test/e2e/fixtures.go b/test/e2e/fixtures.go index f2961537181..55297af4d5d 100644 --- a/test/e2e/fixtures.go +++ b/test/e2e/fixtures.go @@ -285,13 +285,7 @@ func setupTest(tb testing.TB) (*TestData, error) { return testData, nil } -func setupTestForFlowAggregator(tb testing.TB, o flowVisibilityTestOptions) (*TestData, bool, bool, error) { - v4Enabled := clusterInfo.podV4NetworkCIDR != "" - v6Enabled := clusterInfo.podV6NetworkCIDR != "" - testData, err := setupTest(tb) - if err != nil { - return testData, v4Enabled, v6Enabled, err - } +func setupFlowAggregator(tb testing.TB, testData *TestData, o flowVisibilityTestOptions) error { // Create pod using ipfix collector image if err := NewPodBuilder("ipfix-collector", testData.testNamespace, ipfixCollectorImage).InHostNetwork().Create(testData); err != nil { tb.Errorf("Error when creating the ipfix collector Pod: %v", err) @@ -299,10 +293,10 @@ func setupTestForFlowAggregator(tb testing.TB, o flowVisibilityTestOptions) (*Te ipfixCollectorIP, err := testData.podWaitForIPs(defaultTimeout, "ipfix-collector", testData.testNamespace) if err != nil || len(ipfixCollectorIP.IPStrings) == 0 { tb.Errorf("Error when waiting to get ipfix collector Pod IP: %v", err) - return nil, v4Enabled, v6Enabled, err + return err } var ipStr string - if v6Enabled && ipfixCollectorIP.IPv6 != nil { + if isIPv6Enabled() && ipfixCollectorIP.IPv6 != nil { ipStr = ipfixCollectorIP.IPv6.String() } else { ipStr = ipfixCollectorIP.IPv4.String() @@ -312,17 +306,17 @@ func setupTestForFlowAggregator(tb testing.TB, o flowVisibilityTestOptions) (*Te tb.Logf("Deploying ClickHouse") chSvcIP, err := testData.deployFlowVisibilityClickHouse(o) if err != nil { - return testData, v4Enabled, v6Enabled, err + return err } tb.Logf("ClickHouse Service created with ClusterIP: %v", chSvcIP) tb.Logf("Applying flow aggregator YAML with ipfix collector: %s and clickHouse enabled", ipfixCollectorAddr) if err := testData.deployFlowAggregator(ipfixCollectorAddr, o); err != nil { - return testData, v4Enabled, v6Enabled, err + return err } - return testData, v4Enabled, v6Enabled, nil + return nil } func exportLogsForSubtest(tb testing.TB, data *TestData) func() { diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index 9977a5c756c..24bae0af777 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -178,6 +178,28 @@ type IPFIXCollectorResponse struct { FlowRecords []string `json:"flowRecords"` } +func setupFlowAggregatorTest(t *testing.T, options flowVisibilityTestOptions) (*TestData, bool, bool) { + teardownFuncs := make([]func(), 0) + t.Cleanup(func() { + for _, fn := range teardownFuncs { + fn() + } + }) + data, err := setupTest(t) + if err != nil { + t.Fatalf("Error when setting up test: %v", err) + } + teardownFuncs = append(teardownFuncs, func() { teardownTest(t, data) }) + err = setupFlowAggregator(t, data, options) + if err != nil { + t.Fatalf("Error when setting up FlowAggregator: %v", err) + } + // Execute teardownFlowAggregator later than teardownTest to ensure that the logs of Flow + // Aggregator has been exported. + teardownFuncs = append(teardownFuncs, func() { teardownFlowAggregator(t, data) }) + return data, isIPv4Enabled(), isIPv6Enabled() +} + func TestFlowAggregatorSecureConnection(t *testing.T) { skipIfNotFlowVisibilityTest(t) skipIfHasWindowsNodes(t) @@ -215,17 +237,9 @@ func TestFlowAggregatorSecureConnection(t *testing.T) { }, } for _, o := range testCases { - data, v4Enabled, v6Enabled, err := setupTestForFlowAggregator(t, o.flowVisibilityTestOptions) - if err != nil { - t.Fatalf("Error when setting up test: %v", err) - } t.Run(o.name, func(t *testing.T) { - defer func() { - teardownTest(t, data) - // Execute teardownFlowAggregator later than teardownTest to ensure that the log - // of Flow Aggregator has been exported. - teardownFlowAggregator(t, data) - }() + var err error + data, v4Enabled, v6Enabled := setupFlowAggregatorTest(t, o.flowVisibilityTestOptions) podAIPs, podBIPs, _, _, _, err = createPerftestPods(data) if err != nil { t.Fatalf("Error when creating perftest Pods: %v", err) @@ -244,21 +258,13 @@ func TestFlowAggregator(t *testing.T) { skipIfNotFlowVisibilityTest(t) skipIfHasWindowsNodes(t) - data, v4Enabled, v6Enabled, err := setupTestForFlowAggregator(t, flowVisibilityTestOptions{ + var err error + data, v4Enabled, v6Enabled := setupFlowAggregatorTest(t, flowVisibilityTestOptions{ databaseURL: defaultCHDatabaseURL, }) - if err != nil { - t.Fatalf("Error when setting up test: %v", err) - } if err := getAndCheckFlowAggregatorMetrics(t, data); err != nil { t.Fatalf("Error when checking metrics of Flow Aggregator: %v", err) } - defer func() { - teardownTest(t, data) - // Execute teardownFlowAggregator later than teardownTest to ensure that the log - // of Flow Aggregator has been exported. - teardownFlowAggregator(t, data) - }() k8sUtils, err = NewKubernetesUtils(data) if err != nil { diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 39266a4a01f..2335698931d 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -431,6 +431,16 @@ func nodeIP(idx int) string { return node.ip() } +// isIPv4Enabled returns true if and only if IPv4 is enabled in the cluster. +func isIPv4Enabled() bool { + return clusterInfo.podV4NetworkCIDR != "" +} + +// isIPv6Enabled returns true if and only if IPv6 is enabled in the cluster. +func isIPv6Enabled() bool { + return clusterInfo.podV6NetworkCIDR != "" +} + func labelNodeRoleControlPlane() string { // TODO: return labelNodeRoleControlPlane unconditionally when the min K8s version // requirement to run Antrea becomes K8s v1.20 @@ -889,7 +899,7 @@ func (data *TestData) deployFlowVisibilityClickHouse(o flowVisibilityTestOptions } // check for clickhouse pod Ready. Wait for 2x timeout as ch operator needs to be running first to handle chi - if err = data.podWaitForReady(2*defaultTimeout, flowVisibilityCHPodName, flowVisibilityNamespace); err != nil { + if err := data.podWaitForReady(2*defaultTimeout, flowVisibilityCHPodName, flowVisibilityNamespace); err != nil { return "", err } @@ -903,21 +913,30 @@ func (data *TestData) deployFlowVisibilityClickHouse(o flowVisibilityTestOptions return true, nil } }); err != nil { - return "", fmt.Errorf("timeout waiting for ClickHouse Service: %v", err) + return "", fmt.Errorf("timeout waiting for ClickHouse Service: %w", err) } + const probePodName = "ch-svc-probe" + if err := NewPodBuilder(probePodName, flowVisibilityNamespace, agnhostImage).Create(testData); err != nil { + return "", fmt.Errorf("failed to create ClickHouse Service probe Pod: %w", err) + } + defer testData.DeletePod(flowVisibilityNamespace, probePodName) + if err := data.podWaitForReady(defaultTimeout, probePodName, flowVisibilityNamespace); err != nil { + return "", err + } + + cmd := []string{"/agnhost", "connect", net.JoinHostPort(chSvc.Spec.ClusterIP, clickHouseHTTPPort), "--timeout=5s"} if err := wait.PollUntilContextTimeout(context.TODO(), defaultInterval, defaultTimeout, true, func(ctx context.Context) (bool, error) { - rc, stdout, stderr, err := testData.RunCommandOnNode(controlPlaneNodeName(), - fmt.Sprintf("curl -Ss %s:%s", chSvc.Spec.ClusterIP, clickHouseHTTPPort)) - if rc != 0 || err != nil { - log.Infof("Failed to curl clickhouse Service: %s", strings.Trim(stderr, "\n")) + _, stderr, err := testData.RunCommandFromPod(flowVisibilityNamespace, probePodName, agnhostContainerName, cmd) + if err != nil { + log.Infof("Failed to connnect to clickhouse Service, err: %v, stderr: %s", err, strings.Trim(stderr, "\n")) return false, nil } else { - log.Infof("Successfully curl'ed clickhouse Service: %s", strings.Trim(stdout, "\n")) + log.Infof("Successfully connected to clickhouse Service") return true, nil } }); err != nil { - return "", fmt.Errorf("timeout checking http port connectivity of clickhouse service: %v", err) + return "", fmt.Errorf("timeout checking http port connectivity of clickhouse service: %w", err) } return chSvc.Spec.ClusterIP, nil