diff --git a/cmd/ip-masq-agent/ip-masq-agent.go b/cmd/ip-masq-agent/ip-masq-agent.go index d56fa7a4..1624267f 100644 --- a/cmd/ip-masq-agent/ip-masq-agent.go +++ b/cmd/ip-masq-agent/ip-masq-agent.go @@ -29,6 +29,7 @@ import ( utilyaml "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/component-base/logs" + "k8s.io/component-base/logs/logreduction" "k8s.io/component-base/version/verflag" "k8s.io/ip-masq-agent/cmd/ip-masq-agent/testing/fakefs" "k8s.io/ip-masq-agent/pkg/interval" @@ -44,6 +45,8 @@ const ( linkLocalCIDRIPv6 = "fe80::/10" // path to a yaml or json file configPath = "/etc/config/ip-masq-agent" + // How frequently to write identical logs at verbosity 2 (otherwise 4) + identicalLogDelay = 24 * time.Hour ) var ( @@ -116,9 +119,10 @@ func NewMasqConfig(masqAllReservedRanges bool) *MasqConfig { // MasqDaemon object type MasqDaemon struct { - config *MasqConfig - iptables utiliptables.Interface - ip6tables utiliptables.Interface + config *MasqConfig + iptables utiliptables.Interface + ip6tables utiliptables.Interface + logReduction *logreduction.LogReduction } // NewMasqDaemon returns a MasqDaemon with default values, including an initialized utiliptables.Interface @@ -129,9 +133,10 @@ func NewMasqDaemon(c *MasqConfig) *MasqDaemon { iptables := utiliptables.New(execer, protocolv4) ip6tables := utiliptables.New(execer, protocolv6) return &MasqDaemon{ - config: c, - iptables: iptables, - ip6tables: ip6tables, + config: c, + iptables: iptables, + ip6tables: ip6tables, + logReduction: logreduction.NewLogReduction(identicalLogDelay), } } @@ -174,17 +179,17 @@ func (m *MasqDaemon) Run() { defer time.Sleep(time.Duration(m.config.ResyncInterval)) // resync config if err := m.osSyncConfig(); err != nil { - klog.Errorf("error syncing configuration: %v", err) + klog.Errorf("Error syncing configuration: %v", err) return } // resync rules if err := m.syncMasqRules(); err != nil { - klog.Errorf("error syncing masquerade rules: %v", err) + klog.Errorf("Error syncing masquerade rules: %v", err) return } // resync ipv6 rules if err := m.syncMasqRulesIPv6(); err != nil { - klog.Errorf("error syncing masquerade rules for ipv6: %v", err) + klog.Errorf("Error syncing masquerade rules for IPv6: %v", err) return } }() @@ -198,15 +203,30 @@ func (m *MasqDaemon) osSyncConfig() error { return m.syncConfig(fs) } +func (m *MasqDaemon) logVerbose(message string, parentID string) klog.Verbose { + if m.logReduction.ShouldMessageBePrinted(message, parentID) { + return klog.V(2) + } + return klog.V(4) +} + // Syncs the config to the file at ConfigPath, or uses defaults if the file could not be found // Error if the file is found but cannot be parsed. func (m *MasqDaemon) syncConfig(fs fakefs.FileSystem) error { + logSyncParentID := "config-sync" + logFileParentID := "config-file" + + var yaml []byte var err error c := NewMasqConfig(*noMasqueradeAllReservedRangesFlag) defer func() { + // Calculating verbosity here (outside the `if` below) and using `yaml` + // (instead of `json`) allows reprinting at file change (even if the parsed + // json doesn't change) as well as any error condition change. + v := m.logVerbose(fmt.Sprintf("%v\x00%s", err, yaml), logFileParentID) if err == nil { json, _ := utiljson.Marshal(c) - klog.V(2).Infof("using config: %s", string(json)) + v.Infof("Using config: %s", string(json)) } }() @@ -218,13 +238,13 @@ func (m *MasqDaemon) syncConfig(fs fakefs.FileSystem) error { m.config.MasqLinkLocal = c.MasqLinkLocal m.config.MasqLinkLocalIPv6 = c.MasqLinkLocalIPv6 m.config.ResyncInterval = c.ResyncInterval - klog.V(2).Infof("no config file found at %q, using default values", configPath) + m.logVerbose("not-found", logSyncParentID).Infof("No config file found at %q, using default values.", configPath) return nil } - klog.V(2).Infof("config file found at %q", configPath) + m.logVerbose("found", logSyncParentID).Infof("Config file found at %q", configPath) // file exists, read and parse file - yaml, err := fs.ReadFile(configPath) + yaml, err = fs.ReadFile(configPath) if err != nil { return err } @@ -287,6 +307,8 @@ func validateCIDR(cidr string) error { } func (m *MasqDaemon) syncMasqRules() error { + logParentID := "ipv4" + // make sure our custom chain for non-masquerade exists if _, err := m.iptables.EnsureChain(utiliptables.TableNAT, masqChain); err != nil { return err @@ -318,6 +340,7 @@ func (m *MasqDaemon) syncMasqRules() error { writeMasqRules(lines, toPorts) writeLine(lines, "COMMIT") + m.logVerbose(lines.String(), logParentID).Infof("IPv4 masquerading rules: %q", lines) if err := m.iptables.RestoreAll(lines.Bytes(), utiliptables.NoFlushTables, utiliptables.NoRestoreCounters); err != nil { return err @@ -327,40 +350,45 @@ func (m *MasqDaemon) syncMasqRules() error { func (m *MasqDaemon) syncMasqRulesIPv6() error { isIPv6Enabled := *enableIPv6 + logParentID := "ipv6" - if isIPv6Enabled { - // make sure our custom chain for ipv6 non-masquerade exists - if _, err := m.ip6tables.EnsureChain(utiliptables.TableNAT, masqChain); err != nil { - return err - } - // ensure that any non-local in POSTROUTING jumps to masqChain - if err := m.ensurePostroutingJumpIPv6(); err != nil { - return err - } - // build up lines to pass to ip6tables-restore - lines6 := bytes.NewBuffer(nil) - writeLine(lines6, "*nat") - writeLine(lines6, utiliptables.MakeChainLine(masqChain)) // effectively flushes masqChain atomically with rule restore - - // link-local IPv6 CIDR is non-masquerade by default - if !m.config.MasqLinkLocalIPv6 { - writeNonMasqRule(lines6, linkLocalCIDRIPv6) - } + if !isIPv6Enabled { + m.logVerbose("", logParentID).Infof("IPv6 masquerading rules: not enabled.") + return nil + } - for _, cidr := range m.config.NonMasqueradeCIDRs { - if isIPv6CIDR(cidr) { - writeNonMasqRule(lines6, cidr) - } + // make sure our custom chain for ipv6 non-masquerade exists + if _, err := m.ip6tables.EnsureChain(utiliptables.TableNAT, masqChain); err != nil { + return err + } + // ensure that any non-local in POSTROUTING jumps to masqChain + if err := m.ensurePostroutingJumpIPv6(); err != nil { + return err + } + // build up lines to pass to ip6tables-restore + lines6 := bytes.NewBuffer(nil) + writeLine(lines6, "*nat") + writeLine(lines6, utiliptables.MakeChainLine(masqChain)) // effectively flushes masqChain atomically with rule restore + + // link-local IPv6 CIDR is non-masquerade by default + if !m.config.MasqLinkLocalIPv6 { + writeNonMasqRule(lines6, linkLocalCIDRIPv6) + } + + for _, cidr := range m.config.NonMasqueradeCIDRs { + if isIPv6CIDR(cidr) { + writeNonMasqRule(lines6, cidr) } + } - // masquerade all other traffic that is not bound for a --dst-type LOCAL destination - writeMasqRules(lines6, toPorts) + // masquerade all other traffic that is not bound for a --dst-type LOCAL destination + writeMasqRules(lines6, toPorts) - writeLine(lines6, "COMMIT") + writeLine(lines6, "COMMIT") + m.logVerbose(lines6.String(), logParentID).Infof("IPv6 masquerading rules: %q", lines6) - if err := m.ip6tables.RestoreAll(lines6.Bytes(), utiliptables.NoFlushTables, utiliptables.NoRestoreCounters); err != nil { - return err - } + if err := m.ip6tables.RestoreAll(lines6.Bytes(), utiliptables.NoFlushTables, utiliptables.NoRestoreCounters); err != nil { + return err } return nil } diff --git a/cmd/ip-masq-agent/ip-masq-agent_test.go b/cmd/ip-masq-agent/ip-masq-agent_test.go index e04cdea2..0955575f 100644 --- a/cmd/ip-masq-agent/ip-masq-agent_test.go +++ b/cmd/ip-masq-agent/ip-masq-agent_test.go @@ -26,6 +26,7 @@ import ( "testing" "time" + "k8s.io/component-base/logs/logreduction" "k8s.io/ip-masq-agent/cmd/ip-masq-agent/testing/fakefs" "k8s.io/ip-masq-agent/pkg/interval" utiliptables "k8s.io/kubernetes/pkg/util/iptables" @@ -93,9 +94,10 @@ func NewFakeMasqDaemon() *MasqDaemon { }, } return &MasqDaemon{ - config: &MasqConfig{}, - iptables: iptables, - ip6tables: ip6tables, + config: &MasqConfig{}, + iptables: iptables, + ip6tables: ip6tables, + logReduction: logreduction.NewLogReduction(0), } } diff --git a/vendor/k8s.io/component-base/logs/logreduction/logreduction.go b/vendor/k8s.io/component-base/logs/logreduction/logreduction.go new file mode 100644 index 00000000..6534a5a6 --- /dev/null +++ b/vendor/k8s.io/component-base/logs/logreduction/logreduction.go @@ -0,0 +1,78 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logreduction + +import ( + "sync" + "time" +) + +var nowfunc = func() time.Time { return time.Now() } + +// LogReduction provides a filter for consecutive identical log messages; +// a message will be printed no more than once per interval. +// If a string of messages is interrupted by a different message, +// the interval timer will be reset. +type LogReduction struct { + lastError map[string]string + errorPrinted map[string]time.Time + errorMapLock sync.Mutex + identicalErrorDelay time.Duration +} + +// NewLogReduction returns an initialized LogReduction +func NewLogReduction(identicalErrorDelay time.Duration) *LogReduction { + l := new(LogReduction) + l.lastError = make(map[string]string) + l.errorPrinted = make(map[string]time.Time) + l.identicalErrorDelay = identicalErrorDelay + return l +} + +func (l *LogReduction) cleanupErrorTimeouts() { + for name, timeout := range l.errorPrinted { + if nowfunc().Sub(timeout) >= l.identicalErrorDelay { + delete(l.errorPrinted, name) + delete(l.lastError, name) + } + } +} + +// ShouldMessageBePrinted determines whether a message should be printed based +// on how long ago this particular message was last printed +func (l *LogReduction) ShouldMessageBePrinted(message string, parentID string) bool { + l.errorMapLock.Lock() + defer l.errorMapLock.Unlock() + l.cleanupErrorTimeouts() + lastMsg, ok := l.lastError[parentID] + lastPrinted, ok1 := l.errorPrinted[parentID] + if !ok || !ok1 || message != lastMsg || nowfunc().Sub(lastPrinted) >= l.identicalErrorDelay { + l.errorPrinted[parentID] = nowfunc() + l.lastError[parentID] = message + return true + } + return false +} + +// ClearID clears out log reduction records pertaining to a particular parent +// (e. g. container ID) +func (l *LogReduction) ClearID(parentID string) { + l.errorMapLock.Lock() + defer l.errorMapLock.Unlock() + delete(l.lastError, parentID) + delete(l.errorPrinted, parentID) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index d4800b59..302f0731 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -160,6 +160,7 @@ k8s.io/component-base/logs k8s.io/component-base/logs/api/v1 k8s.io/component-base/logs/internal/setverbositylevel k8s.io/component-base/logs/klogflags +k8s.io/component-base/logs/logreduction k8s.io/component-base/metrics k8s.io/component-base/metrics/legacyregistry k8s.io/component-base/metrics/prometheus/feature