Skip to content

Commit

Permalink
[FlowAggregator e2e tests] Collect logs when setup fails (#6444)
Browse files Browse the repository at this point in the history
* Collect logs when setup fails

Logs were not being collected in case of failure while setting up the
FLow Aggregator and its "dependencies" (e.g., ClickHouse). We make sure
that logs are now collected, making it possible to troubleshoot such
test failures

* Use agnhost to check connectivity to ClickHouse service

---------

Signed-off-by: Antonin Bas <[email protected]>
  • Loading branch information
antoninbas authored Jun 18, 2024
1 parent 5c1141e commit 4926f2d
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 40 deletions.
18 changes: 6 additions & 12 deletions test/e2e/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,24 +285,18 @@ func setupTest(tb testing.TB) (*TestData, error) {
return testData, nil
}

func setupTestForFlowAggregator(tb testing.TB, o flowVisibilityTestOptions) (*TestData, bool, bool, error) {
v4Enabled := clusterInfo.podV4NetworkCIDR != ""
v6Enabled := clusterInfo.podV6NetworkCIDR != ""
testData, err := setupTest(tb)
if err != nil {
return testData, v4Enabled, v6Enabled, err
}
func setupFlowAggregator(tb testing.TB, testData *TestData, o flowVisibilityTestOptions) error {
// Create pod using ipfix collector image
if err := NewPodBuilder("ipfix-collector", testData.testNamespace, ipfixCollectorImage).InHostNetwork().Create(testData); err != nil {
tb.Errorf("Error when creating the ipfix collector Pod: %v", err)
}
ipfixCollectorIP, err := testData.podWaitForIPs(defaultTimeout, "ipfix-collector", testData.testNamespace)
if err != nil || len(ipfixCollectorIP.IPStrings) == 0 {
tb.Errorf("Error when waiting to get ipfix collector Pod IP: %v", err)
return nil, v4Enabled, v6Enabled, err
return err
}
var ipStr string
if v6Enabled && ipfixCollectorIP.IPv6 != nil {
if isIPv6Enabled() && ipfixCollectorIP.IPv6 != nil {
ipStr = ipfixCollectorIP.IPv6.String()
} else {
ipStr = ipfixCollectorIP.IPv4.String()
Expand All @@ -312,17 +306,17 @@ func setupTestForFlowAggregator(tb testing.TB, o flowVisibilityTestOptions) (*Te
tb.Logf("Deploying ClickHouse")
chSvcIP, err := testData.deployFlowVisibilityClickHouse(o)
if err != nil {
return testData, v4Enabled, v6Enabled, err
return err
}
tb.Logf("ClickHouse Service created with ClusterIP: %v", chSvcIP)
tb.Logf("Applying flow aggregator YAML with ipfix collector: %s and clickHouse enabled",
ipfixCollectorAddr)

if err := testData.deployFlowAggregator(ipfixCollectorAddr, o); err != nil {
return testData, v4Enabled, v6Enabled, err
return err
}

return testData, v4Enabled, v6Enabled, nil
return nil
}

func exportLogsForSubtest(tb testing.TB, data *TestData) func() {
Expand Down
46 changes: 26 additions & 20 deletions test/e2e/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,28 @@ type IPFIXCollectorResponse struct {
FlowRecords []string `json:"flowRecords"`
}

func setupFlowAggregatorTest(t *testing.T, options flowVisibilityTestOptions) (*TestData, bool, bool) {
teardownFuncs := make([]func(), 0)
t.Cleanup(func() {
for _, fn := range teardownFuncs {
fn()
}
})
data, err := setupTest(t)
if err != nil {
t.Fatalf("Error when setting up test: %v", err)
}
teardownFuncs = append(teardownFuncs, func() { teardownTest(t, data) })
err = setupFlowAggregator(t, data, options)
if err != nil {
t.Fatalf("Error when setting up FlowAggregator: %v", err)
}
// Execute teardownFlowAggregator later than teardownTest to ensure that the logs of Flow
// Aggregator has been exported.
teardownFuncs = append(teardownFuncs, func() { teardownFlowAggregator(t, data) })
return data, isIPv4Enabled(), isIPv6Enabled()
}

func TestFlowAggregatorSecureConnection(t *testing.T) {
skipIfNotFlowVisibilityTest(t)
skipIfHasWindowsNodes(t)
Expand Down Expand Up @@ -215,17 +237,9 @@ func TestFlowAggregatorSecureConnection(t *testing.T) {
},
}
for _, o := range testCases {
data, v4Enabled, v6Enabled, err := setupTestForFlowAggregator(t, o.flowVisibilityTestOptions)
if err != nil {
t.Fatalf("Error when setting up test: %v", err)
}
t.Run(o.name, func(t *testing.T) {
defer func() {
teardownTest(t, data)
// Execute teardownFlowAggregator later than teardownTest to ensure that the log
// of Flow Aggregator has been exported.
teardownFlowAggregator(t, data)
}()
var err error
data, v4Enabled, v6Enabled := setupFlowAggregatorTest(t, o.flowVisibilityTestOptions)
podAIPs, podBIPs, _, _, _, err = createPerftestPods(data)
if err != nil {
t.Fatalf("Error when creating perftest Pods: %v", err)
Expand All @@ -244,21 +258,13 @@ func TestFlowAggregator(t *testing.T) {
skipIfNotFlowVisibilityTest(t)
skipIfHasWindowsNodes(t)

data, v4Enabled, v6Enabled, err := setupTestForFlowAggregator(t, flowVisibilityTestOptions{
var err error
data, v4Enabled, v6Enabled := setupFlowAggregatorTest(t, flowVisibilityTestOptions{
databaseURL: defaultCHDatabaseURL,
})
if err != nil {
t.Fatalf("Error when setting up test: %v", err)
}
if err := getAndCheckFlowAggregatorMetrics(t, data); err != nil {
t.Fatalf("Error when checking metrics of Flow Aggregator: %v", err)
}
defer func() {
teardownTest(t, data)
// Execute teardownFlowAggregator later than teardownTest to ensure that the log
// of Flow Aggregator has been exported.
teardownFlowAggregator(t, data)
}()

k8sUtils, err = NewKubernetesUtils(data)
if err != nil {
Expand Down
35 changes: 27 additions & 8 deletions test/e2e/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,16 @@ func nodeIP(idx int) string {
return node.ip()
}

// isIPv4Enabled returns true if and only if IPv4 is enabled in the cluster.
func isIPv4Enabled() bool {
return clusterInfo.podV4NetworkCIDR != ""
}

// isIPv6Enabled returns true if and only if IPv6 is enabled in the cluster.
func isIPv6Enabled() bool {
return clusterInfo.podV6NetworkCIDR != ""
}

func labelNodeRoleControlPlane() string {
// TODO: return labelNodeRoleControlPlane unconditionally when the min K8s version
// requirement to run Antrea becomes K8s v1.20
Expand Down Expand Up @@ -889,7 +899,7 @@ func (data *TestData) deployFlowVisibilityClickHouse(o flowVisibilityTestOptions
}

// check for clickhouse pod Ready. Wait for 2x timeout as ch operator needs to be running first to handle chi
if err = data.podWaitForReady(2*defaultTimeout, flowVisibilityCHPodName, flowVisibilityNamespace); err != nil {
if err := data.podWaitForReady(2*defaultTimeout, flowVisibilityCHPodName, flowVisibilityNamespace); err != nil {
return "", err
}

Expand All @@ -903,21 +913,30 @@ func (data *TestData) deployFlowVisibilityClickHouse(o flowVisibilityTestOptions
return true, nil
}
}); err != nil {
return "", fmt.Errorf("timeout waiting for ClickHouse Service: %v", err)
return "", fmt.Errorf("timeout waiting for ClickHouse Service: %w", err)
}

const probePodName = "ch-svc-probe"
if err := NewPodBuilder(probePodName, flowVisibilityNamespace, agnhostImage).Create(testData); err != nil {
return "", fmt.Errorf("failed to create ClickHouse Service probe Pod: %w", err)
}
defer testData.DeletePod(flowVisibilityNamespace, probePodName)
if err := data.podWaitForReady(defaultTimeout, probePodName, flowVisibilityNamespace); err != nil {
return "", err
}

cmd := []string{"/agnhost", "connect", net.JoinHostPort(chSvc.Spec.ClusterIP, clickHouseHTTPPort), "--timeout=5s"}
if err := wait.PollUntilContextTimeout(context.TODO(), defaultInterval, defaultTimeout, true, func(ctx context.Context) (bool, error) {
rc, stdout, stderr, err := testData.RunCommandOnNode(controlPlaneNodeName(),
fmt.Sprintf("curl -Ss %s:%s", chSvc.Spec.ClusterIP, clickHouseHTTPPort))
if rc != 0 || err != nil {
log.Infof("Failed to curl clickhouse Service: %s", strings.Trim(stderr, "\n"))
_, stderr, err := testData.RunCommandFromPod(flowVisibilityNamespace, probePodName, agnhostContainerName, cmd)
if err != nil {
log.Infof("Failed to connnect to clickhouse Service, err: %v, stderr: %s", err, strings.Trim(stderr, "\n"))
return false, nil
} else {
log.Infof("Successfully curl'ed clickhouse Service: %s", strings.Trim(stdout, "\n"))
log.Infof("Successfully connected to clickhouse Service")
return true, nil
}
}); err != nil {
return "", fmt.Errorf("timeout checking http port connectivity of clickhouse service: %v", err)
return "", fmt.Errorf("timeout checking http port connectivity of clickhouse service: %w", err)
}

return chSvc.Spec.ClusterIP, nil
Expand Down

0 comments on commit 4926f2d

Please sign in to comment.