Skip to content

Commit

Permalink
Fix error logs for FlowAggregator e2e tests (#6730)
Browse files Browse the repository at this point in the history
When the IPFIX collector has not received the expected flow records, we
try to dump the actual list of records to facilitate
troubleshooting. However, the output is to large to be handled by the
testify assertions (and IMO it is too large to be useful for
troubleshooting). With this change, if the expected records are not
found in the collector output, we only print the 20 most recent records,
in a format that should be easier to leverage for troubleshooting.

Fixes #6478

Signed-off-by: Antonin Bas <[email protected]>
  • Loading branch information
antoninbas authored Oct 15, 2024
1 parent f5996af commit 9c7424a
Showing 1 changed file with 55 additions and 45 deletions.
100 changes: 55 additions & 45 deletions test/e2e/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,12 +1004,12 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri
}

func checkRecordsForFlowsCollector(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIPv6, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64, labelFilter string) {
collectorOutput, recordSlices := getCollectorOutput(t, srcIP, dstIP, srcPort, checkService, true, isIPv6, data, labelFilter)
records := getCollectorOutput(t, srcIP, dstIP, srcPort, checkService, true, isIPv6, data, labelFilter)
// Checking only data records as data records cannot be decoded without template
// record.
assert.GreaterOrEqualf(t, len(recordSlices), expectedNumDataRecords, "IPFIX collector should receive expected number of flow records. Considered records: %s \n Collector output: %s", recordSlices, collectorOutput)
assert.GreaterOrEqualf(t, len(records), expectedNumDataRecords, "IPFIX collector should receive expected number of flow records, filtered records: %v", records)
// Iterate over recordSlices and build some results to test with expected results
for _, record := range recordSlices {
for _, record := range records {
// Check if record has both Pod name of source and destination Pod.
if isIntraNode {
checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-b", controlPlaneNodeName(), data.testNamespace)
Expand Down Expand Up @@ -1151,8 +1151,8 @@ func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName st
}
stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, srcPodName, toolboxContainerName, strings.Fields(cmd))
require.NoErrorf(t, err, "Error when running wget command, stdout: %s, stderr: %s", stdout, stderr)
_, recordSlices := getCollectorOutput(t, srcIP, dstIP, "", false, false, isIPv6, data, labelFilter)
for _, record := range recordSlices {
records := getCollectorOutput(t, srcIP, dstIP, "", false, false, isIPv6, data, labelFilter)
for _, record := range records {
checkPodAndNodeData(t, record, srcPodName, srcNodeName, "", "", data.testNamespace)
checkFlowType(t, record, ipfixregistry.FlowTypeToExternal)
if egressName != "" {
Expand Down Expand Up @@ -1200,13 +1200,13 @@ func checkRecordsForDenyFlows(t *testing.T, data *TestData, testFlow1, testFlow2
}

func checkRecordsForDenyFlowsCollector(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP bool, labelFilter string) {
_, recordSlices1 := getCollectorOutput(t, testFlow1.srcIP, testFlow1.dstIP, "", false, false, isIPv6, data, labelFilter)
_, recordSlices2 := getCollectorOutput(t, testFlow2.srcIP, testFlow2.dstIP, "", false, false, isIPv6, data, labelFilter)
recordSlices := append(recordSlices1, recordSlices2...)
records1 := getCollectorOutput(t, testFlow1.srcIP, testFlow1.dstIP, "", false, false, isIPv6, data, labelFilter)
records2 := getCollectorOutput(t, testFlow2.srcIP, testFlow2.dstIP, "", false, false, isIPv6, data, labelFilter)
records := append(records1, records2...)
src_flow1, dst_flow1 := matchSrcAndDstAddress(testFlow1.srcIP, testFlow1.dstIP, false, isIPv6)
src_flow2, dst_flow2 := matchSrcAndDstAddress(testFlow2.srcIP, testFlow2.dstIP, false, isIPv6)
// Iterate over recordSlices and build some results to test with expected results
for _, record := range recordSlices {
// Iterate over records and build some results to test with expected results
for _, record := range records {
var srcPodName, dstPodName string
var checkDstSvc bool
if strings.Contains(record, src_flow1) && strings.Contains(record, dst_flow1) {
Expand Down Expand Up @@ -1276,9 +1276,9 @@ func checkRecordsForDenyFlowsCollector(t *testing.T, data *TestData, testFlow1,
func checkRecordsForDenyFlowsClickHouse(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP bool, labelFilter string) {
clickHouseRecords1 := getClickHouseOutput(t, data, testFlow1.srcIP, testFlow1.dstIP, "", false, false, labelFilter)
clickHouseRecords2 := getClickHouseOutput(t, data, testFlow2.srcIP, testFlow2.dstIP, "", false, false, labelFilter)
recordSlices := append(clickHouseRecords1, clickHouseRecords2...)
// Iterate over recordSlices and build some results to test with expected results
for _, record := range recordSlices {
records := append(clickHouseRecords1, clickHouseRecords2...)
// Iterate over records and build some results to test with expected results
for _, record := range records {
var srcPodName, dstPodName string
var checkDstSvc bool
if record.SourceIP == testFlow1.srcIP && (record.DestinationIP == testFlow1.dstIP || record.DestinationClusterIP == testFlow1.dstIP) {
Expand Down Expand Up @@ -1426,9 +1426,8 @@ func getUint64FieldFromRecord(t *testing.T, record string, field string) uint64
// received all the expected records for a given flow with source IP, destination IP
// and source port. We send source port to ignore the control flows during the
// iperf test.
func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService bool, checkAllRecords bool, isIPv6 bool, data *TestData, labelFilter string) (string, []string) {
var collectorOutput string
var recordSlices []string
func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService bool, lookForFlowEnd bool, isIPv6 bool, data *TestData, labelFilter string) []string {
var allRecords, records []string
// In the ToExternalFlows test, flow record will arrive 5.5s (exporterActiveFlowExportTimeout+aggregatorActiveFlowRecordTimeout) after executing wget command
// We set the timeout to 9s (5.5s plus one more aggregatorActiveFlowRecordTimeout) to make the ToExternalFlows test more stable
err := wait.PollUntilContextTimeout(context.Background(), 500*time.Millisecond, exporterActiveFlowExportTimeout+aggregatorActiveFlowRecordTimeout*2, true, func(ctx context.Context) (bool, error) {
Expand All @@ -1444,15 +1443,20 @@ func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService
} else {
cmd = fmt.Sprintf("curl http://[%s]:8080/records", ipfixCollectorIP.IPv6.String())
}
rc, collectorOutput, _, err = data.RunCommandOnNode(controlPlaneNodeName(), cmd)
rc, collectorOutput, _, err := data.RunCommandOnNode(controlPlaneNodeName(), cmd)
if err != nil || rc != 0 {
return false, err
return false, fmt.Errorf("failed to run curl command to retrieve flow records, rc: %d - err: %v", rc, err)
}
// Checking that all the data records which correspond to the iperf flow are received
src, dst := matchSrcAndDstAddress(srcIP, dstIP, isDstService, isIPv6)
recordSlices = getRecordsFromOutput(t, collectorOutput, labelFilter, src, dst, srcPort)
if checkAllRecords {
for _, record := range recordSlices {
var response IPFIXCollectorResponse
if err := json.Unmarshal([]byte(collectorOutput), &response); err != nil {
return false, fmt.Errorf("error when unmarshalling output from IPFIX collector Pod: %w", err)
}
allRecords = response.FlowRecords
records = filterCollectorRecords(allRecords, labelFilter, src, dst, srcPort)
if lookForFlowEnd {
for _, record := range records {
flowEndReason := int64(getUint64FieldFromRecord(t, record, "flowEndReason"))
// flowEndReason == 3 means the end of flow detected
if flowEndReason == 3 {
Expand All @@ -1461,17 +1465,25 @@ func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService
}
return false, nil
}
return len(recordSlices) != 0, nil
return len(records) > 0, nil
})
require.NoErrorf(t, err, "IPFIX collector did not receive the expected records in collector, recordSlices ares: %v, output: %v iperf source port: %s", recordSlices, collectorOutput, srcPort)
return collectorOutput, recordSlices
// In case of a timeout, print some debug information.
if err == context.DeadlineExceeded {
const numRecordsToPrint = 20
fmt.Printf("Last %d records received by IPFIX collector:\n", numRecordsToPrint)
for i := 0; i < len(records) && i < numRecordsToPrint; i++ {
fmt.Println(records[i])
}
}
require.NoErrorf(t, err, "IPFIX collector did not receive the expected records, source IP: %s, dest IP: %s, source port: %s, total records count: %d, filtered records count: %d", srcIP, dstIP, srcPort, len(allRecords), len(records))
return records
}

// getClickHouseOutput queries clickhouse with built-in client and checks if we have
// received all the expected records for a given flow with source IP, destination IP
// and source port. We send source port to ignore the control flows during the iperf test.
// Polling timeout is coded assuming IPFIX output has been checked first.
func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isDstService, checkAllRecords bool, labelFilter string) []*ClickHouseFullRow {
func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isDstService, lookForFlowEnd bool, labelFilter string) []*ClickHouseFullRow {
var flowRecords []*ClickHouseFullRow
var queryOutput string

Expand Down Expand Up @@ -1513,7 +1525,7 @@ func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort str
flowRecords = append(flowRecords, &flowRecord)
}

if checkAllRecords {
if lookForFlowEnd {
for _, record := range flowRecords {
// flowEndReason == 3 means the end of flow detected
if record.FlowEndReason == 3 {
Expand All @@ -1528,28 +1540,26 @@ func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort str
return flowRecords
}

func getRecordsFromOutput(t *testing.T, output, labelFilter, src, dst, srcPort string) []string {
var response IPFIXCollectorResponse
err := json.Unmarshal([]byte(output), &response)
if err != nil {
require.NoErrorf(t, err, "error when unmarshall output from IPFIX collector Pod")
}
recordSlices := response.FlowRecords
records := []string{}
for _, recordSlice := range recordSlices {
func filterCollectorRecords(records []string, filters ...string) []string {
filteredRecords := []string{}
match := func(record string) bool {
// We don't check the last record.
if strings.Contains(recordSlice, "octetDeltaCount: 0") {
continue
if strings.Contains(record, "octetDeltaCount: 0") {
return false
}
// We don't check the record that can't match the srcIP, dstIP and srcPort.
if !strings.Contains(recordSlice, src) || !strings.Contains(recordSlice, dst) || !strings.Contains(recordSlice, srcPort) {
continue
for _, filter := range filters {
if filter != "" && !strings.Contains(record, filter) {
return false
}
}
if labelFilter == "" || strings.Contains(recordSlice, labelFilter) {
records = append(records, recordSlice)
return true
}
for _, record := range records {
if match(record) {
filteredRecords = append(filteredRecords, record)
}
}
return records
return filteredRecords
}

func deployK8sNetworkPolicies(t *testing.T, data *TestData, srcPod, dstPod string) (np1 *networkingv1.NetworkPolicy, np2 *networkingv1.NetworkPolicy) {
Expand Down Expand Up @@ -1908,8 +1918,8 @@ func testL7FlowExporterController(t *testing.T, data *TestData, isIPv6 bool) {
cmd := []string{"curl", getHTTPURLFromIPPort(testFlow1.dstIP, serverPodPort)}
stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, testFlow1.srcPodName, "l7flowexporter", cmd)
require.NoErrorf(t, err, "Error when running curl command, stdout: %s, stderr: %s", stdout, stderr)
_, recordSlices := getCollectorOutput(t, testFlow1.srcIP, testFlow1.dstIP, "", false, true, isIPv6, data, "")
for _, record := range recordSlices {
records := getCollectorOutput(t, testFlow1.srcIP, testFlow1.dstIP, "", false, true, isIPv6, data, "")
for _, record := range records {
assert := assert.New(t)
assert.Contains(record, testFlow1.srcPodName, "Record with srcIP does not have Pod name: %s", testFlow1.srcPodName)
assert.Contains(record, fmt.Sprintf("sourcePodNamespace: %s", data.testNamespace), "Record does not have correct sourcePodNamespace: %s", data.testNamespace)
Expand Down

0 comments on commit 9c7424a

Please sign in to comment.