Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid unnecessary DNS queries for FQDN rule of NetworkPolicy #6200

Merged
merged 1 commit into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 43 additions & 55 deletions pkg/agent/controller/networkpolicy/fqdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"antrea.io/libOpenflow/protocol"
"antrea.io/ofnet/ofctrl"
"github.com/miekg/dns"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -142,7 +143,7 @@ type fqdnController struct {
fqdnSelectorMutex sync.Mutex
// fqdnToSelectorItem stores known FQDNSelectorItems that selects the FQDN, for each
// FQDN tracked by this controller.
fqdnToSelectorItem map[string]map[fqdnSelectorItem]struct{}
fqdnToSelectorItem map[string]sets.Set[fqdnSelectorItem]
// selectorItemToFQDN is a reversed map of fqdnToSelectorItem. It stores all known
// FQDNs that match the fqdnSelectorItem.
selectorItemToFQDN map[fqdnSelectorItem]sets.Set[string]
Expand All @@ -162,7 +163,7 @@ func newFQDNController(client openflow.Client, allocator *idAllocator, dnsServer
dnsQueryQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "fqdn"),
dnsEntryCache: map[string]dnsMeta{},
fqdnRuleToSelectedPods: map[string]sets.Set[int32]{},
fqdnToSelectorItem: map[string]map[fqdnSelectorItem]struct{}{},
fqdnToSelectorItem: map[string]sets.Set[fqdnSelectorItem]{},
selectorItemToFQDN: map[fqdnSelectorItem]sets.Set[string]{},
selectorItemToRuleIDs: map[fqdnSelectorItem]sets.Set[string]{},
ipv4Enabled: v4Enabled,
Expand Down Expand Up @@ -220,11 +221,9 @@ func toRegex(pattern string) string {
func (f *fqdnController) setFQDNMatchSelector(fqdn string, selectorItem fqdnSelectorItem) {
matchedSelectorItems, ok := f.fqdnToSelectorItem[fqdn]
if !ok {
f.fqdnToSelectorItem[fqdn] = map[fqdnSelectorItem]struct{}{
selectorItem: {},
}
f.fqdnToSelectorItem[fqdn] = sets.New(selectorItem)
} else {
matchedSelectorItems[selectorItem] = struct{}{}
matchedSelectorItems.Insert(selectorItem)
}
matchedFQDNs, ok := f.selectorItemToFQDN[selectorItem]
if !ok {
Expand Down Expand Up @@ -272,22 +271,25 @@ func (f *fqdnController) addFQDNSelector(ruleID string, fqdns []string) {
fqdnSelectorItem := fqdnToSelectorItem(fqdn)
ruleIDs, exists := f.selectorItemToRuleIDs[fqdnSelectorItem]
if !exists {
// This is a new fqdnSelectorItem. All existing FQDNs in the cache needs to be matched
// against this fqdnSelectorItem to update the mapping.
// This is a new fqdnSelectorItem.
f.selectorItemToRuleIDs[fqdnSelectorItem] = sets.New[string](ruleID)
for fqdn := range f.dnsEntryCache {
if fqdnSelectorItem.matches(fqdn) {
f.setFQDNMatchSelector(fqdn, fqdnSelectorItem)
// Existing FQDNs in the cache needs to be matched against this fqdnSelectorItem to update the mapping.
if fqdnSelectorItem.matchRegex != "" {
// As the selector matches regex, all existing FQDNs can potentially match it.
for fqdn := range f.dnsEntryCache {
if fqdnSelectorItem.matches(fqdn) {
f.setFQDNMatchSelector(fqdn, fqdnSelectorItem)
}
}
} else {
// As the selector matches name, only the FQDN of this name matches it.
f.setFQDNMatchSelector(fqdnSelectorItem.matchName, fqdnSelectorItem)
// Trigger a DNS query immediately for the FQDN.
f.dnsQueryQueue.Add(fqdnSelectorItem.matchName)
}
} else {
f.selectorItemToRuleIDs[fqdnSelectorItem] = ruleIDs.Insert(ruleID)
}
if fqdnSelectorItem.matchName != "" {
// Start a DNS query immediately for matchName selectors.
f.setFQDNMatchSelector(fqdnSelectorItem.matchName, fqdnSelectorItem)
f.dnsQueryQueue.Add(fqdnSelectorItem.matchName)
}
}
}

Expand Down Expand Up @@ -353,15 +355,15 @@ func (f *fqdnController) deleteFQDNSelector(ruleID string, fqdns []string) {
func (f *fqdnController) cleanupFQDNSelectorItem(fs fqdnSelectorItem) {
for fqdn := range f.selectorItemToFQDN[fs] {
selectors := f.fqdnToSelectorItem[fqdn]
if _, ok := selectors[fs]; ok {
if len(selectors) == 1 {
if selectors.Has(fs) {
selectors.Delete(fs)
if len(selectors) == 0 {
// the fqdnSelectorItem being deleted is the last fqdnSelectorItem
// that selects this FQDN. Hence this FQDN no longer needs to be
// tracked by the fqdnController.
delete(f.fqdnToSelectorItem, fqdn)
delete(f.dnsEntryCache, fqdn)
}
delete(selectors, fs)
}
}
delete(f.selectorItemToFQDN, fs)
Expand Down Expand Up @@ -400,7 +402,6 @@ func (f *fqdnController) onDNSResponse(
fqdn string,
responseIPs map[string]net.IP,
lowestTTL uint32,
lookupTime time.Time,
waitCh chan error,
) {
if len(responseIPs) == 0 {
Expand All @@ -415,7 +416,7 @@ func (f *fqdnController) onDNSResponse(
// addressUpdate is only true if there has been an update in IP addresses
// corresponded with the FQDN.
mustCacheResponse, addressUpdate := false, false
recordTTL := lookupTime.Add(time.Duration(lowestTTL) * time.Second)
recordTTL := time.Now().Add(time.Duration(lowestTTL) * time.Second)

f.fqdnSelectorMutex.Lock()
defer f.fqdnSelectorMutex.Unlock()
Expand Down Expand Up @@ -463,7 +464,7 @@ func (f *fqdnController) onDNSResponse(
}

// onDNSResponseMsg handles a DNS response message intercepted.
func (f *fqdnController) onDNSResponseMsg(dnsMsg *dns.Msg, lookupTime time.Time, waitCh chan error) {
func (f *fqdnController) onDNSResponseMsg(dnsMsg *dns.Msg, waitCh chan error) {
fqdn, responseIPs, lowestTTL, err := f.parseDNSResponse(dnsMsg)
if err != nil {
klog.V(2).InfoS("Failed to parse DNS response")
Expand All @@ -472,7 +473,7 @@ func (f *fqdnController) onDNSResponseMsg(dnsMsg *dns.Msg, lookupTime time.Time,
}
return
}
f.onDNSResponse(fqdn, responseIPs, lowestTTL, lookupTime, waitCh)
f.onDNSResponse(fqdn, responseIPs, lowestTTL, waitCh)
}

// syncDirtyRules triggers rule syncs for rules that are affected by the FQDN of DNS response
Expand Down Expand Up @@ -654,7 +655,7 @@ func (f *fqdnController) lookupIP(ctx context.Context, fqdn string) error {
const defaultTTL = 600 // 600 seconds, 10 minutes
resolver := net.DefaultResolver

v4ok, v6ok := true, true
var errs []error

makeResponseIPs := func(ips []net.IP) map[string]net.IP {
responseIPs := make(map[string]net.IP)
Expand All @@ -665,26 +666,21 @@ func (f *fqdnController) lookupIP(ctx context.Context, fqdn string) error {
}

if f.ipv4Enabled {
lookupTime := time.Now()
if ips, err := resolver.LookupIP(ctx, "ip4", fqdn); err == nil {
f.onDNSResponse(fqdn, makeResponseIPs(ips), defaultTTL, lookupTime, nil)
f.onDNSResponse(fqdn, makeResponseIPs(ips), defaultTTL, nil)
} else {
v4ok = false
errs = append(errs, fmt.Errorf("DNS request failed for IPv4: %w", err))
}
}
if f.ipv6Enabled {
lookupTime := time.Now()
if ips, err := resolver.LookupIP(ctx, "ip6", fqdn); err == nil {
f.onDNSResponse(fqdn, makeResponseIPs(ips), defaultTTL, lookupTime, nil)
f.onDNSResponse(fqdn, makeResponseIPs(ips), defaultTTL, nil)
} else {
v6ok = false
errs = append(errs, fmt.Errorf("DNS request failed for IPv6: %w", err))
}
}

if !v4ok || !v6ok {
return fmt.Errorf("DNS request failed for at least one network (v4 and/or v6)")
}
return nil
return errors.NewAggregate(errs)
}

// makeDNSRequest makes a proactive query for a FQDN to the coreDNS service.
Expand All @@ -700,39 +696,31 @@ func (f *fqdnController) makeDNSRequest(ctx context.Context, fqdn string) error
if fqdn[len(fqdn)-1] != '.' {
fqdnToQuery = fqdn + "."
}
query := func(m *dns.Msg) (*dns.Msg, error) {
query := func(qtype uint16) (*dns.Msg, error) {
m := &dns.Msg{}
m.SetQuestion(fqdnToQuery, qtype)
r, _, err := dnsClient.ExchangeContext(ctx, m, f.dnsServerAddr)
if err != nil {
klog.ErrorS(err, "DNS exchange failed")
return nil, err
}
return r, nil
}
v4ok, v6ok := true, true
var errs []error
if f.ipv4Enabled {
m := dns.Msg{}
m.SetQuestion(fqdnToQuery, dns.TypeA)
lookupTime := time.Now()
if res, err := query(&m); err == nil {
f.onDNSResponseMsg(res, lookupTime, nil)
if res, err := query(dns.TypeA); err == nil {
f.onDNSResponseMsg(res, nil)
} else {
v4ok = false
errs = append(errs, fmt.Errorf("DNS request failed for IPv4: %w", err))
}
}
if f.ipv6Enabled {
m := dns.Msg{}
m.SetQuestion(fqdnToQuery, dns.TypeAAAA)
lookupTime := time.Now()
if res, err := query(&m); err == nil {
f.onDNSResponseMsg(res, lookupTime, nil)
if res, err := query(dns.TypeAAAA); err == nil {
f.onDNSResponseMsg(res, nil)
} else {
v6ok = false
errs = append(errs, fmt.Errorf("DNS request failed for IPv6: %w", err))
}
}
if !v4ok || !v6ok {
return fmt.Errorf("DNS request failed for at least one of type A or AAAA queries")
}
return nil
return errors.NewAggregate(errs)
}

// HandlePacketIn implements openflow.PacketInHandler
Expand All @@ -746,7 +734,7 @@ func (f *fqdnController) HandlePacketIn(pktIn *ofctrl.PacketIn) error {
waitCh <- nil
return
}
f.onDNSResponseMsg(&dnsMsg, time.Now(), waitCh)
f.onDNSResponseMsg(&dnsMsg, waitCh)
}
handleTCP := func(tcpPkt *protocol.TCP) {
dnsData, dataLength, err := binding.GetTCPDNSData(tcpPkt)
Expand Down Expand Up @@ -774,7 +762,7 @@ func (f *fqdnController) HandlePacketIn(pktIn *ofctrl.PacketIn) error {
waitCh <- nil
return
}
f.onDNSResponseMsg(&dnsMsg, time.Now(), waitCh)
f.onDNSResponseMsg(&dnsMsg, waitCh)
}
go func() {
ethernetPkt, err := openflow.GetEthernetPacket(pktIn)
Expand Down
Loading
Loading