Skip to content

Commit

Permalink
[backport] [NPM] Backport NPM Changes for v1.5.41 (#3344)
Browse files Browse the repository at this point in the history
* test: [NPM] fix windows unit test for policymanager (#3161)

* test: fix windows unit test for policymanager

Signed-off-by: Hunter Gregory <[email protected]>

* fix(test): flip bool

Signed-off-by: Hunter Gregory <[email protected]>

---------

Signed-off-by: Hunter Gregory <[email protected]>

* fix: [NPM] close telemetry handler before crashing (#3333)

fix: close telemetry handler before crashing

Signed-off-by: Hunter Gregory <[email protected]>

* fix(log): time waiting for appinsights to close was unbounded (#3337)

* fix: time waiting for appinsights to close was unbounded

Signed-off-by: Hunter Gregory <[email protected]>

* fix: close timer in case it hasn't fired yet

Signed-off-by: Hunter Gregory <[email protected]>

---------

Signed-off-by: Hunter Gregory <[email protected]>

* resolved merge conflict as there isnt npm lite in v1.5

---------

Signed-off-by: Hunter Gregory <[email protected]>
Co-authored-by: Hunter Gregory <[email protected]>
  • Loading branch information
rayaisaiah and huntergregory authored Jan 15, 2025
1 parent 269fc2d commit b84dbfd
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 8 deletions.
30 changes: 29 additions & 1 deletion aitelemetry/telemetrywrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
azurePublicCloudStr = "AzurePublicCloud"
hostNameKey = "hostname"
defaultTimeout = 10
maxCloseTimeoutInSeconds = 30
defaultBatchIntervalInSecs = 15
defaultBatchSizeInBytes = 32768
defaultGetEnvRetryCount = 5
Expand Down Expand Up @@ -330,8 +331,35 @@ func (th *telemetryHandle) Close(timeout int) {
timeout = defaultTimeout
}

// max wait is the minimum of the timeout and maxCloseTimeoutInSeconds
maxWaitTimeInSeconds := timeout
if maxWaitTimeInSeconds < maxCloseTimeoutInSeconds {
maxWaitTimeInSeconds = maxCloseTimeoutInSeconds
}

// wait for items to be sent otherwise timeout
<-th.client.Channel().Close(time.Duration(timeout) * time.Second)
// similar to the example in the appinsights-go repo: https://github.com/microsoft/ApplicationInsights-Go#shutdown
timer := time.NewTimer(time.Duration(maxWaitTimeInSeconds) * time.Second)
defer timer.Stop()
select {
case <-th.client.Channel().Close(time.Duration(timeout) * time.Second):
// timeout specified for retries.

// If we got here, then all telemetry was submitted
// successfully, and we can proceed to exiting.

case <-timer.C:
// absolute timeout. This covers any
// previous telemetry submission that may not have
// completed before Close was called.

// There are a number of reasons we could have
// reached here. We gave it a go, but telemetry
// submission failed somewhere. Perhaps old events
// were still retrying, or perhaps we're throttled.
// Either way, we don't want to wait around for it
// to complete, so let's just exit.
}

// Remove diganostic message listener
if th.diagListener != nil {
Expand Down
16 changes: 10 additions & 6 deletions npm/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ func newStartNPMCmd() *cobra.Command {
KubeConfigPath: viper.GetString(flagKubeConfigPath),
}

return start(*config, flags)
// start is blocking, unless there's an error
err = start(*config, flags)
metrics.Close()
return err
},
}

Expand Down Expand Up @@ -117,7 +120,10 @@ func start(config npmconfig.Config, flags npmconfig.Flags) error {
klog.Infof("Resync period for NPM pod is set to %d.", int(resyncPeriod/time.Minute))
factory := informers.NewSharedInformerFactory(clientset, resyncPeriod)

k8sServerVersion := k8sServerVersion(clientset)
err = metrics.CreateTelemetryHandle(config.NPMVersion(), version, npm.GetAIMetadata())
if err != nil {
klog.Infof("CreateTelemetryHandle failed with error %v. AITelemetry is not initialized.", err)
}

var dp dataplane.GenericDataplane
stopChannel := wait.NeverStop
Expand Down Expand Up @@ -181,11 +187,9 @@ func start(config npmconfig.Config, flags npmconfig.Flags) error {
}
dp.RunPeriodicTasks()
}

k8sServerVersion := k8sServerVersion(clientset)
npMgr := npm.NewNetworkPolicyManager(config, factory, dp, exec.New(), version, k8sServerVersion)
err = metrics.CreateTelemetryHandle(config.NPMVersion(), version, npm.GetAIMetadata())
if err != nil {
klog.Infof("CreateTelemetryHandle failed with error %v. AITelemetry is not initialized.", err)
}

go restserver.NPMRestServerListenAndServe(config, npMgr)

Expand Down
11 changes: 11 additions & 0 deletions npm/metrics/ai-utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"k8s.io/klog"
)

const telemetryCloseWaitTimeSeconds = 10

var (
th aitelemetry.TelemetryHandle
npmVersion int
Expand Down Expand Up @@ -54,6 +56,15 @@ func CreateTelemetryHandle(npmVersionNum int, imageVersion, aiMetadata string) e
return nil
}

// Close cleans up the telemetry handle, which effectively waits for all telemetry data to be sent
func Close() {
if th == nil {
return
}

th.Close(telemetryCloseWaitTimeSeconds)
}

// SendErrorLogAndMetric sends a metric through AI telemetry and sends a log to the Kusto Messages table
func SendErrorLogAndMetric(operationID int, format string, args ...interface{}) {
// Send error metrics
Expand Down
1 change: 1 addition & 0 deletions npm/pkg/dataplane/ipsets/ipsetmanager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ func (iMgr *IPSetManager) applyIPSets() error {
msg := fmt.Sprintf("exceeded max consecutive failures (%d) when applying ipsets. final error: %s", maxConsecutiveFailures, restoreError.Error())
klog.Error(msg)
metrics.SendErrorLogAndMetric(util.IpsmID, msg)
metrics.Close()
panic(msg)
}

Expand Down
4 changes: 3 additions & 1 deletion npm/pkg/dataplane/policies/policymanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ func TestBootup(t *testing.T) {
metrics.IncNumACLRules()

require.NoError(t, pMgr.Bootup(epIDs))
require.Equal(t, util.IptablesNft, util.Iptables)
if !util.IsWindowsDP() {
require.Equal(t, util.IptablesNft, util.Iptables)
}

expectedNumACLs := 11
if util.IsWindowsDP() {
Expand Down

0 comments on commit b84dbfd

Please sign in to comment.