diff --git a/pkg/agent/controller/networkpolicy/fqdn.go b/pkg/agent/controller/networkpolicy/fqdn.go index 2cbb28d361d..b8e28be41c2 100644 --- a/pkg/agent/controller/networkpolicy/fqdn.go +++ b/pkg/agent/controller/networkpolicy/fqdn.go @@ -28,6 +28,7 @@ import ( "antrea.io/libOpenflow/protocol" "antrea.io/ofnet/ofctrl" "github.com/miekg/dns" + "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" @@ -142,7 +143,7 @@ type fqdnController struct { fqdnSelectorMutex sync.Mutex // fqdnToSelectorItem stores known FQDNSelectorItems that selects the FQDN, for each // FQDN tracked by this controller. - fqdnToSelectorItem map[string]map[fqdnSelectorItem]struct{} + fqdnToSelectorItem map[string]sets.Set[fqdnSelectorItem] // selectorItemToFQDN is a reversed map of fqdnToSelectorItem. It stores all known // FQDNs that match the fqdnSelectorItem. selectorItemToFQDN map[fqdnSelectorItem]sets.Set[string] @@ -162,7 +163,7 @@ func newFQDNController(client openflow.Client, allocator *idAllocator, dnsServer dnsQueryQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "fqdn"), dnsEntryCache: map[string]dnsMeta{}, fqdnRuleToSelectedPods: map[string]sets.Set[int32]{}, - fqdnToSelectorItem: map[string]map[fqdnSelectorItem]struct{}{}, + fqdnToSelectorItem: map[string]sets.Set[fqdnSelectorItem]{}, selectorItemToFQDN: map[fqdnSelectorItem]sets.Set[string]{}, selectorItemToRuleIDs: map[fqdnSelectorItem]sets.Set[string]{}, ipv4Enabled: v4Enabled, @@ -220,11 +221,9 @@ func toRegex(pattern string) string { func (f *fqdnController) setFQDNMatchSelector(fqdn string, selectorItem fqdnSelectorItem) { matchedSelectorItems, ok := f.fqdnToSelectorItem[fqdn] if !ok { - f.fqdnToSelectorItem[fqdn] = map[fqdnSelectorItem]struct{}{ - selectorItem: {}, - } + f.fqdnToSelectorItem[fqdn] = sets.New(selectorItem) } else { - matchedSelectorItems[selectorItem] = struct{}{} + matchedSelectorItems.Insert(selectorItem) } matchedFQDNs, ok := f.selectorItemToFQDN[selectorItem] if !ok { @@ -272,22 +271,25 @@ func (f *fqdnController) addFQDNSelector(ruleID string, fqdns []string) { fqdnSelectorItem := fqdnToSelectorItem(fqdn) ruleIDs, exists := f.selectorItemToRuleIDs[fqdnSelectorItem] if !exists { - // This is a new fqdnSelectorItem. All existing FQDNs in the cache needs to be matched - // against this fqdnSelectorItem to update the mapping. + // This is a new fqdnSelectorItem. f.selectorItemToRuleIDs[fqdnSelectorItem] = sets.New[string](ruleID) - for fqdn := range f.dnsEntryCache { - if fqdnSelectorItem.matches(fqdn) { - f.setFQDNMatchSelector(fqdn, fqdnSelectorItem) + // Existing FQDNs in the cache needs to be matched against this fqdnSelectorItem to update the mapping. + if fqdnSelectorItem.matchRegex != "" { + // As the selector matches regex, all existing FQDNs can potentially match it. + for fqdn := range f.dnsEntryCache { + if fqdnSelectorItem.matches(fqdn) { + f.setFQDNMatchSelector(fqdn, fqdnSelectorItem) + } } + } else { + // As the selector matches name, only the FQDN of this name matches it. + f.setFQDNMatchSelector(fqdnSelectorItem.matchName, fqdnSelectorItem) + // Trigger a DNS query immediately for the FQDN. + f.dnsQueryQueue.Add(fqdnSelectorItem.matchName) } } else { f.selectorItemToRuleIDs[fqdnSelectorItem] = ruleIDs.Insert(ruleID) } - if fqdnSelectorItem.matchName != "" { - // Start a DNS query immediately for matchName selectors. - f.setFQDNMatchSelector(fqdnSelectorItem.matchName, fqdnSelectorItem) - f.dnsQueryQueue.Add(fqdnSelectorItem.matchName) - } } } @@ -353,15 +355,15 @@ func (f *fqdnController) deleteFQDNSelector(ruleID string, fqdns []string) { func (f *fqdnController) cleanupFQDNSelectorItem(fs fqdnSelectorItem) { for fqdn := range f.selectorItemToFQDN[fs] { selectors := f.fqdnToSelectorItem[fqdn] - if _, ok := selectors[fs]; ok { - if len(selectors) == 1 { + if selectors.Has(fs) { + selectors.Delete(fs) + if len(selectors) == 0 { // the fqdnSelectorItem being deleted is the last fqdnSelectorItem // that selects this FQDN. Hence this FQDN no longer needs to be // tracked by the fqdnController. delete(f.fqdnToSelectorItem, fqdn) delete(f.dnsEntryCache, fqdn) } - delete(selectors, fs) } } delete(f.selectorItemToFQDN, fs) @@ -400,7 +402,6 @@ func (f *fqdnController) onDNSResponse( fqdn string, responseIPs map[string]net.IP, lowestTTL uint32, - lookupTime time.Time, waitCh chan error, ) { if len(responseIPs) == 0 { @@ -415,7 +416,7 @@ func (f *fqdnController) onDNSResponse( // addressUpdate is only true if there has been an update in IP addresses // corresponded with the FQDN. mustCacheResponse, addressUpdate := false, false - recordTTL := lookupTime.Add(time.Duration(lowestTTL) * time.Second) + recordTTL := time.Now().Add(time.Duration(lowestTTL) * time.Second) f.fqdnSelectorMutex.Lock() defer f.fqdnSelectorMutex.Unlock() @@ -463,7 +464,7 @@ func (f *fqdnController) onDNSResponse( } // onDNSResponseMsg handles a DNS response message intercepted. -func (f *fqdnController) onDNSResponseMsg(dnsMsg *dns.Msg, lookupTime time.Time, waitCh chan error) { +func (f *fqdnController) onDNSResponseMsg(dnsMsg *dns.Msg, waitCh chan error) { fqdn, responseIPs, lowestTTL, err := f.parseDNSResponse(dnsMsg) if err != nil { klog.V(2).InfoS("Failed to parse DNS response") @@ -472,7 +473,7 @@ func (f *fqdnController) onDNSResponseMsg(dnsMsg *dns.Msg, lookupTime time.Time, } return } - f.onDNSResponse(fqdn, responseIPs, lowestTTL, lookupTime, waitCh) + f.onDNSResponse(fqdn, responseIPs, lowestTTL, waitCh) } // syncDirtyRules triggers rule syncs for rules that are affected by the FQDN of DNS response @@ -654,7 +655,7 @@ func (f *fqdnController) lookupIP(ctx context.Context, fqdn string) error { const defaultTTL = 600 // 600 seconds, 10 minutes resolver := net.DefaultResolver - v4ok, v6ok := true, true + var errs []error makeResponseIPs := func(ips []net.IP) map[string]net.IP { responseIPs := make(map[string]net.IP) @@ -665,26 +666,21 @@ func (f *fqdnController) lookupIP(ctx context.Context, fqdn string) error { } if f.ipv4Enabled { - lookupTime := time.Now() if ips, err := resolver.LookupIP(ctx, "ip4", fqdn); err == nil { - f.onDNSResponse(fqdn, makeResponseIPs(ips), defaultTTL, lookupTime, nil) + f.onDNSResponse(fqdn, makeResponseIPs(ips), defaultTTL, nil) } else { - v4ok = false + errs = append(errs, fmt.Errorf("DNS request failed for IPv4: %w", err)) } } if f.ipv6Enabled { - lookupTime := time.Now() if ips, err := resolver.LookupIP(ctx, "ip6", fqdn); err == nil { - f.onDNSResponse(fqdn, makeResponseIPs(ips), defaultTTL, lookupTime, nil) + f.onDNSResponse(fqdn, makeResponseIPs(ips), defaultTTL, nil) } else { - v6ok = false + errs = append(errs, fmt.Errorf("DNS request failed for IPv6: %w", err)) } } - if !v4ok || !v6ok { - return fmt.Errorf("DNS request failed for at least one network (v4 and/or v6)") - } - return nil + return errors.NewAggregate(errs) } // makeDNSRequest makes a proactive query for a FQDN to the coreDNS service. @@ -700,39 +696,31 @@ func (f *fqdnController) makeDNSRequest(ctx context.Context, fqdn string) error if fqdn[len(fqdn)-1] != '.' { fqdnToQuery = fqdn + "." } - query := func(m *dns.Msg) (*dns.Msg, error) { + query := func(qtype uint16) (*dns.Msg, error) { + m := &dns.Msg{} + m.SetQuestion(fqdnToQuery, qtype) r, _, err := dnsClient.ExchangeContext(ctx, m, f.dnsServerAddr) if err != nil { - klog.ErrorS(err, "DNS exchange failed") return nil, err } return r, nil } - v4ok, v6ok := true, true + var errs []error if f.ipv4Enabled { - m := dns.Msg{} - m.SetQuestion(fqdnToQuery, dns.TypeA) - lookupTime := time.Now() - if res, err := query(&m); err == nil { - f.onDNSResponseMsg(res, lookupTime, nil) + if res, err := query(dns.TypeA); err == nil { + f.onDNSResponseMsg(res, nil) } else { - v4ok = false + errs = append(errs, fmt.Errorf("DNS request failed for IPv4: %w", err)) } } if f.ipv6Enabled { - m := dns.Msg{} - m.SetQuestion(fqdnToQuery, dns.TypeAAAA) - lookupTime := time.Now() - if res, err := query(&m); err == nil { - f.onDNSResponseMsg(res, lookupTime, nil) + if res, err := query(dns.TypeAAAA); err == nil { + f.onDNSResponseMsg(res, nil) } else { - v6ok = false + errs = append(errs, fmt.Errorf("DNS request failed for IPv6: %w", err)) } } - if !v4ok || !v6ok { - return fmt.Errorf("DNS request failed for at least one of type A or AAAA queries") - } - return nil + return errors.NewAggregate(errs) } // HandlePacketIn implements openflow.PacketInHandler @@ -746,7 +734,7 @@ func (f *fqdnController) HandlePacketIn(pktIn *ofctrl.PacketIn) error { waitCh <- nil return } - f.onDNSResponseMsg(&dnsMsg, time.Now(), waitCh) + f.onDNSResponseMsg(&dnsMsg, waitCh) } handleTCP := func(tcpPkt *protocol.TCP) { dnsData, dataLength, err := binding.GetTCPDNSData(tcpPkt) @@ -774,7 +762,7 @@ func (f *fqdnController) HandlePacketIn(pktIn *ofctrl.PacketIn) error { waitCh <- nil return } - f.onDNSResponseMsg(&dnsMsg, time.Now(), waitCh) + f.onDNSResponseMsg(&dnsMsg, waitCh) } go func() { ethernetPkt, err := openflow.GetEthernetPacket(pktIn) diff --git a/pkg/agent/controller/networkpolicy/fqdn_test.go b/pkg/agent/controller/networkpolicy/fqdn_test.go index d2ffeaa74b6..aa5f93c3d38 100644 --- a/pkg/agent/controller/networkpolicy/fqdn_test.go +++ b/pkg/agent/controller/networkpolicy/fqdn_test.go @@ -62,59 +62,103 @@ func TestAddFQDNRule(t *testing.T) { name string existingSelectorToRuleIDs map[fqdnSelectorItem]sets.Set[string] existingDNSCache map[string]dnsMeta - existingFQDNToSelectorItem map[string]map[fqdnSelectorItem]struct{} + existingFQDNToSelectorItem map[string]sets.Set[fqdnSelectorItem] + existingFQDNToSelectedPods map[string]sets.Set[int32] ruleID string fqdns []string podAddrs sets.Set[int32] finalSelectorToRuleIDs map[fqdnSelectorItem]sets.Set[string] - finalFQDNToSelectorItem map[string]map[fqdnSelectorItem]struct{} + finalFQDNToSelectorItem map[string]sets.Set[fqdnSelectorItem] addressAdded bool addressRemoved bool + enqueuedFQDNs []string }{ { - "addNewFQDNSelector", - nil, - nil, - nil, - "mockRule1", - []string{"test.antrea.io"}, - sets.New[int32](1), - map[fqdnSelectorItem]sets.Set[string]{ + name: "addNewMatchNameSelector", + ruleID: "mockRule1", + fqdns: []string{"test.antrea.io"}, + podAddrs: sets.New[int32](1), + finalSelectorToRuleIDs: map[fqdnSelectorItem]sets.Set[string]{ selectorItem1: sets.New[string]("mockRule1"), }, - map[string]map[fqdnSelectorItem]struct{}{ - "test.antrea.io": {selectorItem1: struct{}{}}, + finalFQDNToSelectorItem: map[string]sets.Set[fqdnSelectorItem]{ + "test.antrea.io": sets.New(selectorItem1), }, - true, - false, + addressAdded: true, + addressRemoved: false, + enqueuedFQDNs: []string{"test.antrea.io"}, }, { - "addNewFQDNSelectorMatchExisting", - map[fqdnSelectorItem]sets.Set[string]{ + name: "addSameMatchNameSelector", + existingSelectorToRuleIDs: map[fqdnSelectorItem]sets.Set[string]{ selectorItem1: sets.New[string]("mockRule1"), }, - map[string]dnsMeta{ + existingDNSCache: map[string]dnsMeta{ "test.antrea.io": {}, }, - map[string]map[fqdnSelectorItem]struct{}{ - "test.antrea.io": { - selectorItem1: struct{}{}, - }, + existingFQDNToSelectorItem: map[string]sets.Set[fqdnSelectorItem]{ + "test.antrea.io": sets.New(selectorItem1), }, - "mockRule2", - []string{"*antrea.io"}, - sets.New[int32](2), - map[fqdnSelectorItem]sets.Set[string]{ + existingFQDNToSelectedPods: map[string]sets.Set[int32]{ + "test.antrea.io": sets.New[int32](1), + }, + ruleID: "mockRule1", + fqdns: []string{"test.antrea.io"}, + podAddrs: sets.New[int32](1), + finalSelectorToRuleIDs: map[fqdnSelectorItem]sets.Set[string]{ + selectorItem1: sets.New[string]("mockRule1"), + }, + finalFQDNToSelectorItem: map[string]sets.Set[fqdnSelectorItem]{ + "test.antrea.io": sets.New(selectorItem1), + }, + addressAdded: false, + addressRemoved: false, + }, + { + name: "addNewMatchNameSelectorMatchingExisting", + existingSelectorToRuleIDs: map[fqdnSelectorItem]sets.Set[string]{ + selectorItem1: sets.New[string]("mockRule1"), + }, + existingDNSCache: map[string]dnsMeta{ + "test.antrea.io": {}, + }, + existingFQDNToSelectorItem: map[string]sets.Set[fqdnSelectorItem]{ + "test.antrea.io": sets.New(selectorItem1), + }, + ruleID: "mockRule2", + fqdns: []string{"test.antrea.io"}, + podAddrs: sets.New[int32](2), + finalSelectorToRuleIDs: map[fqdnSelectorItem]sets.Set[string]{ + selectorItem1: sets.New[string]("mockRule1", "mockRule2"), + }, + finalFQDNToSelectorItem: map[string]sets.Set[fqdnSelectorItem]{ + "test.antrea.io": sets.New(selectorItem1), + }, + addressAdded: true, + addressRemoved: false, + }, + { + name: "addNewMatchRegexSelectorMatchExisting", + existingSelectorToRuleIDs: map[fqdnSelectorItem]sets.Set[string]{ + selectorItem1: sets.New[string]("mockRule1"), + }, + existingDNSCache: map[string]dnsMeta{ + "test.antrea.io": {}, + }, + existingFQDNToSelectorItem: map[string]sets.Set[fqdnSelectorItem]{ + "test.antrea.io": sets.New(selectorItem1), + }, + ruleID: "mockRule2", + fqdns: []string{"*antrea.io"}, + podAddrs: sets.New[int32](2), + finalSelectorToRuleIDs: map[fqdnSelectorItem]sets.Set[string]{ selectorItem1: sets.New[string]("mockRule1"), selectorItem2: sets.New[string]("mockRule2")}, - map[string]map[fqdnSelectorItem]struct{}{ - "test.antrea.io": { - selectorItem1: struct{}{}, - selectorItem2: struct{}{}, - }, + finalFQDNToSelectorItem: map[string]sets.Set[fqdnSelectorItem]{ + "test.antrea.io": sets.New(selectorItem1, selectorItem2), }, - true, - false, + addressAdded: true, + addressRemoved: false, }, } for _, tt := range tests { @@ -131,12 +175,22 @@ func TestAddFQDNRule(t *testing.T) { f.selectorItemToRuleIDs = tt.existingSelectorToRuleIDs f.fqdnToSelectorItem = tt.existingFQDNToSelectorItem } + if tt.existingFQDNToSelectedPods != nil { + f.fqdnRuleToSelectedPods = tt.existingFQDNToSelectedPods + } if tt.existingDNSCache != nil { f.dnsEntryCache = tt.existingDNSCache } require.NoError(t, f.addFQDNRule(tt.ruleID, tt.fqdns, tt.podAddrs), "Error when adding FQDN rule") assert.Equal(t, tt.finalSelectorToRuleIDs, f.selectorItemToRuleIDs) assert.Equal(t, tt.finalFQDNToSelectorItem, f.fqdnToSelectorItem) + var enqueuedFQDNs []string + for f.dnsQueryQueue.Len() > 0 { + item, _ := f.dnsQueryQueue.Get() + f.dnsQueryQueue.Done(item) + enqueuedFQDNs = append(enqueuedFQDNs, item.(string)) + } + assert.ElementsMatch(t, tt.enqueuedFQDNs, enqueuedFQDNs) }) } } @@ -164,7 +218,7 @@ func TestDeleteFQDNRule(t *testing.T) { ruleID string fqdns []string finalSelectorToRuleIDs map[fqdnSelectorItem]sets.Set[string] - finalFQDNToSelectorItem map[string]map[fqdnSelectorItem]struct{} + finalFQDNToSelectorItem map[string]sets.Set[fqdnSelectorItem] addressRemoved bool }{ { @@ -182,7 +236,7 @@ func TestDeleteFQDNRule(t *testing.T) { "mockRule1", []string{"test.antrea.io"}, map[fqdnSelectorItem]sets.Set[string]{}, - map[string]map[fqdnSelectorItem]struct{}{}, + map[string]sets.Set[fqdnSelectorItem]{}, true, }, { @@ -207,10 +261,8 @@ func TestDeleteFQDNRule(t *testing.T) { map[fqdnSelectorItem]sets.Set[string]{ selectorItem1: sets.New[string]("mockRule2"), }, - map[string]map[fqdnSelectorItem]struct{}{ - "test.antrea.io": { - selectorItem1: struct{}{}, - }, + map[string]sets.Set[fqdnSelectorItem]{ + "test.antrea.io": sets.New(selectorItem1), }, true, }, @@ -236,10 +288,8 @@ func TestDeleteFQDNRule(t *testing.T) { map[fqdnSelectorItem]sets.Set[string]{ selectorItem2: sets.New[string]("mockRule2"), }, - map[string]map[fqdnSelectorItem]struct{}{ - "test.antrea.io": { - selectorItem2: struct{}{}, - }, + map[string]sets.Set[fqdnSelectorItem]{ + "test.antrea.io": sets.New(selectorItem2), }, true, }, @@ -266,10 +316,8 @@ func TestDeleteFQDNRule(t *testing.T) { map[fqdnSelectorItem]sets.Set[string]{ selectorItem3: sets.New[string]("mockRule1"), }, - map[string]map[fqdnSelectorItem]struct{}{ - "maps.google.com": { - selectorItem3: struct{}{}, - }, + map[string]sets.Set[fqdnSelectorItem]{ + "maps.google.com": sets.New(selectorItem3), }, true, },