From e98030f3b6ad255b1fabc85392e25526c4c5d3a2 Mon Sep 17 00:00:00 2001 From: Hemant Date: Fri, 11 Oct 2024 01:47:21 +0530 Subject: [PATCH 1/8] This patch enables the caching of TTLs for individual IPs in DNS responses. - Implement tracking of individual IP TTLs for FQDNs in the DNS cache. - Upon expiration of an IP's TTL, trigger a new DNS query. - Evict only those IPs that are no longer present in the latest DNS response and have exceeded their original TTL. Signed-off-by: Hemant --- pkg/agent/controller/networkpolicy/fqdn.go | 149 +++++++++------ .../controller/networkpolicy/fqdn_test.go | 178 +++++++++++++++++- 2 files changed, 270 insertions(+), 57 deletions(-) diff --git a/pkg/agent/controller/networkpolicy/fqdn.go b/pkg/agent/controller/networkpolicy/fqdn.go index 1a54290fc83..1cf8177ddaf 100644 --- a/pkg/agent/controller/networkpolicy/fqdn.go +++ b/pkg/agent/controller/networkpolicy/fqdn.go @@ -17,7 +17,6 @@ package networkpolicy import ( "context" "fmt" - "math" "net" "os" "regexp" @@ -76,11 +75,15 @@ func (fs *fqdnSelectorItem) matches(fqdn string) bool { // expirationTime of the records, which is the DNS response // receiving time plus lowest applicable TTL. type dnsMeta struct { - expirationTime time.Time // Key for responseIPs is the string representation of the IP. // It helps to quickly identify IP address updates when a // new DNS response is received. - responseIPs map[string]net.IP + responseIPs map[string]ipWithExpiration +} + +type ipWithExpiration struct { + ip net.IP + expirationTime time.Time } // subscriber is a entity that subsribes for datapath rule realization @@ -253,8 +256,8 @@ func (f *fqdnController) getIPsForFQDNSelectors(fqdns []string) []net.IP { } for fqdn := range fqdnsMatched { if dnsMeta, ok := f.dnsEntryCache[fqdn]; ok { - for _, ip := range dnsMeta.responseIPs { - matchedIPs = append(matchedIPs, ip) + for _, ipData := range dnsMeta.responseIPs { + matchedIPs = append(matchedIPs, ipData.ip) } } } @@ -405,72 +408,100 @@ func (f *fqdnController) deleteRuleSelectedPods(ruleID string) error { func (f *fqdnController) onDNSResponse( fqdn string, - responseIPs map[string]net.IP, - lowestTTL uint32, + newIPWithExpiration map[string]ipWithExpiration, waitCh chan error, ) { - if len(responseIPs) == 0 { + if len(newIPWithExpiration) == 0 { klog.V(4).InfoS("FQDN was not resolved to any addresses, skip updating DNS cache", "fqdn", fqdn) if waitCh != nil { waitCh <- nil } return } - // mustCacheResponse is only true if the FQDN is already tracked by this - // controller, or it matches at least one fqdnSelectorItem from the policy rules. - // addressUpdate is only true if there has been an update in IP addresses - // corresponded with the FQDN. - mustCacheResponse, addressUpdate := false, false - recordTTL := time.Now().Add(time.Duration(lowestTTL) * time.Second) + + addressUpdate := false + currentTime := time.Now() + ipWithExpirationMap := make(map[string]ipWithExpiration) + + // timeToReQuery establishes a maximum reference time for tracking the minimum re-query time to DNS, as IPs expire. + var timeToReQuery *time.Time + + updateIPWithExpiration := func(ip string, ipMeta ipWithExpiration) { + ipWithExpirationMap[ip] = ipMeta + if timeToReQuery == nil || ipMeta.expirationTime.Before(*timeToReQuery) { + timeToReQuery = &ipMeta.expirationTime + } + } f.fqdnSelectorMutex.Lock() defer f.fqdnSelectorMutex.Unlock() - oldDNSMeta, exist := f.dnsEntryCache[fqdn] + cachedDNSMeta, exist := f.dnsEntryCache[fqdn] if exist { - mustCacheResponse = true - for ipStr := range responseIPs { - if _, ok := oldDNSMeta.responseIPs[ipStr]; !ok { + // check for new IPs. + for newIPStr, newIPMeta := range newIPWithExpiration { + if _, exist := cachedDNSMeta.responseIPs[newIPStr]; !exist { + updateIPWithExpiration(newIPStr, newIPMeta) addressUpdate = true - break } } - for oldIPStr, oldIP := range oldDNSMeta.responseIPs { - if _, ok := responseIPs[oldIPStr]; !ok { - if oldDNSMeta.expirationTime.Before(time.Now()) { - // This IP entry has already expired and not seen in the latest DNS response. - // It should be removed from the cache. + + // check for presence of already cached IPs in the new response. + for cachedIPStr, cachedIPMeta := range cachedDNSMeta.responseIPs { + if newIPMeta, exist := newIPWithExpiration[cachedIPStr]; !exist { + // The IP was not found in current response. + if cachedIPMeta.expirationTime.Before(currentTime) { + // this IP is expired and stale, remove it by not including it but also signal an update to syncRules. addressUpdate = true } else { - // Add the unexpired IP entry to responseIP and update the lowest applicable TTL if needed. - responseIPs[oldIPStr] = oldIP - if oldDNSMeta.expirationTime.Before(recordTTL) { - recordTTL = oldDNSMeta.expirationTime - } + // It hasn't expired yet, so just retain it with its existing expirationTime. + updateIPWithExpiration(cachedIPStr, cachedIPMeta) } + } else { + // This already cached IP is part of the current response, so update it with max time between received time and its old cached time. + expTime := laterOf(newIPMeta.expirationTime, cachedIPMeta.expirationTime) + updateIPWithExpiration(cachedIPStr, ipWithExpiration{ + ip: cachedIPMeta.ip, + expirationTime: expTime, + }) } } + } else { + // First time seeing this domain. + // check if this needs to be tracked, by checking if it matches any Antrea FQDN policy selectors. + + // iterate over current rules mapping + addToCache := false for selectorItem := range f.selectorItemToRuleIDs { // Only track the FQDN if there is at least one fqdnSelectorItem matching it. if selectorItem.matches(fqdn) { - mustCacheResponse, addressUpdate = true, true + // A FQDN can have multiple selectorItems mapped, hence we do not break the loop upon a match, but + // keep iterating to create mapping of multiple selectorItems against same FQDN. + addToCache = true f.setFQDNMatchSelector(fqdn, selectorItem) } } + if addToCache { + for ipStr, ipMeta := range newIPWithExpiration { + updateIPWithExpiration(ipStr, ipMeta) + } + addressUpdate = true + } } - if mustCacheResponse { + + if len(ipWithExpirationMap) > 0 { f.dnsEntryCache[fqdn] = dnsMeta{ - expirationTime: recordTTL, - responseIPs: responseIPs, + responseIPs: ipWithExpirationMap, } - f.dnsQueryQueue.AddAfter(fqdn, recordTTL.Sub(time.Now())) + f.dnsQueryQueue.AddAfter(fqdn, timeToReQuery.Sub(currentTime)) } + f.syncDirtyRules(fqdn, waitCh, addressUpdate) } // onDNSResponseMsg handles a DNS response message intercepted. func (f *fqdnController) onDNSResponseMsg(dnsMsg *dns.Msg, waitCh chan error) { - fqdn, responseIPs, lowestTTL, err := f.parseDNSResponse(dnsMsg) + fqdn, responseIPs, err := f.parseDNSResponse(dnsMsg) if err != nil { klog.V(2).InfoS("Failed to parse DNS response") if waitCh != nil { @@ -478,7 +509,7 @@ func (f *fqdnController) onDNSResponseMsg(dnsMsg *dns.Msg, waitCh chan error) { } return } - f.onDNSResponse(fqdn, responseIPs, lowestTTL, waitCh) + f.onDNSResponse(fqdn, responseIPs, waitCh) } // syncDirtyRules triggers rule syncs for rules that are affected by the FQDN of DNS response @@ -594,38 +625,39 @@ func (f *fqdnController) runRuleSyncTracker(stopCh <-chan struct{}) { } // parseDNSResponse returns the FQDN, IP query result and lowest applicable TTL of a DNS response. -func (f *fqdnController) parseDNSResponse(msg *dns.Msg) (string, map[string]net.IP, uint32, error) { +func (f *fqdnController) parseDNSResponse(msg *dns.Msg) (string, map[string]ipWithExpiration, error) { if len(msg.Question) == 0 { - return "", nil, 0, fmt.Errorf("invalid DNS message") + return "", nil, fmt.Errorf("invalid DNS message") } fqdn := strings.ToLower(msg.Question[0].Name) - lowestTTL := uint32(math.MaxUint32) // a TTL must exist in the RRs - responseIPs := map[string]net.IP{} + responseIPs := map[string]ipWithExpiration{} + currentTime := time.Now() for _, ans := range msg.Answer { switch r := ans.(type) { case *dns.A: if f.ipv4Enabled { - responseIPs[r.A.String()] = r.A - if r.Header().Ttl < lowestTTL { - lowestTTL = r.Header().Ttl + responseIPs[r.A.String()] = ipWithExpiration{ + ip: r.A, + expirationTime: currentTime.Add(time.Duration(r.Header().Ttl) * time.Second), } + } case *dns.AAAA: if f.ipv6Enabled { - responseIPs[r.AAAA.String()] = r.AAAA - if r.Header().Ttl < lowestTTL { - lowestTTL = r.Header().Ttl + responseIPs[r.AAAA.String()] = ipWithExpiration{ + ip: r.AAAA, + expirationTime: currentTime.Add(time.Duration(r.Header().Ttl) * time.Second), } } } } if len(responseIPs) > 0 { - klog.V(4).InfoS("Received DNS Packet with valid Answer", "IPs", responseIPs, "TTL", lowestTTL) + klog.V(4).InfoS("Received DNS Packet with valid Answer", "IPs", responseIPs) } if strings.HasSuffix(fqdn, ".") { fqdn = fqdn[:len(fqdn)-1] } - return fqdn, responseIPs, lowestTTL, nil + return fqdn, responseIPs, nil } func (f *fqdnController) worker() { @@ -662,24 +694,27 @@ func (f *fqdnController) lookupIP(ctx context.Context, fqdn string) error { var errs []error - makeResponseIPs := func(ips []net.IP) map[string]net.IP { - responseIPs := make(map[string]net.IP) + makeResponseIPs := func(ips []net.IP) map[string]ipWithExpiration { + responseIPs := make(map[string]ipWithExpiration) for _, ip := range ips { - responseIPs[ip.String()] = ip + responseIPs[ip.String()] = ipWithExpiration{ + ip: ip, + expirationTime: time.Now().Add(time.Duration(defaultTTL) * time.Second), + } } return responseIPs } if f.ipv4Enabled { if ips, err := resolver.LookupIP(ctx, "ip4", fqdn); err == nil { - f.onDNSResponse(fqdn, makeResponseIPs(ips), defaultTTL, nil) + f.onDNSResponse(fqdn, makeResponseIPs(ips), nil) } else { errs = append(errs, fmt.Errorf("DNS request failed for IPv4: %w", err)) } } if f.ipv6Enabled { if ips, err := resolver.LookupIP(ctx, "ip6", fqdn); err == nil { - f.onDNSResponse(fqdn, makeResponseIPs(ips), defaultTTL, nil) + f.onDNSResponse(fqdn, makeResponseIPs(ips), nil) } else { errs = append(errs, fmt.Errorf("DNS request failed for IPv6: %w", err)) } @@ -818,3 +853,11 @@ func (f *fqdnController) HandlePacketIn(pktIn *ofctrl.PacketIn) error { return f.ofClient.ResumePausePacket(pktIn) } } + +// laterOf returns the later of the two given time.Time values. +func laterOf(t1, t2 time.Time) time.Time { + if t1.After(t2) { + return t1 + } + return t2 +} diff --git a/pkg/agent/controller/networkpolicy/fqdn_test.go b/pkg/agent/controller/networkpolicy/fqdn_test.go index 46bcffa53b0..602a3791b80 100644 --- a/pkg/agent/controller/networkpolicy/fqdn_test.go +++ b/pkg/agent/controller/networkpolicy/fqdn_test.go @@ -25,6 +25,8 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/workqueue" + clocktesting "k8s.io/utils/clock/testing" "antrea.io/antrea/pkg/agent/config" openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" @@ -402,10 +404,10 @@ func TestGetIPsForFQDNSelectors(t *testing.T) { }, existingDNSCache: map[string]dnsMeta{ "test.antrea.io": { - responseIPs: map[string]net.IP{ - "127.0.0.1": net.ParseIP("127.0.0.1"), - "192.155.12.1": net.ParseIP("192.155.12.1"), - "192.158.1.38": net.ParseIP("192.158.1.38"), + responseIPs: map[string]ipWithExpiration{ + "127.0.0.1": {net.ParseIP("127.0.0.1"), time.Now()}, + "192.155.12.1": {net.ParseIP("192.155.12.1"), time.Now()}, + "192.158.1.38": {net.ParseIP("192.158.1.38"), time.Now()}, }, }, }, @@ -584,3 +586,171 @@ func TestSyncDirtyRules(t *testing.T) { }) } } + +func TestOnDNSResponse(t *testing.T) { + testFQDN := "fqdn-test-pod.lfx.test" + selectorItem1 := fqdnSelectorItem{ + matchName: testFQDN, + } + selectorItem2 := fqdnSelectorItem{ + matchName: "random-domain.com", + } + fakeClock := clocktesting.NewFakeClock(time.Now()) + currentTime := fakeClock.Now() + + tests := []struct { + name string + existingDNSCache map[string]dnsMeta + dnsResponseIPs map[string]ipWithExpiration + expectedIPs map[string]ipWithExpiration + expectedItem string + mockSelectorToRuleIDs map[fqdnSelectorItem]sets.Set[string] + }{ + { + name: "new IP added", + existingDNSCache: map[string]dnsMeta{ + testFQDN: { + responseIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + "192.1.1.2": {ip: net.ParseIP("192.1.1.2"), expirationTime: currentTime.Add(6 * time.Second)}, + }, + }, + }, + dnsResponseIPs: map[string]ipWithExpiration{ + "192.1.1.3": {ip: net.ParseIP("192.1.1.3"), expirationTime: currentTime.Add(10 * time.Second)}, + }, + expectedIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + "192.1.1.2": {ip: net.ParseIP("192.1.1.2"), expirationTime: currentTime.Add(6 * time.Second)}, + "192.1.1.3": {ip: net.ParseIP("192.1.1.3"), expirationTime: currentTime.Add(10 * time.Second)}, + }, + expectedItem: testFQDN, + }, + { + name: "existing DNS cache not impacted", + existingDNSCache: map[string]dnsMeta{ + "fqdn-test-pod.lfx.test": { + responseIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + "192.1.1.2": {ip: net.ParseIP("192.1.1.2"), expirationTime: currentTime.Add(6 * time.Second)}, + }, + }, + }, + dnsResponseIPs: map[string]ipWithExpiration{}, + expectedIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + "192.1.1.2": {ip: net.ParseIP("192.1.1.2"), expirationTime: currentTime.Add(6 * time.Second)}, + }, + }, + { + name: "old IP resent in DNS response is retained with an updated TTL fetched from response", + existingDNSCache: map[string]dnsMeta{ + "fqdn-test-pod.lfx.test": { + responseIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + }, + }, + }, + dnsResponseIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(10 * time.Second)}, + }, + expectedIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(10 * time.Second)}, + }, + }, + { + name: "stale IP with expired TTL is evicted", + existingDNSCache: map[string]dnsMeta{ + "fqdn-test-pod.lfx.test": { + responseIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(-1 * time.Second)}, + }, + }, + }, + dnsResponseIPs: map[string]ipWithExpiration{ + "192.1.1.3": {ip: net.ParseIP("192.1.1.3"), expirationTime: currentTime.Add(5 * time.Second)}, + }, + expectedIPs: map[string]ipWithExpiration{ + "192.1.1.3": {ip: net.ParseIP("192.1.1.3"), expirationTime: currentTime.Add(5 * time.Second)}, + }, + expectedItem: testFQDN, + }, + { + name: "stale IP with unexpired TTL are retained", + existingDNSCache: map[string]dnsMeta{ + "fqdn-test-pod.lfx.test": { + responseIPs: map[string]ipWithExpiration{ + "192.1.1.2": {ip: net.ParseIP("192.1.1.2"), expirationTime: currentTime.Add(5 * time.Second)}, + }, + }, + }, + dnsResponseIPs: map[string]ipWithExpiration{}, + expectedIPs: map[string]ipWithExpiration{ + "192.1.1.2": {ip: net.ParseIP("192.1.1.2"), expirationTime: currentTime.Add(5 * time.Second)}, + }, + }, + { + name: "existingDNSCache is empty, the new response matches a selector.", + existingDNSCache: map[string]dnsMeta{}, + dnsResponseIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + }, + expectedIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + }, + expectedItem: testFQDN, + mockSelectorToRuleIDs: map[fqdnSelectorItem]sets.Set[string]{ + selectorItem1: sets.New[string]("mockRule1"), + }, + }, + { + name: "existingDNSCache is empty, the new response doesn't match any selector", + existingDNSCache: map[string]dnsMeta{}, + dnsResponseIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + }, + expectedIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + }, + mockSelectorToRuleIDs: map[fqdnSelectorItem]sets.Set[string]{ + selectorItem2: sets.New[string]("mockRule2"), + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + controller := gomock.NewController(t) + f, _ := newMockFQDNController(t, controller, nil) + mockDNSQueryQueue := workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.NewTypedItemExponentialFailureRateLimiter[string](minRetryDelay, maxRetryDelay), + workqueue.TypedRateLimitingQueueConfig[string]{ + Name: "fqdn-test", + Clock: fakeClock, + }, + ) + f.dnsQueryQueue = mockDNSQueryQueue + f.dnsEntryCache = tc.existingDNSCache + if tc.mockSelectorToRuleIDs != nil { + f.selectorItemToRuleIDs = tc.mockSelectorToRuleIDs + } + + f.onDNSResponse(testFQDN, tc.dnsResponseIPs, nil) + + cachedDnsMetaData, _ := f.dnsEntryCache[testFQDN] + + if _, exists := f.selectorItemToRuleIDs[selectorItem2]; !exists { + assert.Equal(t, tc.expectedIPs, cachedDnsMetaData.responseIPs, "Expected %+v in cache, got %+v", tc.expectedIPs, cachedDnsMetaData.responseIPs) + } else { + assert.NotEqual(t, tc.expectedIPs, cachedDnsMetaData.responseIPs, "Did not Expect %+v in cache, got %+v", tc.expectedIPs, cachedDnsMetaData.responseIPs) + } + + if tc.expectedItem != "" { + fakeClock.Step(5 * time.Second) + actualItem, _ := f.dnsQueryQueue.Get() + f.dnsQueryQueue.Done(actualItem) + assert.Equal(t, tc.expectedItem, actualItem) + } + }) + } +} From fe43d744957140b6da7b9b77d42293d9393d128d Mon Sep 17 00:00:00 2001 From: Hemant Date: Wed, 30 Oct 2024 02:12:59 +0530 Subject: [PATCH 2/8] Added clock field of type clock.Clock in fqdnController type. - Replace time.Now() usages with the clock set in fqdnController. - Inject fakeClock for tests. Removed mockDnsQueryQueue implementation. Signed-off-by: Hemant --- pkg/agent/controller/networkpolicy/fqdn.go | 27 ++++++++---- .../controller/networkpolicy/fqdn_test.go | 41 ++++++++++--------- 2 files changed, 40 insertions(+), 28 deletions(-) diff --git a/pkg/agent/controller/networkpolicy/fqdn.go b/pkg/agent/controller/networkpolicy/fqdn.go index 1cf8177ddaf..17b68c4bc3d 100644 --- a/pkg/agent/controller/networkpolicy/fqdn.go +++ b/pkg/agent/controller/networkpolicy/fqdn.go @@ -17,6 +17,7 @@ package networkpolicy import ( "context" "fmt" + "k8s.io/utils/clock" "net" "os" "regexp" @@ -155,6 +156,12 @@ type fqdnController struct { ipv4Enabled bool ipv6Enabled bool gwPort uint32 + // clock allows for time-dependent operations within the fqdnController. + // By using a clock.Clock as a field, we ensure that all time-related calls throughout the + // implementation of the fqdnController utilize this clock, for example , calls to clock.Now() instead of time.Now() + // The default implementation uses clock.RealClock{}, this also allows to + // inject a custom clock (like fake clock) when writing tests via the newMockFQDNController. + clock clock.Clock } func newFQDNController(client openflow.Client, allocator *idAllocator, dnsServerOverride string, dirtyRuleHandler func(string), v4Enabled, v6Enabled bool, gwPort uint32) (*fqdnController, error) { @@ -177,6 +184,7 @@ func newFQDNController(client openflow.Client, allocator *idAllocator, dnsServer ipv4Enabled: v4Enabled, ipv6Enabled: v6Enabled, gwPort: gwPort, + clock: clock.RealClock{}, } if controller.ofClient != nil { if err := controller.ofClient.NewDNSPacketInConjunction(dnsInterceptRuleID); err != nil { @@ -420,16 +428,16 @@ func (f *fqdnController) onDNSResponse( } addressUpdate := false - currentTime := time.Now() + currentTime := f.clock.Now() ipWithExpirationMap := make(map[string]ipWithExpiration) - // timeToReQuery establishes a maximum reference time for tracking the minimum re-query time to DNS, as IPs expire. - var timeToReQuery *time.Time + // timeToRequery establishes the time after which a new DNS query should be sent for the FQDN. + var timeToRequery *time.Time updateIPWithExpiration := func(ip string, ipMeta ipWithExpiration) { ipWithExpirationMap[ip] = ipMeta - if timeToReQuery == nil || ipMeta.expirationTime.Before(*timeToReQuery) { - timeToReQuery = &ipMeta.expirationTime + if timeToRequery == nil || ipMeta.expirationTime.Before(*timeToRequery) { + timeToRequery = &ipMeta.expirationTime } } @@ -493,7 +501,10 @@ func (f *fqdnController) onDNSResponse( f.dnsEntryCache[fqdn] = dnsMeta{ responseIPs: ipWithExpirationMap, } - f.dnsQueryQueue.AddAfter(fqdn, timeToReQuery.Sub(currentTime)) + // timeToRequery passed below is guaranteed to be non-nil here because updateIPWithExpiration() is called on each IP in + // newIPWithExpiration that either isn't cached in the existing DNS cache or it remains unexpired. This ensures that timeToReQuery is + // always set to the earliest expiration time in the response, hence also enabling proper DNS re-query scheduling. + f.dnsQueryQueue.AddAfter(fqdn, timeToRequery.Sub(currentTime)) } f.syncDirtyRules(fqdn, waitCh, addressUpdate) @@ -631,7 +642,7 @@ func (f *fqdnController) parseDNSResponse(msg *dns.Msg) (string, map[string]ipWi } fqdn := strings.ToLower(msg.Question[0].Name) responseIPs := map[string]ipWithExpiration{} - currentTime := time.Now() + currentTime := f.clock.Now() for _, ans := range msg.Answer { switch r := ans.(type) { case *dns.A: @@ -699,7 +710,7 @@ func (f *fqdnController) lookupIP(ctx context.Context, fqdn string) error { for _, ip := range ips { responseIPs[ip.String()] = ipWithExpiration{ ip: ip, - expirationTime: time.Now().Add(time.Duration(defaultTTL) * time.Second), + expirationTime: f.clock.Now().Add(time.Duration(defaultTTL) * time.Second), } } return responseIPs diff --git a/pkg/agent/controller/networkpolicy/fqdn_test.go b/pkg/agent/controller/networkpolicy/fqdn_test.go index 602a3791b80..219ab21708c 100644 --- a/pkg/agent/controller/networkpolicy/fqdn_test.go +++ b/pkg/agent/controller/networkpolicy/fqdn_test.go @@ -25,7 +25,6 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/util/workqueue" clocktesting "k8s.io/utils/clock/testing" "antrea.io/antrea/pkg/agent/config" @@ -595,15 +594,18 @@ func TestOnDNSResponse(t *testing.T) { selectorItem2 := fqdnSelectorItem{ matchName: "random-domain.com", } - fakeClock := clocktesting.NewFakeClock(time.Now()) - currentTime := fakeClock.Now() + currentTime := time.Now() + + getDuration := func(d time.Duration) *time.Duration { + return &d + } tests := []struct { name string existingDNSCache map[string]dnsMeta dnsResponseIPs map[string]ipWithExpiration expectedIPs map[string]ipWithExpiration - expectedItem string + expectedRequeryAfter *time.Duration mockSelectorToRuleIDs map[fqdnSelectorItem]sets.Set[string] }{ { @@ -624,7 +626,7 @@ func TestOnDNSResponse(t *testing.T) { "192.1.1.2": {ip: net.ParseIP("192.1.1.2"), expirationTime: currentTime.Add(6 * time.Second)}, "192.1.1.3": {ip: net.ParseIP("192.1.1.3"), expirationTime: currentTime.Add(10 * time.Second)}, }, - expectedItem: testFQDN, + expectedRequeryAfter: getDuration(5 * time.Second), }, { name: "existing DNS cache not impacted", @@ -673,7 +675,7 @@ func TestOnDNSResponse(t *testing.T) { expectedIPs: map[string]ipWithExpiration{ "192.1.1.3": {ip: net.ParseIP("192.1.1.3"), expirationTime: currentTime.Add(5 * time.Second)}, }, - expectedItem: testFQDN, + expectedRequeryAfter: getDuration(5 * time.Second), }, { name: "stale IP with unexpired TTL are retained", @@ -698,7 +700,7 @@ func TestOnDNSResponse(t *testing.T) { expectedIPs: map[string]ipWithExpiration{ "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, }, - expectedItem: testFQDN, + expectedRequeryAfter: getDuration(5 * time.Second), mockSelectorToRuleIDs: map[fqdnSelectorItem]sets.Set[string]{ selectorItem1: sets.New[string]("mockRule1"), }, @@ -720,16 +722,10 @@ func TestOnDNSResponse(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { + fakeClock := clocktesting.NewFakeClock(currentTime) controller := gomock.NewController(t) f, _ := newMockFQDNController(t, controller, nil) - mockDNSQueryQueue := workqueue.NewTypedRateLimitingQueueWithConfig( - workqueue.NewTypedItemExponentialFailureRateLimiter[string](minRetryDelay, maxRetryDelay), - workqueue.TypedRateLimitingQueueConfig[string]{ - Name: "fqdn-test", - Clock: fakeClock, - }, - ) - f.dnsQueryQueue = mockDNSQueryQueue + f.clock = fakeClock f.dnsEntryCache = tc.existingDNSCache if tc.mockSelectorToRuleIDs != nil { f.selectorItemToRuleIDs = tc.mockSelectorToRuleIDs @@ -745,11 +741,16 @@ func TestOnDNSResponse(t *testing.T) { assert.NotEqual(t, tc.expectedIPs, cachedDnsMetaData.responseIPs, "Did not Expect %+v in cache, got %+v", tc.expectedIPs, cachedDnsMetaData.responseIPs) } - if tc.expectedItem != "" { - fakeClock.Step(5 * time.Second) - actualItem, _ := f.dnsQueryQueue.Get() - f.dnsQueryQueue.Done(actualItem) - assert.Equal(t, tc.expectedItem, actualItem) + if tc.expectedRequeryAfter != nil { + fakeClock.Step(*tc.expectedRequeryAfter) + // needed to avoid blocking on Get() in case of failure + require.Eventually(t, func() bool { return f.dnsQueryQueue.Len() > 0 }, 1*time.Second, 10*time.Millisecond) + item, _ := f.dnsQueryQueue.Get() + f.dnsQueryQueue.Done(item) + assert.Equal(t, testFQDN, item) + } else { + // make sure that there is no requery + assert.Never(t, func() bool { return f.dnsQueryQueue.Len() > 0 }, 100*time.Millisecond, 10*time.Millisecond) } }) } From a22bef95237860d19360eb622d4be36454725b20 Mon Sep 17 00:00:00 2001 From: Hemant Date: Fri, 11 Oct 2024 01:47:21 +0530 Subject: [PATCH 3/8] Enable caching of individual IP TTLs in DNS responses and add clock support - Implement tracking of TTLs for individual IPs linked to FQDNs in the DNS cache. - Automatically trigger a new DNS query when an IP's TTL expires. - Evict only those IPs that are no longer present in the latest DNS response and have surpassed their original TTL. - Added clock field of type clock.Clock in fqdnController. - Pass clock while initializing fqdn controller. Signed-off-by: Hemant --- pkg/agent/controller/networkpolicy/fqdn.go | 170 +++++++++++------ .../controller/networkpolicy/fqdn_test.go | 171 +++++++++++++++++- .../networkpolicy/networkpolicy_controller.go | 3 +- .../networkpolicy/pod_reconciler_test.go | 2 +- 4 files changed, 277 insertions(+), 69 deletions(-) diff --git a/pkg/agent/controller/networkpolicy/fqdn.go b/pkg/agent/controller/networkpolicy/fqdn.go index 1a54290fc83..dde8396d270 100644 --- a/pkg/agent/controller/networkpolicy/fqdn.go +++ b/pkg/agent/controller/networkpolicy/fqdn.go @@ -17,7 +17,6 @@ package networkpolicy import ( "context" "fmt" - "math" "net" "os" "regexp" @@ -25,13 +24,15 @@ import ( "sync" "time" - "antrea.io/libOpenflow/protocol" - "antrea.io/ofnet/ofctrl" "github.com/miekg/dns" "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/clock" + + "antrea.io/libOpenflow/protocol" + "antrea.io/ofnet/ofctrl" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/types" @@ -76,11 +77,15 @@ func (fs *fqdnSelectorItem) matches(fqdn string) bool { // expirationTime of the records, which is the DNS response // receiving time plus lowest applicable TTL. type dnsMeta struct { - expirationTime time.Time // Key for responseIPs is the string representation of the IP. // It helps to quickly identify IP address updates when a // new DNS response is received. - responseIPs map[string]net.IP + responseIPs map[string]ipWithExpiration +} + +type ipWithExpiration struct { + ip net.IP + expirationTime time.Time } // subscriber is a entity that subsribes for datapath rule realization @@ -152,9 +157,15 @@ type fqdnController struct { ipv4Enabled bool ipv6Enabled bool gwPort uint32 + // clock allows for time-dependent operations within the fqdnController. + // By using a clock.Clock as a field, we ensure that all time-related calls throughout the + // implementation of the fqdnController utilize this clock + // The default implementation uses clock.RealClock{}, this also allows to + // inject a custom clock (like fake clock) when writing tests via the newMockFQDNController. + clock clock.Clock } -func newFQDNController(client openflow.Client, allocator *idAllocator, dnsServerOverride string, dirtyRuleHandler func(string), v4Enabled, v6Enabled bool, gwPort uint32) (*fqdnController, error) { +func newFQDNController(client openflow.Client, allocator *idAllocator, dnsServerOverride string, dirtyRuleHandler func(string), v4Enabled, v6Enabled bool, gwPort uint32, clock clock.WithTicker) (*fqdnController, error) { controller := &fqdnController{ ofClient: client, dirtyRuleHandler: dirtyRuleHandler, @@ -163,7 +174,8 @@ func newFQDNController(client openflow.Client, allocator *idAllocator, dnsServer dnsQueryQueue: workqueue.NewTypedRateLimitingQueueWithConfig( workqueue.NewTypedItemExponentialFailureRateLimiter[string](minRetryDelay, maxRetryDelay), workqueue.TypedRateLimitingQueueConfig[string]{ - Name: "fqdn", + Name: "fqdn", + Clock: clock, }, ), dnsEntryCache: map[string]dnsMeta{}, @@ -174,6 +186,7 @@ func newFQDNController(client openflow.Client, allocator *idAllocator, dnsServer ipv4Enabled: v4Enabled, ipv6Enabled: v6Enabled, gwPort: gwPort, + clock: clock, } if controller.ofClient != nil { if err := controller.ofClient.NewDNSPacketInConjunction(dnsInterceptRuleID); err != nil { @@ -253,8 +266,8 @@ func (f *fqdnController) getIPsForFQDNSelectors(fqdns []string) []net.IP { } for fqdn := range fqdnsMatched { if dnsMeta, ok := f.dnsEntryCache[fqdn]; ok { - for _, ip := range dnsMeta.responseIPs { - matchedIPs = append(matchedIPs, ip) + for _, ipData := range dnsMeta.responseIPs { + matchedIPs = append(matchedIPs, ipData.ip) } } } @@ -405,72 +418,103 @@ func (f *fqdnController) deleteRuleSelectedPods(ruleID string) error { func (f *fqdnController) onDNSResponse( fqdn string, - responseIPs map[string]net.IP, - lowestTTL uint32, + newIPWithExpiration map[string]ipWithExpiration, waitCh chan error, ) { - if len(responseIPs) == 0 { + if len(newIPWithExpiration) == 0 { klog.V(4).InfoS("FQDN was not resolved to any addresses, skip updating DNS cache", "fqdn", fqdn) if waitCh != nil { waitCh <- nil } return } - // mustCacheResponse is only true if the FQDN is already tracked by this - // controller, or it matches at least one fqdnSelectorItem from the policy rules. - // addressUpdate is only true if there has been an update in IP addresses - // corresponded with the FQDN. - mustCacheResponse, addressUpdate := false, false - recordTTL := time.Now().Add(time.Duration(lowestTTL) * time.Second) + + addressUpdate := false + currentTime := f.clock.Now() + ipWithExpirationMap := make(map[string]ipWithExpiration) + + // timeToRequery establishes the time after which a new DNS query should be sent for the FQDN. + var timeToRequery *time.Time + + updateIPWithExpiration := func(ip string, ipMeta ipWithExpiration) { + ipWithExpirationMap[ip] = ipMeta + if timeToRequery == nil || ipMeta.expirationTime.Before(*timeToRequery) { + timeToRequery = &ipMeta.expirationTime + } + } f.fqdnSelectorMutex.Lock() defer f.fqdnSelectorMutex.Unlock() - oldDNSMeta, exist := f.dnsEntryCache[fqdn] + cachedDNSMeta, exist := f.dnsEntryCache[fqdn] if exist { - mustCacheResponse = true - for ipStr := range responseIPs { - if _, ok := oldDNSMeta.responseIPs[ipStr]; !ok { + // check for new IPs. + for newIPStr, newIPMeta := range newIPWithExpiration { + if _, exist := cachedDNSMeta.responseIPs[newIPStr]; !exist { + updateIPWithExpiration(newIPStr, newIPMeta) addressUpdate = true - break } } - for oldIPStr, oldIP := range oldDNSMeta.responseIPs { - if _, ok := responseIPs[oldIPStr]; !ok { - if oldDNSMeta.expirationTime.Before(time.Now()) { - // This IP entry has already expired and not seen in the latest DNS response. - // It should be removed from the cache. + + // check for presence of already cached IPs in the new response. + for cachedIPStr, cachedIPMeta := range cachedDNSMeta.responseIPs { + if newIPMeta, exist := newIPWithExpiration[cachedIPStr]; !exist { + // The IP was not found in current response. + if cachedIPMeta.expirationTime.Before(currentTime) { + // this IP is expired and stale, remove it by not including it but also signal an update to syncRules. addressUpdate = true } else { - // Add the unexpired IP entry to responseIP and update the lowest applicable TTL if needed. - responseIPs[oldIPStr] = oldIP - if oldDNSMeta.expirationTime.Before(recordTTL) { - recordTTL = oldDNSMeta.expirationTime - } + // It hasn't expired yet, so just retain it with its existing expirationTime. + updateIPWithExpiration(cachedIPStr, cachedIPMeta) } + } else { + // This already cached IP is part of the current response, so update it with max time between received time and its old cached time. + expTime := laterOf(newIPMeta.expirationTime, cachedIPMeta.expirationTime) + updateIPWithExpiration(cachedIPStr, ipWithExpiration{ + ip: cachedIPMeta.ip, + expirationTime: expTime, + }) } } + } else { + // First time seeing this domain. + // check if this needs to be tracked, by checking if it matches any Antrea FQDN policy selectors. + + // iterate over current rules mapping + addToCache := false for selectorItem := range f.selectorItemToRuleIDs { // Only track the FQDN if there is at least one fqdnSelectorItem matching it. if selectorItem.matches(fqdn) { - mustCacheResponse, addressUpdate = true, true + // A FQDN can have multiple selectorItems mapped, hence we do not break the loop upon a match, but + // keep iterating to create mapping of multiple selectorItems against same FQDN. + addToCache = true f.setFQDNMatchSelector(fqdn, selectorItem) } } + if addToCache { + for ipStr, ipMeta := range newIPWithExpiration { + updateIPWithExpiration(ipStr, ipMeta) + } + addressUpdate = true + } } - if mustCacheResponse { + + if len(ipWithExpirationMap) > 0 { f.dnsEntryCache[fqdn] = dnsMeta{ - expirationTime: recordTTL, - responseIPs: responseIPs, + responseIPs: ipWithExpirationMap, } - f.dnsQueryQueue.AddAfter(fqdn, recordTTL.Sub(time.Now())) + // timeToRequery passed below is guaranteed to be non-nil here because updateIPWithExpiration() is called on each IP in + // newIPWithExpiration that either isn't cached in the existing DNS cache or it remains unexpired. This ensures that timeToReQuery is + // always set to the earliest expiration time in the response, hence also enabling proper DNS re-query scheduling. + f.dnsQueryQueue.AddAfter(fqdn, timeToRequery.Sub(currentTime)) } + f.syncDirtyRules(fqdn, waitCh, addressUpdate) } // onDNSResponseMsg handles a DNS response message intercepted. func (f *fqdnController) onDNSResponseMsg(dnsMsg *dns.Msg, waitCh chan error) { - fqdn, responseIPs, lowestTTL, err := f.parseDNSResponse(dnsMsg) + fqdn, responseIPs, err := f.parseDNSResponse(dnsMsg) if err != nil { klog.V(2).InfoS("Failed to parse DNS response") if waitCh != nil { @@ -478,7 +522,7 @@ func (f *fqdnController) onDNSResponseMsg(dnsMsg *dns.Msg, waitCh chan error) { } return } - f.onDNSResponse(fqdn, responseIPs, lowestTTL, waitCh) + f.onDNSResponse(fqdn, responseIPs, waitCh) } // syncDirtyRules triggers rule syncs for rules that are affected by the FQDN of DNS response @@ -594,38 +638,39 @@ func (f *fqdnController) runRuleSyncTracker(stopCh <-chan struct{}) { } // parseDNSResponse returns the FQDN, IP query result and lowest applicable TTL of a DNS response. -func (f *fqdnController) parseDNSResponse(msg *dns.Msg) (string, map[string]net.IP, uint32, error) { +func (f *fqdnController) parseDNSResponse(msg *dns.Msg) (string, map[string]ipWithExpiration, error) { if len(msg.Question) == 0 { - return "", nil, 0, fmt.Errorf("invalid DNS message") + return "", nil, fmt.Errorf("invalid DNS message") } fqdn := strings.ToLower(msg.Question[0].Name) - lowestTTL := uint32(math.MaxUint32) // a TTL must exist in the RRs - responseIPs := map[string]net.IP{} + responseIPs := map[string]ipWithExpiration{} + currentTime := f.clock.Now() for _, ans := range msg.Answer { switch r := ans.(type) { case *dns.A: if f.ipv4Enabled { - responseIPs[r.A.String()] = r.A - if r.Header().Ttl < lowestTTL { - lowestTTL = r.Header().Ttl + responseIPs[r.A.String()] = ipWithExpiration{ + ip: r.A, + expirationTime: currentTime.Add(time.Duration(r.Header().Ttl) * time.Second), } + } case *dns.AAAA: if f.ipv6Enabled { - responseIPs[r.AAAA.String()] = r.AAAA - if r.Header().Ttl < lowestTTL { - lowestTTL = r.Header().Ttl + responseIPs[r.AAAA.String()] = ipWithExpiration{ + ip: r.AAAA, + expirationTime: currentTime.Add(time.Duration(r.Header().Ttl) * time.Second), } } } } if len(responseIPs) > 0 { - klog.V(4).InfoS("Received DNS Packet with valid Answer", "IPs", responseIPs, "TTL", lowestTTL) + klog.V(4).InfoS("Received DNS Packet with valid Answer", "IPs", responseIPs) } if strings.HasSuffix(fqdn, ".") { fqdn = fqdn[:len(fqdn)-1] } - return fqdn, responseIPs, lowestTTL, nil + return fqdn, responseIPs, nil } func (f *fqdnController) worker() { @@ -662,24 +707,27 @@ func (f *fqdnController) lookupIP(ctx context.Context, fqdn string) error { var errs []error - makeResponseIPs := func(ips []net.IP) map[string]net.IP { - responseIPs := make(map[string]net.IP) + makeResponseIPs := func(ips []net.IP) map[string]ipWithExpiration { + responseIPs := make(map[string]ipWithExpiration) for _, ip := range ips { - responseIPs[ip.String()] = ip + responseIPs[ip.String()] = ipWithExpiration{ + ip: ip, + expirationTime: f.clock.Now().Add(time.Duration(defaultTTL) * time.Second), + } } return responseIPs } if f.ipv4Enabled { if ips, err := resolver.LookupIP(ctx, "ip4", fqdn); err == nil { - f.onDNSResponse(fqdn, makeResponseIPs(ips), defaultTTL, nil) + f.onDNSResponse(fqdn, makeResponseIPs(ips), nil) } else { errs = append(errs, fmt.Errorf("DNS request failed for IPv4: %w", err)) } } if f.ipv6Enabled { if ips, err := resolver.LookupIP(ctx, "ip6", fqdn); err == nil { - f.onDNSResponse(fqdn, makeResponseIPs(ips), defaultTTL, nil) + f.onDNSResponse(fqdn, makeResponseIPs(ips), nil) } else { errs = append(errs, fmt.Errorf("DNS request failed for IPv6: %w", err)) } @@ -818,3 +866,11 @@ func (f *fqdnController) HandlePacketIn(pktIn *ofctrl.PacketIn) error { return f.ofClient.ResumePausePacket(pktIn) } } + +// laterOf returns the later of the two given time.Time values. +func laterOf(t1, t2 time.Time) time.Time { + if t1.After(t2) { + return t1 + } + return t2 +} diff --git a/pkg/agent/controller/networkpolicy/fqdn_test.go b/pkg/agent/controller/networkpolicy/fqdn_test.go index 46bcffa53b0..384eb992069 100644 --- a/pkg/agent/controller/networkpolicy/fqdn_test.go +++ b/pkg/agent/controller/networkpolicy/fqdn_test.go @@ -25,12 +25,14 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" "k8s.io/apimachinery/pkg/util/sets" + clocktesting "k8s.io/utils/clock/testing" + "k8s.io/utils/ptr" "antrea.io/antrea/pkg/agent/config" openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" ) -func newMockFQDNController(t *testing.T, controller *gomock.Controller, dnsServer *string) (*fqdnController, *openflowtest.MockClient) { +func newMockFQDNController(t *testing.T, controller *gomock.Controller, dnsServer *string, fakeClock *clocktesting.FakeClock) (*fqdnController, *openflowtest.MockClient) { mockOFClient := openflowtest.NewMockClient(controller) mockOFClient.EXPECT().NewDNSPacketInConjunction(gomock.Any()).Return(nil).AnyTimes() dirtyRuleHandler := func(rule string) {} @@ -46,6 +48,7 @@ func newMockFQDNController(t *testing.T, controller *gomock.Controller, dnsServe true, false, config.DefaultHostGatewayOFPort, + fakeClock, ) require.NoError(t, err) return f, mockOFClient @@ -164,7 +167,7 @@ func TestAddFQDNRule(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { controller := gomock.NewController(t) - f, c := newMockFQDNController(t, controller, nil) + f, c := newMockFQDNController(t, controller, nil, nil) if tt.addressAdded { c.EXPECT().AddAddressToDNSConjunction(dnsInterceptRuleID, gomock.Any()).Times(1) } @@ -325,7 +328,7 @@ func TestDeleteFQDNRule(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { controller := gomock.NewController(t) - f, c := newMockFQDNController(t, controller, nil) + f, c := newMockFQDNController(t, controller, nil, nil) c.EXPECT().AddAddressToDNSConjunction(dnsInterceptRuleID, gomock.Any()).Times(len(tt.previouslyAddedRules)) f.dnsEntryCache = tt.existingDNSCache if tt.addressRemoved { @@ -344,7 +347,7 @@ func TestDeleteFQDNRule(t *testing.T) { func TestLookupIPFallback(t *testing.T) { controller := gomock.NewController(t) dnsServer := "" // force a fallback to local resolver - f, _ := newMockFQDNController(t, controller, &dnsServer) + f, _ := newMockFQDNController(t, controller, &dnsServer, nil) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // not ideal as a unit test because it requires the ability to resolve @@ -402,10 +405,10 @@ func TestGetIPsForFQDNSelectors(t *testing.T) { }, existingDNSCache: map[string]dnsMeta{ "test.antrea.io": { - responseIPs: map[string]net.IP{ - "127.0.0.1": net.ParseIP("127.0.0.1"), - "192.155.12.1": net.ParseIP("192.155.12.1"), - "192.158.1.38": net.ParseIP("192.158.1.38"), + responseIPs: map[string]ipWithExpiration{ + "127.0.0.1": {net.ParseIP("127.0.0.1"), time.Now()}, + "192.155.12.1": {net.ParseIP("192.155.12.1"), time.Now()}, + "192.158.1.38": {net.ParseIP("192.158.1.38"), time.Now()}, }, }, }, @@ -421,7 +424,7 @@ func TestGetIPsForFQDNSelectors(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { controller := gomock.NewController(t) - f, _ := newMockFQDNController(t, controller, nil) + f, _ := newMockFQDNController(t, controller, nil, nil) if tc.existingSelectorItemToFQDN != nil { f.selectorItemToFQDN = tc.existingSelectorItemToFQDN } @@ -539,7 +542,7 @@ func TestSyncDirtyRules(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { controller := gomock.NewController(t) - f, _ := newMockFQDNController(t, controller, nil) + f, _ := newMockFQDNController(t, controller, nil, nil) var dirtyRuleSyncCalls []string f.dirtyRuleHandler = func(s string) { dirtyRuleSyncCalls = append(dirtyRuleSyncCalls, s) @@ -584,3 +587,151 @@ func TestSyncDirtyRules(t *testing.T) { }) } } + +func TestOnDNSResponse(t *testing.T) { + testFQDN := "fqdn-test-pod.lfx.test" + selectorItem1 := fqdnSelectorItem{ + matchName: testFQDN, + } + selectorItem2 := fqdnSelectorItem{ + matchName: "random-domain.com", + } + currentTime := time.Now() + + tests := []struct { + name string + existingDNSCache map[string]dnsMeta + dnsResponseIPs map[string]ipWithExpiration + expectedIPs map[string]ipWithExpiration + expectedRequeryAfter *time.Duration + mockSelectorToRuleIDs map[fqdnSelectorItem]sets.Set[string] + }{ + { + name: "new IP added", + existingDNSCache: map[string]dnsMeta{ + testFQDN: { + responseIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + "192.1.1.2": {ip: net.ParseIP("192.1.1.2"), expirationTime: currentTime.Add(6 * time.Second)}, + }, + }, + }, + dnsResponseIPs: map[string]ipWithExpiration{ + "192.1.1.3": {ip: net.ParseIP("192.1.1.3"), expirationTime: currentTime.Add(10 * time.Second)}, + }, + expectedIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + "192.1.1.2": {ip: net.ParseIP("192.1.1.2"), expirationTime: currentTime.Add(6 * time.Second)}, + "192.1.1.3": {ip: net.ParseIP("192.1.1.3"), expirationTime: currentTime.Add(10 * time.Second)}, + }, + expectedRequeryAfter: ptr.To(5 * time.Second), + }, + { + name: "existing DNS cache not impacted", + existingDNSCache: map[string]dnsMeta{ + testFQDN: { + responseIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + "192.1.1.2": {ip: net.ParseIP("192.1.1.2"), expirationTime: currentTime.Add(6 * time.Second)}, + }, + }, + }, + dnsResponseIPs: map[string]ipWithExpiration{}, + expectedIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + "192.1.1.2": {ip: net.ParseIP("192.1.1.2"), expirationTime: currentTime.Add(6 * time.Second)}, + }, + }, + { + name: "old IP present in DNS response is retained with an updated TTL fetched from response", + existingDNSCache: map[string]dnsMeta{ + testFQDN: { + responseIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + "192.1.1.2": {ip: net.ParseIP("192.1.1.2"), expirationTime: currentTime.Add(1 * time.Second)}, + }, + }, + }, + dnsResponseIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(10 * time.Second)}, + "192.1.1.2": {ip: net.ParseIP("192.1.1.2"), expirationTime: currentTime.Add(5 * time.Second)}, + }, + expectedIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(10 * time.Second)}, + "192.1.1.2": {ip: net.ParseIP("192.1.1.2"), expirationTime: currentTime.Add(5 * time.Second)}, + }, + }, + { + name: "stale IP with expired TTL is evicted", + existingDNSCache: map[string]dnsMeta{ + testFQDN: { + responseIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(-1 * time.Second)}, + }, + }, + }, + dnsResponseIPs: map[string]ipWithExpiration{ + "192.1.1.3": {ip: net.ParseIP("192.1.1.3"), expirationTime: currentTime.Add(5 * time.Second)}, + }, + expectedIPs: map[string]ipWithExpiration{ + "192.1.1.3": {ip: net.ParseIP("192.1.1.3"), expirationTime: currentTime.Add(5 * time.Second)}, + }, + expectedRequeryAfter: ptr.To(5 * time.Second), + }, + { + name: "existingDNSCache is empty, the new response matches a selector.", + existingDNSCache: map[string]dnsMeta{}, + dnsResponseIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + }, + expectedIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + }, + expectedRequeryAfter: ptr.To(5 * time.Second), + mockSelectorToRuleIDs: map[fqdnSelectorItem]sets.Set[string]{ + selectorItem1: sets.New[string]("mockRule1"), + }, + }, + { + name: "existingDNSCache is empty, the new response doesn't match any selector", + existingDNSCache: map[string]dnsMeta{}, + dnsResponseIPs: map[string]ipWithExpiration{ + "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(5 * time.Second)}, + }, + expectedIPs: nil, + mockSelectorToRuleIDs: map[fqdnSelectorItem]sets.Set[string]{ + selectorItem2: sets.New[string]("mockRule2"), + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + fakeClock := clocktesting.NewFakeClock(currentTime) + controller := gomock.NewController(t) + f, _ := newMockFQDNController(t, controller, nil, fakeClock) + f.dnsEntryCache = tc.existingDNSCache + if tc.mockSelectorToRuleIDs != nil { + f.selectorItemToRuleIDs = tc.mockSelectorToRuleIDs + } + + f.onDNSResponse(testFQDN, tc.dnsResponseIPs, nil) + + cachedDnsMetaData, _ := f.dnsEntryCache[testFQDN] + + assert.Equal(t, tc.expectedIPs, cachedDnsMetaData.responseIPs, "Expected %+v in cache, got %+v", tc.expectedIPs, cachedDnsMetaData.responseIPs) + + if tc.expectedRequeryAfter != nil { + fakeClock.Step(*tc.expectedRequeryAfter) + // needed to avoid blocking on Get() in case of failure + require.Eventually(t, func() bool { return f.dnsQueryQueue.Len() > 0 }, 100*time.Second, 10*time.Millisecond) + item, _ := f.dnsQueryQueue.Get() + f.dnsQueryQueue.Done(item) + assert.Equal(t, testFQDN, item) + } else { + // make sure that there is no requery + assert.Never(t, func() bool { return f.dnsQueryQueue.Len() > 0 }, 100*time.Millisecond, 10*time.Millisecond) + } + }) + } +} diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go index ae0afe124c4..9b5308e549f 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go @@ -33,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/clock" "antrea.io/antrea/pkg/agent/client" "antrea.io/antrea/pkg/agent/config" @@ -226,7 +227,7 @@ func NewNetworkPolicyController(antreaClientGetter client.AntreaClientProvider, var err error if antreaPolicyEnabled { - if c.fqdnController, err = newFQDNController(ofClient, idAllocator, dnsServerOverride, c.enqueueRule, v4Enabled, v6Enabled, gwPort); err != nil { + if c.fqdnController, err = newFQDNController(ofClient, idAllocator, dnsServerOverride, c.enqueueRule, v4Enabled, v6Enabled, gwPort, clock.RealClock{}); err != nil { return nil, err } diff --git a/pkg/agent/controller/networkpolicy/pod_reconciler_test.go b/pkg/agent/controller/networkpolicy/pod_reconciler_test.go index ec31137a918..390be056682 100644 --- a/pkg/agent/controller/networkpolicy/pod_reconciler_test.go +++ b/pkg/agent/controller/networkpolicy/pod_reconciler_test.go @@ -108,7 +108,7 @@ func newCIDR(cidrStr string) *net.IPNet { } func newTestReconciler(t *testing.T, controller *gomock.Controller, ifaceStore interfacestore.InterfaceStore, ofClient *openflowtest.MockClient, v4Enabled, v6Enabled bool) *podReconciler { - f, _ := newMockFQDNController(t, controller, nil) + f, _ := newMockFQDNController(t, controller, nil, nil) ch := make(chan string, 100) groupIDAllocator := openflow.NewGroupAllocator() groupCounters := []proxytypes.GroupCounter{proxytypes.NewGroupCounter(groupIDAllocator, ch)} From 8b05b7e8a3aec30590d8fcce0bf84bf80615eb0a Mon Sep 17 00:00:00 2001 From: Hemant Date: Thu, 31 Oct 2024 02:07:18 +0530 Subject: [PATCH 4/8] group antrea imports Signed-off-by: Hemant --- pkg/agent/controller/networkpolicy/fqdn.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/agent/controller/networkpolicy/fqdn.go b/pkg/agent/controller/networkpolicy/fqdn.go index dde8396d270..48ae0000bc8 100644 --- a/pkg/agent/controller/networkpolicy/fqdn.go +++ b/pkg/agent/controller/networkpolicy/fqdn.go @@ -31,14 +31,13 @@ import ( "k8s.io/klog/v2" "k8s.io/utils/clock" - "antrea.io/libOpenflow/protocol" - "antrea.io/ofnet/ofctrl" - "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/types" binding "antrea.io/antrea/pkg/ovs/openflow" utilsets "antrea.io/antrea/pkg/util/sets" dnsutil "antrea.io/antrea/third_party/dns" + "antrea.io/libOpenflow/protocol" + "antrea.io/ofnet/ofctrl" ) const ( From 2822742bfff292f927499ddc05e2781c13e9aed5 Mon Sep 17 00:00:00 2001 From: Hemant Date: Thu, 31 Oct 2024 02:28:28 +0530 Subject: [PATCH 5/8] fix imports using golangci-fix Signed-off-by: Hemant --- pkg/agent/controller/networkpolicy/fqdn.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/agent/controller/networkpolicy/fqdn.go b/pkg/agent/controller/networkpolicy/fqdn.go index 48ae0000bc8..dde8396d270 100644 --- a/pkg/agent/controller/networkpolicy/fqdn.go +++ b/pkg/agent/controller/networkpolicy/fqdn.go @@ -31,13 +31,14 @@ import ( "k8s.io/klog/v2" "k8s.io/utils/clock" + "antrea.io/libOpenflow/protocol" + "antrea.io/ofnet/ofctrl" + "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/types" binding "antrea.io/antrea/pkg/ovs/openflow" utilsets "antrea.io/antrea/pkg/util/sets" dnsutil "antrea.io/antrea/third_party/dns" - "antrea.io/libOpenflow/protocol" - "antrea.io/ofnet/ofctrl" ) const ( From 8a03d3a43b3c9b731b2e5aaea571be1ac4cba2d1 Mon Sep 17 00:00:00 2001 From: Hemant Date: Thu, 31 Oct 2024 02:56:10 +0530 Subject: [PATCH 6/8] rephrase various comments as per review and correct poll time in eventuality test to 1 second Signed-off-by: Hemant --- pkg/agent/controller/networkpolicy/fqdn.go | 20 +++++++------------ .../controller/networkpolicy/fqdn_test.go | 2 +- 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/pkg/agent/controller/networkpolicy/fqdn.go b/pkg/agent/controller/networkpolicy/fqdn.go index dde8396d270..d5a6ac228fb 100644 --- a/pkg/agent/controller/networkpolicy/fqdn.go +++ b/pkg/agent/controller/networkpolicy/fqdn.go @@ -157,11 +157,7 @@ type fqdnController struct { ipv4Enabled bool ipv6Enabled bool gwPort uint32 - // clock allows for time-dependent operations within the fqdnController. - // By using a clock.Clock as a field, we ensure that all time-related calls throughout the - // implementation of the fqdnController utilize this clock - // The default implementation uses clock.RealClock{}, this also allows to - // inject a custom clock (like fake clock) when writing tests via the newMockFQDNController. + // clock allows injecting a custom (fake) clock in unit tests. clock clock.Clock } @@ -418,10 +414,10 @@ func (f *fqdnController) deleteRuleSelectedPods(ruleID string) error { func (f *fqdnController) onDNSResponse( fqdn string, - newIPWithExpiration map[string]ipWithExpiration, + newIPsWithExpiration map[string]ipWithExpiration, waitCh chan error, ) { - if len(newIPWithExpiration) == 0 { + if len(newIPsWithExpiration) == 0 { klog.V(4).InfoS("FQDN was not resolved to any addresses, skip updating DNS cache", "fqdn", fqdn) if waitCh != nil { waitCh <- nil @@ -448,7 +444,7 @@ func (f *fqdnController) onDNSResponse( cachedDNSMeta, exist := f.dnsEntryCache[fqdn] if exist { // check for new IPs. - for newIPStr, newIPMeta := range newIPWithExpiration { + for newIPStr, newIPMeta := range newIPsWithExpiration { if _, exist := cachedDNSMeta.responseIPs[newIPStr]; !exist { updateIPWithExpiration(newIPStr, newIPMeta) addressUpdate = true @@ -457,7 +453,7 @@ func (f *fqdnController) onDNSResponse( // check for presence of already cached IPs in the new response. for cachedIPStr, cachedIPMeta := range cachedDNSMeta.responseIPs { - if newIPMeta, exist := newIPWithExpiration[cachedIPStr]; !exist { + if newIPMeta, exist := newIPsWithExpiration[cachedIPStr]; !exist { // The IP was not found in current response. if cachedIPMeta.expirationTime.Before(currentTime) { // this IP is expired and stale, remove it by not including it but also signal an update to syncRules. @@ -492,7 +488,7 @@ func (f *fqdnController) onDNSResponse( } } if addToCache { - for ipStr, ipMeta := range newIPWithExpiration { + for ipStr, ipMeta := range newIPsWithExpiration { updateIPWithExpiration(ipStr, ipMeta) } addressUpdate = true @@ -503,9 +499,7 @@ func (f *fqdnController) onDNSResponse( f.dnsEntryCache[fqdn] = dnsMeta{ responseIPs: ipWithExpirationMap, } - // timeToRequery passed below is guaranteed to be non-nil here because updateIPWithExpiration() is called on each IP in - // newIPWithExpiration that either isn't cached in the existing DNS cache or it remains unexpired. This ensures that timeToReQuery is - // always set to the earliest expiration time in the response, hence also enabling proper DNS re-query scheduling. + // timeToRequery is guaranteed not to be nil because ipWithExpirationMap is not empty, and adding an entry to ipWithExpirationMap requires updating timeToRequery f.dnsQueryQueue.AddAfter(fqdn, timeToRequery.Sub(currentTime)) } diff --git a/pkg/agent/controller/networkpolicy/fqdn_test.go b/pkg/agent/controller/networkpolicy/fqdn_test.go index 384eb992069..0a9b129e3d7 100644 --- a/pkg/agent/controller/networkpolicy/fqdn_test.go +++ b/pkg/agent/controller/networkpolicy/fqdn_test.go @@ -724,7 +724,7 @@ func TestOnDNSResponse(t *testing.T) { if tc.expectedRequeryAfter != nil { fakeClock.Step(*tc.expectedRequeryAfter) // needed to avoid blocking on Get() in case of failure - require.Eventually(t, func() bool { return f.dnsQueryQueue.Len() > 0 }, 100*time.Second, 10*time.Millisecond) + require.Eventually(t, func() bool { return f.dnsQueryQueue.Len() > 0 }, 1*time.Second, 10*time.Millisecond) item, _ := f.dnsQueryQueue.Get() f.dnsQueryQueue.Done(item) assert.Equal(t, testFQDN, item) From cfcc0e052344bc8a15ae6ef330dcd98100e6bab9 Mon Sep 17 00:00:00 2001 From: Hemant Date: Thu, 31 Oct 2024 23:50:27 +0530 Subject: [PATCH 7/8] Refactored newFQDNController() to pass a clock to the DNS query workqueue. Updated onDNSResponse() to requeue DNS queries even when same IPs are receieved in a response. Added and updated comments as per review. Signed-off-by: Hemant --- pkg/agent/controller/networkpolicy/fqdn.go | 47 +++++++++++++++---- .../controller/networkpolicy/fqdn_test.go | 20 ++++++-- 2 files changed, 53 insertions(+), 14 deletions(-) diff --git a/pkg/agent/controller/networkpolicy/fqdn.go b/pkg/agent/controller/networkpolicy/fqdn.go index d5a6ac228fb..650bbe4679d 100644 --- a/pkg/agent/controller/networkpolicy/fqdn.go +++ b/pkg/agent/controller/networkpolicy/fqdn.go @@ -161,7 +161,32 @@ type fqdnController struct { clock clock.Clock } -func newFQDNController(client openflow.Client, allocator *idAllocator, dnsServerOverride string, dirtyRuleHandler func(string), v4Enabled, v6Enabled bool, gwPort uint32, clock clock.WithTicker) (*fqdnController, error) { +// CustomWithTickerClock wraps clock.Clock and provides NewTicker to implement clock.WithTicker +type CustomWithTickerClock struct { + clock.Clock +} + +// NewTicker is implemented to fulfill the clock.WithTicker interface. +func (c *CustomWithTickerClock) NewTicker(d time.Duration) clock.Ticker { + return &realTicker{time.NewTicker(d)} + +} + +// realTicker wraps time.Ticker to implement clock.Ticker interface +type realTicker struct { + *time.Ticker +} + +func (t *realTicker) C() <-chan time.Time { + return t.Ticker.C +} + +func (t *realTicker) Stop() { + t.Ticker.Stop() +} + +func newFQDNController(client openflow.Client, allocator *idAllocator, dnsServerOverride string, dirtyRuleHandler func(string), v4Enabled, v6Enabled bool, gwPort uint32, clock clock.Clock) (*fqdnController, error) { + customClock := &CustomWithTickerClock{Clock: clock} controller := &fqdnController{ ofClient: client, dirtyRuleHandler: dirtyRuleHandler, @@ -171,7 +196,7 @@ func newFQDNController(client openflow.Client, allocator *idAllocator, dnsServer workqueue.NewTypedItemExponentialFailureRateLimiter[string](minRetryDelay, maxRetryDelay), workqueue.TypedRateLimitingQueueConfig[string]{ Name: "fqdn", - Clock: clock, + Clock: customClock, }, ), dnsEntryCache: map[string]dnsMeta{}, @@ -429,7 +454,8 @@ func (f *fqdnController) onDNSResponse( currentTime := f.clock.Now() ipWithExpirationMap := make(map[string]ipWithExpiration) - // timeToRequery establishes the time after which a new DNS query should be sent for the FQDN. + // timeToRequery sets the interval for sending a new DNS query for the FQDN, + // based on the shortest expiration time of cached IPs. var timeToRequery *time.Time updateIPWithExpiration := func(ip string, ipMeta ipWithExpiration) { @@ -463,20 +489,18 @@ func (f *fqdnController) onDNSResponse( updateIPWithExpiration(cachedIPStr, cachedIPMeta) } } else { - // This already cached IP is part of the current response, so update it with max time between received time and its old cached time. - expTime := laterOf(newIPMeta.expirationTime, cachedIPMeta.expirationTime) + // The cached IP is included in the current response; update its expiration time to the later of the new and existing values. updateIPWithExpiration(cachedIPStr, ipWithExpiration{ ip: cachedIPMeta.ip, - expirationTime: expTime, + expirationTime: laterOf(newIPMeta.expirationTime, cachedIPMeta.expirationTime), }) } } } else { - // First time seeing this domain. - // check if this needs to be tracked, by checking if it matches any Antrea FQDN policy selectors. + // This domain is being encountered for the first time. + // Check if it should be tracked by matching it against existing selectorItemToRuleIDs. - // iterate over current rules mapping addToCache := false for selectorItem := range f.selectorItemToRuleIDs { // Only track the FQDN if there is at least one fqdnSelectorItem matching it. @@ -495,11 +519,14 @@ func (f *fqdnController) onDNSResponse( } } + // ipWithExpirationMap remains empty only when FQDN doesn't match any selector or the IPs are not in response and have expired. if len(ipWithExpirationMap) > 0 { f.dnsEntryCache[fqdn] = dnsMeta{ responseIPs: ipWithExpirationMap, } - // timeToRequery is guaranteed not to be nil because ipWithExpirationMap is not empty, and adding an entry to ipWithExpirationMap requires updating timeToRequery + } + if timeToRequery != nil { + // timeToRequery is never nil since ipWithExpirationMap is non-empty and updates when adding entries. f.dnsQueryQueue.AddAfter(fqdn, timeToRequery.Sub(currentTime)) } diff --git a/pkg/agent/controller/networkpolicy/fqdn_test.go b/pkg/agent/controller/networkpolicy/fqdn_test.go index 0a9b129e3d7..aef15716422 100644 --- a/pkg/agent/controller/networkpolicy/fqdn_test.go +++ b/pkg/agent/controller/networkpolicy/fqdn_test.go @@ -17,6 +17,7 @@ package networkpolicy import ( "context" "fmt" + "k8s.io/utils/clock" "net" "testing" "time" @@ -32,7 +33,7 @@ import ( openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" ) -func newMockFQDNController(t *testing.T, controller *gomock.Controller, dnsServer *string, fakeClock *clocktesting.FakeClock) (*fqdnController, *openflowtest.MockClient) { +func newMockFQDNController(t *testing.T, controller *gomock.Controller, dnsServer *string, clockToInject clock.Clock) (*fqdnController, *openflowtest.MockClient) { mockOFClient := openflowtest.NewMockClient(controller) mockOFClient.EXPECT().NewDNSPacketInConjunction(gomock.Any()).Return(nil).AnyTimes() dirtyRuleHandler := func(rule string) {} @@ -40,6 +41,9 @@ func newMockFQDNController(t *testing.T, controller *gomock.Controller, dnsServe if dnsServer != nil { dnsServerAddr = *dnsServer } + if clockToInject == nil { + clockToInject = clock.RealClock{} + } f, err := newFQDNController( mockOFClient, newIDAllocator(testAsyncDeleteInterval), @@ -48,7 +52,7 @@ func newMockFQDNController(t *testing.T, controller *gomock.Controller, dnsServe true, false, config.DefaultHostGatewayOFPort, - fakeClock, + clockToInject, ) require.NoError(t, err) return f, mockOFClient @@ -627,7 +631,7 @@ func TestOnDNSResponse(t *testing.T) { expectedRequeryAfter: ptr.To(5 * time.Second), }, { - name: "existing DNS cache not impacted", + name: "empty DNS response", existingDNSCache: map[string]dnsMeta{ testFQDN: { responseIPs: map[string]ipWithExpiration{ @@ -660,6 +664,7 @@ func TestOnDNSResponse(t *testing.T) { "192.1.1.1": {ip: net.ParseIP("192.1.1.1"), expirationTime: currentTime.Add(10 * time.Second)}, "192.1.1.2": {ip: net.ParseIP("192.1.1.2"), expirationTime: currentTime.Add(5 * time.Second)}, }, + expectedRequeryAfter: ptr.To(5 * time.Second), }, { name: "stale IP with expired TTL is evicted", @@ -715,11 +720,18 @@ func TestOnDNSResponse(t *testing.T) { f.selectorItemToRuleIDs = tc.mockSelectorToRuleIDs } + for selectorItem := range f.selectorItemToRuleIDs { + if selectorItem.matches(testFQDN) { + t.Logf("SELECTOR ITEM MATCHS") + + } + } + f.onDNSResponse(testFQDN, tc.dnsResponseIPs, nil) cachedDnsMetaData, _ := f.dnsEntryCache[testFQDN] - assert.Equal(t, tc.expectedIPs, cachedDnsMetaData.responseIPs, "Expected %+v in cache, got %+v", tc.expectedIPs, cachedDnsMetaData.responseIPs) + assert.Equal(t, tc.expectedIPs, cachedDnsMetaData.responseIPs, "FQDN cache doesn't match expected entries") if tc.expectedRequeryAfter != nil { fakeClock.Step(*tc.expectedRequeryAfter) From e3e25df33b66764bd94b1387e219972c3862c831 Mon Sep 17 00:00:00 2001 From: Hemant Date: Fri, 1 Nov 2024 00:01:28 +0530 Subject: [PATCH 8/8] correct grouping of imports in test file. Signed-off-by: Hemant --- pkg/agent/controller/networkpolicy/fqdn_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/agent/controller/networkpolicy/fqdn_test.go b/pkg/agent/controller/networkpolicy/fqdn_test.go index aef15716422..047d30f9ed3 100644 --- a/pkg/agent/controller/networkpolicy/fqdn_test.go +++ b/pkg/agent/controller/networkpolicy/fqdn_test.go @@ -17,7 +17,6 @@ package networkpolicy import ( "context" "fmt" - "k8s.io/utils/clock" "net" "testing" "time" @@ -26,6 +25,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/utils/clock" clocktesting "k8s.io/utils/clock/testing" "k8s.io/utils/ptr"