-
Notifications
You must be signed in to change notification settings - Fork 368
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cache TTLs for individual IP addresses in DNS responses. #6732
base: main
Are you sure you want to change the base?
Changes from 7 commits
e98030f
fe43d74
a22bef9
d8fe685
8b05b7e
2822742
8a03d3a
cfcc0e0
e3e25df
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,21 +17,22 @@ package networkpolicy | |
import ( | ||
"context" | ||
"fmt" | ||
"math" | ||
"net" | ||
"os" | ||
"regexp" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
"antrea.io/libOpenflow/protocol" | ||
"antrea.io/ofnet/ofctrl" | ||
"github.com/miekg/dns" | ||
"k8s.io/apimachinery/pkg/util/errors" | ||
"k8s.io/apimachinery/pkg/util/sets" | ||
"k8s.io/client-go/util/workqueue" | ||
"k8s.io/klog/v2" | ||
"k8s.io/utils/clock" | ||
|
||
"antrea.io/libOpenflow/protocol" | ||
"antrea.io/ofnet/ofctrl" | ||
|
||
"antrea.io/antrea/pkg/agent/openflow" | ||
"antrea.io/antrea/pkg/agent/types" | ||
|
@@ -76,11 +77,15 @@ func (fs *fqdnSelectorItem) matches(fqdn string) bool { | |
// expirationTime of the records, which is the DNS response | ||
// receiving time plus lowest applicable TTL. | ||
type dnsMeta struct { | ||
expirationTime time.Time | ||
// Key for responseIPs is the string representation of the IP. | ||
// It helps to quickly identify IP address updates when a | ||
// new DNS response is received. | ||
responseIPs map[string]net.IP | ||
responseIPs map[string]ipWithExpiration | ||
} | ||
|
||
type ipWithExpiration struct { | ||
ip net.IP | ||
expirationTime time.Time | ||
} | ||
|
||
// subscriber is a entity that subsribes for datapath rule realization | ||
|
@@ -152,9 +157,11 @@ type fqdnController struct { | |
ipv4Enabled bool | ||
ipv6Enabled bool | ||
gwPort uint32 | ||
// clock allows injecting a custom (fake) clock in unit tests. | ||
clock clock.Clock | ||
} | ||
|
||
func newFQDNController(client openflow.Client, allocator *idAllocator, dnsServerOverride string, dirtyRuleHandler func(string), v4Enabled, v6Enabled bool, gwPort uint32) (*fqdnController, error) { | ||
func newFQDNController(client openflow.Client, allocator *idAllocator, dnsServerOverride string, dirtyRuleHandler func(string), v4Enabled, v6Enabled bool, gwPort uint32, clock clock.WithTicker) (*fqdnController, error) { | ||
controller := &fqdnController{ | ||
ofClient: client, | ||
dirtyRuleHandler: dirtyRuleHandler, | ||
|
@@ -163,7 +170,8 @@ func newFQDNController(client openflow.Client, allocator *idAllocator, dnsServer | |
dnsQueryQueue: workqueue.NewTypedRateLimitingQueueWithConfig( | ||
workqueue.NewTypedItemExponentialFailureRateLimiter[string](minRetryDelay, maxRetryDelay), | ||
workqueue.TypedRateLimitingQueueConfig[string]{ | ||
Name: "fqdn", | ||
Name: "fqdn", | ||
Clock: clock, | ||
}, | ||
), | ||
dnsEntryCache: map[string]dnsMeta{}, | ||
|
@@ -174,6 +182,7 @@ func newFQDNController(client openflow.Client, allocator *idAllocator, dnsServer | |
ipv4Enabled: v4Enabled, | ||
ipv6Enabled: v6Enabled, | ||
gwPort: gwPort, | ||
clock: clock, | ||
} | ||
if controller.ofClient != nil { | ||
if err := controller.ofClient.NewDNSPacketInConjunction(dnsInterceptRuleID); err != nil { | ||
|
@@ -253,8 +262,8 @@ func (f *fqdnController) getIPsForFQDNSelectors(fqdns []string) []net.IP { | |
} | ||
for fqdn := range fqdnsMatched { | ||
if dnsMeta, ok := f.dnsEntryCache[fqdn]; ok { | ||
for _, ip := range dnsMeta.responseIPs { | ||
matchedIPs = append(matchedIPs, ip) | ||
for _, ipData := range dnsMeta.responseIPs { | ||
matchedIPs = append(matchedIPs, ipData.ip) | ||
} | ||
} | ||
} | ||
|
@@ -405,80 +414,109 @@ func (f *fqdnController) deleteRuleSelectedPods(ruleID string) error { | |
|
||
func (f *fqdnController) onDNSResponse( | ||
fqdn string, | ||
responseIPs map[string]net.IP, | ||
lowestTTL uint32, | ||
newIPsWithExpiration map[string]ipWithExpiration, | ||
waitCh chan error, | ||
) { | ||
if len(responseIPs) == 0 { | ||
if len(newIPsWithExpiration) == 0 { | ||
klog.V(4).InfoS("FQDN was not resolved to any addresses, skip updating DNS cache", "fqdn", fqdn) | ||
if waitCh != nil { | ||
waitCh <- nil | ||
} | ||
return | ||
} | ||
// mustCacheResponse is only true if the FQDN is already tracked by this | ||
// controller, or it matches at least one fqdnSelectorItem from the policy rules. | ||
// addressUpdate is only true if there has been an update in IP addresses | ||
// corresponded with the FQDN. | ||
mustCacheResponse, addressUpdate := false, false | ||
recordTTL := time.Now().Add(time.Duration(lowestTTL) * time.Second) | ||
|
||
addressUpdate := false | ||
currentTime := f.clock.Now() | ||
ipWithExpirationMap := make(map[string]ipWithExpiration) | ||
|
||
// timeToRequery establishes the time after which a new DNS query should be sent for the FQDN. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I had suggested a second line for this comment? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ping |
||
var timeToRequery *time.Time | ||
|
||
updateIPWithExpiration := func(ip string, ipMeta ipWithExpiration) { | ||
ipWithExpirationMap[ip] = ipMeta | ||
if timeToRequery == nil || ipMeta.expirationTime.Before(*timeToRequery) { | ||
timeToRequery = &ipMeta.expirationTime | ||
} | ||
} | ||
|
||
f.fqdnSelectorMutex.Lock() | ||
defer f.fqdnSelectorMutex.Unlock() | ||
oldDNSMeta, exist := f.dnsEntryCache[fqdn] | ||
cachedDNSMeta, exist := f.dnsEntryCache[fqdn] | ||
if exist { | ||
mustCacheResponse = true | ||
for ipStr := range responseIPs { | ||
if _, ok := oldDNSMeta.responseIPs[ipStr]; !ok { | ||
// check for new IPs. | ||
for newIPStr, newIPMeta := range newIPsWithExpiration { | ||
if _, exist := cachedDNSMeta.responseIPs[newIPStr]; !exist { | ||
updateIPWithExpiration(newIPStr, newIPMeta) | ||
addressUpdate = true | ||
break | ||
} | ||
} | ||
for oldIPStr, oldIP := range oldDNSMeta.responseIPs { | ||
if _, ok := responseIPs[oldIPStr]; !ok { | ||
if oldDNSMeta.expirationTime.Before(time.Now()) { | ||
// This IP entry has already expired and not seen in the latest DNS response. | ||
// It should be removed from the cache. | ||
|
||
// check for presence of already cached IPs in the new response. | ||
for cachedIPStr, cachedIPMeta := range cachedDNSMeta.responseIPs { | ||
if newIPMeta, exist := newIPsWithExpiration[cachedIPStr]; !exist { | ||
// The IP was not found in current response. | ||
if cachedIPMeta.expirationTime.Before(currentTime) { | ||
// this IP is expired and stale, remove it by not including it but also signal an update to syncRules. | ||
addressUpdate = true | ||
} else { | ||
// Add the unexpired IP entry to responseIP and update the lowest applicable TTL if needed. | ||
responseIPs[oldIPStr] = oldIP | ||
if oldDNSMeta.expirationTime.Before(recordTTL) { | ||
recordTTL = oldDNSMeta.expirationTime | ||
} | ||
// It hasn't expired yet, so just retain it with its existing expirationTime. | ||
updateIPWithExpiration(cachedIPStr, cachedIPMeta) | ||
} | ||
} else { | ||
// This already cached IP is part of the current response, so update it with max time between received time and its old cached time. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wrap line at ~100 chars |
||
expTime := laterOf(newIPMeta.expirationTime, cachedIPMeta.expirationTime) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: no need to define new variable |
||
updateIPWithExpiration(cachedIPStr, ipWithExpiration{ | ||
ip: cachedIPMeta.ip, | ||
expirationTime: expTime, | ||
}) | ||
} | ||
} | ||
|
||
} else { | ||
// First time seeing this domain. | ||
// check if this needs to be tracked, by checking if it matches any Antrea FQDN policy selectors. | ||
|
||
// iterate over current rules mapping | ||
addToCache := false | ||
for selectorItem := range f.selectorItemToRuleIDs { | ||
// Only track the FQDN if there is at least one fqdnSelectorItem matching it. | ||
if selectorItem.matches(fqdn) { | ||
mustCacheResponse, addressUpdate = true, true | ||
// A FQDN can have multiple selectorItems mapped, hence we do not break the loop upon a match, but | ||
// keep iterating to create mapping of multiple selectorItems against same FQDN. | ||
addToCache = true | ||
f.setFQDNMatchSelector(fqdn, selectorItem) | ||
} | ||
} | ||
if addToCache { | ||
for ipStr, ipMeta := range newIPsWithExpiration { | ||
updateIPWithExpiration(ipStr, ipMeta) | ||
} | ||
addressUpdate = true | ||
} | ||
} | ||
if mustCacheResponse { | ||
|
||
if len(ipWithExpirationMap) > 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would add a comment saying that the only way this condition can be false is if the FQDN doesn't match any selector. Based on my understanding of the code I don't see any other case where that could happen. Am I missing something @Dyanngg ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "or the IPs are not in response and have expired" can you give an example? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. to clarify, this is just the comment that I think is wrong here, not the implementation |
||
f.dnsEntryCache[fqdn] = dnsMeta{ | ||
expirationTime: recordTTL, | ||
responseIPs: responseIPs, | ||
responseIPs: ipWithExpirationMap, | ||
} | ||
f.dnsQueryQueue.AddAfter(fqdn, recordTTL.Sub(time.Now())) | ||
// timeToRequery is guaranteed not to be nil because ipWithExpirationMap is not empty, and adding an entry to ipWithExpirationMap requires updating timeToRequery | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wrap line at ~100 chars, add trailing period to comment |
||
f.dnsQueryQueue.AddAfter(fqdn, timeToRequery.Sub(currentTime)) | ||
} | ||
|
||
f.syncDirtyRules(fqdn, waitCh, addressUpdate) | ||
} | ||
|
||
// onDNSResponseMsg handles a DNS response message intercepted. | ||
func (f *fqdnController) onDNSResponseMsg(dnsMsg *dns.Msg, waitCh chan error) { | ||
fqdn, responseIPs, lowestTTL, err := f.parseDNSResponse(dnsMsg) | ||
fqdn, responseIPs, err := f.parseDNSResponse(dnsMsg) | ||
if err != nil { | ||
klog.V(2).InfoS("Failed to parse DNS response") | ||
if waitCh != nil { | ||
waitCh <- fmt.Errorf("failed to parse DNS response: %v", err) | ||
} | ||
return | ||
} | ||
f.onDNSResponse(fqdn, responseIPs, lowestTTL, waitCh) | ||
f.onDNSResponse(fqdn, responseIPs, waitCh) | ||
} | ||
|
||
// syncDirtyRules triggers rule syncs for rules that are affected by the FQDN of DNS response | ||
|
@@ -594,38 +632,39 @@ func (f *fqdnController) runRuleSyncTracker(stopCh <-chan struct{}) { | |
} | ||
|
||
// parseDNSResponse returns the FQDN, IP query result and lowest applicable TTL of a DNS response. | ||
func (f *fqdnController) parseDNSResponse(msg *dns.Msg) (string, map[string]net.IP, uint32, error) { | ||
func (f *fqdnController) parseDNSResponse(msg *dns.Msg) (string, map[string]ipWithExpiration, error) { | ||
if len(msg.Question) == 0 { | ||
return "", nil, 0, fmt.Errorf("invalid DNS message") | ||
return "", nil, fmt.Errorf("invalid DNS message") | ||
} | ||
fqdn := strings.ToLower(msg.Question[0].Name) | ||
lowestTTL := uint32(math.MaxUint32) // a TTL must exist in the RRs | ||
responseIPs := map[string]net.IP{} | ||
responseIPs := map[string]ipWithExpiration{} | ||
currentTime := f.clock.Now() | ||
for _, ans := range msg.Answer { | ||
switch r := ans.(type) { | ||
case *dns.A: | ||
if f.ipv4Enabled { | ||
responseIPs[r.A.String()] = r.A | ||
if r.Header().Ttl < lowestTTL { | ||
lowestTTL = r.Header().Ttl | ||
responseIPs[r.A.String()] = ipWithExpiration{ | ||
ip: r.A, | ||
expirationTime: currentTime.Add(time.Duration(r.Header().Ttl) * time.Second), | ||
} | ||
|
||
} | ||
case *dns.AAAA: | ||
if f.ipv6Enabled { | ||
responseIPs[r.AAAA.String()] = r.AAAA | ||
if r.Header().Ttl < lowestTTL { | ||
lowestTTL = r.Header().Ttl | ||
responseIPs[r.AAAA.String()] = ipWithExpiration{ | ||
ip: r.AAAA, | ||
expirationTime: currentTime.Add(time.Duration(r.Header().Ttl) * time.Second), | ||
} | ||
} | ||
} | ||
} | ||
if len(responseIPs) > 0 { | ||
klog.V(4).InfoS("Received DNS Packet with valid Answer", "IPs", responseIPs, "TTL", lowestTTL) | ||
klog.V(4).InfoS("Received DNS Packet with valid Answer", "IPs", responseIPs) | ||
} | ||
if strings.HasSuffix(fqdn, ".") { | ||
fqdn = fqdn[:len(fqdn)-1] | ||
} | ||
return fqdn, responseIPs, lowestTTL, nil | ||
return fqdn, responseIPs, nil | ||
} | ||
|
||
func (f *fqdnController) worker() { | ||
|
@@ -662,24 +701,27 @@ func (f *fqdnController) lookupIP(ctx context.Context, fqdn string) error { | |
|
||
var errs []error | ||
|
||
makeResponseIPs := func(ips []net.IP) map[string]net.IP { | ||
responseIPs := make(map[string]net.IP) | ||
makeResponseIPs := func(ips []net.IP) map[string]ipWithExpiration { | ||
responseIPs := make(map[string]ipWithExpiration) | ||
for _, ip := range ips { | ||
responseIPs[ip.String()] = ip | ||
responseIPs[ip.String()] = ipWithExpiration{ | ||
ip: ip, | ||
expirationTime: f.clock.Now().Add(time.Duration(defaultTTL) * time.Second), | ||
} | ||
} | ||
return responseIPs | ||
} | ||
|
||
if f.ipv4Enabled { | ||
if ips, err := resolver.LookupIP(ctx, "ip4", fqdn); err == nil { | ||
f.onDNSResponse(fqdn, makeResponseIPs(ips), defaultTTL, nil) | ||
f.onDNSResponse(fqdn, makeResponseIPs(ips), nil) | ||
} else { | ||
errs = append(errs, fmt.Errorf("DNS request failed for IPv4: %w", err)) | ||
} | ||
} | ||
if f.ipv6Enabled { | ||
if ips, err := resolver.LookupIP(ctx, "ip6", fqdn); err == nil { | ||
f.onDNSResponse(fqdn, makeResponseIPs(ips), defaultTTL, nil) | ||
f.onDNSResponse(fqdn, makeResponseIPs(ips), nil) | ||
} else { | ||
errs = append(errs, fmt.Errorf("DNS request failed for IPv6: %w", err)) | ||
} | ||
|
@@ -818,3 +860,11 @@ func (f *fqdnController) HandlePacketIn(pktIn *ofctrl.PacketIn) error { | |
return f.ofClient.ResumePausePacket(pktIn) | ||
} | ||
} | ||
|
||
// laterOf returns the later of the two given time.Time values. | ||
func laterOf(t1, t2 time.Time) time.Time { | ||
if t1.After(t2) { | ||
return t1 | ||
} | ||
return t2 | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may not be looking in the right place, but do we need a ticker anywhere? Asking because you are using
clock.WithTicker
instead ofclock.Clock
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my comment was wrong: the workqueue requires a ticker